From 5d20e9bafc3571f37eb0d9b74699d023d2d3d13a Mon Sep 17 00:00:00 2001
From: Uros Majstorovic <majstor@majstor.org>
Date: Fri, 18 Aug 2017 20:35:21 +0200
Subject: timer fixed; rbuf almost implemented

---
 code/core/core.c      |  78 +++++++++++++++++++++-------------
 code/core/core.h      |  10 +++--
 code/core/msgq.c      |   2 +
 code/core/rbuf.c      | 114 ++++++++++++++++++++++++++++++++++++++------------
 code/core/rbuf.h      |  56 +++++++++++++++----------
 code/core/rbuf_recv.c |  48 ++++++++++-----------
 code/core/rbuf_send.c |  63 +++++++++++++++++++---------
 code/core/timer.c     |   8 +---
 code/vconn/vconn.c    |  38 ++++++-----------
 9 files changed, 260 insertions(+), 157 deletions(-)

(limited to 'code')

diff --git a/code/core/core.c b/code/core/core.c
index d64aa04..b319afb 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -517,7 +517,7 @@ static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
 
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+    return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 int ecp_conn_init(ECPConnection *conn, ECPNode *node) {
@@ -616,7 +616,7 @@ static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
     ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ);
     buf[0] = conn->type;
     
-    return ecp_pld_send(conn, payload, sizeof(payload));
+    return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload));
 }
 
 ssize_t ecp_conn_send_open(ECPConnection *conn) {
@@ -706,7 +706,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m
 
 #ifdef ECP_WITH_RBUF
         if (!is_open && conn->rbuf.recv) {
-            rv = ecp_conn_rbuf_recv_start(conn, seq);
+            rv = ecp_rbuf_recv_start(conn, seq);
             if (rv) return rv;
         }
 #endif
@@ -815,7 +815,7 @@ static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) {
     rv = ecp_conn_dhkey_get_curr(conn, buf, buf+1);
     if (rv) return rv;
 
-    return ecp_pld_send(conn, payload, sizeof(payload));
+    return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload));
 }
 
 int ecp_conn_dhkey_new(ECPConnection *conn) {
@@ -920,7 +920,7 @@ ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *pack
     return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
 }
     
-ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info) {
     ecp_aead_key_t shsec;
     ecp_dh_public_t public;
     ecp_seq_t _seq;
@@ -957,26 +957,28 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
         }
     }
     if (!rv) {
-        _seq = conn->seq_out + 1;
 #ifdef ECP_WITH_RBUF
-        if (conn->rbuf.send && rbuf_idx) {
-            ECPRBSend *buf = conn->rbuf.send;
-#ifdef ECP_WITH_PTHREAD
-            pthread_mutex_lock(&buf->mutex);
-#endif
-            
-            *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq);
-            if (*rbuf_idx < 0) rv = *rbuf_idx;
+        ECPRBInfo *rbuf_info = _rbuf_info;
 
-#ifdef ECP_WITH_PTHREAD
-            pthread_mutex_unlock(&buf->mutex);
-#endif
+        if (rbuf_info) {
+            if (rbuf_info->seq_force) {
+                _seq = rbuf_info->seq;
+            } else {
+                unsigned char mtype = ecp_pld_get_type(payload);
+                _seq = conn->seq_out + 1;
+
+                rv = ecp_rbuf_pkt_prep(conn->rbuf.send, _seq, mtype, rbuf_info);
+                if (!rv) conn->seq_out = _seq;
+            }
+        } else {
+            _seq = conn->seq_out + 1;
+            conn->seq_out = _seq;
         }
-#endif
-    }
-    if (!rv) {
+#else
+        _seq = conn->seq_out + 1;
         conn->seq_out = _seq;
-        if (addr) *addr = conn->node.addr;
+#endif
+        if (!rv && addr) *addr = conn->node.addr;
     }
 
 #ifdef ECP_WITH_PTHREAD
@@ -996,7 +998,6 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
     pthread_mutex_unlock(&conn->mutex);
 #endif
 
-    if (seq) *seq = _seq;
     return _rv;
 }
 
@@ -1150,7 +1151,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
 #endif
 
 #ifdef ECP_WITH_RBUF
-    if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
+    if (conn->rbuf.recv) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
 #endif
     if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
 
@@ -1247,25 +1248,42 @@ unsigned char ecp_pld_get_type(unsigned char *payload) {
 }
 
 ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) {
-    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+    return ecp_pld_send_ll(conn, NULL, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
 }
 
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size) {
+    return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+}
+
+ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
     unsigned char packet[ECP_MAX_PKT];
     ECPSocket *sock = conn->sock;
     ECPContext *ctx = sock->ctx;
     ECPNetAddr addr;
-    ecp_seq_t seq;
-    ssize_t rv;
-    int rbuf_idx = -1;
+    int _rv = ECP_OK;
+    void *_rbuf_info = NULL;
 
-    rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx);
+#ifdef ECP_WITH_RBUF
+    ECPRBInfo rbuf_info;
+
+    if (conn->rbuf.send) {
+        _rv = ecp_rbuf_info_init(&rbuf_info);
+        if (_rv) return _rv;
+        _rbuf_info = &rbuf_info;
+    }
+#endif
+
+    ssize_t rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, _rbuf_info);
     if (rv < 0) return rv;
 
 #ifdef ECP_WITH_RBUF
-    if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx);
+    if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, packet, rv, _rbuf_info);
 #endif
     
+    if (ti) {
+        _rv = ecp_timer_push(ti);
+        if (_rv) return _rv;
+    }
     return ecp_pkt_send(sock, &addr, packet, rv);
 }
 
diff --git a/code/core/core.h b/code/core/core.h
index 9738fb4..04fb5ee 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -36,7 +36,7 @@
 #define ECP_MAX_NODE_KEY            2
 #define ECP_MAX_CTYPE               8
 #define ECP_MAX_MTYPE               16
-#define ECP_MAX_MTYPE_SYS           8
+#define ECP_MAX_MTYPE_SYS           4
 
 #define ECP_SIZE_PKT_HDR            (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
 #define ECP_SIZE_PLD_HDR            (ECP_SIZE_SEQ)
@@ -59,6 +59,7 @@
 #define ECP_MTYPE_OPEN              0x00
 #define ECP_MTYPE_KGET              0x01
 #define ECP_MTYPE_KPUT              0x02
+#define ECP_MTYPE_NOP               0x03
 
 #define ECP_MTYPE_OPEN_REQ          (ECP_MTYPE_OPEN)
 #define ECP_MTYPE_OPEN_REP          (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP)
@@ -221,7 +222,7 @@ typedef struct ECPContext {
 #ifdef ECP_WITH_HTABLE
     ECPHTableIface ht;
 #endif
-    ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+    ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info);
     ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
     ECPConnHandler *handler[ECP_MAX_CTYPE];
 } ECPContext;
@@ -315,7 +316,7 @@ int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned ch
 
 ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
 ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
-ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info);
 
 ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, unsigned char *packet, size_t pkt_size);
 ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
@@ -326,7 +327,8 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload);
 void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
 unsigned char ecp_pld_get_type(unsigned char *payload);
 ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
 ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
 
 ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 15629c8..4c1252b 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -79,7 +79,9 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
     ECPRBRecv *buf = conn->rbuf.recv;
     ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
 
+    mtype &= ECP_MTYPE_MASK;
     if (msgq == NULL) return ECP_ERR;
+    if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
     
     if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG;
     if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index b384774..ee9a7fc 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -37,33 +37,88 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch
     if (rbuf->msg == NULL) return 0;
     if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP;
     
-    if (ECP_RBUF_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
-
-    if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) {
-        memcpy(rbuf->msg[idx].msg, msg, msg_size);
-        rbuf->msg[idx].size = msg_size;
-    }
+    if (msg_size) memcpy(rbuf->msg[idx].msg, msg, msg_size);
+    rbuf->msg[idx].size = msg_size;
     rbuf->msg[idx].flags = set_flags;
 
     return msg_size;
 }
 
-ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
-    int do_send = 1;
-    ECPRBSend *buf = conn->rbuf.send;
+ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq) {
+    unsigned char packet[ECP_MAX_PKT];
+    ECPSocket *sock = conn->sock;
+    ECPContext *ctx = sock->ctx;
+    ECPNetAddr addr;
+    ECPRBInfo rbuf_info;
+    ssize_t rv;
     
-    ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
+    int _rv = ecp_rbuf_info_init(&rbuf_info);
+    if (_rv) return _rv;
+    
+    rbuf_info.seq = seq;
+    rbuf_info.seq_force = 1;
+
+    rv = ctx->pack(conn, packet, ECP_MAX_PKT, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size, &addr, &rbuf_info);
     if (rv < 0) return rv;
 
-    if (buf->win_size) {
+    rv = ecp_pkt_send(sock, &addr, packet, rv);
+    if (rv < 0) return rv;
+
+    return rv;
+}
+
+int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) {
+    memset(rbuf_info, 0, sizeof(ECPRBInfo));
+
+    return ECP_OK;
+}
+
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) {
+    if (buf == NULL) return ECP_ERR;
+    
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_lock(&buf->mutex);
+#endif
+    int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_unlock(&buf->mutex);
+#endif
+
+    if (idx < 0) return idx;
+    
+    rbuf_info->seq = seq;
+    rbuf_info->idx = idx;
+    rbuf_info->mtype = mtype;
+    buf->rbuf.msg[idx].size = 0;
+    buf->rbuf.msg[idx].flags = 0;
+
+    return ECP_OK; 
+}
+
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) {
+    unsigned char flags = 0;
+    int do_send = 1;
+    ssize_t rv = 0;
+    ecp_seq_t seq = rbuf_info->seq;
+    unsigned int idx = rbuf_info->idx;
+    unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK;
+
+    if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+
+    rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+    if (rv < 0) return rv;
+
+    if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
 #ifdef ECP_WITH_PTHREAD
         pthread_mutex_lock(&buf->mutex);
 #endif
 
-        if (buf->cc_wait || (buf->in_transit >= buf->win_size)) {
-            if (!buf->cc_wait) buf->seq_cc = seq;
-            buf->cc_wait++;
-            buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_CCWAIT;
+        if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+        if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+            if (!buf->cnt_cc) buf->seq_cc = seq;
+            buf->cnt_cc++;
+            buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
             do_send = 0;
         } else {
             buf->in_transit++;
@@ -74,36 +129,43 @@ ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned c
 #endif
     }
 
-    if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
+    if (do_send) {
+        int _rv;
+        if (ti) {
+            _rv = ecp_timer_push(ti);
+            if (_rv) return _rv;
+        }
+        rv = ecp_pkt_send(sock, addr, packet, pkt_size);
+    }
     return rv;
 }
 
-int ecp_conn_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
+int ecp_rbuf_conn_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
     int rv;
     
-    rv = ecp_conn_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
+    rv = ecp_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
     if (rv) return rv;
     
-    rv = ecp_conn_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
+    rv = ecp_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
     if (rv) {
-        ecp_conn_rbuf_send_destroy(conn);
+        ecp_rbuf_send_destroy(conn);
         return rv;
     }
     
     return ECP_OK;
 }
 
-void ecp_conn_rbuf_destroy(ECPConnection *conn) {
-    ecp_conn_rbuf_send_destroy(conn);
-    ecp_conn_rbuf_recv_destroy(conn);
+void ecp_rbuf_conn_destroy(ECPConnection *conn) {
+    ecp_rbuf_send_destroy(conn);
+    ecp_rbuf_recv_destroy(conn);
 }
 
-int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
-    int rv = ecp_conn_rbuf_send_start(conn);
+int ecp_rbuf_conn_start(ECPConnection *conn, ecp_seq_t seq) {
+    int rv = ecp_rbuf_send_start(conn);
     if (rv) return rv;
 
     if (!conn->out) {
-        rv = ecp_conn_rbuf_recv_start(conn, seq);
+        rv = ecp_rbuf_recv_start(conn, seq);
         if (rv) return rv;
     }
     
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index b09b19c..1332dee 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,10 +1,11 @@
 #define ECP_RBUF_FLAG_IN_RBUF       0x01
 #define ECP_RBUF_FLAG_IN_MSGQ       0x02
-#define ECP_RBUF_FLAG_DELIVERED     0x04
-#define ECP_RBUF_FLAG_CCWAIT        0x08
+#define ECP_RBUF_FLAG_IN_CCONTROL   0x04
+#define ECP_RBUF_FLAG_SYS           0x80
 
-#define ECP_RBUF_FLAG_RELIABLE      0x01
-#define ECP_RBUF_FLAG_MSGQ          0x02
+#define ECP_RBUF_FLAG_CCONTROL      0x01
+#define ECP_RBUF_FLAG_RELIABLE      0x02
+#define ECP_RBUF_FLAG_MSGQ          0x04
 
 #define ECP_MTYPE_RBACK             0x04
 #define ECP_MTYPE_RBFLUSH           0x05
@@ -66,9 +67,9 @@ typedef struct ECPRBSend {
     unsigned char flush;
     ecp_win_t win_size;
     ecp_win_t in_transit;
-    ecp_win_t cc_wait;
-    ecp_seq_t seq_flush;
+    ecp_win_t cnt_cc;
     ecp_seq_t seq_cc;
+    ecp_seq_t seq_flush;
     unsigned int nack_rate;
     ECPRBuffer rbuf;
 #ifdef ECP_WITH_PTHREAD
@@ -81,28 +82,37 @@ typedef struct ECPConnRBuffer {
     ECPRBSend *send;
 } ECPConnRBuffer;
 
+typedef struct ECPRBInfo {
+    ecp_seq_t seq;
+    unsigned int idx;
+    unsigned char mtype;
+    unsigned char seq_force;
+} ECPRBInfo;
 
 int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
 int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
 int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
 ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx);
-
-int ecp_conn_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
-void ecp_conn_rbuf_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
-
-int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
-void ecp_conn_rbuf_recv_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
-int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
-int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
-ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
-
-int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
-void ecp_conn_rbuf_send_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_send_start(struct ECPConnection *conn);
-ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size);
+ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);
+
+int ecp_rbuf_info_init(ECPRBInfo *rbuf_info);
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info);
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info);
+
+int ecp_rbuf_conn_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
+void ecp_rbuf_conn_destroy(struct ECPConnection *conn);
+int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq);
+
+int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_rbuf_recv_destroy(struct ECPConnection *conn);
+int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
+int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
+int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_rbuf_send_destroy(struct ECPConnection *conn);
+int ecp_rbuf_send_start(struct ECPConnection *conn);
 
 ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
 ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index e00b63f..d78ffce 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -7,32 +7,31 @@
 
 static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
     ECPRBRecv *buf = conn->rbuf.recv;
-    int rv = ECP_OK;
-    ssize_t _rv = 0;
     unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
     unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
         
-    if (mtype < ECP_MAX_MTYPE_SYS) {
-        flags |= ECP_RBUF_FLAG_DELIVERED;
-        ecp_msg_handle(conn, seq, msg, msg_size);
-    }
+    if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
         
 #ifdef ECP_WITH_MSGQ
     if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+        int rv = ECP_OK;
+
         pthread_mutex_lock(&buf->msgq.mutex);
         ecp_seq_t seq_offset = seq - buf->msgq.seq_start;
         if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL;
+        pthread_mutex_unlock(&buf->msgq.mutex);
+        
+        if (rv) return rv;
     }
 #endif
 
-    if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+    ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+    if (rv < 0) return rv;
 
-#ifdef ECP_WITH_MSGQ
-    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
-#endif
+    if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+    if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size);
 
-    if (rv) return rv;
-    return _rv;
+    return rv;
 }
 
 static void msg_flush(ECPConnection *conn) {
@@ -52,11 +51,11 @@ static void msg_flush(ECPConnection *conn) {
         if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
             int rv = 0;
             ecp_seq_t seq = buf->rbuf.seq_start + i;
-            if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
-                buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+            if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
+                buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
             } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
 #ifdef ECP_WITH_MSGQ
-                unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK;
+                unsigned char mtype = buf->rbuf.msg[idx].msg[0];
                 int rv = ecp_conn_msgq_push(conn, seq, mtype);
                 if (rv) break;
                 buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
@@ -92,8 +91,8 @@ static int ack_send(ECPConnection *conn) {
     buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
     buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
     buf_[7] = (buf->ack_map & 0x000000FF);
-    
-    rv = ecp_pld_send(conn, payload, sizeof(payload));
+
+    rv = ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
     if (rv < 0) return rv;
 
     buf->ack_pkt = 0;
@@ -151,7 +150,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
     return 0;
 }
 
-int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
     int rv;
 
     memset(buf, 0, sizeof(ECPRBRecv));
@@ -170,13 +169,13 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage
     return ECP_OK;
 }
 
-void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) {
+void ecp_rbuf_recv_destroy(ECPConnection *conn) {
 #ifdef ECP_WITH_MSGQ
     ecp_conn_msgq_destroy(conn);
 #endif    
 }
 
-int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
     ECPRBRecv *buf = conn->rbuf.recv;
 
     buf->hole_max = hole_max;
@@ -186,18 +185,18 @@ int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
     return ECP_OK;
 }
 
-int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
     ECPRBRecv *buf = conn->rbuf.recv;
 
     buf->deliver_delay = delay;
     if (buf->hole_max < delay - 1) {
-        ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
+        ecp_rbuf_recv_set_hole(conn, delay - 1);
     }
     
     return ECP_OK;
 }
 
-int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
     int rv;
     ECPRBRecv *buf = conn->rbuf.recv;
 
@@ -217,7 +216,7 @@ int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
     return ECP_OK;
 }
 
-ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
     ECPRBRecv *buf = conn->rbuf.recv;
     ecp_seq_t ack_pkt = 0;
     ssize_t rv;
@@ -226,6 +225,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
     if (buf == NULL) return ECP_ERR;
     if (msg_size < 1) return ECP_ERR_MIN_MSG;
     
+    // XXX mtype == RBACK | RBFLUSH handle imediately
     if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
     if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
         ecp_seq_t seq_offset = buf->seq_ack - seq;
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 06bd2e2..e7020cb 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -4,11 +4,20 @@
 
 #define NACK_RATE_UNIT   10000
 
-static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
+static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
 
     ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH);
-    return ecp_pld_send(conn, payload, sizeof(payload));
+    if (ti == NULL) {
+        ECPTimerItem _ti;
+        int rv = ecp_timer_item_init(&_ti, conn, ECP_MTYPE_RBACK, 3, 500);
+        if (rv) return rv;
+
+        _ti.retry = flush_send;
+        rv = ecp_timer_push(&_ti);
+        if (rv) return rv;
+    }
+    return ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
 }
 
 static void cc_flush(ECPConnection *conn) {
@@ -24,8 +33,8 @@ static void cc_flush(ECPConnection *conn) {
         unsigned int _idx = idx;
 
         for (i=0; i<pkt_to_send; i++) {
-            if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break;
-            rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT;
+            if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
+            rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
             _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
         }
         pkt_to_send = i;
@@ -45,7 +54,7 @@ static void cc_flush(ECPConnection *conn) {
 #endif
 
         buf->in_transit += (ecp_win_t)pkt_to_send;
-        buf->cc_wait -= (ecp_win_t)pkt_to_send; 
+        buf->cnt_cc -= (ecp_win_t)pkt_to_send; 
         buf->seq_cc += (ecp_seq_t)pkt_to_send;
     }
 }
@@ -85,7 +94,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
     
     if (!rv) {
         seq_ack++;
-        buf->in_transit -= seq_ack - rbuf->seq_start;
+        if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1;
 
         if (ack_map != ECP_RBUF_ACK_FULL) {
             int nack_first = 0;
@@ -104,7 +113,11 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
                     nack_cnt++;
                     if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
                         unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
-                        ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+                        if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
+                            
+                        } else {
+                            ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+                        }
                         if (!nack_first) {
                             nack_first = 1;
                             seq_start = seq_ack + i;
@@ -118,7 +131,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
             pthread_mutex_lock(&buf->mutex);
 #endif
 
-            buf->in_transit += nack_cnt;
+            if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt;
             buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
             if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
                 rbuf->seq_start = seq_start;
@@ -138,7 +151,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
                 do_flush = 1;
             }
         }
-        if (buf->cc_wait) cc_flush(conn);
+        if (buf->cnt_cc) cc_flush(conn);
     }
 
 #ifdef ECP_WITH_PTHREAD
@@ -147,11 +160,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
 
     if (rv) return rv;
 
-    if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+    if (do_flush) {
+        ssize_t _rv = flush_send(conn, NULL);
+        if (_rv < 0) return _rv;
+    }
     return rsize;
 }
 
-int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
     int rv;
     
     memset(buf, 0, sizeof(ECPRBRecv));
@@ -167,7 +183,7 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage
     return ECP_OK;
 }
 
-void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
+void ecp_rbuf_send_destroy(ECPConnection *conn) {
     ECPRBSend *buf = conn->rbuf.send;
     
     if (buf == NULL) return;
@@ -177,7 +193,7 @@ void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
 #endif
 }
 
-int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
+int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
     ECPRBSend *buf = conn->rbuf.send;
 
     if (buf == NULL) return ECP_ERR;
@@ -187,7 +203,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
 #endif
     
     buf->win_size = size;
-    if (buf->cc_wait) cc_flush(conn);
+    if (buf->cnt_cc) cc_flush(conn);
 
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_unlock(&buf->mutex);
@@ -196,7 +212,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
     return ECP_OK;
 }
 
-int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+int ecp_rbuf_send_start(ECPConnection *conn) {
     ECPRBSend *buf = conn->rbuf.send;
 
     if (buf == NULL) return ECP_ERR;
@@ -204,29 +220,38 @@ int ecp_conn_rbuf_send_start(ECPConnection *conn) {
     return ecp_rbuf_start(&buf->rbuf, conn->seq_out);
 }
 
-int ecp_conn_rbuf_flush(ECPConnection *conn) {
+int ecp_rbuf_flush(ECPConnection *conn) {
     ECPRBSend *buf = conn->rbuf.send;
     unsigned char payload[ECP_SIZE_PLD(0)];
     ecp_seq_t seq;
 
     if (buf == NULL) return ECP_ERR;
 
-    // XXX flush seq
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_lock(&conn->mutex);
+#endif
+    seq = conn->seq_out;
+#ifdef ECP_WITH_PTHREAD
+    pthread_mutex_unlock(&conn->mutex);
+#endif
+
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_lock(&buf->mutex);
 #endif
+
     if (buf->flush) {
         if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq;
     } else {
         buf->flush = 1;
         buf->seq_flush = seq;
     }
+
 #ifdef ECP_WITH_PTHREAD
     pthread_mutex_unlock(&buf->mutex);
 #endif
 
-    ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
-    if (_rv < 0) return _rv;
+    ssize_t rv = flush_send(conn, 0);
+    if (rv < 0) return rv;
 
     return ECP_OK;
 }
diff --git a/code/core/timer.c b/code/core/timer.c
index 8368742..a2d3fa5 100644
--- a/code/core/timer.c
+++ b/code/core/timer.c
@@ -196,10 +196,9 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
                 _rv = retry(conn, to_exec+i);
                 if (_rv < 0) rv = _rv;
             } else {
-                _rv = ecp_pld_send(conn, pld, pld_size);
+                _rv = ecp_pld_send_wtimer(conn, to_exec+i, pld, pld_size);
                 if (_rv < 0) rv = _rv;
             }
-            if (!rv) rv = ecp_timer_push(to_exec+i);
             if (rv && (rv != ECP_ERR_CLOSED) && handler) handler(conn, 0, mtype, NULL, rv);
         } else if (handler) {
             handler(conn, 0, mtype, NULL, ECP_ERR_TIMEOUT);
@@ -224,8 +223,5 @@ ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned
     if (rv) return rv;
 
     ti.retry = send_f;
-    rv = ecp_timer_push(&ti);
-    if (rv) return rv;
-
-    return send_f(conn, NULL);
+    return send_f(conn, &ti);
 }
diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c
index 33fa80b..25b6583 100644
--- a/code/vconn/vconn.c
+++ b/code/vconn/vconn.c
@@ -69,21 +69,11 @@ static void vconn_destroy(ECPConnection *conn) {
 #endif
 }
 
-static ssize_t _vconn_send_open(ECPConnection *conn) {
-    ECPVConnection *conn_v = (ECPVConnection *)conn;
-    ECPConnection *conn_next = conn_v->next;
+static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
 
-    if (conn_next == NULL) return ECP_ERR;
-
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn_next, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
-}
-
-static ssize_t _vconn_retry_kget(ECPConnection *conn, ECPTimerItem *ti) {
-    if (conn->parent == NULL) return ECP_ERR;
-    
-    return _vconn_send_open(conn->parent);
+    return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 static ssize_t vconn_open(ECPConnection *conn) {
@@ -92,14 +82,12 @@ static ssize_t vconn_open(ECPConnection *conn) {
     ECPVConnection *conn_v = (ECPVConnection *)conn;
     ECPConnection *conn_next = conn_v->next;
 
-    rv = ecp_timer_item_init(&ti, conn_next, ECP_MTYPE_KGET_REP, 3, 3000);
-    if (rv) return rv;
+    if (conn_next == NULL) return ECP_ERR;
 
-    ti.retry = _vconn_retry_kget;
-    rv = ecp_timer_push(&ti);
-    if (rv) return rv;
+    ssize_t _rv = ecp_timer_send(conn_next, _vconn_send_open, ECP_MTYPE_KGET_REP, 3, 3000);
+    if (_rv < 0) return _rv;
 
-    return _vconn_send_open(conn);
+    return _rv;
 }
 
 static ssize_t vconn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
@@ -278,7 +266,7 @@ static ssize_t _vlink_send_open(ECPConnection *conn, ECPTimerItem *ti) {
     buf[0] = conn->type;
     memcpy(buf+1, ctx->cr.dh_pub_get_buf(&sock->key_perma.public), ECP_ECDH_SIZE_KEY);
 
-    return ecp_pld_send(conn, payload, sizeof(payload));
+    return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload));
 }
 
 static ssize_t vlink_open(ECPConnection *conn) {
@@ -407,7 +395,7 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t
 }
 
 
-static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
+static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info) {
     ECPContext *ctx = conn->sock->ctx;
 
     if (conn->parent) {
@@ -415,12 +403,12 @@ static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt
         ssize_t rv, hdr_size = vconn_set_msg(conn->parent, payload_, sizeof(payload_), payload, payload_size);
         if (hdr_size < 0) return hdr_size;
 
-        rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, seq, rbuf_idx);
+        rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, _rbuf_info);
         if (rv < 0) return rv;
 
-        return vconn_pack(conn->parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
+        return vconn_pack(conn->parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL);
     } else {
-        return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, seq, rbuf_idx);
+        return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, _rbuf_info);
     }
 }
 
@@ -435,7 +423,7 @@ static ssize_t vconn_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned c
         rv = ecp_pack(ctx, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
         if (rv < 0) return rv;
 
-        return vconn_pack(parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL);
+        return vconn_pack(parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL);
     } else {
         return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
     }
@@ -521,7 +509,7 @@ static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
     unsigned char payload[ECP_SIZE_PLD(0)];
 
     ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
-    return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+    return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
 }
 
 int ecp_vconn_open(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn[], ECPNode vconn_node[], int size) {
-- 
cgit v1.2.3