summaryrefslogtreecommitdiff
path: root/ecp
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2022-01-30 02:04:24 +0100
committerUros Majstorovic <majstor@majstor.org>2022-01-30 02:04:24 +0100
commit483b954dff83b86877184718acdd66533cb694ac (patch)
treef4f3b594416468674ba6321e2fb690649d852697 /ecp
parentceb32f60ef7d6210883acf1f17500f87cac8888c (diff)
code cleanup and fixes mainly rbuf
Diffstat (limited to 'ecp')
-rw-r--r--ecp/src/TODO2
-rw-r--r--ecp/src/core.c79
-rw-r--r--ecp/src/core.h40
-rw-r--r--ecp/src/msgq.c68
-rw-r--r--ecp/src/msgq.h1
-rw-r--r--ecp/src/rbuf.c214
-rw-r--r--ecp/src/rbuf.h61
-rw-r--r--ecp/src/rbuf_recv.c293
-rw-r--r--ecp/src/rbuf_send.c255
-rw-r--r--ecp/src/vconn/vconn.c28
-rw-r--r--ecp/test/frag.c2
11 files changed, 555 insertions, 488 deletions
diff --git a/ecp/src/TODO b/ecp/src/TODO
index 982e382..01c2a28 100644
--- a/ecp/src/TODO
+++ b/ecp/src/TODO
@@ -7,5 +7,3 @@
rbuf:
- implement _wait variants of open / send
- msgq should subtract ECP_MAX_MTYPE_SYS from mtype (make ECPConnMsgQ smaller)
-- consider adding one buffer for all msgs (frag. issue)
-
diff --git a/ecp/src/core.c b/ecp/src/core.c
index a8f8f01..90ee925 100644
--- a/ecp/src/core.c
+++ b/ecp/src/core.c
@@ -311,7 +311,6 @@ static ECPDHKey *conn_dhkey_get(ECPConnection *conn, unsigned char idx) {
}
static int conn_dhkey_new_pair(ECPConnection *conn, ECPDHKey *key) {
- /* called when client makes new key pair */
ECPSocket *sock = conn->sock;
conn->key_curr = conn->key_curr == ECP_ECDH_IDX_INV ? 0 : (conn->key_curr+1) % ECP_MAX_CONN_KEY;
@@ -349,8 +348,8 @@ static void conn_dhkey_del_pair(ECPConnection *conn, unsigned char idx) {
conn->key_idx_map[idx] = ECP_ECDH_IDX_INV;
}
+/* remote client obtained our key */
static int conn_dhkey_new_pub_local(ECPConnection *conn, unsigned char idx) {
- // Remote obtained our key
unsigned char new = conn->key_idx_curr == ECP_ECDH_IDX_INV ? 0 : (conn->key_idx_curr+1) % ECP_MAX_NODE_KEY;
int i;
@@ -367,8 +366,8 @@ static int conn_dhkey_new_pub_local(ECPConnection *conn, unsigned char idx) {
return ECP_OK;
}
+/* this client obtained remote key */
static int conn_dhkey_new_pub_remote(ECPConnection *conn, unsigned char idx, unsigned char *public) {
- // We obtained remote key
ECPSocket *sock = conn->sock;
ECPDHRKeyBucket *remote = &conn->remote;
unsigned char new = remote->key_curr == ECP_ECDH_IDX_INV ? 0 : (remote->key_curr+1) % ECP_MAX_NODE_KEY;
@@ -649,7 +648,7 @@ int ecp_conn_open(ECPConnection *conn, ECPNode *node) {
return ECP_OK;
}
-void _ecp_conn_close(ECPConnection *conn) {
+static void conn_close(ECPConnection *conn) {
ECPContext *ctx = conn->sock->ctx;
if (ecp_conn_is_inb(conn) && conn->parent) {
@@ -672,7 +671,7 @@ int ecp_conn_close(ECPConnection *conn) {
if (refcount) return ECP_ERR_BUSY;
- _ecp_conn_close(conn);
+ conn_close(conn);
return ECP_OK;
}
@@ -737,7 +736,7 @@ void ecp_conn_refcount_dec(ECPConnection *conn) {
pthread_mutex_unlock(&conn->mutex);
#endif
- if (!is_reg && (refcount == 0)) _ecp_conn_close(conn);
+ if (!is_reg && (refcount == 0)) conn_close(conn);
}
int ecp_conn_handler_init(ECPConnHandler *handler) {
@@ -751,7 +750,7 @@ int ecp_conn_handler_init(ECPConnHandler *handler) {
#ifdef ECP_WITH_RBUF
handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack;
handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush;
- handler->msg[ECP_MTYPE_RBFLUSH_PTS] = ecp_rbuf_handle_flush_pts;
+ handler->msg[ECP_MTYPE_RBTIMER] = ecp_rbuf_handle_timer;
#endif
return ECP_OK;
}
@@ -867,7 +866,7 @@ static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn);
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ);
- return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
+ return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
}
static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) {
@@ -903,7 +902,7 @@ static ssize_t _conn_send_dir(ECPConnection *conn, ECPTimerItem *ti) {
payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_DIR_REQ, conn);
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REQ);
- return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti);
+ return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti);
}
ssize_t ecp_conn_send_open(ECPConnection *conn) {
@@ -1114,7 +1113,7 @@ ssize_t ecp_conn_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
while (rem_size) {
_rv = ecp_msg_get_type(msg, rem_size, &mtype);
- if (_rv) return ECP_ERR_MIN_MSG;
+ if (_rv) return ECP_ERR;
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
ecp_timer_pop(conn, mtype);
@@ -1133,7 +1132,7 @@ ssize_t ecp_conn_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
}
content = ecp_msg_get_content(msg, rem_size);
- if (content == NULL) return ECP_ERR_MIN_MSG;
+ if (content == NULL) return ECP_ERR;
rem_size -= content - msg;
handler = ecp_conn_get_msg_handler(conn, mtype & ECP_MTYPE_MASK);
@@ -1209,7 +1208,6 @@ ssize_t ecp_sock_handle_kget(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *p
return msg_size;
}
-
ssize_t _ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, ECPPktMeta *pkt_meta, unsigned char *payload, size_t pld_size) {
ssize_t rv;
unsigned char s_idx, c_idx;
@@ -1246,7 +1244,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP
}
#endif
-ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, unsigned char *payload, size_t pld_size, ECPNetAddr *addr) {
+ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) {
ECPPktMeta pkt_meta;
int rv;
ssize_t _rv;
@@ -1254,6 +1252,7 @@ ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_si
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
+
if (s_idx == ECP_ECDH_IDX_INV) {
if (ecp_conn_is_outb(conn)) {
if (conn->remote.key_curr != ECP_ECDH_IDX_INV) s_idx = conn->remote.key[conn->remote.key_curr].idx;
@@ -1326,11 +1325,11 @@ ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_si
}
#ifndef ECP_WITH_VCONN
-ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) {
+ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) {
if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR;
if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR;
- return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr);
+ return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si);
}
#endif
@@ -1683,13 +1682,13 @@ int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char mtype, unsign
int rv;
rv = ecp_msg_get_frag(msg_in, msg_in_size, &frag_cnt, &frag_tot, &frag_size);
- if (rv) return ECP_ERR_MIN_MSG;
+ if (rv) return ECP_ERR;
content = ecp_msg_get_content(msg_in, msg_in_size);
- if (content == NULL) return ECP_ERR_MIN_MSG;
+ if (content == NULL) return ECP_ERR;
msg_size = msg_in_size - (content - msg_in);
- if (msg_size == 0) return ECP_ERR_MIN_MSG;
+ if (msg_size == 0) return ECP_ERR;
if (iter->msg_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter);
@@ -1725,6 +1724,8 @@ int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char mtype, unsign
}
int ecp_pld_get_type(unsigned char *payload, size_t pld_size, unsigned char *mtype) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1732,6 +1733,8 @@ int ecp_pld_get_type(unsigned char *payload, size_t pld_size, unsigned char *mty
}
int ecp_pld_set_type(unsigned char *payload, size_t pld_size, unsigned char mtype) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1739,6 +1742,8 @@ int ecp_pld_set_type(unsigned char *payload, size_t pld_size, unsigned char mtyp
}
int ecp_pld_get_frag(unsigned char *payload, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1746,6 +1751,8 @@ int ecp_pld_get_frag(unsigned char *payload, size_t pld_size, unsigned char *fra
}
int ecp_pld_set_frag(unsigned char *payload, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_size) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1753,6 +1760,8 @@ int ecp_pld_set_frag(unsigned char *payload, size_t pld_size, unsigned char frag
}
int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1760,6 +1769,8 @@ int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts) {
}
int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
@@ -1767,18 +1778,20 @@ int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts) {
}
unsigned char *ecp_pld_get_buf(unsigned char *payload, size_t pld_size) {
+ if (pld_size < ECP_SIZE_PLD_HDR) return NULL;
+
payload += ECP_SIZE_PLD_HDR;
pld_size -= ECP_SIZE_PLD_HDR;
return ecp_msg_get_content(payload, pld_size);
}
-static ssize_t pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
+ssize_t __ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) {
ECPSocket *sock = conn->sock;
ECPNetAddr addr;
ssize_t rv;
- rv = ecp_pack_conn(conn, packet, s_idx, c_idx, si, payload, pld_size, &addr);
+ rv = ecp_pack_conn(conn, packet, s_idx, c_idx, payload, pld_size, &addr, si);
if (rv < 0) return rv;
#ifdef ECP_WITH_RBUF
@@ -1797,35 +1810,43 @@ static ssize_t pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_
return ecp_pkt_send(sock, &addr, packet, rv, flags);
}
-ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
#ifdef ECP_WITH_RBUF
- if ((si == NULL) && conn->rbuf.send) {
+
+ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
+ if (conn->rbuf.send) {
ECPSeqItem seq_item;
int rv;
rv = ecp_seq_item_init(&seq_item);
if (rv) return rv;
- return pld_send(conn, packet, s_idx, c_idx, &seq_item, payload, pld_size, flags, ti);
+ return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, &seq_item);
}
-#endif
- return pld_send(conn, packet, s_idx, c_idx, NULL, payload, pld_size, flags, ti);
+ return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, NULL);
}
+#else
+
+ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
+ return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, NULL);
+}
+
+#endif
+
ssize_t ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags) {
- return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, pld_size, flags, NULL);
+ return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, flags, NULL);
}
ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
- return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, pld_size, flags, ti);
+ return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, flags, ti);
}
ssize_t ecp_pld_send_tr(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, unsigned char flags) {
ECPNetAddr _addr;
ssize_t rv;
- rv = ecp_pack(sock->ctx, parent, packet, pkt_meta, payload, pld_size, &_addr);
+ rv = ecp_pack(sock->ctx, parent, packet, pkt_meta, payload, pld_size, addr ? NULL : &_addr);
if (rv < 0) return rv;
return ecp_pkt_send(sock, addr ? addr : &_addr, packet, rv, flags);
@@ -1887,7 +1908,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *conten
content += frag_size;
seq_item.seq = seq_start + i;
- _rv = _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &seq_item, &payload, ECP_SIZE_PLD(frag_size, mtype), 0, NULL);
+ _rv = __ecp_pld_send(conn, &packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(frag_size, mtype), 0, NULL, &seq_item);
if (_rv < 0) return _rv;
rv += _rv;
diff --git a/ecp/src/core.h b/ecp/src/core.h
index bf04947..d842e8b 100644
--- a/ecp/src/core.h
+++ b/ecp/src/core.h
@@ -18,19 +18,14 @@
#define ECP_ERR_SIZE -4
#define ECP_ERR_ITER -5
#define ECP_ERR_BUSY -6
+#define ECP_ERR_EMPTY -7
+#define ECP_ERR_FULL -8
#define ECP_ERR_MAX_SOCK_CONN -10
#define ECP_ERR_MAX_CTYPE -11
#define ECP_ERR_MAX_MTYPE -12
-#define ECP_ERR_MIN_PKT -13
-#define ECP_ERR_MAX_PLD -14
-// XXX ???
-#define ECP_ERR_MIN_MSG -15
-#define ECP_ERR_MAX_MSG -16
-//
-#define ECP_ERR_NET_ADDR -17
-
-#define ECP_ERR_CONN_NOT_FOUND -20
+#define ECP_ERR_NET_ADDR -13
+
#define ECP_ERR_ECDH_KEY_DUP -21
#define ECP_ERR_ECDH_IDX -22
#define ECP_ERR_ECDH_IDX_LOCAL -23
@@ -42,7 +37,6 @@
#define ECP_ERR_RECV -29
#define ECP_ERR_SEQ -30
#define ECP_ERR_CLOSED -31
-#define ECP_ERR_HANDLE -32
#define ECP_ERR_NOT_IMPLEMENTED -99
#define ECP_SIZE_PROTO 2
@@ -98,19 +92,19 @@
#define ECP_SEND_FLAG_REPLY 0x01
#define ECP_SEND_FLAG_MORE 0x02
-#define ecp_conn_is_inb(conn) (!((conn)->flags_ro & ECP_CONN_FLAG_OUTB))
-#define ecp_conn_is_outb(conn) ((conn)->flags_ro & ECP_CONN_FLAG_OUTB)
-#define ecp_conn_is_new(conn) ((conn)->flags_ro & ECP_CONN_FLAG_NEW)
+#define ecp_conn_is_inb(conn) (!((conn)->flags_im & ECP_CONN_FLAG_OUTB))
+#define ecp_conn_is_outb(conn) ((conn)->flags_im & ECP_CONN_FLAG_OUTB)
+#define ecp_conn_is_new(conn) ((conn)->flags_im & ECP_CONN_FLAG_NEW)
#define ecp_conn_is_reg(conn) ((conn)->flags & ECP_CONN_FLAG_REG)
#define ecp_conn_is_open(conn) ((conn)->flags & ECP_CONN_FLAG_OPEN)
-#define ecp_conn_set_outb(conn) ((conn)->flags_ro |= ECP_CONN_FLAG_OUTB)
-#define ecp_conn_set_new(conn) ((conn)->flags_ro |= ECP_CONN_FLAG_NEW)
+#define ecp_conn_set_outb(conn) ((conn)->flags_im |= ECP_CONN_FLAG_OUTB)
+#define ecp_conn_set_new(conn) ((conn)->flags_im |= ECP_CONN_FLAG_NEW)
#define ecp_conn_set_reg(conn) ((conn)->flags |= ECP_CONN_FLAG_REG)
#define ecp_conn_set_open(conn) ((conn)->flags |= ECP_CONN_FLAG_OPEN)
-#define ecp_conn_clr_outb(conn) ((conn)->flags_ro &= ~ECP_CONN_FLAG_OUTB)
-#define ecp_conn_clr_new(conn) ((conn)->flags_ro &= ~ECP_CONN_FLAG_NEW)
+#define ecp_conn_clr_outb(conn) ((conn)->flags_im &= ~ECP_CONN_FLAG_OUTB)
+#define ecp_conn_clr_new(conn) ((conn)->flags_im &= ~ECP_CONN_FLAG_NEW)
#define ecp_conn_clr_reg(conn) ((conn)->flags &= ~ECP_CONN_FLAG_REG)
#define ecp_conn_clr_open(conn) ((conn)->flags &= ~ECP_CONN_FLAG_OPEN)
@@ -224,7 +218,7 @@ typedef struct ECPSeqItem {
#ifdef ECP_WITH_RBUF
unsigned char rb_pass;
unsigned char rb_mtype;
- unsigned int rb_idx;
+ unsigned short rb_idx;
#endif
} ECPSeqItem;
@@ -293,7 +287,7 @@ typedef struct ECPSocket {
typedef struct ECPConnection {
unsigned char type;
unsigned char flags;
- unsigned char flags_ro;
+ unsigned char flags_im;
unsigned short refcount;
ecp_seq_t seq_out;
ecp_seq_t seq_in;
@@ -347,7 +341,6 @@ void ecp_conn_unregister(ECPConnection *conn, unsigned short *refcount);
int ecp_conn_get_dirlist(ECPConnection *conn, ECPNode *node);
int ecp_conn_open(ECPConnection *conn, ECPNode *node);
-void _ecp_conn_close(ECPConnection *conn);
int ecp_conn_close(ECPConnection *conn);
int ecp_conn_reset(ECPConnection *conn);
void ecp_conn_refcount_inc(ECPConnection *conn);
@@ -377,8 +370,8 @@ ssize_t ecp_sock_handle_kget(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *p
ssize_t _ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, ECPPktMeta *pkt_meta, unsigned char *payload, size_t pld_size);
ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) ;
-ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, unsigned char *payload, size_t pld_size, ECPNetAddr *addr);
-ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr);
+ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si);
+ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si);
ssize_t ecp_unpack(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP2Buffer *bufs, size_t pkt_size, ECPConnection **_conn, ecp_seq_t *_seq);
ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP2Buffer *bufs, size_t pkt_size);
@@ -401,7 +394,8 @@ int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts);
int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts);
unsigned char *ecp_pld_get_buf(unsigned char *payload, size_t pld_size);
-ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti);
+ssize_t __ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si);
+ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti);
ssize_t ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags);
ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti);
ssize_t ecp_pld_send_tr(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, unsigned char flags);
diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c
index 61c7b02..8f81d8a 100644
--- a/ecp/src/msgq.c
+++ b/ecp/src/msgq.c
@@ -4,9 +4,6 @@
#include <sys/time.h>
-#define MIN(a,b) (((a)<(b))?(a):(b))
-#define MAX(a,b) (((a)>(b))?(a):(b))
-
#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {
@@ -25,8 +22,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
int i;
int rv;
- if (msgq == NULL) return ECP_ERR;
-
memset(msgq, 0, sizeof(ECPConnMsgQ));
rv = pthread_mutex_init(&msgq->mutex, NULL);
@@ -51,8 +46,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {
int i;
- if (msgq == NULL) return;
-
for (i=0; i<ECP_MAX_MTYPE; i++) {
pthread_cond_destroy(&msgq->cond[i]);
}
@@ -60,8 +53,6 @@ void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {
}
int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {
- if (msgq == NULL) return ECP_ERR;
-
msgq->seq_max = seq;
msgq->seq_start = seq + 1;
@@ -70,13 +61,11 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {
int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) {
ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
+ ECPConnMsgQ *msgq = &buf->msgq;
- mtype &= ECP_MTYPE_MASK;
- if (msgq == NULL) return ECP_ERR;
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
- if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG;
+ if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL;
if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq;
@@ -89,13 +78,15 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
+ ECPConnMsgQ *msgq = &buf->msgq;
+ ECPRBuffer *rbuf = &buf->rbuf;
ecp_seq_t seq;
- ecp_seq_t seq_offset;
- unsigned int idx;
+ unsigned char *msg_buf;
+ unsigned char *content;
+ unsigned short idx;
+ int _rv;
ssize_t rv;
- if (msgq == NULL) return ECP_ERR;
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {
@@ -103,35 +94,48 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha
pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);
} else {
struct timespec ts;
- int _rv;
_rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));
if (_rv) return ECP_ERR_TIMEOUT;
}
}
-
seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])];
- seq_offset = seq - buf->rbuf.seq_start;
- idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size);
- msgq->idx_r[mtype]++;
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
+ if (_rv) return ECP_ERR;
+ msg_buf = rbuf->arr.msg[idx].buf;
+ rv = rbuf->arr.msg[idx].size;
+
+ content = ecp_msg_get_content(msg_buf, rv);
+ if (content == NULL) {
+ rv = ECP_ERR;
+ goto msgq_pop_fin;
+ }
+
+ rv -= content - msg_buf;
+ if (msg_size < rv) {
+ rv = ECP_ERR_FULL;
+ goto msgq_pop_fin;
+ }
+
+ memcpy(msg, content, rv);
+
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ // if (rbuf->arr.msg[idx].flags == 0);
+
+msgq_pop_fin:
+ msgq->idx_r[mtype]++;
if (msgq->seq_start == seq) {
- int i, _idx = idx;
- ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1;
+ int i;
+ unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1;
for (i=0; i<msg_cnt; i++) {
- if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
- _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size);
+ if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
msgq->seq_start += i;
}
- rv = buf->rbuf.msg[idx].size - 1;
- if (rv >= 0) {
- rv = MIN(msg_size, rv);
- memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv);
- }
return rv;
}
diff --git a/ecp/src/msgq.h b/ecp/src/msgq.h
index a81ff36..394209f 100644
--- a/ecp/src/msgq.h
+++ b/ecp/src/msgq.h
@@ -1,7 +1,6 @@
#ifdef ECP_WITH_MSGQ
#define ECP_MSGQ_MAX_MSG 32
-#define ECP_MSGQ_ERR_MAX_MSG -110
typedef struct ECPConnMsgQ {
unsigned short idx_w[ECP_MAX_MTYPE];
diff --git a/ecp/src/rbuf.c b/ecp/src/rbuf.c
index 9680d14..be8e9f3 100644
--- a/ecp/src/rbuf.c
+++ b/ecp/src/rbuf.c
@@ -1,18 +1,5 @@
#include "core.h"
-int _ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
- rbuf->msg = msg;
- if (msg_size) {
- if (msg == NULL) return ECP_ERR;
- rbuf->msg_size = msg_size;
- memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size);
- } else {
- rbuf->msg_size = ECP_SEQ_HALF;
- }
-
- return ECP_OK;
-}
-
int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) {
rbuf->seq_max = seq;
rbuf->seq_start = seq + 1;
@@ -20,29 +7,17 @@ int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) {
return ECP_OK;
}
-int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
+int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) {
ecp_seq_t seq_offset = seq - rbuf->seq_start;
- // This also checks for seq_start <= seq if seq type range >> rbuf->msg_size
- if (seq_offset < rbuf->msg_size) return ECP_RBUF_IDX_MASK(rbuf->msg_start + seq_offset, rbuf->msg_size);
- return ECP_ERR_RBUF_FULL;
-}
+ /* This also checks for seq_start <= seq if seq type range >> rbuf->arr_size */
+ if (seq_offset >= rbuf->arr_size) return ECP_ERR_FULL;
-ssize_t _ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
- idx = idx < 0 ? _ecp_rbuf_msg_idx(rbuf, seq) : idx;
- if (idx < 0) return idx;
-
- if (rbuf->msg == NULL) return 0;
- if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP;
-
- if (msg_size) memcpy(rbuf->msg[idx].msg, msg, msg_size);
- rbuf->msg[idx].size = msg_size;
- rbuf->msg[idx].flags = set_flags;
-
- return msg_size;
+ *idx = ECP_RBUF_IDX_MASK(rbuf->idx_start + seq_offset, rbuf->arr_size);
+ return ECP_OK;
}
-int ecp_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
+int ecp_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBPacket *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
int rv;
if (buf_s) {
@@ -72,39 +47,16 @@ void ecp_rbuf_destroy(ECPConnection *conn) {
ecp_rbuf_recv_destroy(conn);
}
-ssize_t ecp_rbuf_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq) {
- ECPSocket *sock = conn->sock;
- ECPContext *ctx = sock->ctx;
- ECPNetAddr addr;
- ECPSeqItem seq_item;
- ssize_t rv;
- int _rv;
-
- _rv = ecp_seq_item_init(&seq_item);
- if (_rv) return _rv;
-
- seq_item.seq = seq;
- seq_item.seq_w = 1;
- seq_item.rb_pass = 1;
-
- rv = ecp_pack_conn(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &seq_item, payload, pld_size, &addr);
- if (rv < 0) return rv;
-
- rv = ecp_pkt_send(sock, &addr, packet, rv, flags);
- if (rv < 0) return rv;
-
- return rv;
-}
-
int ecp_rbuf_handle_seq(ECPConnection *conn, unsigned char mtype) {
if (conn->rbuf.recv || (mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return 1;
return 0;
}
int ecp_rbuf_set_seq(ECPConnection *conn, ECPSeqItem *si, unsigned char *payload, size_t pld_size) {
- ECPRBSend *buf;
+ ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
unsigned char mtype;
- int idx;
+ unsigned short idx;
int rv;
if (si->rb_pass) return ECP_OK;
@@ -113,86 +65,142 @@ int ecp_rbuf_set_seq(ECPConnection *conn, ECPSeqItem *si, unsigned char *payload
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
- idx = _ecp_rbuf_msg_idx(&buf->rbuf, si->seq);
+ rv = _ecp_rbuf_msg_idx(rbuf, si->seq, &idx);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
#endif
-
- if (idx < 0) return idx;
+ if (rv) return rv;
rv = ecp_pld_get_type(payload, pld_size, &mtype);
if (rv) return rv;
si->rb_mtype = mtype;
si->rb_idx = idx;
- buf->rbuf.msg[idx].size = 0;
- buf->rbuf.msg[idx].flags = 0;
+ rbuf->arr.pkt[idx].size = 0;
+ rbuf->arr.pkt[idx].flags = 0;
return ECP_OK;
}
+ssize_t ecp_rbuf_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq) {
+ ECPSocket *sock = conn->sock;
+ ECPContext *ctx = sock->ctx;
+ ECPNetAddr addr;
+ ECPSeqItem seq_item;
+ int _rv;
+ ssize_t rv;
+
+ _rv = ecp_seq_item_init(&seq_item);
+ if (_rv) return _rv;
+
+ seq_item.seq = seq;
+ seq_item.seq_w = 1;
+ seq_item.rb_pass = 1;
+
+ rv = ecp_pack_conn(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, &addr, &seq_item);
+ if (rv < 0) return rv;
+
+ rv = ecp_pkt_send(sock, &addr, packet, rv, flags);
+ if (rv < 0) return rv;
+
+ return rv;
+}
+
ssize_t ecp_rbuf_pkt_send(ECPConnection *conn, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) {
- ECPRBSend *buf;
- int do_send = 1;
+ ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ ecp_seq_t seq = si->seq;
+ unsigned short idx = si->rb_idx;
+ unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
+ unsigned char _flags;
+ int rb_rel;
+ int rb_cc;
+ int do_send;
+ int do_skip;
+ int _rv = ECP_OK;
ssize_t rv = 0;
- buf = conn->rbuf.send;
- if (!si->rb_pass) {
- unsigned char flags = 0;
- ecp_seq_t seq = si->seq;
- unsigned int idx = si->rb_idx;
- unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
+ if (pkt_size == 0) return ECP_ERR;
+
+ do_send = 1;
+ do_skip = ecp_rbuf_skip(mtype);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ rb_rel = (buf->flags & ECP_RBUF_FLAG_RELIABLE);
+ rb_cc = ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && (buf->cnt_cc || (buf->in_transit >= buf->win_size)));
+
+ if (rb_rel || rb_cc) {
+ ECPRBTimer *rb_timer = NULL;
+ ECPRBTimerItem *rb_ti = NULL;
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+ _flags = ECP_RBUF_FLAG_IN_RBUF;
+ if (do_skip) {
+ _flags |= ECP_RBUF_FLAG_SKIP;
+ } else {
+ do_send = 0;
+ }
- rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet->buffer, pkt_size, 0, flags);
- if (rv < 0) return rv;
+ if (rbuf->arr.pkt[idx].flags) _rv = ECP_ERR_RBUF_DUP;
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
- int _rv = ECP_OK;
+ if (!_rv && !do_send && ti) {
+ rb_timer = buf->timer;
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
+ pthread_mutex_lock(&rb_timer->mutex);
#endif
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
-
- if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
- if (!buf->cnt_cc) buf->seq_cc = seq;
- buf->cnt_cc++;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
- do_send = 0;
- if (ti) {
- ECPRBTimer *timer = &buf->timer;
- ECPRBTimerItem *item = &timer->item[timer->idx_w];
-
- if (!item->occupied) {
- item->occupied = 1;
- item->item = *ti;
- buf->rbuf.msg[idx].idx_t = timer->idx_w;
- timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
- } else {
- _rv = ECP_ERR_MAX_TIMER;
- }
- } else {
- buf->rbuf.msg[idx].idx_t = -1;
- }
+ rb_ti = &rb_timer->item[rb_timer->idx_w];
+ if (rb_ti->empty) {
+ rb_ti->empty = 0;
+ rb_ti->item = *ti;
+ rb_timer->idx_w = (rb_timer->idx_w + 1) % ECP_MAX_TIMER;
} else {
- buf->in_transit++;
+ _rv = ECP_ERR_MAX_TIMER;
}
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
+ pthread_mutex_unlock(&rb_timer->mutex);
#endif
+ }
- if (_rv) return _rv;
+ if (_rv) {
+ rv = _rv;
+ goto pkt_send_fin;
+ }
+
+ if (!do_send) {
+ memcpy(rbuf->arr.pkt[idx].buf, packet->buffer, pkt_size);
+ rbuf->arr.pkt[idx].size = pkt_size;
+ rbuf->arr.pkt[idx].timer = rb_ti;
+ rv = pkt_size;
+ }
+ rbuf->arr.pkt[idx].flags = _flags;
+
+ if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
+
+ if (rb_cc && !do_send) {
+ if (buf->cnt_cc == 0) buf->seq_cc = seq;
+ buf->cnt_cc++;
}
}
+ if ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && !do_skip && do_send) {
+ buf->in_transit++;
+ }
+
+pkt_send_fin:
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (rv < 0) return rv;
+
if (do_send) {
if (ti) {
- int _rv;
-
_rv = ecp_timer_push(ti);
if (_rv) return _rv;
}
diff --git a/ecp/src/rbuf.h b/ecp/src/rbuf.h
index 891f29d..31aeb39 100644
--- a/ecp/src/rbuf.h
+++ b/ecp/src/rbuf.h
@@ -1,7 +1,7 @@
#define ECP_RBUF_FLAG_IN_RBUF 0x01
#define ECP_RBUF_FLAG_IN_MSGQ 0x02
-#define ECP_RBUF_FLAG_IN_CCONTROL 0x04
-#define ECP_RBUF_FLAG_SYS 0x80
+#define ECP_RBUF_FLAG_IN_TIMER 0x04
+#define ECP_RBUF_FLAG_SKIP 0x08
#define ECP_RBUF_FLAG_CCONTROL 0x01
#define ECP_RBUF_FLAG_RELIABLE 0x02
@@ -9,58 +9,66 @@
#define ECP_MTYPE_RBACK 0x04
#define ECP_MTYPE_RBFLUSH 0x05
-#define ECP_MTYPE_RBFLUSH_PTS 0x06
+#define ECP_MTYPE_RBTIMER 0x06
#define ECP_MTYPE_NOP 0x07
#define ECP_ERR_RBUF_DUP -100
-#define ECP_ERR_RBUF_FULL -101
typedef uint32_t ecp_win_t;
/* size must be power of 2 */
#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
+#define ecp_rbuf_skip(mtype) ((mtype & ECP_MTYPE_MASK) < ECP_MTYPE_NOP ? 1 : 0)
#ifdef ECP_WITH_MSGQ
#include "msgq.h"
#endif
typedef struct ECPRBTimerItem {
- unsigned char occupied;
ECPTimerItem item;
+ unsigned char empty;
} ECPRBTimerItem;
typedef struct ECPRBTimer {
ECPRBTimerItem item[ECP_MAX_TIMER];
unsigned short idx_w;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_t mutex;
+#endif
} ECPRBTimer;
typedef struct ECPRBMessage {
- unsigned char msg[ECP_MAX_PKT];
- ssize_t size;
+ unsigned char buf[ECP_MAX_MSG];
+ size_t size;
unsigned char flags;
- short idx_t;
} ECPRBMessage;
+typedef struct ECPRBPacket {
+ unsigned char buf[ECP_MAX_PKT];
+ size_t size;
+ unsigned char flags;
+ ECPRBTimerItem *timer;
+} ECPRBPacket;
+
typedef struct ECPRBuffer {
ecp_seq_t seq_start;
ecp_seq_t seq_max;
- unsigned int msg_size;
- unsigned int msg_start;
- ECPRBMessage *msg;
+ unsigned short arr_size;
+ unsigned short idx_start;
+ union {
+ ECPRBMessage *msg;
+ ECPRBPacket *pkt;
+ } arr;
} ECPRBuffer;
typedef struct ECPRBRecv {
unsigned char flags;
- unsigned char timer_pts;
- unsigned char ack_do;
+ ecp_cts_t deliver_delay;
unsigned short hole_max;
unsigned short ack_rate;
- ecp_pts_t deliver_delay;
+ unsigned short ack_pkt;
ecp_seq_t seq_ack;
- ecp_seq_t ack_pkt;
ecp_ack_t ack_map;
- ecp_ack_t hole_mask_full;
- ecp_ack_t hole_mask_empty;
ECPRBuffer rbuf;
#ifdef ECP_WITH_MSGQ
ECPConnMsgQ msgq;
@@ -70,15 +78,16 @@ typedef struct ECPRBRecv {
typedef struct ECPRBSend {
unsigned char flags;
- unsigned char flush;
ecp_win_t win_size;
ecp_win_t in_transit;
ecp_win_t cnt_cc;
ecp_seq_t seq_cc;
ecp_seq_t seq_flush;
+ ecp_seq_t seq_nack;
+ unsigned char flush;
unsigned int nack_rate;
ECPRBuffer rbuf;
- ECPRBTimer timer;
+ ECPRBTimer *timer;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_t mutex;
#endif
@@ -89,28 +98,26 @@ typedef struct ECPConnRBuffer {
ECPRBSend *send;
} ECPConnRBuffer;
-int _ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
-int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
-ssize_t _ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
+int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx);
-int ecp_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
+int ecp_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBPacket *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
void ecp_rbuf_destroy(struct ECPConnection *conn);
ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, struct ECPBuffer *packet, struct ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq);
int ecp_rbuf_handle_seq(struct ECPConnection *conn, unsigned char mtype);
int ecp_rbuf_set_seq(struct ECPConnection *conn, struct ECPSeqItem *si, unsigned char *payload, size_t pld_size);
ssize_t ecp_rbuf_pkt_send(struct ECPConnection *conn, struct ECPSocket *sock, ECPNetAddr *addr, struct ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, struct ECPSeqItem *si);
-int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size);
void ecp_rbuf_recv_destroy(struct ECPConnection *conn);
int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
int ecp_rbuf_set_hole(struct ECPConnection *conn, unsigned short hole_max);
-int ecp_rbuf_set_delay(struct ECPConnection *conn, ecp_pts_t delay);
+int ecp_rbuf_set_delay(struct ECPConnection *conn, ecp_cts_t delay);
ssize_t ecp_rbuf_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b);
struct ECPFragIter *ecp_rbuf_get_frag_iter(struct ECPConnection *conn);
-int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBPacket *msg, unsigned short msg_size);
void ecp_rbuf_send_destroy(struct ECPConnection *conn);
int ecp_rbuf_send_start(struct ECPConnection *conn);
int ecp_rbuf_flush(struct ECPConnection *conn);
@@ -118,4 +125,4 @@ int ecp_rbuf_set_wsize(struct ECPConnection *conn, ecp_win_t size);
ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b);
ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b);
-ssize_t ecp_rbuf_handle_flush_pts(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b);
+ssize_t ecp_rbuf_handle_timer(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b);
diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c
index f2bb1c2..98b472c 100644
--- a/ecp/src/rbuf_recv.c
+++ b/ecp/src/rbuf_recv.c
@@ -5,130 +5,115 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1))
-static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char mtype) {
ECPRBRecv *buf = conn->rbuf.recv;
- unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
- unsigned char mtype;
- ssize_t rv;
- int _rv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ unsigned short idx;
+ unsigned char flags;
+ int skip;
+ int rv;
- _rv = ecp_msg_get_type(msg, msg_size, &mtype);
- if (_rv) return _rv;
+ rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
+ if (rv) return rv;
- mtype &= ECP_MTYPE_MASK;
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+ if (rbuf->arr.msg[idx].flags) return ECP_ERR_RBUF_DUP;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
ecp_seq_t seq_offset;
- int _rv = ECP_OK;
pthread_mutex_lock(&buf->msgq.mutex);
+
seq_offset = seq - buf->msgq.seq_start;
- if (seq_offset >= buf->rbuf.msg_size) _rv = ECP_ERR_RBUF_FULL;
+ if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL;
+
pthread_mutex_unlock(&buf->msgq.mutex);
- if (_rv) return _rv;
+ if (rv) return rv;
}
#endif
- rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
- if (rv < 0) return rv;
+ skip = ecp_rbuf_skip(mtype);
+ flags = ECP_RBUF_FLAG_IN_RBUF;
+ if (skip) flags |= ECP_RBUF_FLAG_SKIP;
+ rbuf->arr.msg[idx].flags = flags;
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
- if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
+ if (skip) return 0;
- return rv;
+ memcpy(rbuf->arr.msg[idx].buf, msg, msg_size);
+ rbuf->arr.msg[idx].size = msg_size;
+ if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
+
+ return msg_size;
}
static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ ecp_seq_t seq;
+ unsigned short idx;
+ int i;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
#endif
- ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
- ecp_seq_t seq_next = buf->rbuf.seq_start;
- ecp_seq_t i = 0;
- unsigned int idx = buf->rbuf.msg_start;
+ seq = rbuf->seq_start;
+ idx = rbuf->idx_start;
- if (buf->timer_pts) {
- ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS);
- buf->timer_pts = 0;
- }
+ unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
- for (i=0; i<msg_cnt; i++) {
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
- } else {
+ while (ECP_SEQ_LTE(seq, rbuf->seq_max)) {
+ if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
+ if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_SKIP)) {
ecp_pts_t msg_pts;
- ecp_seq_t seq = buf->rbuf.seq_start + i;
- unsigned char frag_tot;
- unsigned char frag_cnt;
- uint16_t frag_size;
int rv;
- rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size);
- if (!rv && (frag_cnt != 0) && (seq != seq_next)) {
- ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
-
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) {
- ecp_seq_t seq_fbeg = seq - frag_cnt;
- ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next;
-
- i -= seq_offset;
- idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
- break;
- }
- }
+ rv = ecp_msg_get_pts(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &msg_pts);
+ if (!rv && buf->deliver_delay) {
+ ecp_cts_t now = ecp_tm_abstime_ms(0);
- rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts);
- if (!rv) {
- ecp_pts_t now = ecp_tm_abstime_ms(0);
+ msg_pts += buf->deliver_delay;
if (ECP_PTS_LT(now, msg_pts)) {
- ECPTimerItem ti;
- ecp_seq_t seq_offset = seq - seq_next;
+ if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) {
+ ECPTimerItem ti;
- rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now);
- if (!rv) rv = ecp_timer_push(&ti);
- if (!rv) buf->timer_pts = 1;
-
- i -= seq_offset;
- idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBTIMER, 0, msg_pts - now);
+ if (!rv) rv = ecp_timer_push(&ti);
+ if (!rv) rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_TIMER;
+ }
break;
+ } else if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER) {
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER;
}
}
- seq_next = seq + 1;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
unsigned char mtype;
- rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype);
- if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype);
+ rv = ecp_msg_get_type(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &mtype);
+ if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK);
if (rv) break;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
+ rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
} else
-
#endif
- ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b);
+ ecp_conn_handle_msg(conn, seq, rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, b);
+ } else {
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_SKIP;
}
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ // if (rbuf->arr.msg[idx].flags == 0);
} else {
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break;
- if (buf->hole_max) {
- ecp_seq_t seq = buf->rbuf.seq_start + i;
- ecp_seq_t seq_offset = buf->rbuf.seq_max - seq;
- if (seq_offset <= buf->hole_max) break;
- }
+ if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break;
}
- idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
+ seq++;
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
- buf->rbuf.seq_start += i;
- buf->rbuf.msg_start = idx;
+ rbuf->seq_start = seq;
+ rbuf->idx_start = idx;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
@@ -136,7 +121,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
}
-static int ack_send(ECPConnection *conn) {
+static int ack_send(ECPConnection *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) {
ECPRBRecv *buf = conn->rbuf.recv;
ECPBuffer packet;
ECPBuffer payload;
@@ -152,56 +137,53 @@ static int ack_send(ECPConnection *conn) {
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK);
_buf = ecp_pld_get_buf(payload.buffer, payload.size);
- _buf[0] = (buf->seq_ack & 0xFF000000) >> 24;
- _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16;
- _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8;
- _buf[3] = (buf->seq_ack & 0x000000FF);
- _buf[4] = (buf->ack_map & 0xFF000000) >> 24;
- _buf[5] = (buf->ack_map & 0x00FF0000) >> 16;
- _buf[6] = (buf->ack_map & 0x0000FF00) >> 8;
- _buf[7] = (buf->ack_map & 0x000000FF);
-
- rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0);
+ _buf[0] = (seq_ack & 0xFF000000) >> 24;
+ _buf[1] = (seq_ack & 0x00FF0000) >> 16;
+ _buf[2] = (seq_ack & 0x0000FF00) >> 8;
+ _buf[3] = (seq_ack & 0x000000FF);
+ _buf[4] = (ack_map & 0xFF000000) >> 24;
+ _buf[5] = (ack_map & 0x00FF0000) >> 16;
+ _buf[6] = (ack_map & 0x0000FF00) >> 8;
+ _buf[7] = (ack_map & 0x000000FF);
+
+ rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0);
if (rv < 0) return rv;
buf->ack_pkt = 0;
- buf->ack_do = 0;
+
return ECP_OK;
}
static int ack_shift(ECPRBRecv *buf) {
+ ECPRBuffer *rbuf = &buf->rbuf;
+ unsigned short idx;
int do_ack = 0;
int in_rbuf = 0;
- int idx;
- int i;
+ int rv;
if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
- while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) {
+ /* walks through messages that are not delivered yet, so no need for msgq mutex lock */
+ while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) {
buf->seq_ack++;
- in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF;
-
+ rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx);
+ if (!rv) {
+ in_rbuf = rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF;
+ } else {
+ in_rbuf = 1;
+ }
if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
if (in_rbuf) {
buf->ack_map |= 1;
- } else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {
+ } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) {
do_ack = 1;
}
- if ((buf->ack_map & ACK_MASK_FIRST) == 0) break;
- }
-
- if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
- ecp_ack_t hole_mask = buf->ack_map;
-
- for (i=0; i<buf->hole_max-1; i++) {
- hole_mask = hole_mask >> 1;
- if ((hole_mask & buf->hole_mask_empty) == 0) {
- do_ack = 1;
- break;
- }
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) {
+ do_ack = 1;
+ break;
}
}
@@ -209,34 +191,38 @@ static int ack_shift(ECPRBRecv *buf) {
}
ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
- if (size < 0) return size;
-
ECPRBRecv *buf = conn->rbuf.recv;
+
if (buf == NULL) return ECP_ERR;
+ if (size < 0) return size;
ecp_tr_release(b->packet, 1);
- ack_send(conn);
+ ack_send(conn, buf->seq_ack, buf->ack_map);
return 0;
}
-ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
+ssize_t ecp_rbuf_handle_timer(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
+
if (buf == NULL) return ECP_ERR;
- buf->timer_pts = 0;
msg_flush(conn, b);
return 0;
}
-int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size) {
+ ECPRBuffer *rbuf = &buf->rbuf;
int rv;
+ if (msg == NULL) return ECP_ERR;
+
memset(buf, 0, sizeof(ECPRBRecv));
- rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size);
- if (rv) return rv;
+ memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
buf->ack_map = ECP_ACK_FULL;
buf->ack_rate = ACK_RATE;
+ rbuf->arr.msg = msg;
+ rbuf->arr_size = msg_size;
#ifdef ECP_WITH_MSGQ
rv = ecp_conn_msgq_create(&buf->msgq);
@@ -251,22 +237,23 @@ void ecp_rbuf_recv_destroy(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return;
+
#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(&buf->msgq);
#endif
-
conn->rbuf.recv = NULL;
}
int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
- int rv;
ECPRBRecv *buf = conn->rbuf.recv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ int rv;
if (buf == NULL) return ECP_ERR;
seq--;
buf->seq_ack = seq;
- rv = _ecp_rbuf_start(&buf->rbuf, seq);
+ rv = _ecp_rbuf_start(rbuf, seq);
if (rv) return rv;
#ifdef ECP_WITH_MSGQ
@@ -283,13 +270,11 @@ int ecp_rbuf_set_hole(ECPConnection *conn, unsigned short hole_max) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->hole_max = hole_max;
- buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
- buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
return ECP_OK;
}
-int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) {
+int ecp_rbuf_set_delay(ECPConnection *conn, ecp_cts_t delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
@@ -299,69 +284,83 @@ int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) {
ssize_t ecp_rbuf_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
- ecp_seq_t ack_pkt = 0;
- ssize_t rv;
- int _rv;
+ ECPRBuffer *rbuf = &buf->rbuf;
unsigned char mtype;
+ unsigned short ack_pkt = 0;
+ int do_ack = 0;
+ int _rv;
+ ssize_t rv;
_rv = ecp_msg_get_type(msg, msg_size, &mtype);
if (_rv) return _rv;
- mtype &= ECP_MTYPE_MASK;
- if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
-
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) {
- ack_pkt = seq - buf->rbuf.seq_max;
- buf->ack_pkt += ack_pkt;
- if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1;
+ if (ECP_SEQ_LT(rbuf->seq_max, seq)) {
+ ack_pkt = seq - rbuf->seq_max;
}
if (ECP_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_SIZE_ACKB) {
- ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
-
- if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
+ ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset);
- buf->ack_map |= ack_mask;
- buf->ack_do = buf->ack_do || ack_shift(buf);
+ if (ack_bit & buf->ack_map) return ECP_ERR_RBUF_DUP;
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
+
+ buf->ack_map |= ack_bit;
+ do_ack = ack_shift(buf);
} else {
return ECP_ERR_RBUF_DUP;
}
} else {
- if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
+ unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+
+ if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
} else {
- ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
- rv = msg_size;
- buf->rbuf.seq_max++;
- buf->rbuf.seq_start++;
- buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
+ /* receive buffer is empty, so no need for msgq mutex lock */
+ rv = 0;
+ rbuf->seq_max++;
+ rbuf->seq_start++;
+ rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size);
}
buf->seq_ack++;
} else {
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
- buf->ack_do = buf->ack_do || ack_shift(buf);
+ do_ack = ack_shift(buf);
}
}
msg_flush(conn, b);
- if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) {
- int _rv;
-
- _rv = ack_send(conn);
+ if (ack_pkt) {
+ buf->ack_pkt += ack_pkt;
+ if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1;
+ }
+ if (do_ack) {
+ ecp_seq_t seq_ack = buf->seq_ack;
+ ecp_seq_t ack_map = buf->ack_map;
+
+ /* account for missing mackets within hole_max range */
+ if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) {
+ unsigned short h_bits = buf->hole_max + 1;
+ ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits);
+
+ if ((ack_map & h_mask) != h_mask) {
+ h_mask = ~(~((ecp_seq_t)0) >> h_bits);
+ seq_ack -= h_bits;
+ ack_map = (ack_map >> h_bits) | h_mask;
+ }
+ }
+ _rv = ack_send(conn, seq_ack, ack_map);
if (_rv) return _rv;
}
return rv;
}
-
ECPFragIter *ecp_rbuf_get_frag_iter(ECPConnection *conn) {
if (conn->rbuf.recv) return conn->rbuf.recv->frag_iter;
return NULL;
diff --git a/ecp/src/rbuf_send.c b/ecp/src/rbuf_send.c
index faf2a7d..9f29010 100644
--- a/ecp/src/rbuf_send.c
+++ b/ecp/src/rbuf_send.c
@@ -1,7 +1,10 @@
#include "core.h"
#include "tr.h"
-#define NACK_RATE_UNIT 10000
+#define NACK_RATE_UNIT 10000
+
+#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
+#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
ECPBuffer packet;
@@ -26,76 +29,83 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
rv = ecp_timer_push(&_ti);
if (rv) return rv;
}
- return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, 0);
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0);
}
-static void cc_flush(ECPConnection *conn) {
+static void cc_flush(ECPConnection *conn, unsigned char flags) {
ECPRBSend *buf = conn->rbuf.send;
ECPRBuffer *rbuf = &buf->rbuf;
- ecp_seq_t pkt_buf_cnt = rbuf->seq_max - rbuf->seq_start + 1;
- ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;
- int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;
- int i;
+ unsigned short idx;
+ int rv;
- ECPTimerItem ti[ECP_MAX_TIMER];
- unsigned short max_t = 0;
+ rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx);
+ if (rv) return;
- if (pkt_to_send) {
- unsigned int idx = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
- unsigned int _idx = idx;
+ while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) {
+ ECPRBTimerItem *ti;
+ ECPBuffer packet;
- for (i=0; i<pkt_to_send; i++) {
- if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
- rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
- if (rbuf->msg[_idx].idx_t != -1) {
- ECPRBTimer *timer = &buf->timer;
- ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t];
-
- item->occupied = 0;
- ti[max_t] = item->item;
- rbuf->msg[_idx].idx_t = max_t;
- max_t++;
- }
- _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
- }
- pkt_to_send = i;
- _idx = idx;
+ if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break;
+ if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) {
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
+ pthread_mutex_unlock(&buf->mutex);
#endif
- for (i=0; i<pkt_to_send; i++) {
- ECPBuffer packet;
+ ti = rbuf->arr.pkt[idx].timer;
+ if (ti) ecp_timer_push(&ti->item);
+ packet.buffer = rbuf->arr.pkt[idx].buf;
+ packet.size = ECP_MAX_PKT;
+ ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, flags);
- if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]);
- packet.buffer = rbuf->msg[_idx].msg;
- packet.size = rbuf->msg[_idx].size;
- ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0);
- _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
- }
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ if (ti) {
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
+ pthread_mutex_lock(&buf->timer->mutex);
#endif
+ ti->empty = 1;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->timer->mutex);
+#endif
+ }
- buf->in_transit += (ecp_win_t)pkt_to_send;
- buf->cnt_cc -= (ecp_win_t)pkt_to_send;
- buf->seq_cc += (ecp_seq_t)pkt_to_send;
+ buf->cnt_cc--;
+ buf->in_transit++;
+ }
+ if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) {
+ rbuf->arr.pkt[idx].flags = 0;
+ // if (rbuf->arr.pkt[idx].flags == 0);
+ }
+ buf->seq_cc++;
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
+ }
+ if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) {
+ rbuf->seq_start = buf->seq_cc;
+ rbuf->idx_start = idx;
}
}
ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
ECPRBSend *buf;
+ ECPRBuffer *rbuf;
ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
ecp_seq_t seq_ack = 0;
ecp_ack_t ack_map = 0;
- int i;
+ ecp_seq_t seq_start;
+ ecp_seq_t seq_max;
+ unsigned short idx;
+ unsigned short msg_cnt;
int do_flush = 0;
- int rv = ECP_OK;
+ int i;
+ int rv;
buf = conn->rbuf.send;
if (buf == NULL) return size;
+
+ rbuf = &buf->rbuf;
if (size < 0) return size;
if (size < rsize) return ECP_ERR;
@@ -111,89 +121,108 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
(msg[7]);
ecp_tr_release(b->packet, 1);
- ecp_tr_flag_set(ECP_SEND_FLAG_MORE);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
- ECPRBuffer *rbuf = &buf->rbuf;
- int idx = _ecp_rbuf_msg_idx(rbuf, seq_ack);
- if (idx < 0) rv = idx;
+ seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max);
+ if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR;
- if (!rv) {
- seq_ack++;
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1;
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx);
+ if (rv) goto handle_ack_fin;
+ }
- if (ack_map != ECP_ACK_FULL) {
- int nack_first = 0;
- unsigned int msg_start;
- ecp_seq_t seq_start;
- ecp_win_t nack_cnt = 0;
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+ buf->in_transit = seq_max - seq_ack;
+ }
- seq_ack -= ECP_SIZE_ACKB;
+ if (ack_map != ECP_ACK_FULL) {
+ ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1);
+ unsigned short nack_cnt = 0;
+ int nack_first = 0;
+
+ seq_ack -= (ECP_SIZE_ACKB - 1);
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx);
+ if (rv) goto handle_ack_fin;
+ }
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
+ pthread_mutex_unlock(&buf->mutex);
#endif
- for (i=0; i<ECP_SIZE_ACKB; i++) {
- if ((ack_map & ((ecp_ack_t)1 << (ECP_SIZE_ACKB - i - 1))) == 0) {
+ for (i=0; i<ECP_SIZE_ACKB; i++) {
+ if ((ack_map & ack_mask) == 0) {
+ if (ECP_SEQ_LT(buf->seq_nack, seq_ack)) {
nack_cnt++;
- if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size);
- if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
- ECPBuffer packet;
- ECPBuffer payload;
- unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)];
- unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)];
-
- packet.buffer = pkt_buf;
- packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn);
- payload.buffer = pld_buf;
- payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn);
-
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP);
- ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), 0, seq_ack + i);
- } else {
- ECPBuffer packet;
- packet.buffer = rbuf->msg[_idx].msg;
- packet.size = rbuf->msg[_idx].size;
- ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0);
- }
- if (!nack_first) {
- nack_first = 1;
- seq_start = seq_ack + i;
- msg_start = _idx;
+ buf->seq_nack = seq_ack;
+ }
+
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)];
+ unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)];
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn);
+ payload.buffer = pld_buf;
+ payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn);
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP);
+ ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), ECP_SEND_FLAG_MORE, seq_ack);
+ } else {
+ ECPBuffer packet;
+
+ packet.buffer = rbuf->arr.pkt[idx].buf;
+ packet.size = ECP_MAX_PKT;
+ ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE);
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+ buf->in_transit++;
}
}
+ if (!nack_first) {
+ nack_first = 1;
+ seq_start = seq_ack;
+ }
}
}
+ seq_ack++;
+ ack_mask = ack_mask >> 1;
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
+ }
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
+ pthread_mutex_lock(&buf->mutex);
#endif
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt;
- buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8;
- if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- rbuf->seq_start = seq_start;
- rbuf->msg_start = msg_start;
- } else {
- rbuf->seq_start = seq_ack + ECP_SIZE_ACKB;
- rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
- }
- } else {
- buf->nack_rate = (buf->nack_rate * 7) / 8;
- rbuf->seq_start = seq_ack;
- rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
- }
- if (buf->flush) {
- if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
- if (buf->flush) do_flush = 1;
+ buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8;
+ } else {
+ buf->nack_rate = (buf->nack_rate * 7) / 8;
+ seq_start = seq_ack + 1;
+ }
+
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
+ msg_cnt = seq_start - rbuf->seq_start;
+ idx = rbuf->idx_start;
+ for (i=0; i<msg_cnt; i++) {
+ rbuf->arr.pkt[idx].flags = 0;
+ // if (rbuf->arr.pkt[idx].flags == 0);
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
- if (buf->cnt_cc) cc_flush(conn);
+ rbuf->seq_start = seq_start;
+ rbuf->idx_start = idx;
}
+ if (buf->flush) {
+ if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
+ if (buf->flush) do_flush = 1;
+ }
+ if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE);
+
+handle_ack_fin:
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
@@ -204,21 +233,27 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
_rv = flush_send(conn, NULL);
if (_rv < 0) rv = _rv;
+ } else {
+ // ecp_tr_nomore();
}
- ecp_tr_flag_clear(ECP_SEND_FLAG_MORE);
ecp_tr_release(b->packet, 0);
if (rv) return rv;
return rsize;
}
-int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) {
+ ECPRBuffer *rbuf = &buf->rbuf;
int rv;
+ if (pkt == NULL) return ECP_ERR;
+
memset(buf, 0, sizeof(ECPRBRecv));
- rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size);
- if (rv) return rv;
+ memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size);
+
+ rbuf->arr.pkt = pkt;
+ rbuf->arr_size = pkt_size;
#ifdef ECP_WITH_PTHREAD
rv = pthread_mutex_init(&buf->mutex, NULL);
@@ -243,10 +278,12 @@ void ecp_rbuf_send_destroy(ECPConnection *conn) {
int ecp_rbuf_send_start(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
if (buf == NULL) return ECP_ERR;
- return _ecp_rbuf_start(&buf->rbuf, conn->seq_out);
+ buf->seq_nack = conn->seq_out;
+ return _ecp_rbuf_start(rbuf, conn->seq_out);
}
int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) {
@@ -259,7 +296,7 @@ int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) {
#endif
buf->win_size = size;
- if (buf->cnt_cc) cc_flush(conn);
+ if (buf->cnt_cc) cc_flush(conn, 0);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
diff --git a/ecp/src/vconn/vconn.c b/ecp/src/vconn/vconn.c
index d60f5f2..d3eec44 100644
--- a/ecp/src/vconn/vconn.c
+++ b/ecp/src/vconn/vconn.c
@@ -64,7 +64,7 @@ static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn);
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ);
- return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
+ return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
}
static ssize_t vconn_send_open(ECPConnection *conn) {
@@ -188,7 +188,7 @@ static ssize_t vconn_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c
if (b == NULL) return ECP_ERR;
if (size < 0) return size;
- if (size < ECP_MIN_PKT) return ECP_ERR_MIN_PKT;
+ if (size < ECP_MIN_PKT) return ECP_ERR;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&key_perma_mutex);
@@ -337,7 +337,7 @@ static ssize_t vlink_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c
if (b == NULL) return ECP_ERR;
if (size < 0) return size;
- if (size < ECP_MIN_PKT) return ECP_ERR_MIN_PKT;
+ if (size < ECP_MIN_PKT) return ECP_ERR;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&key_next_mutex);
@@ -456,7 +456,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP
if (payload->size < rv+hdr_size) return ECP_ERR;
memcpy(payload->buffer, packet->buffer, rv+hdr_size);
- return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, rv+hdr_size, addr);
+ return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, rv+hdr_size, addr, NULL);
} else {
return _ecp_pack(ctx, packet->buffer, packet->size, pkt_meta, payload->buffer, pld_size);
}
@@ -487,7 +487,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP
rv = _ecp_pack(ctx, _payload.buffer+hdr_size, _payload.size-hdr_size, pkt_meta, payload->buffer, pld_size);
if (rv < 0) return rv;
- return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, &_payload, rv+hdr_size, addr);
+ return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &_payload, rv+hdr_size, addr, NULL);
} else {
return _ecp_pack(ctx, packet->buffer, packet->size, pkt_meta, payload->buffer, pld_size);
}
@@ -498,7 +498,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP
#ifdef ECP_MEM_TINY
/* Memory limited version */
-ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) {
+ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) {
if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR;
if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR;
@@ -513,21 +513,21 @@ ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_id
hdr_size = vconn_set_msg(conn->parent, packet, mtype);
if (hdr_size < 0) return hdr_size;
- rv = _ecp_pack_conn(conn, packet->buffer+hdr_size, packet->size-hdr_size, s_idx, c_idx, si, payload->buffer, pld_size, NULL);
+ rv = _ecp_pack_conn(conn, packet->buffer+hdr_size, packet->size-hdr_size, s_idx, c_idx, payload->buffer, pld_size, NULL, si);
if (rv < 0) return rv;
if (payload->size < rv+hdr_size) return ECP_ERR;
memcpy(payload->buffer, packet->buffer, rv+hdr_size);
- return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, rv+hdr_size, addr);
+ return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, rv+hdr_size, addr, NULL);
} else {
- return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr);
+ return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si);
}
}
#else
-ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) {
+ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) {
if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR;
if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR;
@@ -547,12 +547,12 @@ ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_id
hdr_size = vconn_set_msg(conn->parent, &_payload, mtype);
if (hdr_size < 0) return hdr_size;
- rv = _ecp_pack_conn(conn, _payload.buffer+hdr_size, _payload.size-hdr_size, s_idx, c_idx, si, payload->buffer, pld_size, NULL);
+ rv = _ecp_pack_conn(conn, _payload.buffer+hdr_size, _payload.size-hdr_size, s_idx, c_idx, payload->buffer, pld_size, NULL, si);
if (rv < 0) return rv;
- return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, &_payload, rv+hdr_size, addr);
+ return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &_payload, rv+hdr_size, addr, NULL);
} else {
- return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr);
+ return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si);
}
}
@@ -662,7 +662,7 @@ static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn);
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ);
- return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
+ return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti);
}
int ecp_vconn_open(ECPConnection *conn, ECPNode *conn_node, ECPVConnOut vconn[], ECPNode vconn_node[], int size) {
diff --git a/ecp/test/frag.c b/ecp/test/frag.c
index 73e45cb..70b20de 100644
--- a/ecp/test/frag.c
+++ b/ecp/test/frag.c
@@ -46,7 +46,7 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
printf("MSG C:%s size:%ld\n", p, s);
ECPRBuffer *rbuf = &conn->rbuf.recv->rbuf;
- printf("RBUF: %d %d %d %d\n", rbuf->seq_start, rbuf->seq_max, rbuf->msg_start, rbuf->msg_size);
+ printf("RBUF: %d %d %d %d\n", rbuf->seq_start, rbuf->seq_max, rbuf->idx_start, rbuf->arr_size);
return s;
}