From 483b954dff83b86877184718acdd66533cb694ac Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Sun, 30 Jan 2022 02:04:24 +0100 Subject: code cleanup and fixes mainly rbuf --- ecp/src/TODO | 2 - ecp/src/core.c | 79 +++++++++----- ecp/src/core.h | 40 +++---- ecp/src/msgq.c | 68 ++++++------ ecp/src/msgq.h | 1 - ecp/src/rbuf.c | 214 ++++++++++++++++++------------------ ecp/src/rbuf.h | 61 ++++++----- ecp/src/rbuf_recv.c | 293 +++++++++++++++++++++++++------------------------- ecp/src/rbuf_send.c | 255 ++++++++++++++++++++++++------------------- ecp/src/vconn/vconn.c | 28 ++--- ecp/test/frag.c | 2 +- 11 files changed, 555 insertions(+), 488 deletions(-) (limited to 'ecp') diff --git a/ecp/src/TODO b/ecp/src/TODO index 982e382..01c2a28 100644 --- a/ecp/src/TODO +++ b/ecp/src/TODO @@ -7,5 +7,3 @@ rbuf: - implement _wait variants of open / send - msgq should subtract ECP_MAX_MTYPE_SYS from mtype (make ECPConnMsgQ smaller) -- consider adding one buffer for all msgs (frag. issue) - diff --git a/ecp/src/core.c b/ecp/src/core.c index a8f8f01..90ee925 100644 --- a/ecp/src/core.c +++ b/ecp/src/core.c @@ -311,7 +311,6 @@ static ECPDHKey *conn_dhkey_get(ECPConnection *conn, unsigned char idx) { } static int conn_dhkey_new_pair(ECPConnection *conn, ECPDHKey *key) { - /* called when client makes new key pair */ ECPSocket *sock = conn->sock; conn->key_curr = conn->key_curr == ECP_ECDH_IDX_INV ? 0 : (conn->key_curr+1) % ECP_MAX_CONN_KEY; @@ -349,8 +348,8 @@ static void conn_dhkey_del_pair(ECPConnection *conn, unsigned char idx) { conn->key_idx_map[idx] = ECP_ECDH_IDX_INV; } +/* remote client obtained our key */ static int conn_dhkey_new_pub_local(ECPConnection *conn, unsigned char idx) { - // Remote obtained our key unsigned char new = conn->key_idx_curr == ECP_ECDH_IDX_INV ? 0 : (conn->key_idx_curr+1) % ECP_MAX_NODE_KEY; int i; @@ -367,8 +366,8 @@ static int conn_dhkey_new_pub_local(ECPConnection *conn, unsigned char idx) { return ECP_OK; } +/* this client obtained remote key */ static int conn_dhkey_new_pub_remote(ECPConnection *conn, unsigned char idx, unsigned char *public) { - // We obtained remote key ECPSocket *sock = conn->sock; ECPDHRKeyBucket *remote = &conn->remote; unsigned char new = remote->key_curr == ECP_ECDH_IDX_INV ? 0 : (remote->key_curr+1) % ECP_MAX_NODE_KEY; @@ -649,7 +648,7 @@ int ecp_conn_open(ECPConnection *conn, ECPNode *node) { return ECP_OK; } -void _ecp_conn_close(ECPConnection *conn) { +static void conn_close(ECPConnection *conn) { ECPContext *ctx = conn->sock->ctx; if (ecp_conn_is_inb(conn) && conn->parent) { @@ -672,7 +671,7 @@ int ecp_conn_close(ECPConnection *conn) { if (refcount) return ECP_ERR_BUSY; - _ecp_conn_close(conn); + conn_close(conn); return ECP_OK; } @@ -737,7 +736,7 @@ void ecp_conn_refcount_dec(ECPConnection *conn) { pthread_mutex_unlock(&conn->mutex); #endif - if (!is_reg && (refcount == 0)) _ecp_conn_close(conn); + if (!is_reg && (refcount == 0)) conn_close(conn); } int ecp_conn_handler_init(ECPConnHandler *handler) { @@ -751,7 +750,7 @@ int ecp_conn_handler_init(ECPConnHandler *handler) { #ifdef ECP_WITH_RBUF handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack; handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush; - handler->msg[ECP_MTYPE_RBFLUSH_PTS] = ecp_rbuf_handle_flush_pts; + handler->msg[ECP_MTYPE_RBTIMER] = ecp_rbuf_handle_timer; #endif return ECP_OK; } @@ -867,7 +866,7 @@ static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn); ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ); - return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); + return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); } static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) { @@ -903,7 +902,7 @@ static ssize_t _conn_send_dir(ECPConnection *conn, ECPTimerItem *ti) { payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_DIR_REQ, conn); ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REQ); - return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti); + return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti); } ssize_t ecp_conn_send_open(ECPConnection *conn) { @@ -1114,7 +1113,7 @@ ssize_t ecp_conn_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char *m while (rem_size) { _rv = ecp_msg_get_type(msg, rem_size, &mtype); - if (_rv) return ECP_ERR_MIN_MSG; + if (_rv) return ECP_ERR; if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; ecp_timer_pop(conn, mtype); @@ -1133,7 +1132,7 @@ ssize_t ecp_conn_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char *m } content = ecp_msg_get_content(msg, rem_size); - if (content == NULL) return ECP_ERR_MIN_MSG; + if (content == NULL) return ECP_ERR; rem_size -= content - msg; handler = ecp_conn_get_msg_handler(conn, mtype & ECP_MTYPE_MASK); @@ -1209,7 +1208,6 @@ ssize_t ecp_sock_handle_kget(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *p return msg_size; } - ssize_t _ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, ECPPktMeta *pkt_meta, unsigned char *payload, size_t pld_size) { ssize_t rv; unsigned char s_idx, c_idx; @@ -1246,7 +1244,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP } #endif -ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, unsigned char *payload, size_t pld_size, ECPNetAddr *addr) { +ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) { ECPPktMeta pkt_meta; int rv; ssize_t _rv; @@ -1254,6 +1252,7 @@ ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_si #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn->mutex); #endif + if (s_idx == ECP_ECDH_IDX_INV) { if (ecp_conn_is_outb(conn)) { if (conn->remote.key_curr != ECP_ECDH_IDX_INV) s_idx = conn->remote.key[conn->remote.key_curr].idx; @@ -1326,11 +1325,11 @@ ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_si } #ifndef ECP_WITH_VCONN -ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) { +ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) { if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR; if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR; - return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr); + return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si); } #endif @@ -1683,13 +1682,13 @@ int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char mtype, unsign int rv; rv = ecp_msg_get_frag(msg_in, msg_in_size, &frag_cnt, &frag_tot, &frag_size); - if (rv) return ECP_ERR_MIN_MSG; + if (rv) return ECP_ERR; content = ecp_msg_get_content(msg_in, msg_in_size); - if (content == NULL) return ECP_ERR_MIN_MSG; + if (content == NULL) return ECP_ERR; msg_size = msg_in_size - (content - msg_in); - if (msg_size == 0) return ECP_ERR_MIN_MSG; + if (msg_size == 0) return ECP_ERR; if (iter->msg_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter); @@ -1725,6 +1724,8 @@ int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char mtype, unsign } int ecp_pld_get_type(unsigned char *payload, size_t pld_size, unsigned char *mtype) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1732,6 +1733,8 @@ int ecp_pld_get_type(unsigned char *payload, size_t pld_size, unsigned char *mty } int ecp_pld_set_type(unsigned char *payload, size_t pld_size, unsigned char mtype) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1739,6 +1742,8 @@ int ecp_pld_set_type(unsigned char *payload, size_t pld_size, unsigned char mtyp } int ecp_pld_get_frag(unsigned char *payload, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1746,6 +1751,8 @@ int ecp_pld_get_frag(unsigned char *payload, size_t pld_size, unsigned char *fra } int ecp_pld_set_frag(unsigned char *payload, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_size) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1753,6 +1760,8 @@ int ecp_pld_set_frag(unsigned char *payload, size_t pld_size, unsigned char frag } int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1760,6 +1769,8 @@ int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts) { } int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts) { + if (pld_size < ECP_SIZE_PLD_HDR) return ECP_ERR; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; @@ -1767,18 +1778,20 @@ int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts) { } unsigned char *ecp_pld_get_buf(unsigned char *payload, size_t pld_size) { + if (pld_size < ECP_SIZE_PLD_HDR) return NULL; + payload += ECP_SIZE_PLD_HDR; pld_size -= ECP_SIZE_PLD_HDR; return ecp_msg_get_content(payload, pld_size); } -static ssize_t pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) { +ssize_t __ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) { ECPSocket *sock = conn->sock; ECPNetAddr addr; ssize_t rv; - rv = ecp_pack_conn(conn, packet, s_idx, c_idx, si, payload, pld_size, &addr); + rv = ecp_pack_conn(conn, packet, s_idx, c_idx, payload, pld_size, &addr, si); if (rv < 0) return rv; #ifdef ECP_WITH_RBUF @@ -1797,35 +1810,43 @@ static ssize_t pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_ return ecp_pkt_send(sock, &addr, packet, rv, flags); } -ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) { #ifdef ECP_WITH_RBUF - if ((si == NULL) && conn->rbuf.send) { + +ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) { + if (conn->rbuf.send) { ECPSeqItem seq_item; int rv; rv = ecp_seq_item_init(&seq_item); if (rv) return rv; - return pld_send(conn, packet, s_idx, c_idx, &seq_item, payload, pld_size, flags, ti); + return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, &seq_item); } -#endif - return pld_send(conn, packet, s_idx, c_idx, NULL, payload, pld_size, flags, ti); + return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, NULL); } +#else + +ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) { + return __ecp_pld_send(conn, packet, s_idx, c_idx, payload, pld_size, flags, ti, NULL); +} + +#endif + ssize_t ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags) { - return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, pld_size, flags, NULL); + return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, flags, NULL); } ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) { - return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, pld_size, flags, ti); + return _ecp_pld_send(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, flags, ti); } ssize_t ecp_pld_send_tr(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, unsigned char flags) { ECPNetAddr _addr; ssize_t rv; - rv = ecp_pack(sock->ctx, parent, packet, pkt_meta, payload, pld_size, &_addr); + rv = ecp_pack(sock->ctx, parent, packet, pkt_meta, payload, pld_size, addr ? NULL : &_addr); if (rv < 0) return rv; return ecp_pkt_send(sock, addr ? addr : &_addr, packet, rv, flags); @@ -1887,7 +1908,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *conten content += frag_size; seq_item.seq = seq_start + i; - _rv = _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &seq_item, &payload, ECP_SIZE_PLD(frag_size, mtype), 0, NULL); + _rv = __ecp_pld_send(conn, &packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(frag_size, mtype), 0, NULL, &seq_item); if (_rv < 0) return _rv; rv += _rv; diff --git a/ecp/src/core.h b/ecp/src/core.h index bf04947..d842e8b 100644 --- a/ecp/src/core.h +++ b/ecp/src/core.h @@ -18,19 +18,14 @@ #define ECP_ERR_SIZE -4 #define ECP_ERR_ITER -5 #define ECP_ERR_BUSY -6 +#define ECP_ERR_EMPTY -7 +#define ECP_ERR_FULL -8 #define ECP_ERR_MAX_SOCK_CONN -10 #define ECP_ERR_MAX_CTYPE -11 #define ECP_ERR_MAX_MTYPE -12 -#define ECP_ERR_MIN_PKT -13 -#define ECP_ERR_MAX_PLD -14 -// XXX ??? -#define ECP_ERR_MIN_MSG -15 -#define ECP_ERR_MAX_MSG -16 -// -#define ECP_ERR_NET_ADDR -17 - -#define ECP_ERR_CONN_NOT_FOUND -20 +#define ECP_ERR_NET_ADDR -13 + #define ECP_ERR_ECDH_KEY_DUP -21 #define ECP_ERR_ECDH_IDX -22 #define ECP_ERR_ECDH_IDX_LOCAL -23 @@ -42,7 +37,6 @@ #define ECP_ERR_RECV -29 #define ECP_ERR_SEQ -30 #define ECP_ERR_CLOSED -31 -#define ECP_ERR_HANDLE -32 #define ECP_ERR_NOT_IMPLEMENTED -99 #define ECP_SIZE_PROTO 2 @@ -98,19 +92,19 @@ #define ECP_SEND_FLAG_REPLY 0x01 #define ECP_SEND_FLAG_MORE 0x02 -#define ecp_conn_is_inb(conn) (!((conn)->flags_ro & ECP_CONN_FLAG_OUTB)) -#define ecp_conn_is_outb(conn) ((conn)->flags_ro & ECP_CONN_FLAG_OUTB) -#define ecp_conn_is_new(conn) ((conn)->flags_ro & ECP_CONN_FLAG_NEW) +#define ecp_conn_is_inb(conn) (!((conn)->flags_im & ECP_CONN_FLAG_OUTB)) +#define ecp_conn_is_outb(conn) ((conn)->flags_im & ECP_CONN_FLAG_OUTB) +#define ecp_conn_is_new(conn) ((conn)->flags_im & ECP_CONN_FLAG_NEW) #define ecp_conn_is_reg(conn) ((conn)->flags & ECP_CONN_FLAG_REG) #define ecp_conn_is_open(conn) ((conn)->flags & ECP_CONN_FLAG_OPEN) -#define ecp_conn_set_outb(conn) ((conn)->flags_ro |= ECP_CONN_FLAG_OUTB) -#define ecp_conn_set_new(conn) ((conn)->flags_ro |= ECP_CONN_FLAG_NEW) +#define ecp_conn_set_outb(conn) ((conn)->flags_im |= ECP_CONN_FLAG_OUTB) +#define ecp_conn_set_new(conn) ((conn)->flags_im |= ECP_CONN_FLAG_NEW) #define ecp_conn_set_reg(conn) ((conn)->flags |= ECP_CONN_FLAG_REG) #define ecp_conn_set_open(conn) ((conn)->flags |= ECP_CONN_FLAG_OPEN) -#define ecp_conn_clr_outb(conn) ((conn)->flags_ro &= ~ECP_CONN_FLAG_OUTB) -#define ecp_conn_clr_new(conn) ((conn)->flags_ro &= ~ECP_CONN_FLAG_NEW) +#define ecp_conn_clr_outb(conn) ((conn)->flags_im &= ~ECP_CONN_FLAG_OUTB) +#define ecp_conn_clr_new(conn) ((conn)->flags_im &= ~ECP_CONN_FLAG_NEW) #define ecp_conn_clr_reg(conn) ((conn)->flags &= ~ECP_CONN_FLAG_REG) #define ecp_conn_clr_open(conn) ((conn)->flags &= ~ECP_CONN_FLAG_OPEN) @@ -224,7 +218,7 @@ typedef struct ECPSeqItem { #ifdef ECP_WITH_RBUF unsigned char rb_pass; unsigned char rb_mtype; - unsigned int rb_idx; + unsigned short rb_idx; #endif } ECPSeqItem; @@ -293,7 +287,7 @@ typedef struct ECPSocket { typedef struct ECPConnection { unsigned char type; unsigned char flags; - unsigned char flags_ro; + unsigned char flags_im; unsigned short refcount; ecp_seq_t seq_out; ecp_seq_t seq_in; @@ -347,7 +341,6 @@ void ecp_conn_unregister(ECPConnection *conn, unsigned short *refcount); int ecp_conn_get_dirlist(ECPConnection *conn, ECPNode *node); int ecp_conn_open(ECPConnection *conn, ECPNode *node); -void _ecp_conn_close(ECPConnection *conn); int ecp_conn_close(ECPConnection *conn); int ecp_conn_reset(ECPConnection *conn); void ecp_conn_refcount_inc(ECPConnection *conn); @@ -377,8 +370,8 @@ ssize_t ecp_sock_handle_kget(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *p ssize_t _ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, ECPPktMeta *pkt_meta, unsigned char *payload, size_t pld_size); ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) ; -ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, unsigned char *payload, size_t pld_size, ECPNetAddr *addr); -ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr); +ssize_t _ecp_pack_conn(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si); +ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si); ssize_t ecp_unpack(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP2Buffer *bufs, size_t pkt_size, ECPConnection **_conn, ecp_seq_t *_seq); ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP2Buffer *bufs, size_t pkt_size); @@ -401,7 +394,8 @@ int ecp_pld_get_pts(unsigned char *payload, size_t pld_size, ecp_pts_t *pts); int ecp_pld_set_pts(unsigned char *payload, size_t pld_size, ecp_pts_t pts); unsigned char *ecp_pld_get_buf(unsigned char *payload, size_t pld_size); -ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti); +ssize_t __ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si); +ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti); ssize_t ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags); ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti); ssize_t ecp_pld_send_tr(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECPBuffer *packet, ECPPktMeta *pkt_meta, ECPBuffer *payload, size_t pld_size, unsigned char flags); diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c index 61c7b02..8f81d8a 100644 --- a/ecp/src/msgq.c +++ b/ecp/src/msgq.c @@ -4,9 +4,6 @@ #include -#define MIN(a,b) (((a)<(b))?(a):(b)) -#define MAX(a,b) (((a)>(b))?(a):(b)) - #define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { @@ -25,8 +22,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { int i; int rv; - if (msgq == NULL) return ECP_ERR; - memset(msgq, 0, sizeof(ECPConnMsgQ)); rv = pthread_mutex_init(&msgq->mutex, NULL); @@ -51,8 +46,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { int i; - if (msgq == NULL) return; - for (i=0; icond[i]); } @@ -60,8 +53,6 @@ void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { } int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { - if (msgq == NULL) return ECP_ERR; - msgq->seq_max = seq; msgq->seq_start = seq + 1; @@ -70,13 +61,11 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + ECPConnMsgQ *msgq = &buf->msgq; - mtype &= ECP_MTYPE_MASK; - if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; + if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; @@ -89,13 +78,15 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + ECPConnMsgQ *msgq = &buf->msgq; + ECPRBuffer *rbuf = &buf->rbuf; ecp_seq_t seq; - ecp_seq_t seq_offset; - unsigned int idx; + unsigned char *msg_buf; + unsigned char *content; + unsigned short idx; + int _rv; ssize_t rv; - if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { @@ -103,35 +94,48 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); } else { struct timespec ts; - int _rv; _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); if (_rv) return ECP_ERR_TIMEOUT; } } - seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; - seq_offset = seq - buf->rbuf.seq_start; - idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); - msgq->idx_r[mtype]++; - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (_rv) return ECP_ERR; + msg_buf = rbuf->arr.msg[idx].buf; + rv = rbuf->arr.msg[idx].size; + + content = ecp_msg_get_content(msg_buf, rv); + if (content == NULL) { + rv = ECP_ERR; + goto msgq_pop_fin; + } + + rv -= content - msg_buf; + if (msg_size < rv) { + rv = ECP_ERR_FULL; + goto msgq_pop_fin; + } + + memcpy(msg, content, rv); + + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.msg[idx].flags == 0); + +msgq_pop_fin: + msgq->idx_r[mtype]++; if (msgq->seq_start == seq) { - int i, _idx = idx; - ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; + int i; + unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; for (i=0; irbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; - _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); + if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } msgq->seq_start += i; } - rv = buf->rbuf.msg[idx].size - 1; - if (rv >= 0) { - rv = MIN(msg_size, rv); - memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); - } return rv; } diff --git a/ecp/src/msgq.h b/ecp/src/msgq.h index a81ff36..394209f 100644 --- a/ecp/src/msgq.h +++ b/ecp/src/msgq.h @@ -1,7 +1,6 @@ #ifdef ECP_WITH_MSGQ #define ECP_MSGQ_MAX_MSG 32 -#define ECP_MSGQ_ERR_MAX_MSG -110 typedef struct ECPConnMsgQ { unsigned short idx_w[ECP_MAX_MTYPE]; diff --git a/ecp/src/rbuf.c b/ecp/src/rbuf.c index 9680d14..be8e9f3 100644 --- a/ecp/src/rbuf.c +++ b/ecp/src/rbuf.c @@ -1,18 +1,5 @@ #include "core.h" -int _ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) { - rbuf->msg = msg; - if (msg_size) { - if (msg == NULL) return ECP_ERR; - rbuf->msg_size = msg_size; - memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size); - } else { - rbuf->msg_size = ECP_SEQ_HALF; - } - - return ECP_OK; -} - int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) { rbuf->seq_max = seq; rbuf->seq_start = seq + 1; @@ -20,29 +7,17 @@ int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) { return ECP_OK; } -int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) { ecp_seq_t seq_offset = seq - rbuf->seq_start; - // This also checks for seq_start <= seq if seq type range >> rbuf->msg_size - if (seq_offset < rbuf->msg_size) return ECP_RBUF_IDX_MASK(rbuf->msg_start + seq_offset, rbuf->msg_size); - return ECP_ERR_RBUF_FULL; -} + /* This also checks for seq_start <= seq if seq type range >> rbuf->arr_size */ + if (seq_offset >= rbuf->arr_size) return ECP_ERR_FULL; -ssize_t _ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { - idx = idx < 0 ? _ecp_rbuf_msg_idx(rbuf, seq) : idx; - if (idx < 0) return idx; - - if (rbuf->msg == NULL) return 0; - if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP; - - if (msg_size) memcpy(rbuf->msg[idx].msg, msg, msg_size); - rbuf->msg[idx].size = msg_size; - rbuf->msg[idx].flags = set_flags; - - return msg_size; + *idx = ECP_RBUF_IDX_MASK(rbuf->idx_start + seq_offset, rbuf->arr_size); + return ECP_OK; } -int ecp_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) { +int ecp_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBPacket *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) { int rv; if (buf_s) { @@ -72,39 +47,16 @@ void ecp_rbuf_destroy(ECPConnection *conn) { ecp_rbuf_recv_destroy(conn); } -ssize_t ecp_rbuf_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq) { - ECPSocket *sock = conn->sock; - ECPContext *ctx = sock->ctx; - ECPNetAddr addr; - ECPSeqItem seq_item; - ssize_t rv; - int _rv; - - _rv = ecp_seq_item_init(&seq_item); - if (_rv) return _rv; - - seq_item.seq = seq; - seq_item.seq_w = 1; - seq_item.rb_pass = 1; - - rv = ecp_pack_conn(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &seq_item, payload, pld_size, &addr); - if (rv < 0) return rv; - - rv = ecp_pkt_send(sock, &addr, packet, rv, flags); - if (rv < 0) return rv; - - return rv; -} - int ecp_rbuf_handle_seq(ECPConnection *conn, unsigned char mtype) { if (conn->rbuf.recv || (mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return 1; return 0; } int ecp_rbuf_set_seq(ECPConnection *conn, ECPSeqItem *si, unsigned char *payload, size_t pld_size) { - ECPRBSend *buf; + ECPRBSend *buf = conn->rbuf.send; + ECPRBuffer *rbuf = &buf->rbuf; unsigned char mtype; - int idx; + unsigned short idx; int rv; if (si->rb_pass) return ECP_OK; @@ -113,86 +65,142 @@ int ecp_rbuf_set_seq(ECPConnection *conn, ECPSeqItem *si, unsigned char *payload #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif - idx = _ecp_rbuf_msg_idx(&buf->rbuf, si->seq); + rv = _ecp_rbuf_msg_idx(rbuf, si->seq, &idx); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); #endif - - if (idx < 0) return idx; + if (rv) return rv; rv = ecp_pld_get_type(payload, pld_size, &mtype); if (rv) return rv; si->rb_mtype = mtype; si->rb_idx = idx; - buf->rbuf.msg[idx].size = 0; - buf->rbuf.msg[idx].flags = 0; + rbuf->arr.pkt[idx].size = 0; + rbuf->arr.pkt[idx].flags = 0; return ECP_OK; } +ssize_t ecp_rbuf_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq) { + ECPSocket *sock = conn->sock; + ECPContext *ctx = sock->ctx; + ECPNetAddr addr; + ECPSeqItem seq_item; + int _rv; + ssize_t rv; + + _rv = ecp_seq_item_init(&seq_item); + if (_rv) return _rv; + + seq_item.seq = seq; + seq_item.seq_w = 1; + seq_item.rb_pass = 1; + + rv = ecp_pack_conn(conn, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, pld_size, &addr, &seq_item); + if (rv < 0) return rv; + + rv = ecp_pkt_send(sock, &addr, packet, rv, flags); + if (rv < 0) return rv; + + return rv; +} + ssize_t ecp_rbuf_pkt_send(ECPConnection *conn, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) { - ECPRBSend *buf; - int do_send = 1; + ECPRBSend *buf = conn->rbuf.send; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq = si->seq; + unsigned short idx = si->rb_idx; + unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; + unsigned char _flags; + int rb_rel; + int rb_cc; + int do_send; + int do_skip; + int _rv = ECP_OK; ssize_t rv = 0; - buf = conn->rbuf.send; - if (!si->rb_pass) { - unsigned char flags = 0; - ecp_seq_t seq = si->seq; - unsigned int idx = si->rb_idx; - unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; + if (pkt_size == 0) return ECP_ERR; + + do_send = 1; + do_skip = ecp_rbuf_skip(mtype); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + rb_rel = (buf->flags & ECP_RBUF_FLAG_RELIABLE); + rb_cc = ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && (buf->cnt_cc || (buf->in_transit >= buf->win_size))); + + if (rb_rel || rb_cc) { + ECPRBTimer *rb_timer = NULL; + ECPRBTimerItem *rb_ti = NULL; - if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; + _flags = ECP_RBUF_FLAG_IN_RBUF; + if (do_skip) { + _flags |= ECP_RBUF_FLAG_SKIP; + } else { + do_send = 0; + } - rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet->buffer, pkt_size, 0, flags); - if (rv < 0) return rv; + if (rbuf->arr.pkt[idx].flags) _rv = ECP_ERR_RBUF_DUP; - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { - int _rv = ECP_OK; + if (!_rv && !do_send && ti) { + rb_timer = buf->timer; #ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); + pthread_mutex_lock(&rb_timer->mutex); #endif - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - - if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { - if (!buf->cnt_cc) buf->seq_cc = seq; - buf->cnt_cc++; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; - do_send = 0; - if (ti) { - ECPRBTimer *timer = &buf->timer; - ECPRBTimerItem *item = &timer->item[timer->idx_w]; - - if (!item->occupied) { - item->occupied = 1; - item->item = *ti; - buf->rbuf.msg[idx].idx_t = timer->idx_w; - timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; - } else { - _rv = ECP_ERR_MAX_TIMER; - } - } else { - buf->rbuf.msg[idx].idx_t = -1; - } + rb_ti = &rb_timer->item[rb_timer->idx_w]; + if (rb_ti->empty) { + rb_ti->empty = 0; + rb_ti->item = *ti; + rb_timer->idx_w = (rb_timer->idx_w + 1) % ECP_MAX_TIMER; } else { - buf->in_transit++; + _rv = ECP_ERR_MAX_TIMER; } #ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); + pthread_mutex_unlock(&rb_timer->mutex); #endif + } - if (_rv) return _rv; + if (_rv) { + rv = _rv; + goto pkt_send_fin; + } + + if (!do_send) { + memcpy(rbuf->arr.pkt[idx].buf, packet->buffer, pkt_size); + rbuf->arr.pkt[idx].size = pkt_size; + rbuf->arr.pkt[idx].timer = rb_ti; + rv = pkt_size; + } + rbuf->arr.pkt[idx].flags = _flags; + + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + if (rb_cc && !do_send) { + if (buf->cnt_cc == 0) buf->seq_cc = seq; + buf->cnt_cc++; } } + if ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && !do_skip && do_send) { + buf->in_transit++; + } + +pkt_send_fin: + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (rv < 0) return rv; + if (do_send) { if (ti) { - int _rv; - _rv = ecp_timer_push(ti); if (_rv) return _rv; } diff --git a/ecp/src/rbuf.h b/ecp/src/rbuf.h index 891f29d..31aeb39 100644 --- a/ecp/src/rbuf.h +++ b/ecp/src/rbuf.h @@ -1,7 +1,7 @@ #define ECP_RBUF_FLAG_IN_RBUF 0x01 #define ECP_RBUF_FLAG_IN_MSGQ 0x02 -#define ECP_RBUF_FLAG_IN_CCONTROL 0x04 -#define ECP_RBUF_FLAG_SYS 0x80 +#define ECP_RBUF_FLAG_IN_TIMER 0x04 +#define ECP_RBUF_FLAG_SKIP 0x08 #define ECP_RBUF_FLAG_CCONTROL 0x01 #define ECP_RBUF_FLAG_RELIABLE 0x02 @@ -9,58 +9,66 @@ #define ECP_MTYPE_RBACK 0x04 #define ECP_MTYPE_RBFLUSH 0x05 -#define ECP_MTYPE_RBFLUSH_PTS 0x06 +#define ECP_MTYPE_RBTIMER 0x06 #define ECP_MTYPE_NOP 0x07 #define ECP_ERR_RBUF_DUP -100 -#define ECP_ERR_RBUF_FULL -101 typedef uint32_t ecp_win_t; /* size must be power of 2 */ #define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) +#define ecp_rbuf_skip(mtype) ((mtype & ECP_MTYPE_MASK) < ECP_MTYPE_NOP ? 1 : 0) #ifdef ECP_WITH_MSGQ #include "msgq.h" #endif typedef struct ECPRBTimerItem { - unsigned char occupied; ECPTimerItem item; + unsigned char empty; } ECPRBTimerItem; typedef struct ECPRBTimer { ECPRBTimerItem item[ECP_MAX_TIMER]; unsigned short idx_w; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif } ECPRBTimer; typedef struct ECPRBMessage { - unsigned char msg[ECP_MAX_PKT]; - ssize_t size; + unsigned char buf[ECP_MAX_MSG]; + size_t size; unsigned char flags; - short idx_t; } ECPRBMessage; +typedef struct ECPRBPacket { + unsigned char buf[ECP_MAX_PKT]; + size_t size; + unsigned char flags; + ECPRBTimerItem *timer; +} ECPRBPacket; + typedef struct ECPRBuffer { ecp_seq_t seq_start; ecp_seq_t seq_max; - unsigned int msg_size; - unsigned int msg_start; - ECPRBMessage *msg; + unsigned short arr_size; + unsigned short idx_start; + union { + ECPRBMessage *msg; + ECPRBPacket *pkt; + } arr; } ECPRBuffer; typedef struct ECPRBRecv { unsigned char flags; - unsigned char timer_pts; - unsigned char ack_do; + ecp_cts_t deliver_delay; unsigned short hole_max; unsigned short ack_rate; - ecp_pts_t deliver_delay; + unsigned short ack_pkt; ecp_seq_t seq_ack; - ecp_seq_t ack_pkt; ecp_ack_t ack_map; - ecp_ack_t hole_mask_full; - ecp_ack_t hole_mask_empty; ECPRBuffer rbuf; #ifdef ECP_WITH_MSGQ ECPConnMsgQ msgq; @@ -70,15 +78,16 @@ typedef struct ECPRBRecv { typedef struct ECPRBSend { unsigned char flags; - unsigned char flush; ecp_win_t win_size; ecp_win_t in_transit; ecp_win_t cnt_cc; ecp_seq_t seq_cc; ecp_seq_t seq_flush; + ecp_seq_t seq_nack; + unsigned char flush; unsigned int nack_rate; ECPRBuffer rbuf; - ECPRBTimer timer; + ECPRBTimer *timer; #ifdef ECP_WITH_PTHREAD pthread_mutex_t mutex; #endif @@ -89,28 +98,26 @@ typedef struct ECPConnRBuffer { ECPRBSend *send; } ECPConnRBuffer; -int _ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size); int _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); -int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq); -ssize_t _ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx); -int ecp_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size); +int ecp_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBPacket *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size); void ecp_rbuf_destroy(struct ECPConnection *conn); ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, struct ECPBuffer *packet, struct ECPBuffer *payload, size_t pld_size, unsigned char flags, ecp_seq_t seq); int ecp_rbuf_handle_seq(struct ECPConnection *conn, unsigned char mtype); int ecp_rbuf_set_seq(struct ECPConnection *conn, struct ECPSeqItem *si, unsigned char *payload, size_t pld_size); ssize_t ecp_rbuf_pkt_send(struct ECPConnection *conn, struct ECPSocket *sock, ECPNetAddr *addr, struct ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, struct ECPSeqItem *si); -int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size); void ecp_rbuf_recv_destroy(struct ECPConnection *conn); int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_rbuf_set_hole(struct ECPConnection *conn, unsigned short hole_max); -int ecp_rbuf_set_delay(struct ECPConnection *conn, ecp_pts_t delay); +int ecp_rbuf_set_delay(struct ECPConnection *conn, ecp_cts_t delay); ssize_t ecp_rbuf_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b); struct ECPFragIter *ecp_rbuf_get_frag_iter(struct ECPConnection *conn); -int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBPacket *msg, unsigned short msg_size); void ecp_rbuf_send_destroy(struct ECPConnection *conn); int ecp_rbuf_send_start(struct ECPConnection *conn); int ecp_rbuf_flush(struct ECPConnection *conn); @@ -118,4 +125,4 @@ int ecp_rbuf_set_wsize(struct ECPConnection *conn, ecp_win_t size); ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b); ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b); -ssize_t ecp_rbuf_handle_flush_pts(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b); +ssize_t ecp_rbuf_handle_timer(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, struct ECP2Buffer *b); diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c index f2bb1c2..98b472c 100644 --- a/ecp/src/rbuf_recv.c +++ b/ecp/src/rbuf_recv.c @@ -5,130 +5,115 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) -static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { +static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; - unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype; - ssize_t rv; - int _rv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + unsigned char flags; + int skip; + int rv; - _rv = ecp_msg_get_type(msg, msg_size, &mtype); - if (_rv) return _rv; + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return rv; - mtype &= ECP_MTYPE_MASK; - if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; + if (rbuf->arr.msg[idx].flags) return ECP_ERR_RBUF_DUP; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { ecp_seq_t seq_offset; - int _rv = ECP_OK; pthread_mutex_lock(&buf->msgq.mutex); + seq_offset = seq - buf->msgq.seq_start; - if (seq_offset >= buf->rbuf.msg_size) _rv = ECP_ERR_RBUF_FULL; + if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL; + pthread_mutex_unlock(&buf->msgq.mutex); - if (_rv) return _rv; + if (rv) return rv; } #endif - rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); - if (rv < 0) return rv; + skip = ecp_rbuf_skip(mtype); + flags = ECP_RBUF_FLAG_IN_RBUF; + if (skip) flags |= ECP_RBUF_FLAG_SKIP; + rbuf->arr.msg[idx].flags = flags; - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b); + if (skip) return 0; - return rv; + memcpy(rbuf->arr.msg[idx].buf, msg, msg_size); + rbuf->arr.msg[idx].size = msg_size; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + return msg_size; } static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq; + unsigned short idx; + int i; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); #endif - ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; - ecp_seq_t seq_next = buf->rbuf.seq_start; - ecp_seq_t i = 0; - unsigned int idx = buf->rbuf.msg_start; + seq = rbuf->seq_start; + idx = rbuf->idx_start; - if (buf->timer_pts) { - ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); - buf->timer_pts = 0; - } + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; - for (i=0; irbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; - } else { + while (ECP_SEQ_LTE(seq, rbuf->seq_max)) { + if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_SKIP)) { ecp_pts_t msg_pts; - ecp_seq_t seq = buf->rbuf.seq_start + i; - unsigned char frag_tot; - unsigned char frag_cnt; - uint16_t frag_size; int rv; - rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size); - if (!rv && (frag_cnt != 0) && (seq != seq_next)) { - ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); - - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { - ecp_seq_t seq_fbeg = seq - frag_cnt; - ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; - - i -= seq_offset; - idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); - break; - } - } + rv = ecp_msg_get_pts(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &msg_pts); + if (!rv && buf->deliver_delay) { + ecp_cts_t now = ecp_tm_abstime_ms(0); - rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts); - if (!rv) { - ecp_pts_t now = ecp_tm_abstime_ms(0); + msg_pts += buf->deliver_delay; if (ECP_PTS_LT(now, msg_pts)) { - ECPTimerItem ti; - ecp_seq_t seq_offset = seq - seq_next; + if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) { + ECPTimerItem ti; - rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now); - if (!rv) rv = ecp_timer_push(&ti); - if (!rv) buf->timer_pts = 1; - - i -= seq_offset; - idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBTIMER, 0, msg_pts - now); + if (!rv) rv = ecp_timer_push(&ti); + if (!rv) rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_TIMER; + } break; + } else if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER) { + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER; } } - seq_next = seq + 1; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { unsigned char mtype; - rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype); - if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype); + rv = ecp_msg_get_type(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &mtype); + if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK); if (rv) break; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; } else - #endif - ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b); + ecp_conn_handle_msg(conn, seq, rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, b); + } else { + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_SKIP; } - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + // if (rbuf->arr.msg[idx].flags == 0); } else { if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; - if (buf->hole_max) { - ecp_seq_t seq = buf->rbuf.seq_start + i; - ecp_seq_t seq_offset = buf->rbuf.seq_max - seq; - if (seq_offset <= buf->hole_max) break; - } + if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; } - idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); + seq++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = idx; + rbuf->seq_start = seq; + rbuf->idx_start = idx; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); @@ -136,7 +121,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { } -static int ack_send(ECPConnection *conn) { +static int ack_send(ECPConnection *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) { ECPRBRecv *buf = conn->rbuf.recv; ECPBuffer packet; ECPBuffer payload; @@ -152,56 +137,53 @@ static int ack_send(ECPConnection *conn) { ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); _buf = ecp_pld_get_buf(payload.buffer, payload.size); - _buf[0] = (buf->seq_ack & 0xFF000000) >> 24; - _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16; - _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8; - _buf[3] = (buf->seq_ack & 0x000000FF); - _buf[4] = (buf->ack_map & 0xFF000000) >> 24; - _buf[5] = (buf->ack_map & 0x00FF0000) >> 16; - _buf[6] = (buf->ack_map & 0x0000FF00) >> 8; - _buf[7] = (buf->ack_map & 0x000000FF); - - rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0); + _buf[0] = (seq_ack & 0xFF000000) >> 24; + _buf[1] = (seq_ack & 0x00FF0000) >> 16; + _buf[2] = (seq_ack & 0x0000FF00) >> 8; + _buf[3] = (seq_ack & 0x000000FF); + _buf[4] = (ack_map & 0xFF000000) >> 24; + _buf[5] = (ack_map & 0x00FF0000) >> 16; + _buf[6] = (ack_map & 0x0000FF00) >> 8; + _buf[7] = (ack_map & 0x000000FF); + + rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); if (rv < 0) return rv; buf->ack_pkt = 0; - buf->ack_do = 0; + return ECP_OK; } static int ack_shift(ECPRBRecv *buf) { + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; int do_ack = 0; int in_rbuf = 0; - int idx; - int i; + int rv; if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; - while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) { + /* walks through messages that are not delivered yet, so no need for msgq mutex lock */ + while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { buf->seq_ack++; - in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF; - + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx); + if (!rv) { + in_rbuf = rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF; + } else { + in_rbuf = 1; + } if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; if (in_rbuf) { buf->ack_map |= 1; - } else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) { + } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) { do_ack = 1; } - if ((buf->ack_map & ACK_MASK_FIRST) == 0) break; - } - - if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) { - ecp_ack_t hole_mask = buf->ack_map; - - for (i=0; ihole_max-1; i++) { - hole_mask = hole_mask >> 1; - if ((hole_mask & buf->hole_mask_empty) == 0) { - do_ack = 1; - break; - } + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) { + do_ack = 1; + break; } } @@ -209,34 +191,38 @@ static int ack_shift(ECPRBRecv *buf) { } ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { - if (size < 0) return size; - ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; + if (size < 0) return size; ecp_tr_release(b->packet, 1); - ack_send(conn); + ack_send(conn, buf->seq_ack, buf->ack_map); return 0; } -ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { +ssize_t ecp_rbuf_handle_timer(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; - buf->timer_pts = 0; msg_flush(conn, b); return 0; } -int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size) { + ECPRBuffer *rbuf = &buf->rbuf; int rv; + if (msg == NULL) return ECP_ERR; + memset(buf, 0, sizeof(ECPRBRecv)); - rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size); - if (rv) return rv; + memset(msg, 0, sizeof(ECPRBMessage) * msg_size); buf->ack_map = ECP_ACK_FULL; buf->ack_rate = ACK_RATE; + rbuf->arr.msg = msg; + rbuf->arr_size = msg_size; #ifdef ECP_WITH_MSGQ rv = ecp_conn_msgq_create(&buf->msgq); @@ -251,22 +237,23 @@ void ecp_rbuf_recv_destroy(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return; + #ifdef ECP_WITH_MSGQ ecp_conn_msgq_destroy(&buf->msgq); #endif - conn->rbuf.recv = NULL; } int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { - int rv; ECPRBRecv *buf = conn->rbuf.recv; + ECPRBuffer *rbuf = &buf->rbuf; + int rv; if (buf == NULL) return ECP_ERR; seq--; buf->seq_ack = seq; - rv = _ecp_rbuf_start(&buf->rbuf, seq); + rv = _ecp_rbuf_start(rbuf, seq); if (rv) return rv; #ifdef ECP_WITH_MSGQ @@ -283,13 +270,11 @@ int ecp_rbuf_set_hole(ECPConnection *conn, unsigned short hole_max) { ECPRBRecv *buf = conn->rbuf.recv; buf->hole_max = hole_max; - buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2)); - buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1)); return ECP_OK; } -int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) { +int ecp_rbuf_set_delay(ECPConnection *conn, ecp_cts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; @@ -299,69 +284,83 @@ int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) { ssize_t ecp_rbuf_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; - ecp_seq_t ack_pkt = 0; - ssize_t rv; - int _rv; + ECPRBuffer *rbuf = &buf->rbuf; unsigned char mtype; + unsigned short ack_pkt = 0; + int do_ack = 0; + int _rv; + ssize_t rv; _rv = ecp_msg_get_type(msg, msg_size, &mtype); if (_rv) return _rv; - mtype &= ECP_MTYPE_MASK; - if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b); - - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) { - ack_pkt = seq - buf->rbuf.seq_max; - buf->ack_pkt += ack_pkt; - if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) { + ack_pkt = seq - rbuf->seq_max; } if (ECP_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_SIZE_ACKB) { - ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - - if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; + ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset); - buf->ack_map |= ack_mask; - buf->ack_do = buf->ack_do || ack_shift(buf); + if (ack_bit & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; + + buf->ack_map |= ack_bit; + do_ack = ack_shift(buf); } else { return ECP_ERR_RBUF_DUP; } } else { - if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; + + if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; } else { - ecp_conn_handle_msg(conn, seq, msg, msg_size, b); - rv = msg_size; - buf->rbuf.seq_max++; - buf->rbuf.seq_start++; - buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); + /* receive buffer is empty, so no need for msgq mutex lock */ + rv = 0; + rbuf->seq_max++; + rbuf->seq_start++; + rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size); } buf->seq_ack++; } else { - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; - buf->ack_do = buf->ack_do || ack_shift(buf); + do_ack = ack_shift(buf); } } msg_flush(conn, b); - if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) { - int _rv; - - _rv = ack_send(conn); + if (ack_pkt) { + buf->ack_pkt += ack_pkt; + if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1; + } + if (do_ack) { + ecp_seq_t seq_ack = buf->seq_ack; + ecp_seq_t ack_map = buf->ack_map; + + /* account for missing mackets within hole_max range */ + if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) { + unsigned short h_bits = buf->hole_max + 1; + ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits); + + if ((ack_map & h_mask) != h_mask) { + h_mask = ~(~((ecp_seq_t)0) >> h_bits); + seq_ack -= h_bits; + ack_map = (ack_map >> h_bits) | h_mask; + } + } + _rv = ack_send(conn, seq_ack, ack_map); if (_rv) return _rv; } return rv; } - ECPFragIter *ecp_rbuf_get_frag_iter(ECPConnection *conn) { if (conn->rbuf.recv) return conn->rbuf.recv->frag_iter; return NULL; diff --git a/ecp/src/rbuf_send.c b/ecp/src/rbuf_send.c index faf2a7d..9f29010 100644 --- a/ecp/src/rbuf_send.c +++ b/ecp/src/rbuf_send.c @@ -1,7 +1,10 @@ #include "core.h" #include "tr.h" -#define NACK_RATE_UNIT 10000 +#define NACK_RATE_UNIT 10000 + +#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { ECPBuffer packet; @@ -26,76 +29,83 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { rv = ecp_timer_push(&_ti); if (rv) return rv; } - return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, 0); + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0); } -static void cc_flush(ECPConnection *conn) { +static void cc_flush(ECPConnection *conn, unsigned char flags) { ECPRBSend *buf = conn->rbuf.send; ECPRBuffer *rbuf = &buf->rbuf; - ecp_seq_t pkt_buf_cnt = rbuf->seq_max - rbuf->seq_start + 1; - ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0; - int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt; - int i; + unsigned short idx; + int rv; - ECPTimerItem ti[ECP_MAX_TIMER]; - unsigned short max_t = 0; + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx); + if (rv) return; - if (pkt_to_send) { - unsigned int idx = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc); - unsigned int _idx = idx; + while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) { + ECPRBTimerItem *ti; + ECPBuffer packet; - for (i=0; imsg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break; - rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL; - if (rbuf->msg[_idx].idx_t != -1) { - ECPRBTimer *timer = &buf->timer; - ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t]; - - item->occupied = 0; - ti[max_t] = item->item; - rbuf->msg[_idx].idx_t = max_t; - max_t++; - } - _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); - } - pkt_to_send = i; - _idx = idx; + if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break; + if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { #ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); + pthread_mutex_unlock(&buf->mutex); #endif - for (i=0; iarr.pkt[idx].timer; + if (ti) ecp_timer_push(&ti->item); + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, flags); - if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]); - packet.buffer = rbuf->msg[_idx].msg; - packet.size = rbuf->msg[_idx].size; - ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); - _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); - } +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + if (ti) { #ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); + pthread_mutex_lock(&buf->timer->mutex); #endif + ti->empty = 1; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->timer->mutex); +#endif + } - buf->in_transit += (ecp_win_t)pkt_to_send; - buf->cnt_cc -= (ecp_win_t)pkt_to_send; - buf->seq_cc += (ecp_seq_t)pkt_to_send; + buf->cnt_cc--; + buf->in_transit++; + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + } + buf->seq_cc++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->seq_start = buf->seq_cc; + rbuf->idx_start = idx; } } ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBSend *buf; + ECPRBuffer *rbuf; ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; ecp_ack_t ack_map = 0; - int i; + ecp_seq_t seq_start; + ecp_seq_t seq_max; + unsigned short idx; + unsigned short msg_cnt; int do_flush = 0; - int rv = ECP_OK; + int i; + int rv; buf = conn->rbuf.send; if (buf == NULL) return size; + + rbuf = &buf->rbuf; if (size < 0) return size; if (size < rsize) return ECP_ERR; @@ -111,89 +121,108 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt (msg[7]); ecp_tr_release(b->packet, 1); - ecp_tr_flag_set(ECP_SEND_FLAG_MORE); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif - ECPRBuffer *rbuf = &buf->rbuf; - int idx = _ecp_rbuf_msg_idx(rbuf, seq_ack); - if (idx < 0) rv = idx; + seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max); + if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR; - if (!rv) { - seq_ack++; - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } - if (ack_map != ECP_ACK_FULL) { - int nack_first = 0; - unsigned int msg_start; - ecp_seq_t seq_start; - ecp_win_t nack_cnt = 0; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit = seq_max - seq_ack; + } - seq_ack -= ECP_SIZE_ACKB; + if (ack_map != ECP_ACK_FULL) { + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + unsigned short nack_cnt = 0; + int nack_first = 0; + + seq_ack -= (ECP_SIZE_ACKB - 1); + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } #ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); + pthread_mutex_unlock(&buf->mutex); #endif - for (i=0; iseq_nack, seq_ack)) { nack_cnt++; - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size); - if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)]; - - packet.buffer = pkt_buf; - packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn); - payload.buffer = pld_buf; - payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP); - ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), 0, seq_ack + i); - } else { - ECPBuffer packet; - packet.buffer = rbuf->msg[_idx].msg; - packet.size = rbuf->msg[_idx].size; - ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); - } - if (!nack_first) { - nack_first = 1; - seq_start = seq_ack + i; - msg_start = _idx; + buf->seq_nack = seq_ack; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)]; + + packet.buffer = pkt_buf; + packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn); + payload.buffer = pld_buf; + payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP); + ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), ECP_SEND_FLAG_MORE, seq_ack); + } else { + ECPBuffer packet; + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE); + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit++; } } + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack; + } } } + seq_ack++; + ack_mask = ack_mask >> 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } #ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); + pthread_mutex_lock(&buf->mutex); #endif - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt; - buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - rbuf->seq_start = seq_start; - rbuf->msg_start = msg_start; - } else { - rbuf->seq_start = seq_ack + ECP_SIZE_ACKB; - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); - } - } else { - buf->nack_rate = (buf->nack_rate * 7) / 8; - rbuf->seq_start = seq_ack; - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); - } - if (buf->flush) { - if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; - if (buf->flush) do_flush = 1; + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; + } else { + buf->nack_rate = (buf->nack_rate * 7) / 8; + seq_start = seq_ack + 1; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + msg_cnt = seq_start - rbuf->seq_start; + idx = rbuf->idx_start; + for (i=0; iarr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } - if (buf->cnt_cc) cc_flush(conn); + rbuf->seq_start = seq_start; + rbuf->idx_start = idx; } + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) do_flush = 1; + } + if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE); + +handle_ack_fin: #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); @@ -204,21 +233,27 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt _rv = flush_send(conn, NULL); if (_rv < 0) rv = _rv; + } else { + // ecp_tr_nomore(); } - ecp_tr_flag_clear(ECP_SEND_FLAG_MORE); ecp_tr_release(b->packet, 0); if (rv) return rv; return rsize; } -int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { + ECPRBuffer *rbuf = &buf->rbuf; int rv; + if (pkt == NULL) return ECP_ERR; + memset(buf, 0, sizeof(ECPRBRecv)); - rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size); - if (rv) return rv; + memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); + + rbuf->arr.pkt = pkt; + rbuf->arr_size = pkt_size; #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&buf->mutex, NULL); @@ -243,10 +278,12 @@ void ecp_rbuf_send_destroy(ECPConnection *conn) { int ecp_rbuf_send_start(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; + ECPRBuffer *rbuf = &buf->rbuf; if (buf == NULL) return ECP_ERR; - return _ecp_rbuf_start(&buf->rbuf, conn->seq_out); + buf->seq_nack = conn->seq_out; + return _ecp_rbuf_start(rbuf, conn->seq_out); } int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) { @@ -259,7 +296,7 @@ int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) { #endif buf->win_size = size; - if (buf->cnt_cc) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn, 0); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); diff --git a/ecp/src/vconn/vconn.c b/ecp/src/vconn/vconn.c index d60f5f2..d3eec44 100644 --- a/ecp/src/vconn/vconn.c +++ b/ecp/src/vconn/vconn.c @@ -64,7 +64,7 @@ static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) { payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn); ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ); - return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); + return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); } static ssize_t vconn_send_open(ECPConnection *conn) { @@ -188,7 +188,7 @@ static ssize_t vconn_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c if (b == NULL) return ECP_ERR; if (size < 0) return size; - if (size < ECP_MIN_PKT) return ECP_ERR_MIN_PKT; + if (size < ECP_MIN_PKT) return ECP_ERR; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&key_perma_mutex); @@ -337,7 +337,7 @@ static ssize_t vlink_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c if (b == NULL) return ECP_ERR; if (size < 0) return size; - if (size < ECP_MIN_PKT) return ECP_ERR_MIN_PKT; + if (size < ECP_MIN_PKT) return ECP_ERR; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&key_next_mutex); @@ -456,7 +456,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP if (payload->size < rv+hdr_size) return ECP_ERR; memcpy(payload->buffer, packet->buffer, rv+hdr_size); - return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, rv+hdr_size, addr); + return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, rv+hdr_size, addr, NULL); } else { return _ecp_pack(ctx, packet->buffer, packet->size, pkt_meta, payload->buffer, pld_size); } @@ -487,7 +487,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP rv = _ecp_pack(ctx, _payload.buffer+hdr_size, _payload.size-hdr_size, pkt_meta, payload->buffer, pld_size); if (rv < 0) return rv; - return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, &_payload, rv+hdr_size, addr); + return ecp_pack_conn(parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &_payload, rv+hdr_size, addr, NULL); } else { return _ecp_pack(ctx, packet->buffer, packet->size, pkt_meta, payload->buffer, pld_size); } @@ -498,7 +498,7 @@ ssize_t ecp_pack(ECPContext *ctx, ECPConnection *parent, ECPBuffer *packet, ECPP #ifdef ECP_MEM_TINY /* Memory limited version */ -ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) { +ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) { if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR; if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR; @@ -513,21 +513,21 @@ ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_id hdr_size = vconn_set_msg(conn->parent, packet, mtype); if (hdr_size < 0) return hdr_size; - rv = _ecp_pack_conn(conn, packet->buffer+hdr_size, packet->size-hdr_size, s_idx, c_idx, si, payload->buffer, pld_size, NULL); + rv = _ecp_pack_conn(conn, packet->buffer+hdr_size, packet->size-hdr_size, s_idx, c_idx, payload->buffer, pld_size, NULL, si); if (rv < 0) return rv; if (payload->size < rv+hdr_size) return ECP_ERR; memcpy(payload->buffer, packet->buffer, rv+hdr_size); - return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, payload, rv+hdr_size, addr); + return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, rv+hdr_size, addr, NULL); } else { - return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr); + return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si); } } #else -ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPSeqItem *si, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr) { +ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, ECPBuffer *payload, size_t pld_size, ECPNetAddr *addr, ECPSeqItem *si) { if ((packet == NULL) || (packet->buffer == NULL)) return ECP_ERR; if ((payload == NULL) || (payload->buffer == NULL)) return ECP_ERR; @@ -547,12 +547,12 @@ ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_id hdr_size = vconn_set_msg(conn->parent, &_payload, mtype); if (hdr_size < 0) return hdr_size; - rv = _ecp_pack_conn(conn, _payload.buffer+hdr_size, _payload.size-hdr_size, s_idx, c_idx, si, payload->buffer, pld_size, NULL); + rv = _ecp_pack_conn(conn, _payload.buffer+hdr_size, _payload.size-hdr_size, s_idx, c_idx, payload->buffer, pld_size, NULL, si); if (rv < 0) return rv; - return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, NULL, &_payload, rv+hdr_size, addr); + return ecp_pack_conn(conn->parent, packet, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, &_payload, rv+hdr_size, addr, NULL); } else { - return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, si, payload->buffer, pld_size, addr); + return _ecp_pack_conn(conn, packet->buffer, packet->size, s_idx, c_idx, payload->buffer, pld_size, addr, si); } } @@ -662,7 +662,7 @@ static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_KGET_REQ, conn); ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_KGET_REQ); - return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, NULL, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); + return _ecp_pld_send(conn, &packet, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_KGET_REQ), 0, ti); } int ecp_vconn_open(ECPConnection *conn, ECPNode *conn_node, ECPVConnOut vconn[], ECPNode vconn_node[], int size) { diff --git a/ecp/test/frag.c b/ecp/test/frag.c index 73e45cb..70b20de 100644 --- a/ecp/test/frag.c +++ b/ecp/test/frag.c @@ -46,7 +46,7 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne printf("MSG C:%s size:%ld\n", p, s); ECPRBuffer *rbuf = &conn->rbuf.recv->rbuf; - printf("RBUF: %d %d %d %d\n", rbuf->seq_start, rbuf->seq_max, rbuf->msg_start, rbuf->msg_size); + printf("RBUF: %d %d %d %d\n", rbuf->seq_start, rbuf->seq_max, rbuf->idx_start, rbuf->arr_size); return s; } -- cgit v1.2.3