From cbba099541d27400ad45083a4b1102b86f9e8dea Mon Sep 17 00:00:00 2001
From: Uros Majstorovic <majstor@majstor.org>
Date: Thu, 10 Aug 2017 21:02:16 +0200
Subject: rbuffer almost implemented

---
 code/core/config.h          |   2 +-
 code/core/core.c            |  74 ++++++++++++------
 code/core/core.h            |  27 +++----
 code/core/htable/htable.c   |   9 +--
 code/core/msgq.c            |   2 +-
 code/core/posix/transport.c |   2 +-
 code/core/rbuf.c            |  49 ++++++++++--
 code/core/rbuf.h            |  72 ++++++++++++------
 code/core/rbuf_recv.c       | 149 ++++++++++++++++++++++++++----------
 code/core/rbuf_send.c       | 182 ++++++++++++++++++++++++++++++++------------
 code/proxy/proxy.c          |  20 ++---
 code/proxy/proxy.h          |   3 +-
 code/test/basic.c           |   4 +-
 code/test/client.c          |   4 +-
 code/test/pr_client.c       |   4 +-
 code/test/stress.c          |   2 +-
 code/test/voip.c            |   8 +-
 17 files changed, 427 insertions(+), 186 deletions(-)

(limited to 'code')

diff --git a/code/core/config.h b/code/core/config.h
index 7f9390f..7e2e416 100644
--- a/code/core/config.h
+++ b/code/core/config.h
@@ -1,4 +1,4 @@
 #define ECP_WITH_PTHREAD    1
 #define ECP_WITH_HTABLE     1
-#define ECP_WITH_RBUF       0
+#define ECP_WITH_RBUF       1
 #define ECP_DEBUG           1
\ No newline at end of file
diff --git a/code/core/core.c b/code/core/core.c
index a725952..069c910 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -521,10 +521,9 @@ void ecp_conn_unregister(ECPConnection *conn) {
 
 static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
-    ecp_seq_t seq;
 
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 int ecp_conn_init(ECPConnection *conn, ECPNode *node) {
@@ -686,18 +685,31 @@ int ecp_conn_handle_new(ECPSocket *sock, ECPConnection **_conn, ECPConnection *p
 }
 
 ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+    int is_open;
     if (size < 0) return size;
 
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_lock(&conn->mutex);
 #endif
-    if (!ecp_conn_is_open(conn)) conn->flags |= ECP_CONN_FLAG_OPEN;
+    is_open = ecp_conn_is_open(conn);
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_unlock(&conn->mutex);
 #endif
 
+    if (!is_open) conn->flags |= ECP_CONN_FLAG_OPEN;
+
     if (mtype & ECP_MTYPE_FLAG_REP) {
+        int rv;
+        
         if (!conn->out) return ECP_ERR;
+
+#ifdef ECP_WITH_RBUF
+        if (!is_open && conn->rbuf.recv) {
+            rv = ecp_conn_rbuf_recv_start(conn, seq);
+            if (rv) return rv;
+        }
+#endif
+
         return 0;
     } else {
         unsigned char payload[ECP_SIZE_PLD(0)];
@@ -901,15 +913,16 @@ ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsign
     return rv+ECP_SIZE_PKT_HDR;
 }
 
-ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr) {
     ECPContext *ctx = sock->ctx;
     
     return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
 }
     
-ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
     ecp_aead_key_t shsec;
     ecp_dh_public_t public;
+    ecp_seq_t _seq;
     unsigned char nonce[ECP_AEAD_SIZE_NONCE];
     int rv = ECP_OK;
 
@@ -941,13 +954,27 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
         } else {
             memcpy(&public, &conn->remote.key[conn->remote.key_curr].public, sizeof(public));
         }
-
     }
     if (!rv) {
-        if (seq) {
-            conn->seq_out++;
-            *seq = conn->seq_out;
+        _seq = conn->seq_out;
+#ifdef ECP_WITH_RBUF
+        if (conn->rbuf.send && rbuf_idx) {
+            ECPRBSend *buf = conn->rbuf.send;
+#ifdef ECP_WITH_PTHREAD
+            pthread_mutex_lock(&buf->mutex);
+#endif
+            
+            *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq);
+            if (*rbuf_idx < 0) rv = ECP_ERR_RBUF_FULL;
+
+#ifdef ECP_WITH_PTHREAD
+            pthread_mutex_unlock(&buf->mutex);
+#endif
         }
+#endif
+    }
+    if (!rv) {
+        conn->seq_out = _seq + 1;
         if (addr) *addr = conn->node.addr;
     }
 
@@ -957,7 +984,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
     
     if (rv) return rv;
     
-    ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, seq ? *seq : 0, payload, payload_size);
+    ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, _seq, payload, payload_size);
     if (_rv < 0) return _rv;
     
 #ifdef ECP_WITH_PTHREAD
@@ -968,6 +995,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns
     pthread_mutex_unlock(&conn->mutex);
 #endif
 
+    if (seq) *seq = _seq;
     return _rv;
 }
 
@@ -1041,7 +1069,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
     }
 
     pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY);
-    if (pld_size < ECP_MIN_PLD) rv = ECP_ERR_DECRYPT;
+    if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT;
     if (rv) goto pkt_handle_err;
 
     p_seq = \
@@ -1121,7 +1149,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
 #endif
 
 #ifdef ECP_WITH_RBUF
-    if (conn->rbuf.recv && conn->rbuf.recv->open) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
+    if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
 #endif
     if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
 
@@ -1186,21 +1214,20 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s
     unsigned char mtype = 0;
     size_t rem_size = msg_size;
     
+    if (msg_size < 1) return ECP_ERR_MIN_MSG;
+    
     while (rem_size) {
         mtype = msg[0];
         msg++;
         rem_size--;
 
         if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
-        if (rem_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG;
 
         ecp_timer_pop(conn, mtype);
         handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL;
         if (handler) {
             rv = handler(conn, seq, mtype, msg, rem_size);
             if (rv < 0) return rv;
-            if (rv == 0) rv = rem_size;
-            if (rv < ECP_MIN_MSG) rv = ECP_MIN_MSG;
             if (rv > rem_size) return ECP_ERR;
 
             rem_size -= rv;
@@ -1226,26 +1253,25 @@ unsigned char ecp_pld_get_type(unsigned char *payload) {
 }
 
 ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) {
-    ecp_seq_t seq;
-    return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
 }
 
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
     unsigned char packet[ECP_MAX_PKT];
     ECPSocket *sock = conn->sock;
     ECPContext *ctx = sock->ctx;
     ECPNetAddr addr;
+    ecp_seq_t seq;
     ssize_t rv;
+    int rbuf_idx = -1;
 
-    rv = ctx->pack(conn, &addr, seq, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size);
+    rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx);
     if (rv < 0) return rv;
 
 #ifdef ECP_WITH_RBUF
-    if (conn->rbuf.send && conn->rbuf.send->open && seq) {
-        ssize_t _rv = ecp_rbuf_send_store(conn, *seq, packet, rv);
-        if (_rv < 0) return _rv;
-    }
+    if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx);
 #endif
+    
     return ecp_pkt_send(sock, &addr, packet, rv);
 }
 
@@ -1255,7 +1281,7 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr
     ECPNetAddr _addr;
     ssize_t rv;
 
-    rv = ctx->pack_raw(sock, proxy, &_addr, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
+    rv = ctx->pack_raw(sock, proxy, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size, &_addr);
     if (rv < 0) return rv;
     
     return ecp_pkt_send(sock, proxy ? &_addr : addr, packet, rv);
diff --git a/code/core/core.h b/code/core/core.h
index 81ec69c..382b2c1 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -36,6 +36,7 @@
 #define ECP_MAX_NODE_KEY            2
 #define ECP_MAX_CTYPE               8
 #define ECP_MAX_MTYPE               16
+#define ECP_MAX_MTYPE_SYS           8
 
 #define ECP_SIZE_PKT_HDR            (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
 #define ECP_SIZE_PLD_HDR            (ECP_SIZE_SEQ)
@@ -45,21 +46,19 @@
 #define ECP_MAX_PLD                 (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG)
 #define ECP_MAX_MSG                 (ECP_MAX_PLD-ECP_SIZE_MSG_HDR)
 
-#define ECP_MIN_MSG                 8
-#define ECP_MIN_PLD                 (ECP_SIZE_MSG_HDR+ECP_MIN_MSG)
-#define ECP_MIN_PKT                 (ECP_SIZE_PKT_HDR+ECP_MIN_PLD+ECP_AEAD_SIZE_TAG)
+#define ECP_MIN_PKT                 (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
 
 #define ECP_POLL_TIMEOUT            500
 #define ECP_ECDH_IDX_INV            0xFF
 #define ECP_ECDH_IDX_PERMA          0x0F
 
-#define ECP_MTYPE_MASK              0x7f
-#define ECP_MTYPE_FLAG_REP          0x80
+#define ECP_MTYPE_MASK              0x3f
+#define ECP_MTYPE_FLAG_TIMER        0x80
+#define ECP_MTYPE_FLAG_REP          0x40
 
 #define ECP_MTYPE_OPEN              0x00
 #define ECP_MTYPE_KGET              0x01
 #define ECP_MTYPE_KPUT              0x02
-#define ECP_MTYPE_EXEC              0x03
 
 #define ECP_MTYPE_OPEN_REQ          (ECP_MTYPE_OPEN)
 #define ECP_MTYPE_OPEN_REP          (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP)
@@ -68,8 +67,8 @@
 #define ECP_MTYPE_KPUT_REQ          (ECP_MTYPE_KPUT)
 #define ECP_MTYPE_KPUT_REP          (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP)
 
-#define ECP_SIZE_PLD(X)             ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_MSG_HDR : (X) + ECP_SIZE_MSG_HDR)
-#define ECP_SIZE_PKT(X)             ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG : (X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
+#define ECP_SIZE_PLD(X)             ((X) + ECP_SIZE_MSG_HDR)
+#define ECP_SIZE_PKT(X)             ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
 
 #define ECP_CONN_FLAG_REG           0x01
 #define ECP_CONN_FLAG_OPEN          0x02
@@ -202,7 +201,9 @@ typedef struct ECPConnHandler {
 typedef struct ECPSockCTable {
     struct ECPConnection *array[ECP_MAX_SOCK_CONN];
     unsigned short size;
+#ifdef ECP_WITH_HTABLE
     void *htable;
+#endif
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_t mutex;
 #endif
@@ -218,8 +219,8 @@ typedef struct ECPContext {
 #ifdef ECP_WITH_HTABLE
     ECPHTableIface ht;
 #endif
-    ssize_t (*pack) (struct ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
-    ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
+    ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+    ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
     ECPConnHandler *handler[ECP_MAX_CTYPE];
 } ECPContext;
 
@@ -311,8 +312,8 @@ int ecp_conn_dhkey_new_pub(ECPConnection *conn, unsigned char idx, unsigned char
 int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned char *public);
 
 ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
-ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
 
 ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, unsigned char *packet, size_t pkt_size);
 ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
@@ -323,7 +324,7 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload);
 void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
 unsigned char ecp_pld_get_type(unsigned char *payload);
 ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
 ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
 
 ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
diff --git a/code/core/htable/htable.c b/code/core/htable/htable.c
index 552a03b..82dbcbc 100644
--- a/code/core/htable/htable.c
+++ b/code/core/htable/htable.c
@@ -5,11 +5,7 @@
 #include "hashtable.h"
 
 static void *h_create(ECPContext *ctx) {
-    int rv;
-    struct hashtable *h = create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL);
-    if (h == NULL) return NULL;
-        
-    return h;
+    return create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL);
 }
 
 static void h_destroy(void *h) {
@@ -23,7 +19,6 @@ static int h_insert(void *h, unsigned char *k, ECPConnection *v) {
 }
 
 static ECPConnection *h_remove(void *h, unsigned char *k) {
-    printf("REMOVE!!!\n");
     return hashtable_remove(h, k);
 }
 
@@ -38,5 +33,5 @@ int ecp_htable_init(ECPHTableIface *h) {
     h->insert = h_insert;
     h->remove = h_remove;
     h->search = h_search;
-    return 0;
+    return ECP_OK;
 }
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 53c55ea..2ba0ed5 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -62,7 +62,7 @@ ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_s
 
     if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
     if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG;
-    if (msg_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG;
+    if (msg_size < 1) return ECP_ERR_MIN_MSG;
     
     for (i=0; i<ECP_MAX_CONN_MSG; i++) {
         if (!msgq->occupied[msg_idx]) {
diff --git a/code/core/posix/transport.c b/code/core/posix/transport.c
index da87342..e3fe612 100644
--- a/code/core/posix/transport.c
+++ b/code/core/posix/transport.c
@@ -112,5 +112,5 @@ int ecp_transport_init(ECPTransportIface *t) {
     t->recv = t_recv;
     t->addr_eq = t_addr_eq;
     t->addr_set = t_addr_set;
-    return 0;
+    return ECP_OK;
 }
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index 0865e67..bb110b9 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -3,14 +3,15 @@
 #include <string.h>
 
 int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
-    memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
     rbuf->msg = msg;
     if (msg_size) {
+        if (msg == NULL) return ECP_ERR;
         rbuf->msg_size = msg_size;
+        memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size);
     } else {
         rbuf->msg_size = ECP_RBUF_SEQ_HALF;
     }
-    
+
     return ECP_OK;
 }
 
@@ -22,17 +23,53 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
     return ECP_ERR_RBUF_IDX;
 }
 
-ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
-    int idx = ecp_rbuf_msg_idx(rbuf, seq);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
+    idx = idx < 0 ? ecp_rbuf_msg_idx(rbuf, seq) : idx;
     if (idx < 0) return idx;
 
     if (rbuf->msg == NULL) return 0;
     if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG;
     
-    memcpy(rbuf->msg[idx].msg, msg, msg_size);
-    rbuf->msg[idx].size = msg_size;
+    if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) {
+        memcpy(rbuf->msg[idx].msg, msg, msg_size);
+        rbuf->msg[idx].size = msg_size;
+    }
     rbuf->msg[idx].flags = set_flags;
 
     return msg_size;
 }
 
+int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
+    int rv = ecp_conn_rbuf_send_start(conn);
+    if (rv) return rv;
+
+    if (!conn->out) {
+        rv = ecp_conn_rbuf_recv_start(conn, seq);
+        if (rv) return rv;
+    }
+    
+    return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
+    int do_send;
+    ECPRBSend *buf = conn->rbuf.send;
+    
+    ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
+    if (rv < 0) return rv;
+
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_lock(&buf->mutex);
+#endif
+    if (buf->in_transit < buf->win_size) {
+        buf->in_transit++;
+        do_send = 1;
+    }
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_unlock(&buf->mutex);
+#endif
+
+    if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
+    return rv;
+}
+
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 4340707..9e9e53d 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,21 +1,38 @@
-#define ECP_RBUF_FLAG_PRESENT        1
+#define ECP_RBUF_FLAG_RECEIVED      0x01
+#define ECP_RBUF_FLAG_DELIVERED     0x02
 
-#define ECP_ERR_RBUF_FLAG   -100
-#define ECP_ERR_RBUF_IDX    -101
-#define ECP_ERR_RBUF_DUP    -102
+#define ECP_RBUF_FLAG_RELIABLE      0x01
+#define ECP_RBUF_FLAG_MSGQ          0x02
+
+#define ECP_MTYPE_RBOPEN            0x04
+#define ECP_MTYPE_RBCLOSE           0x05
+#define ECP_MTYPE_RBFLUSH           0x06
+#define ECP_MTYPE_RBACK             0x07
+
+#define ECP_MTYPE_RBOPEN_REQ        (ECP_MTYPE_RBOPEN)
+#define ECP_MTYPE_RBOPEN_REP        (ECP_MTYPE_RBOPEN | ECP_MTYPE_FLAG_REP)
+#define ECP_MTYPE_RBCLOSE_REQ       (ECP_MTYPE_RBCLOSE)
+#define ECP_MTYPE_RBCLOSE_REP       (ECP_MTYPE_RBCLOSE | ECP_MTYPE_FLAG_REP)
+#define ECP_MTYPE_RBFLUSH_REQ       (ECP_MTYPE_RBFLUSH)
+#define ECP_MTYPE_RBFLUSH_REP       (ECP_MTYPE_RBFLUSH | ECP_MTYPE_FLAG_REP)
+
+#define ECP_ERR_RBUF_FLAG           -100
+#define ECP_ERR_RBUF_IDX            -101
+#define ECP_ERR_RBUF_DUP            -102
+#define ECP_ERR_RBUF_FULL           -103
 
 typedef uint32_t ecp_ack_t;
 typedef uint32_t ecp_win_t;
 
-#define ECP_RBUF_SEQ_HALF            ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
+#define ECP_RBUF_SEQ_HALF               ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
 
-#define ECP_RBUF_ACK_FULL            (~(ecp_ack_t)0)
-#define ECP_RBUF_ACK_SIZE            (sizeof(ecp_ack_t)*8)
+#define ECP_RBUF_ACK_FULL               (~(ecp_ack_t)0)
+#define ECP_RBUF_ACK_SIZE               (sizeof(ecp_ack_t)*8)
 
-#define ECP_RBUF_SEQ_LT(a,b)         ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF)
-#define ECP_RBUF_SEQ_LTE(a,b)        ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF)
+#define ECP_RBUF_SEQ_LT(a,b)            ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF)
+#define ECP_RBUF_SEQ_LTE(a,b)           ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF)
 
-#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
+#define ECP_RBUF_IDX_MASK(idx, size)    ((idx) & ((size) - 1))
 /* If size not 2^x:
 #define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size))
 */
@@ -35,8 +52,8 @@ typedef struct ECPRBuffer {
 } ECPRBuffer;
 
 typedef struct ECPRBRecv {
-    unsigned char open;
-    unsigned char reliable;
+    unsigned char flags;
+    unsigned char flush;
     unsigned short deliver_delay;
     unsigned short hole_max;
     unsigned short ack_rate;
@@ -50,12 +67,16 @@ typedef struct ECPRBRecv {
 } ECPRBRecv;
 
 typedef struct ECPRBSend {
-    unsigned char open;
-    unsigned char reliable;
+    unsigned char flags;
+    unsigned char flush;
     ecp_win_t win_size;
     ecp_win_t in_transit;
+    ecp_seq_t seq_flush;
     unsigned int nack_rate;
     ECPRBuffer rbuf;
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_t mutex;
+#endif
 } ECPRBSend;
 
 typedef struct ECPConnRBuffer {
@@ -66,14 +87,17 @@ typedef struct ECPConnRBuffer {
 
 int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
 int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
-ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-
-int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
-int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq);
-int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max);
-int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay);
-ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
-
-
-ssize_t ecp_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
+int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx);
+
+int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_conn_rbuf_send_start(struct ECPConnection *conn);
+int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
+int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
+ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size);
 
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index 222daa7..b2a5ecf 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -5,10 +5,35 @@
 #define ACK_RATE            8
 #define ACK_MASK_FIRST      ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1))
 
-static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
-    ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT);
+static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+    if (size < 0) return size;
+    
+    ECPRBRecv *buf = conn->rbuf.recv;
+    unsigned char payload[ECP_SIZE_PLD(0)];
+    
+    if (buf == NULL) return ECP_ERR;
+
+    buf->flush = 1;
+    return 0;
+}
+
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+    ECPRBRecv *buf = conn->rbuf.recv;
+    ssize_t rv = 0;
+    unsigned char flags;
+    unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
+        
+    if (mtype < ECP_MAX_MTYPE_SYS) {
+        flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED;
+    } else {
+        flags = ECP_RBUF_FLAG_RECEIVED;
+    }
+        
+    rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
     if (rv < 0) return ECP_ERR_RBUF_DUP;
     
+    if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
+
     if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq;
     return rv;
 }
@@ -20,11 +45,15 @@ static void msg_flush(ECPConnection *conn) {
     unsigned int idx = buf->rbuf.msg_start;
     
     for (i=0; i<msg_cnt; i++) {
-        if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break;
-        if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break;
-        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
-            buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT;
-            ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+        if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
+        if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
+        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
+            buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+            if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
+                buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+            } else {
+                ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+            }
         }
         idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
     }
@@ -32,12 +61,35 @@ static void msg_flush(ECPConnection *conn) {
     buf->rbuf.seq_start += i;
 }
 
+static int ack_send(ECPConnection *conn) {
+    ECPRBRecv *buf = conn->rbuf.recv;
+    unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))];
+    unsigned char *buf_ = ecp_pld_get_buf(payload);
+    ssize_t rv;
+
+    ecp_pld_set_type(payload, ECP_MTYPE_RBACK);
+    buf_[0] = (buf->seq_ack & 0xFF000000) >> 24;
+    buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16;
+    buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8;
+    buf_[3] = (buf->seq_ack & 0x000000FF);
+    buf_[4] = (buf->ack_map & 0xFF000000) >> 24;
+    buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
+    buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
+    buf_[7] = (buf->ack_map & 0x000000FF);
+    
+    rv = ecp_pld_send(conn, payload, sizeof(payload));
+    if (rv < 0) return rv;
+
+    buf->ack_pkt = 0;
+    return ECP_OK;
+}
+
 static int ack_shift(ECPRBRecv *buf) {
     int do_ack = 0;
     int idx;
     int i;
     
-    if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
+    if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
 
     idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack);
     if (idx < 0) return idx;
@@ -46,10 +98,10 @@ static int ack_shift(ECPRBRecv *buf) {
         idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
         buf->seq_ack++;
 
-        if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
+        if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
         
         buf->ack_map = buf->ack_map << 1;
-        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
+        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
             buf->ack_map |= 1;
         } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) {
             do_ack = 1;
@@ -73,47 +125,61 @@ static int ack_shift(ECPRBRecv *buf) {
     return do_ack;
 }
 
-int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+    int rv;
+
     memset(buf, 0, sizeof(ECPRBRecv));
-    ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+    rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+    if (rv) return rv;
+    
     buf->ack_map = ECP_RBUF_ACK_FULL;
     buf->ack_rate = ACK_RATE;
-
+    conn->rbuf.recv = buf;
+    
     return ECP_OK;
 }
 
-int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) {
-    buf->seq_ack = seq;
-    buf->seq_max = seq;
-    buf->rbuf.seq_start = seq + 1;
-
-    return ECP_OK;
-}
+int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+    ECPRBRecv *buf = conn->rbuf.recv;
 
-int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) {
     buf->hole_max = hole_max;
-    buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
-    buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
+    buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
+    buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
     
     return ECP_OK;
 }
 
-int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) {
+int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+    ECPRBRecv *buf = conn->rbuf.recv;
+
     buf->deliver_delay = delay;
     if (buf->hole_max < delay - 1) {
-        ecp_rbuf_recv_set_hole(buf, delay - 1);
+        ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
     }
     
     return ECP_OK;
 }
 
-ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
-    ssize_t rv;
+int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+    ECPRBRecv *buf = conn->rbuf.recv;
+
+    if (buf == NULL) return ECP_ERR;
+    
+    buf->seq_ack = seq;
+    buf->seq_max = seq;
+    buf->rbuf.seq_start = seq + 1;
+
+    return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+    ECPRBRecv *buf = conn->rbuf.recv;
     ecp_seq_t ack_pkt = 0;
+    ssize_t rv;
     int do_ack = 0; 
-    ECPRBRecv *buf = conn->rbuf.recv;
     
     if (buf == NULL) return ECP_ERR;
+    if (msg_size < 1) return ECP_ERR_MIN_MSG;
     
     if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
     if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
@@ -122,43 +188,48 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
             ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
 
             if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
-            
-            rv = msg_store(buf, seq, msg, msg_size);
-            if (rv < 0) return rv;
-
             buf->ack_map |= ack_mask;
             do_ack = ack_shift(buf);
+
+            rv = msg_store(conn, seq, msg, msg_size);
+            if (rv < 0) return rv;
         } else {
             return ECP_ERR_RBUF_IDX;
         }
     } else {
         if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
-            if (buf->deliver_delay) {
-                rv = msg_store(buf, seq, msg, msg_size);
+            if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
+                rv = msg_store(conn, seq, msg, msg_size);
                 if (rv < 0) return rv;
             } else {
-                rv = ecp_msg_handle(conn, seq, msg, msg_size);
+                ecp_msg_handle(conn, seq, msg, msg_size);
+                rv = msg_size;
                 buf->seq_max++;
                 buf->rbuf.seq_start++;
+                buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
             }
             buf->seq_ack++;
         } else {
-            rv = msg_store(buf, seq, msg, msg_size);
+            rv = msg_store(conn, seq, msg, msg_size);
             if (rv < 0) return rv;
 
             do_ack = ack_shift(buf);
         }
     }
+    if (buf->flush) {
+        buf->flush = 0;
+        do_ack = 1;
+    }
     if (ack_pkt && !do_ack) {
         buf->ack_pkt += ack_pkt;
         // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT)
         if (buf->ack_pkt > buf->ack_rate) do_ack = 1;
     }
     if (do_ack) {
-        buf->ack_pkt = 0;
-        // send ack (with seq = 0)
+        int _rv = ack_send(conn);
+        if (_rv) return _rv;
     }
-    // XXX should handle close
     msg_flush(conn);
     return rv;
 }
+
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 5f3bc87..877b086 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -2,75 +2,163 @@
 
 #include <string.h>
 
-#define ACK_RATE_UNIT   10000
+#define NACK_RATE_UNIT   10000
+
+static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
+    unsigned char payload[ECP_SIZE_PLD(0)];
+
+    ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH_REQ);
+    return ecp_pld_send(conn, payload, sizeof(payload));
+}
 
 static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
-    int idx = 0;
+    ECPRBSend *buf;
+    ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
     ecp_seq_t seq_ack = 0;
     ecp_ack_t ack_map = 0;
+    int do_flush = 0;
+    int rv = ECP_OK;
     
-    ECPRBSend *buf = conn->rbuf.send;
+    buf = conn->rbuf.send;
+    if (buf == NULL) return ECP_ERR;
+    if (size < 0) return size;
+    if (size < rsize) return ECP_ERR;
+
+    seq_ack = \
+        (msg[0] << 24) | \
+        (msg[1] << 16) | \
+        (msg[2] << 8)  | \
+        (msg[3]);
+    ack_map = \
+        (msg[4] << 24) | \
+        (msg[5] << 16) | \
+        (msg[6] << 8)  | \
+        (msg[7]);
+
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_lock(&buf->mutex);
+#endif
+
     ECPRBuffer *rbuf = &buf->rbuf;
+    int idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
+    if (idx < 0) rv = idx;
     
-    idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
-    if (idx < 0) return idx;
-    
-    seq_ack++;
-    buf->in_transit -= seq_ack - rbuf->seq_start;
-
-    if (ack_map != ECP_RBUF_ACK_FULL) {
-        int i;
-        int nack = 0;
-        ecp_win_t nack_cnt = 0;
-        
-        seq_ack -= ECP_RBUF_ACK_SIZE;
-        for (i=0; i<ECP_RBUF_ACK_SIZE; i++) {
-            if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
-                nack_cnt++;
-                if (buf->reliable) {
-                    idx = ecp_rbuf_msg_idx(rbuf, seq_ack + i);
-                    // resend packet 
-                    // ecp_pkt_send(conn->sock, &conn->node.addr, packet, rv);
-                    if (!nack) {
-                        nack = 1;
-                
-                        rbuf->seq_start = seq_ack + i;
-                        rbuf->msg_start = idx;
+    int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE;
+
+    if (!rv) {
+        seq_ack++;
+        buf->in_transit -= seq_ack - rbuf->seq_start;
+
+        if (ack_map != ECP_RBUF_ACK_FULL) {
+            int i;
+            int nack_first = 0;
+            unsigned int msg_start;
+            ecp_seq_t seq_start;
+            ecp_win_t nack_cnt = 0;
+            
+            seq_ack -= ECP_RBUF_ACK_SIZE;
+
+#ifdef ECP_WITH_PTHREAD
+            pthread_mutex_unlock(&buf->mutex);
+#endif
+            
+            for (i=0; i<ECP_RBUF_ACK_SIZE; i++) {
+                if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
+                    nack_cnt++;
+                    if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+                        idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
+                        ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size);
+                        if (!nack_first) {
+                            nack_first = 1;
+                            seq_start = seq_ack + i;
+                            msg_start = idx;
+                        }
                     }
                 }
             }
+
+#ifdef ECP_WITH_PTHREAD
+            pthread_mutex_lock(&buf->mutex);
+#endif
+
+            buf->in_transit += nack_cnt;
+            buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
+            if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+                rbuf->seq_start = seq_start;
+                rbuf->msg_start = msg_start;
+            } else {
+                rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
+            }
+        } else {
+            rbuf->seq_start = seq_ack;
+            buf->nack_rate = (buf->nack_rate * 7) / 8;
+            if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+                rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
+            }
         }
-        buf->in_transit += nack_cnt;
-        buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
-        if (!buf->reliable) {
-            rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
-        }
-    } else {
-        rbuf->seq_start = seq_ack;
-        buf->nack_rate = (buf->nack_rate * 7) / 8;
-        if (buf->reliable) {
-            rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
+        if (buf->flush) {
+            if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
+            if (buf->flush) {
+                do_flush = 1;
+            }
         }
     }
-    return size;
+
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_unlock(&buf->mutex);
+#endif
+
+    if (rv) return rv;
+
+    if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+    return rsize;
 }
 
-int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+    int rv;
+    
     memset(buf, 0, sizeof(ECPRBRecv));
-    ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+    rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+    if (rv) return rv;
+
+    conn->rbuf.send = buf;
 
     return ECP_OK;
 }
 
-int ecp_rbuf_send_start(ECPRBSend *buf, ecp_seq_t seq) {
-    buf->rbuf.seq_start = seq + 1;
+int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+    ECPRBSend *buf = conn->rbuf.send;
+
+    if (buf == NULL) return ECP_ERR;
+    buf->rbuf.seq_start = conn->seq_out;
 
     return ECP_OK;
 }
 
-ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+int ecp_conn_rbuf_flush(ECPConnection *conn) {
     ECPRBSend *buf = conn->rbuf.send;
-    
-    buf->in_transit++;
-    return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0);
-}
\ No newline at end of file
+    unsigned char payload[ECP_SIZE_PLD(0)];
+    ecp_seq_t seq;
+
+    if (buf == NULL) return ECP_ERR;
+
+    // XXX flush seq
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_lock(&buf->mutex);
+#endif
+    if (buf->flush) {
+        if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq;
+    } else {
+        buf->flush = 1;
+        buf->seq_flush = seq;
+    }
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_unlock(&buf->mutex);
+#endif
+
+    ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+    if (_rv < 0) return _rv;
+
+    return ECP_OK;
+}
+
diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c
index 67d4d01..e9cd93f 100644
--- a/code/proxy/proxy.c
+++ b/code/proxy/proxy.c
@@ -70,12 +70,11 @@ static ssize_t _proxyf_send_open(ECPConnection *conn) {
     ECPConnProxy *conn_p = (ECPConnProxy *)conn;
     ECPConnection *conn_next = conn_p->next;
     unsigned char payload[ECP_SIZE_PLD(0)];
-    ecp_seq_t seq;
 
     if (conn_next == NULL) return ECP_ERR;
 
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn_next, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+    return ecp_pld_send_wkey(conn_next, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 static ssize_t _proxyf_retry_kget(ECPConnection *conn, ECPTimerItem *ti) {
@@ -363,37 +362,35 @@ static ssize_t proxy_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t
 }
 
 
-static ssize_t proxy_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+static ssize_t proxy_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
     ECPContext *ctx = conn->sock->ctx;
 
     if (conn->proxy) {
         unsigned char payload_[ECP_MAX_PLD];
-        ecp_seq_t _seq;
         ssize_t rv, hdr_size = proxy_set_msg(conn->proxy, payload_, sizeof(payload_), payload, payload_size);
         if (hdr_size < 0) return hdr_size;
 
-        rv = ecp_conn_pack(conn, NULL, &_seq, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size);
+        rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, seq, rbuf_idx);
         if (rv < 0) return rv;
 
-        return proxy_pack(conn->proxy, addr, seq, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size);
+        return proxy_pack(conn->proxy, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
     } else {
-        return ecp_conn_pack(conn, addr, seq, packet, pkt_size, s_idx, c_idx, payload, payload_size);
+        return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, seq, rbuf_idx);
     }
 }
 
-static ssize_t proxy_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size) {
+static ssize_t proxy_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr) {
     ECPContext *ctx = sock->ctx;
 
     if (proxy) {
         unsigned char payload_[ECP_MAX_PLD];
-        ecp_seq_t _seq;
         ssize_t rv, hdr_size = proxy_set_msg(proxy, payload_, sizeof(payload_), payload, payload_size);
         if (hdr_size < 0) return hdr_size;
 
         rv = ecp_pack(ctx, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
         if (rv < 0) return rv;
 
-        return proxy_pack(proxy, addr, &_seq, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size);
+        return proxy_pack(proxy, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
     } else {
         return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
     }
@@ -472,10 +469,9 @@ int ecp_conn_proxy_init(ECPConnection *conn, ECPNode *conn_node, ECPConnProxy pr
 
 static ssize_t _proxy_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
-    ecp_seq_t seq;
 
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 int ecp_conn_proxy_open(ECPConnection *conn, ECPNode *conn_node, ECPConnProxy proxy[], ECPNode proxy_node[], int size) {
diff --git a/code/proxy/proxy.h b/code/proxy/proxy.h
index 1ae2566..6f82cd2 100644
--- a/code/proxy/proxy.h
+++ b/code/proxy/proxy.h
@@ -1,7 +1,8 @@
 #define ECP_CTYPE_PROXYF    1
 #define ECP_CTYPE_PROXYB    2
 
-#define ECP_MTYPE_RELAY     0x04
+#define ECP_MTYPE_RELAY     0x08
+#define ECP_MTYPE_EXEC      0x09
 
 typedef struct ECPConnProxy {
     ECPConnection b;
diff --git a/code/test/basic.c b/code/test/basic.c
index c9f6bd5..809d453 100644
--- a/code/test/basic.c
+++ b/code/test/basic.c
@@ -26,7 +26,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
     ecp_conn_handle_open(conn, sq, t, p, s);
     if (s < 0) {
         printf("OPEN ERR:%ld\n", s);
-        return 0;
+        return s;
     }
     
     unsigned char payload[ECP_SIZE_PLD(1000)];
@@ -36,7 +36,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
     ecp_pld_set_type(payload, MTYPE_MSG);
     strcpy((char *)buf, msg);
     ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
-    return 0;
+    return s;
 }
 
 ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/client.c b/code/test/client.c
index 6e45df4..1d5dc6c 100644
--- a/code/test/client.c
+++ b/code/test/client.c
@@ -22,7 +22,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
     ecp_conn_handle_open(conn, sq, t, p, s);
     if (s < 0) {
         printf("OPEN ERR:%ld\n", s);
-        return 0;
+        return s;
     }
     
     unsigned char payload[ECP_SIZE_PLD(1000)];
@@ -32,7 +32,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
     ecp_pld_set_type(payload, MTYPE_MSG);
     strcpy((char *)buf, msg);
     ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
-    return 0;
+    return s;
 }
 
 ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/pr_client.c b/code/test/pr_client.c
index 7b80d6f..d643dd3 100644
--- a/code/test/pr_client.c
+++ b/code/test/pr_client.c
@@ -26,7 +26,7 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
     ecp_conn_handle_open(conn, sq, t, p, s);
     if (s < 0) {
         printf("OPEN ERR:%ld\n", s);
-        return 0;
+        return s;
     }
     
     printf("OPEN!\n");
@@ -38,7 +38,7 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
     ecp_pld_set_type(payload, MTYPE_MSG);
     strcpy((char *)buf, msg);
     ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
-    return 0;
+    return s;
 }
 
 ssize_t handle_msg(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/stress.c b/code/test/stress.c
index 42663fc..081c0cb 100644
--- a/code/test/stress.c
+++ b/code/test/stress.c
@@ -107,7 +107,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
         perror(msg);
         exit(1);
     }
-    return 0;
+    return s;
 }
 
 ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
diff --git a/code/test/voip.c b/code/test/voip.c
index 67e1afe..8c53d01 100644
--- a/code/test/voip.c
+++ b/code/test/voip.c
@@ -52,7 +52,7 @@ int a_open(char *dev_name, snd_pcm_t **handle, snd_pcm_hw_params_t **hw_params,
 	frame_size = *nchannels * (bits / 8);
 	*buf_size = frame_size * *frames;
 
-	return 0;
+	return ECP_OK;
 }
 
 int a_prepare(snd_pcm_t *handle, snd_pcm_hw_params_t *hw_params, unsigned char *buf, snd_pcm_uframes_t frames) {
@@ -68,7 +68,7 @@ int a_prepare(snd_pcm_t *handle, snd_pcm_hw_params_t *hw_params, unsigned char *
 
 		for (i=0; i<fragments; i++) snd_pcm_writei(handle, buf, frames);
 	}
-	return 0;
+	return ECP_OK;
 }
 
 opus_int32 a_read(snd_pcm_t *handle, unsigned char *buf, snd_pcm_uframes_t frames, OpusEncoder *enc, unsigned char *opus_buf, opus_int32 opus_size) {
@@ -121,6 +121,8 @@ int a_init(void) {
 	size = opus_decoder_get_size(nchannels);
 	opus_dec = malloc(size);
 	opus_decoder_init(opus_dec, sample_rate, nchannels);
+    
+    return ECP_OK;
 }
 
 ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
@@ -129,7 +131,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
     ecp_conn_handle_open(conn, t, p, s);
     if (s < 0) {
         printf("OPEN ERR:%ld\n", s);
-        return 0;
+        return s;
     }
     
 	a_init();
-- 
cgit v1.2.3