summaryrefslogtreecommitdiff
path: root/code
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
commitcbba099541d27400ad45083a4b1102b86f9e8dea (patch)
treee260d7da3d07b4de0e989726917f08d23c55f70d /code
parente9ced8e60689c6f46ac4fc31013b88f4c3f4fa80 (diff)
rbuffer almost implemented
Diffstat (limited to 'code')
-rw-r--r--code/core/config.h2
-rw-r--r--code/core/core.c74
-rw-r--r--code/core/core.h27
-rw-r--r--code/core/htable/htable.c9
-rw-r--r--code/core/msgq.c2
-rw-r--r--code/core/posix/transport.c2
-rw-r--r--code/core/rbuf.c49
-rw-r--r--code/core/rbuf.h72
-rw-r--r--code/core/rbuf_recv.c149
-rw-r--r--code/core/rbuf_send.c182
-rw-r--r--code/proxy/proxy.c20
-rw-r--r--code/proxy/proxy.h3
-rw-r--r--code/test/basic.c4
-rw-r--r--code/test/client.c4
-rw-r--r--code/test/pr_client.c4
-rw-r--r--code/test/stress.c2
-rw-r--r--code/test/voip.c8
17 files changed, 427 insertions, 186 deletions
diff --git a/code/core/config.h b/code/core/config.h
index 7f9390f..7e2e416 100644
--- a/code/core/config.h
+++ b/code/core/config.h
@@ -1,4 +1,4 @@
#define ECP_WITH_PTHREAD 1
#define ECP_WITH_HTABLE 1
-#define ECP_WITH_RBUF 0
+#define ECP_WITH_RBUF 1
#define ECP_DEBUG 1 \ No newline at end of file
diff --git a/code/core/core.c b/code/core/core.c
index a725952..069c910 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -521,10 +521,9 @@ void ecp_conn_unregister(ECPConnection *conn) {
static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
unsigned char payload[ECP_SIZE_PLD(0)];
- ecp_seq_t seq;
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
- return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+ return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
}
int ecp_conn_init(ECPConnection *conn, ECPNode *node) {
@@ -686,18 +685,31 @@ int ecp_conn_handle_new(ECPSocket *sock, ECPConnection **_conn, ECPConnection *p
}
ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ int is_open;
if (size < 0) return size;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
- if (!ecp_conn_is_open(conn)) conn->flags |= ECP_CONN_FLAG_OPEN;
+ is_open = ecp_conn_is_open(conn);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&conn->mutex);
#endif
+ if (!is_open) conn->flags |= ECP_CONN_FLAG_OPEN;
+
if (mtype & ECP_MTYPE_FLAG_REP) {
+ int rv;
+
if (!conn->out) return ECP_ERR;
+
+#ifdef ECP_WITH_RBUF
+ if (!is_open && conn->rbuf.recv) {
+ rv = ecp_conn_rbuf_recv_start(conn, seq);
+ if (rv) return rv;
+ }
+#endif
+
return 0;
} else {
unsigned char payload[ECP_SIZE_PLD(0)];
@@ -901,15 +913,16 @@ ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsign
return rv+ECP_SIZE_PKT_HDR;
}
-ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr) {
ECPContext *ctx = sock->ctx;
return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
}
-ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
ecp_aead_key_t shsec;
ecp_dh_public_t public;
+ ecp_seq_t _seq;
unsigned char nonce[ECP_AEAD_SIZE_NONCE];
int rv = ECP_OK;
@@ -941,13 +954,27 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
} else {
memcpy(&public, &conn->remote.key[conn->remote.key_curr].public, sizeof(public));
}
-
}
if (!rv) {
- if (seq) {
- conn->seq_out++;
- *seq = conn->seq_out;
+ _seq = conn->seq_out;
+#ifdef ECP_WITH_RBUF
+ if (conn->rbuf.send && rbuf_idx) {
+ ECPRBSend *buf = conn->rbuf.send;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq);
+ if (*rbuf_idx < 0) rv = ECP_ERR_RBUF_FULL;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
}
+#endif
+ }
+ if (!rv) {
+ conn->seq_out = _seq + 1;
if (addr) *addr = conn->node.addr;
}
@@ -957,7 +984,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
if (rv) return rv;
- ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, seq ? *seq : 0, payload, payload_size);
+ ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, _seq, payload, payload_size);
if (_rv < 0) return _rv;
#ifdef ECP_WITH_PTHREAD
@@ -968,6 +995,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
pthread_mutex_unlock(&conn->mutex);
#endif
+ if (seq) *seq = _seq;
return _rv;
}
@@ -1041,7 +1069,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
}
pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY);
- if (pld_size < ECP_MIN_PLD) rv = ECP_ERR_DECRYPT;
+ if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT;
if (rv) goto pkt_handle_err;
p_seq = \
@@ -1121,7 +1149,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
#endif
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.recv && conn->rbuf.recv->open) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
+ if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
#endif
if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
@@ -1186,21 +1214,20 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s
unsigned char mtype = 0;
size_t rem_size = msg_size;
+ if (msg_size < 1) return ECP_ERR_MIN_MSG;
+
while (rem_size) {
mtype = msg[0];
msg++;
rem_size--;
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
- if (rem_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG;
ecp_timer_pop(conn, mtype);
handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL;
if (handler) {
rv = handler(conn, seq, mtype, msg, rem_size);
if (rv < 0) return rv;
- if (rv == 0) rv = rem_size;
- if (rv < ECP_MIN_MSG) rv = ECP_MIN_MSG;
if (rv > rem_size) return ECP_ERR;
rem_size -= rv;
@@ -1226,26 +1253,25 @@ unsigned char ecp_pld_get_type(unsigned char *payload) {
}
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) {
- ecp_seq_t seq;
- return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+ return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
}
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
unsigned char packet[ECP_MAX_PKT];
ECPSocket *sock = conn->sock;
ECPContext *ctx = sock->ctx;
ECPNetAddr addr;
+ ecp_seq_t seq;
ssize_t rv;
+ int rbuf_idx = -1;
- rv = ctx->pack(conn, &addr, seq, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size);
+ rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx);
if (rv < 0) return rv;
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.send && conn->rbuf.send->open && seq) {
- ssize_t _rv = ecp_rbuf_send_store(conn, *seq, packet, rv);
- if (_rv < 0) return _rv;
- }
+ if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx);
#endif
+
return ecp_pkt_send(sock, &addr, packet, rv);
}
@@ -1255,7 +1281,7 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr
ECPNetAddr _addr;
ssize_t rv;
- rv = ctx->pack_raw(sock, proxy, &_addr, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
+ rv = ctx->pack_raw(sock, proxy, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size, &_addr);
if (rv < 0) return rv;
return ecp_pkt_send(sock, proxy ? &_addr : addr, packet, rv);
diff --git a/code/core/core.h b/code/core/core.h
index 81ec69c..382b2c1 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -36,6 +36,7 @@
#define ECP_MAX_NODE_KEY 2
#define ECP_MAX_CTYPE 8
#define ECP_MAX_MTYPE 16
+#define ECP_MAX_MTYPE_SYS 8
#define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
#define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ)
@@ -45,21 +46,19 @@
#define ECP_MAX_PLD (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG)
#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_MSG_HDR)
-#define ECP_MIN_MSG 8
-#define ECP_MIN_PLD (ECP_SIZE_MSG_HDR+ECP_MIN_MSG)
-#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_MIN_PLD+ECP_AEAD_SIZE_TAG)
+#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
#define ECP_POLL_TIMEOUT 500
#define ECP_ECDH_IDX_INV 0xFF
#define ECP_ECDH_IDX_PERMA 0x0F
-#define ECP_MTYPE_MASK 0x7f
-#define ECP_MTYPE_FLAG_REP 0x80
+#define ECP_MTYPE_MASK 0x3f
+#define ECP_MTYPE_FLAG_TIMER 0x80
+#define ECP_MTYPE_FLAG_REP 0x40
#define ECP_MTYPE_OPEN 0x00
#define ECP_MTYPE_KGET 0x01
#define ECP_MTYPE_KPUT 0x02
-#define ECP_MTYPE_EXEC 0x03
#define ECP_MTYPE_OPEN_REQ (ECP_MTYPE_OPEN)
#define ECP_MTYPE_OPEN_REP (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP)
@@ -68,8 +67,8 @@
#define ECP_MTYPE_KPUT_REQ (ECP_MTYPE_KPUT)
#define ECP_MTYPE_KPUT_REP (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP)
-#define ECP_SIZE_PLD(X) ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_MSG_HDR : (X) + ECP_SIZE_MSG_HDR)
-#define ECP_SIZE_PKT(X) ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG : (X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
+#define ECP_SIZE_PLD(X) ((X) + ECP_SIZE_MSG_HDR)
+#define ECP_SIZE_PKT(X) ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
#define ECP_CONN_FLAG_REG 0x01
#define ECP_CONN_FLAG_OPEN 0x02
@@ -202,7 +201,9 @@ typedef struct ECPConnHandler {
typedef struct ECPSockCTable {
struct ECPConnection *array[ECP_MAX_SOCK_CONN];
unsigned short size;
+#ifdef ECP_WITH_HTABLE
void *htable;
+#endif
#ifdef ECP_WITH_PTHREAD
pthread_mutex_t mutex;
#endif
@@ -218,8 +219,8 @@ typedef struct ECPContext {
#ifdef ECP_WITH_HTABLE
ECPHTableIface ht;
#endif
- ssize_t (*pack) (struct ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
- ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
+ ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+ ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
ECPConnHandler *handler[ECP_MAX_CTYPE];
} ECPContext;
@@ -311,8 +312,8 @@ int ecp_conn_dhkey_new_pub(ECPConnection *conn, unsigned char idx, unsigned char
int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned char *public);
ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
-ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, unsigned char *packet, size_t pkt_size);
ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
@@ -323,7 +324,7 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload);
void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
unsigned char ecp_pld_get_type(unsigned char *payload);
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
diff --git a/code/core/htable/htable.c b/code/core/htable/htable.c
index 552a03b..82dbcbc 100644
--- a/code/core/htable/htable.c
+++ b/code/core/htable/htable.c
@@ -5,11 +5,7 @@
#include "hashtable.h"
static void *h_create(ECPContext *ctx) {
- int rv;
- struct hashtable *h = create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL);
- if (h == NULL) return NULL;
-
- return h;
+ return create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL);
}
static void h_destroy(void *h) {
@@ -23,7 +19,6 @@ static int h_insert(void *h, unsigned char *k, ECPConnection *v) {
}
static ECPConnection *h_remove(void *h, unsigned char *k) {
- printf("REMOVE!!!\n");
return hashtable_remove(h, k);
}
@@ -38,5 +33,5 @@ int ecp_htable_init(ECPHTableIface *h) {
h->insert = h_insert;
h->remove = h_remove;
h->search = h_search;
- return 0;
+ return ECP_OK;
}
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 53c55ea..2ba0ed5 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -62,7 +62,7 @@ ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_s
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG;
- if (msg_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG;
+ if (msg_size < 1) return ECP_ERR_MIN_MSG;
for (i=0; i<ECP_MAX_CONN_MSG; i++) {
if (!msgq->occupied[msg_idx]) {
diff --git a/code/core/posix/transport.c b/code/core/posix/transport.c
index da87342..e3fe612 100644
--- a/code/core/posix/transport.c
+++ b/code/core/posix/transport.c
@@ -112,5 +112,5 @@ int ecp_transport_init(ECPTransportIface *t) {
t->recv = t_recv;
t->addr_eq = t_addr_eq;
t->addr_set = t_addr_set;
- return 0;
+ return ECP_OK;
}
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index 0865e67..bb110b9 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -3,14 +3,15 @@
#include <string.h>
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
- memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
rbuf->msg = msg;
if (msg_size) {
+ if (msg == NULL) return ECP_ERR;
rbuf->msg_size = msg_size;
+ memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size);
} else {
rbuf->msg_size = ECP_RBUF_SEQ_HALF;
}
-
+
return ECP_OK;
}
@@ -22,17 +23,53 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
return ECP_ERR_RBUF_IDX;
}
-ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
- int idx = ecp_rbuf_msg_idx(rbuf, seq);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
+ idx = idx < 0 ? ecp_rbuf_msg_idx(rbuf, seq) : idx;
if (idx < 0) return idx;
if (rbuf->msg == NULL) return 0;
if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG;
- memcpy(rbuf->msg[idx].msg, msg, msg_size);
- rbuf->msg[idx].size = msg_size;
+ if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) {
+ memcpy(rbuf->msg[idx].msg, msg, msg_size);
+ rbuf->msg[idx].size = msg_size;
+ }
rbuf->msg[idx].flags = set_flags;
return msg_size;
}
+int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv = ecp_conn_rbuf_send_start(conn);
+ if (rv) return rv;
+
+ if (!conn->out) {
+ rv = ecp_conn_rbuf_recv_start(conn, seq);
+ if (rv) return rv;
+ }
+
+ return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
+ int do_send;
+ ECPRBSend *buf = conn->rbuf.send;
+
+ ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
+ if (rv < 0) return rv;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ if (buf->in_transit < buf->win_size) {
+ buf->in_transit++;
+ do_send = 1;
+ }
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
+ return rv;
+}
+
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 4340707..9e9e53d 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,21 +1,38 @@
-#define ECP_RBUF_FLAG_PRESENT 1
+#define ECP_RBUF_FLAG_RECEIVED 0x01
+#define ECP_RBUF_FLAG_DELIVERED 0x02
-#define ECP_ERR_RBUF_FLAG -100
-#define ECP_ERR_RBUF_IDX -101
-#define ECP_ERR_RBUF_DUP -102
+#define ECP_RBUF_FLAG_RELIABLE 0x01
+#define ECP_RBUF_FLAG_MSGQ 0x02
+
+#define ECP_MTYPE_RBOPEN 0x04
+#define ECP_MTYPE_RBCLOSE 0x05
+#define ECP_MTYPE_RBFLUSH 0x06
+#define ECP_MTYPE_RBACK 0x07
+
+#define ECP_MTYPE_RBOPEN_REQ (ECP_MTYPE_RBOPEN)
+#define ECP_MTYPE_RBOPEN_REP (ECP_MTYPE_RBOPEN | ECP_MTYPE_FLAG_REP)
+#define ECP_MTYPE_RBCLOSE_REQ (ECP_MTYPE_RBCLOSE)
+#define ECP_MTYPE_RBCLOSE_REP (ECP_MTYPE_RBCLOSE | ECP_MTYPE_FLAG_REP)
+#define ECP_MTYPE_RBFLUSH_REQ (ECP_MTYPE_RBFLUSH)
+#define ECP_MTYPE_RBFLUSH_REP (ECP_MTYPE_RBFLUSH | ECP_MTYPE_FLAG_REP)
+
+#define ECP_ERR_RBUF_FLAG -100
+#define ECP_ERR_RBUF_IDX -101
+#define ECP_ERR_RBUF_DUP -102
+#define ECP_ERR_RBUF_FULL -103
typedef uint32_t ecp_ack_t;
typedef uint32_t ecp_win_t;
-#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
+#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
-#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0)
-#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8)
+#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0)
+#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8)
-#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF)
-#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF)
+#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF)
+#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF)
-#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
+#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
/* If size not 2^x:
#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size))
*/
@@ -35,8 +52,8 @@ typedef struct ECPRBuffer {
} ECPRBuffer;
typedef struct ECPRBRecv {
- unsigned char open;
- unsigned char reliable;
+ unsigned char flags;
+ unsigned char flush;
unsigned short deliver_delay;
unsigned short hole_max;
unsigned short ack_rate;
@@ -50,12 +67,16 @@ typedef struct ECPRBRecv {
} ECPRBRecv;
typedef struct ECPRBSend {
- unsigned char open;
- unsigned char reliable;
+ unsigned char flags;
+ unsigned char flush;
ecp_win_t win_size;
ecp_win_t in_transit;
+ ecp_seq_t seq_flush;
unsigned int nack_rate;
ECPRBuffer rbuf;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_t mutex;
+#endif
} ECPRBSend;
typedef struct ECPConnRBuffer {
@@ -66,14 +87,17 @@ typedef struct ECPConnRBuffer {
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
-ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-
-int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
-int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq);
-int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max);
-int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay);
-ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
-
-
-ssize_t ecp_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
+int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx);
+
+int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_conn_rbuf_send_start(struct ECPConnection *conn);
+int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
+int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
+ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index 222daa7..b2a5ecf 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -5,10 +5,35 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1))
-static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT);
+static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ if (size < 0) return size;
+
+ ECPRBRecv *buf = conn->rbuf.recv;
+ unsigned char payload[ECP_SIZE_PLD(0)];
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->flush = 1;
+ return 0;
+}
+
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ssize_t rv = 0;
+ unsigned char flags;
+ unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
+
+ if (mtype < ECP_MAX_MTYPE_SYS) {
+ flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED;
+ } else {
+ flags = ECP_RBUF_FLAG_RECEIVED;
+ }
+
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
if (rv < 0) return ECP_ERR_RBUF_DUP;
+ if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
+
if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq;
return rv;
}
@@ -20,11 +45,15 @@ static void msg_flush(ECPConnection *conn) {
unsigned int idx = buf->rbuf.msg_start;
for (i=0; i<msg_cnt; i++) {
- if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break;
- if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT;
- ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
+ if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+ } else {
+ ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
@@ -32,12 +61,35 @@ static void msg_flush(ECPConnection *conn) {
buf->rbuf.seq_start += i;
}
+static int ack_send(ECPConnection *conn) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))];
+ unsigned char *buf_ = ecp_pld_get_buf(payload);
+ ssize_t rv;
+
+ ecp_pld_set_type(payload, ECP_MTYPE_RBACK);
+ buf_[0] = (buf->seq_ack & 0xFF000000) >> 24;
+ buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16;
+ buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8;
+ buf_[3] = (buf->seq_ack & 0x000000FF);
+ buf_[4] = (buf->ack_map & 0xFF000000) >> 24;
+ buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
+ buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
+ buf_[7] = (buf->ack_map & 0x000000FF);
+
+ rv = ecp_pld_send(conn, payload, sizeof(payload));
+ if (rv < 0) return rv;
+
+ buf->ack_pkt = 0;
+ return ECP_OK;
+}
+
static int ack_shift(ECPRBRecv *buf) {
int do_ack = 0;
int idx;
int i;
- if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack);
if (idx < 0) return idx;
@@ -46,10 +98,10 @@ static int ack_shift(ECPRBRecv *buf) {
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
buf->seq_ack++;
- if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
+ if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
buf->ack_map |= 1;
} else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) {
do_ack = 1;
@@ -73,47 +125,61 @@ static int ack_shift(ECPRBRecv *buf) {
return do_ack;
}
-int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+ int rv;
+
memset(buf, 0, sizeof(ECPRBRecv));
- ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ if (rv) return rv;
+
buf->ack_map = ECP_RBUF_ACK_FULL;
buf->ack_rate = ACK_RATE;
-
+ conn->rbuf.recv = buf;
+
return ECP_OK;
}
-int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) {
- buf->seq_ack = seq;
- buf->seq_max = seq;
- buf->rbuf.seq_start = seq + 1;
-
- return ECP_OK;
-}
+int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+ ECPRBRecv *buf = conn->rbuf.recv;
-int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) {
buf->hole_max = hole_max;
- buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
- buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
+ buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
+ buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
return ECP_OK;
}
-int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) {
+int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
buf->deliver_delay = delay;
if (buf->hole_max < delay - 1) {
- ecp_rbuf_recv_set_hole(buf, delay - 1);
+ ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
}
return ECP_OK;
}
-ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- ssize_t rv;
+int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->seq_ack = seq;
+ buf->seq_max = seq;
+ buf->rbuf.seq_start = seq + 1;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
+ ssize_t rv;
int do_ack = 0;
- ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
+ if (msg_size < 1) return ECP_ERR_MIN_MSG;
if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
@@ -122,43 +188,48 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
-
- rv = msg_store(buf, seq, msg, msg_size);
- if (rv < 0) return rv;
-
buf->ack_map |= ack_mask;
do_ack = ack_shift(buf);
+
+ rv = msg_store(conn, seq, msg, msg_size);
+ if (rv < 0) return rv;
} else {
return ECP_ERR_RBUF_IDX;
}
} else {
if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
- if (buf->deliver_delay) {
- rv = msg_store(buf, seq, msg, msg_size);
+ if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
+ rv = msg_store(conn, seq, msg, msg_size);
if (rv < 0) return rv;
} else {
- rv = ecp_msg_handle(conn, seq, msg, msg_size);
+ ecp_msg_handle(conn, seq, msg, msg_size);
+ rv = msg_size;
buf->seq_max++;
buf->rbuf.seq_start++;
+ buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
buf->seq_ack++;
} else {
- rv = msg_store(buf, seq, msg, msg_size);
+ rv = msg_store(conn, seq, msg, msg_size);
if (rv < 0) return rv;
do_ack = ack_shift(buf);
}
}
+ if (buf->flush) {
+ buf->flush = 0;
+ do_ack = 1;
+ }
if (ack_pkt && !do_ack) {
buf->ack_pkt += ack_pkt;
// should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT)
if (buf->ack_pkt > buf->ack_rate) do_ack = 1;
}
if (do_ack) {
- buf->ack_pkt = 0;
- // send ack (with seq = 0)
+ int _rv = ack_send(conn);
+ if (_rv) return _rv;
}
- // XXX should handle close
msg_flush(conn);
return rv;
}
+
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 5f3bc87..877b086 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -2,75 +2,163 @@
#include <string.h>
-#define ACK_RATE_UNIT 10000
+#define NACK_RATE_UNIT 10000
+
+static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
+ unsigned char payload[ECP_SIZE_PLD(0)];
+
+ ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH_REQ);
+ return ecp_pld_send(conn, payload, sizeof(payload));
+}
static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
- int idx = 0;
+ ECPRBSend *buf;
+ ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
ecp_seq_t seq_ack = 0;
ecp_ack_t ack_map = 0;
+ int do_flush = 0;
+ int rv = ECP_OK;
- ECPRBSend *buf = conn->rbuf.send;
+ buf = conn->rbuf.send;
+ if (buf == NULL) return ECP_ERR;
+ if (size < 0) return size;
+ if (size < rsize) return ECP_ERR;
+
+ seq_ack = \
+ (msg[0] << 24) | \
+ (msg[1] << 16) | \
+ (msg[2] << 8) | \
+ (msg[3]);
+ ack_map = \
+ (msg[4] << 24) | \
+ (msg[5] << 16) | \
+ (msg[6] << 8) | \
+ (msg[7]);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
ECPRBuffer *rbuf = &buf->rbuf;
+ int idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
+ if (idx < 0) rv = idx;
- idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
- if (idx < 0) return idx;
-
- seq_ack++;
- buf->in_transit -= seq_ack - rbuf->seq_start;
-
- if (ack_map != ECP_RBUF_ACK_FULL) {
- int i;
- int nack = 0;
- ecp_win_t nack_cnt = 0;
-
- seq_ack -= ECP_RBUF_ACK_SIZE;
- for (i=0; i<ECP_RBUF_ACK_SIZE; i++) {
- if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
- nack_cnt++;
- if (buf->reliable) {
- idx = ecp_rbuf_msg_idx(rbuf, seq_ack + i);
- // resend packet
- // ecp_pkt_send(conn->sock, &conn->node.addr, packet, rv);
- if (!nack) {
- nack = 1;
-
- rbuf->seq_start = seq_ack + i;
- rbuf->msg_start = idx;
+ int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE;
+
+ if (!rv) {
+ seq_ack++;
+ buf->in_transit -= seq_ack - rbuf->seq_start;
+
+ if (ack_map != ECP_RBUF_ACK_FULL) {
+ int i;
+ int nack_first = 0;
+ unsigned int msg_start;
+ ecp_seq_t seq_start;
+ ecp_win_t nack_cnt = 0;
+
+ seq_ack -= ECP_RBUF_ACK_SIZE;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ for (i=0; i<ECP_RBUF_ACK_SIZE; i++) {
+ if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
+ nack_cnt++;
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size);
+ if (!nack_first) {
+ nack_first = 1;
+ seq_start = seq_ack + i;
+ msg_start = idx;
+ }
}
}
}
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ buf->in_transit += nack_cnt;
+ buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ rbuf->seq_start = seq_start;
+ rbuf->msg_start = msg_start;
+ } else {
+ rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
+ }
+ } else {
+ rbuf->seq_start = seq_ack;
+ buf->nack_rate = (buf->nack_rate * 7) / 8;
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
+ }
}
- buf->in_transit += nack_cnt;
- buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
- if (!buf->reliable) {
- rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
- }
- } else {
- rbuf->seq_start = seq_ack;
- buf->nack_rate = (buf->nack_rate * 7) / 8;
- if (buf->reliable) {
- rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
+ if (buf->flush) {
+ if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
+ if (buf->flush) {
+ do_flush = 1;
+ }
}
}
- return size;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (rv) return rv;
+
+ if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+ return rsize;
}
-int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+ int rv;
+
memset(buf, 0, sizeof(ECPRBRecv));
- ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ if (rv) return rv;
+
+ conn->rbuf.send = buf;
return ECP_OK;
}
-int ecp_rbuf_send_start(ECPRBSend *buf, ecp_seq_t seq) {
- buf->rbuf.seq_start = seq + 1;
+int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+ ECPRBSend *buf = conn->rbuf.send;
+
+ if (buf == NULL) return ECP_ERR;
+ buf->rbuf.seq_start = conn->seq_out;
return ECP_OK;
}
-ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+int ecp_conn_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
-
- buf->in_transit++;
- return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0);
-} \ No newline at end of file
+ unsigned char payload[ECP_SIZE_PLD(0)];
+ ecp_seq_t seq;
+
+ if (buf == NULL) return ECP_ERR;
+
+ // XXX flush seq
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ if (buf->flush) {
+ if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq;
+ } else {
+ buf->flush = 1;
+ buf->seq_flush = seq;
+ }
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+ if (_rv < 0) return _rv;
+
+ return ECP_OK;
+}
+
diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c
index 67d4d01..e9cd93f 100644
--- a/code/proxy/proxy.c
+++ b/code/proxy/proxy.c
@@ -70,12 +70,11 @@ static ssize_t _proxyf_send_open(ECPConnection *conn) {
ECPConnProxy *conn_p = (ECPConnProxy *)conn;
ECPConnection *conn_next = conn_p->next;
unsigned char payload[ECP_SIZE_PLD(0)];
- ecp_seq_t seq;
if (conn_next == NULL) return ECP_ERR;
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
- return ecp_pld_send_wkey(conn_next, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+ return ecp_pld_send_wkey(conn_next, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
}
static ssize_t _proxyf_retry_kget(ECPConnection *conn, ECPTimerItem *ti) {
@@ -363,37 +362,35 @@ static ssize_t proxy_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t
}
-static ssize_t proxy_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+static ssize_t proxy_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
ECPContext *ctx = conn->sock->ctx;
if (conn->proxy) {
unsigned char payload_[ECP_MAX_PLD];
- ecp_seq_t _seq;
ssize_t rv, hdr_size = proxy_set_msg(conn->proxy, payload_, sizeof(payload_), payload, payload_size);
if (hdr_size < 0) return hdr_size;
- rv = ecp_conn_pack(conn, NULL, &_seq, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size);
+ rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, seq, rbuf_idx);
if (rv < 0) return rv;
- return proxy_pack(conn->proxy, addr, seq, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size);
+ return proxy_pack(conn->proxy, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
} else {
- return ecp_conn_pack(conn, addr, seq, packet, pkt_size, s_idx, c_idx, payload, payload_size);
+ return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, seq, rbuf_idx);
}
}
-static ssize_t proxy_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size) {
+static ssize_t proxy_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr) {
ECPContext *ctx = sock->ctx;
if (proxy) {
unsigned char payload_[ECP_MAX_PLD];
- ecp_seq_t _seq;
ssize_t rv, hdr_size = proxy_set_msg(proxy, payload_, sizeof(payload_), payload, payload_size);
if (hdr_size < 0) return hdr_size;
rv = ecp_pack(ctx, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
if (rv < 0) return rv;
- return proxy_pack(proxy, addr, &_seq, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size);
+ return proxy_pack(proxy, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
} else {
return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
}
@@ -472,10 +469,9 @@ int ecp_conn_proxy_init(ECPConnection *conn, ECPNode *conn_node, ECPConnProxy pr
static ssize_t _proxy_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
unsigned char payload[ECP_SIZE_PLD(0)];
- ecp_seq_t seq;
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
- return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+ return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
}
int ecp_conn_proxy_open(ECPConnection *conn, ECPNode *conn_node, ECPConnProxy proxy[], ECPNode proxy_node[], int size) {
diff --git a/code/proxy/proxy.h b/code/proxy/proxy.h
index 1ae2566..6f82cd2 100644
--- a/code/proxy/proxy.h
+++ b/code/proxy/proxy.h
@@ -1,7 +1,8 @@
#define ECP_CTYPE_PROXYF 1
#define ECP_CTYPE_PROXYB 2
-#define ECP_MTYPE_RELAY 0x04
+#define ECP_MTYPE_RELAY 0x08
+#define ECP_MTYPE_EXEC 0x09
typedef struct ECPConnProxy {
ECPConnection b;
diff --git a/code/test/basic.c b/code/test/basic.c
index c9f6bd5..809d453 100644
--- a/code/test/basic.c
+++ b/code/test/basic.c
@@ -26,7 +26,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_conn_handle_open(conn, sq, t, p, s);
if (s < 0) {
printf("OPEN ERR:%ld\n", s);
- return 0;
+ return s;
}
unsigned char payload[ECP_SIZE_PLD(1000)];
@@ -36,7 +36,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
- return 0;
+ return s;
}
ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/client.c b/code/test/client.c
index 6e45df4..1d5dc6c 100644
--- a/code/test/client.c
+++ b/code/test/client.c
@@ -22,7 +22,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_conn_handle_open(conn, sq, t, p, s);
if (s < 0) {
printf("OPEN ERR:%ld\n", s);
- return 0;
+ return s;
}
unsigned char payload[ECP_SIZE_PLD(1000)];
@@ -32,7 +32,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
- return 0;
+ return s;
}
ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/pr_client.c b/code/test/pr_client.c
index 7b80d6f..d643dd3 100644
--- a/code/test/pr_client.c
+++ b/code/test/pr_client.c
@@ -26,7 +26,7 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
ecp_conn_handle_open(conn, sq, t, p, s);
if (s < 0) {
printf("OPEN ERR:%ld\n", s);
- return 0;
+ return s;
}
printf("OPEN!\n");
@@ -38,7 +38,7 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
- return 0;
+ return s;
}
ssize_t handle_msg(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/stress.c b/code/test/stress.c
index 42663fc..081c0cb 100644
--- a/code/test/stress.c
+++ b/code/test/stress.c
@@ -107,7 +107,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
perror(msg);
exit(1);
}
- return 0;
+ return s;
}
ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/voip.c b/code/test/voip.c
index 67e1afe..8c53d01 100644
--- a/code/test/voip.c
+++ b/code/test/voip.c
@@ -52,7 +52,7 @@ int a_open(char *dev_name, snd_pcm_t **handle, snd_pcm_hw_params_t **hw_params,
frame_size = *nchannels * (bits / 8);
*buf_size = frame_size * *frames;
- return 0;
+ return ECP_OK;
}
int a_prepare(snd_pcm_t *handle, snd_pcm_hw_params_t *hw_params, unsigned char *buf, snd_pcm_uframes_t frames) {
@@ -68,7 +68,7 @@ int a_prepare(snd_pcm_t *handle, snd_pcm_hw_params_t *hw_params, unsigned char *
for (i=0; i<fragments; i++) snd_pcm_writei(handle, buf, frames);
}
- return 0;
+ return ECP_OK;
}
opus_int32 a_read(snd_pcm_t *handle, unsigned char *buf, snd_pcm_uframes_t frames, OpusEncoder *enc, unsigned char *opus_buf, opus_int32 opus_size) {
@@ -121,6 +121,8 @@ int a_init(void) {
size = opus_decoder_get_size(nchannels);
opus_dec = malloc(size);
opus_decoder_init(opus_dec, sample_rate, nchannels);
+
+ return ECP_OK;
}
ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
@@ -129,7 +131,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_conn_handle_open(conn, t, p, s);
if (s < 0) {
printf("OPEN ERR:%ld\n", s);
- return 0;
+ return s;
}
a_init();