From cbba099541d27400ad45083a4b1102b86f9e8dea Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Thu, 10 Aug 2017 21:02:16 +0200 Subject: rbuffer almost implemented --- code/core/config.h | 2 +- code/core/core.c | 74 ++++++++++++------ code/core/core.h | 27 +++---- code/core/htable/htable.c | 9 +-- code/core/msgq.c | 2 +- code/core/posix/transport.c | 2 +- code/core/rbuf.c | 49 ++++++++++-- code/core/rbuf.h | 72 ++++++++++++------ code/core/rbuf_recv.c | 149 ++++++++++++++++++++++++++---------- code/core/rbuf_send.c | 182 ++++++++++++++++++++++++++++++++------------ 10 files changed, 405 insertions(+), 163 deletions(-) (limited to 'code/core') diff --git a/code/core/config.h b/code/core/config.h index 7f9390f..7e2e416 100644 --- a/code/core/config.h +++ b/code/core/config.h @@ -1,4 +1,4 @@ #define ECP_WITH_PTHREAD 1 #define ECP_WITH_HTABLE 1 -#define ECP_WITH_RBUF 0 +#define ECP_WITH_RBUF 1 #define ECP_DEBUG 1 \ No newline at end of file diff --git a/code/core/core.c b/code/core/core.c index a725952..069c910 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -521,10 +521,9 @@ void ecp_conn_unregister(ECPConnection *conn) { static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; - ecp_seq_t seq; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); - return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); + return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); } int ecp_conn_init(ECPConnection *conn, ECPNode *node) { @@ -686,18 +685,31 @@ int ecp_conn_handle_new(ECPSocket *sock, ECPConnection **_conn, ECPConnection *p } ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + int is_open; if (size < 0) return size; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn->mutex); #endif - if (!ecp_conn_is_open(conn)) conn->flags |= ECP_CONN_FLAG_OPEN; + is_open = ecp_conn_is_open(conn); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&conn->mutex); #endif + if (!is_open) conn->flags |= ECP_CONN_FLAG_OPEN; + if (mtype & ECP_MTYPE_FLAG_REP) { + int rv; + if (!conn->out) return ECP_ERR; + +#ifdef ECP_WITH_RBUF + if (!is_open && conn->rbuf.recv) { + rv = ecp_conn_rbuf_recv_start(conn, seq); + if (rv) return rv; + } +#endif + return 0; } else { unsigned char payload[ECP_SIZE_PLD(0)]; @@ -901,15 +913,16 @@ ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsign return rv+ECP_SIZE_PKT_HDR; } -ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size) { +ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr) { ECPContext *ctx = sock->ctx; return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size); } -ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) { +ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) { ecp_aead_key_t shsec; ecp_dh_public_t public; + ecp_seq_t _seq; unsigned char nonce[ECP_AEAD_SIZE_NONCE]; int rv = ECP_OK; @@ -941,13 +954,27 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns } else { memcpy(&public, &conn->remote.key[conn->remote.key_curr].public, sizeof(public)); } - } if (!rv) { - if (seq) { - conn->seq_out++; - *seq = conn->seq_out; + _seq = conn->seq_out; +#ifdef ECP_WITH_RBUF + if (conn->rbuf.send && rbuf_idx) { + ECPRBSend *buf = conn->rbuf.send; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq); + if (*rbuf_idx < 0) rv = ECP_ERR_RBUF_FULL; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif } +#endif + } + if (!rv) { + conn->seq_out = _seq + 1; if (addr) *addr = conn->node.addr; } @@ -957,7 +984,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns if (rv) return rv; - ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, seq ? *seq : 0, payload, payload_size); + ssize_t _rv = ecp_pack(conn->sock->ctx, packet, pkt_size, s_idx, c_idx, &public, &shsec, nonce, _seq, payload, payload_size); if (_rv < 0) return _rv; #ifdef ECP_WITH_PTHREAD @@ -968,6 +995,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, uns pthread_mutex_unlock(&conn->mutex); #endif + if (seq) *seq = _seq; return _rv; } @@ -1041,7 +1069,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, } pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY); - if (pld_size < ECP_MIN_PLD) rv = ECP_ERR_DECRYPT; + if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT; if (rv) goto pkt_handle_err; p_seq = \ @@ -1121,7 +1149,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, #endif #ifdef ECP_WITH_RBUF - if (conn->rbuf.recv && conn->rbuf.recv->open) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); + if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); #endif if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size); @@ -1186,21 +1214,20 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s unsigned char mtype = 0; size_t rem_size = msg_size; + if (msg_size < 1) return ECP_ERR_MIN_MSG; + while (rem_size) { mtype = msg[0]; msg++; rem_size--; if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if (rem_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG; ecp_timer_pop(conn, mtype); handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; if (handler) { rv = handler(conn, seq, mtype, msg, rem_size); if (rv < 0) return rv; - if (rv == 0) rv = rem_size; - if (rv < ECP_MIN_MSG) rv = ECP_MIN_MSG; if (rv > rem_size) return ECP_ERR; rem_size -= rv; @@ -1226,26 +1253,25 @@ unsigned char ecp_pld_get_type(unsigned char *payload) { } ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) { - ecp_seq_t seq; - return ecp_pld_send_wkey(conn, &seq, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size); + return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size); } -ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) { +ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; ECPContext *ctx = sock->ctx; ECPNetAddr addr; + ecp_seq_t seq; ssize_t rv; + int rbuf_idx = -1; - rv = ctx->pack(conn, &addr, seq, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size); + rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx); if (rv < 0) return rv; #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && conn->rbuf.send->open && seq) { - ssize_t _rv = ecp_rbuf_send_store(conn, *seq, packet, rv); - if (_rv < 0) return _rv; - } + if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx); #endif + return ecp_pkt_send(sock, &addr, packet, rv); } @@ -1255,7 +1281,7 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr ECPNetAddr _addr; ssize_t rv; - rv = ctx->pack_raw(sock, proxy, &_addr, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size); + rv = ctx->pack_raw(sock, proxy, packet, ECP_MAX_PKT, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size, &_addr); if (rv < 0) return rv; return ecp_pkt_send(sock, proxy ? &_addr : addr, packet, rv); diff --git a/code/core/core.h b/code/core/core.h index 81ec69c..382b2c1 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -36,6 +36,7 @@ #define ECP_MAX_NODE_KEY 2 #define ECP_MAX_CTYPE 8 #define ECP_MAX_MTYPE 16 +#define ECP_MAX_MTYPE_SYS 8 #define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE) #define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ) @@ -45,21 +46,19 @@ #define ECP_MAX_PLD (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG) #define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_MSG_HDR) -#define ECP_MIN_MSG 8 -#define ECP_MIN_PLD (ECP_SIZE_MSG_HDR+ECP_MIN_MSG) -#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_MIN_PLD+ECP_AEAD_SIZE_TAG) +#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) #define ECP_POLL_TIMEOUT 500 #define ECP_ECDH_IDX_INV 0xFF #define ECP_ECDH_IDX_PERMA 0x0F -#define ECP_MTYPE_MASK 0x7f -#define ECP_MTYPE_FLAG_REP 0x80 +#define ECP_MTYPE_MASK 0x3f +#define ECP_MTYPE_FLAG_TIMER 0x80 +#define ECP_MTYPE_FLAG_REP 0x40 #define ECP_MTYPE_OPEN 0x00 #define ECP_MTYPE_KGET 0x01 #define ECP_MTYPE_KPUT 0x02 -#define ECP_MTYPE_EXEC 0x03 #define ECP_MTYPE_OPEN_REQ (ECP_MTYPE_OPEN) #define ECP_MTYPE_OPEN_REP (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP) @@ -68,8 +67,8 @@ #define ECP_MTYPE_KPUT_REQ (ECP_MTYPE_KPUT) #define ECP_MTYPE_KPUT_REP (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP) -#define ECP_SIZE_PLD(X) ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_MSG_HDR : (X) + ECP_SIZE_MSG_HDR) -#define ECP_SIZE_PKT(X) ((X) < ECP_MIN_MSG ? ECP_MIN_MSG + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG : (X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) +#define ECP_SIZE_PLD(X) ((X) + ECP_SIZE_MSG_HDR) +#define ECP_SIZE_PKT(X) ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) #define ECP_CONN_FLAG_REG 0x01 #define ECP_CONN_FLAG_OPEN 0x02 @@ -202,7 +201,9 @@ typedef struct ECPConnHandler { typedef struct ECPSockCTable { struct ECPConnection *array[ECP_MAX_SOCK_CONN]; unsigned short size; +#ifdef ECP_WITH_HTABLE void *htable; +#endif #ifdef ECP_WITH_PTHREAD pthread_mutex_t mutex; #endif @@ -218,8 +219,8 @@ typedef struct ECPContext { #ifdef ECP_WITH_HTABLE ECPHTableIface ht; #endif - ssize_t (*pack) (struct ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); - ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); + ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx); + ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr); ECPConnHandler *handler[ECP_MAX_CTYPE]; } ECPContext; @@ -311,8 +312,8 @@ int ecp_conn_dhkey_new_pub(ECPConnection *conn, unsigned char idx, unsigned char int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned char *public); ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); -ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); -ssize_t ecp_conn_pack(ECPConnection *conn, ECPNetAddr *addr, ecp_seq_t *seq, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); +ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *proxy, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr); +ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx); ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, unsigned char *packet, size_t pkt_size); ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size); @@ -323,7 +324,7 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload); void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); unsigned char ecp_pld_get_type(unsigned char *payload); ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); -ssize_t ecp_pld_send_wkey(ECPConnection *conn, ecp_seq_t *seq, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); +ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *proxy, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); diff --git a/code/core/htable/htable.c b/code/core/htable/htable.c index 552a03b..82dbcbc 100644 --- a/code/core/htable/htable.c +++ b/code/core/htable/htable.c @@ -5,11 +5,7 @@ #include "hashtable.h" static void *h_create(ECPContext *ctx) { - int rv; - struct hashtable *h = create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL); - if (h == NULL) return NULL; - - return h; + return create_hashtable(1000, (unsigned int (*)(void *))ctx->cr.dh_pub_hash_fn, (int (*)(void *, void *))ctx->cr.dh_pub_hash_eq, NULL, NULL, NULL); } static void h_destroy(void *h) { @@ -23,7 +19,6 @@ static int h_insert(void *h, unsigned char *k, ECPConnection *v) { } static ECPConnection *h_remove(void *h, unsigned char *k) { - printf("REMOVE!!!\n"); return hashtable_remove(h, k); } @@ -38,5 +33,5 @@ int ecp_htable_init(ECPHTableIface *h) { h->insert = h_insert; h->remove = h_remove; h->search = h_search; - return 0; + return ECP_OK; } diff --git a/code/core/msgq.c b/code/core/msgq.c index 53c55ea..2ba0ed5 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -62,7 +62,7 @@ ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_s if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; - if (msg_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG; + if (msg_size < 1) return ECP_ERR_MIN_MSG; for (i=0; ioccupied[msg_idx]) { diff --git a/code/core/posix/transport.c b/code/core/posix/transport.c index da87342..e3fe612 100644 --- a/code/core/posix/transport.c +++ b/code/core/posix/transport.c @@ -112,5 +112,5 @@ int ecp_transport_init(ECPTransportIface *t) { t->recv = t_recv; t->addr_eq = t_addr_eq; t->addr_set = t_addr_set; - return 0; + return ECP_OK; } diff --git a/code/core/rbuf.c b/code/core/rbuf.c index 0865e67..bb110b9 100644 --- a/code/core/rbuf.c +++ b/code/core/rbuf.c @@ -3,14 +3,15 @@ #include int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) { - memset(msg, 0, sizeof(ECPRBMessage) * msg_size); rbuf->msg = msg; if (msg_size) { + if (msg == NULL) return ECP_ERR; rbuf->msg_size = msg_size; + memset(rbuf->msg, 0, sizeof(ECPRBMessage) * msg_size); } else { rbuf->msg_size = ECP_RBUF_SEQ_HALF; } - + return ECP_OK; } @@ -22,17 +23,53 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { return ECP_ERR_RBUF_IDX; } -ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { - int idx = ecp_rbuf_msg_idx(rbuf, seq); +ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { + idx = idx < 0 ? ecp_rbuf_msg_idx(rbuf, seq) : idx; if (idx < 0) return idx; if (rbuf->msg == NULL) return 0; if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG; - memcpy(rbuf->msg[idx].msg, msg, msg_size); - rbuf->msg[idx].size = msg_size; + if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) { + memcpy(rbuf->msg[idx].msg, msg, msg_size); + rbuf->msg[idx].size = msg_size; + } rbuf->msg[idx].flags = set_flags; return msg_size; } +int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) { + int rv = ecp_conn_rbuf_send_start(conn); + if (rv) return rv; + + if (!conn->out) { + rv = ecp_conn_rbuf_recv_start(conn, seq); + if (rv) return rv; + } + + return ECP_OK; +} + +ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) { + int do_send; + ECPRBSend *buf = conn->rbuf.send; + + ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0); + if (rv < 0) return rv; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + if (buf->in_transit < buf->win_size) { + buf->in_transit++; + do_send = 1; + } +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size); + return rv; +} + diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 4340707..9e9e53d 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,21 +1,38 @@ -#define ECP_RBUF_FLAG_PRESENT 1 +#define ECP_RBUF_FLAG_RECEIVED 0x01 +#define ECP_RBUF_FLAG_DELIVERED 0x02 -#define ECP_ERR_RBUF_FLAG -100 -#define ECP_ERR_RBUF_IDX -101 -#define ECP_ERR_RBUF_DUP -102 +#define ECP_RBUF_FLAG_RELIABLE 0x01 +#define ECP_RBUF_FLAG_MSGQ 0x02 + +#define ECP_MTYPE_RBOPEN 0x04 +#define ECP_MTYPE_RBCLOSE 0x05 +#define ECP_MTYPE_RBFLUSH 0x06 +#define ECP_MTYPE_RBACK 0x07 + +#define ECP_MTYPE_RBOPEN_REQ (ECP_MTYPE_RBOPEN) +#define ECP_MTYPE_RBOPEN_REP (ECP_MTYPE_RBOPEN | ECP_MTYPE_FLAG_REP) +#define ECP_MTYPE_RBCLOSE_REQ (ECP_MTYPE_RBCLOSE) +#define ECP_MTYPE_RBCLOSE_REP (ECP_MTYPE_RBCLOSE | ECP_MTYPE_FLAG_REP) +#define ECP_MTYPE_RBFLUSH_REQ (ECP_MTYPE_RBFLUSH) +#define ECP_MTYPE_RBFLUSH_REP (ECP_MTYPE_RBFLUSH | ECP_MTYPE_FLAG_REP) + +#define ECP_ERR_RBUF_FLAG -100 +#define ECP_ERR_RBUF_IDX -101 +#define ECP_ERR_RBUF_DUP -102 +#define ECP_ERR_RBUF_FULL -103 typedef uint32_t ecp_ack_t; typedef uint32_t ecp_win_t; -#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) +#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) -#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0) -#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8) +#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0) +#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8) -#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF) -#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF) +#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF) +#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF) -#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) /* If size not 2^x: #define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size)) */ @@ -35,8 +52,8 @@ typedef struct ECPRBuffer { } ECPRBuffer; typedef struct ECPRBRecv { - unsigned char open; - unsigned char reliable; + unsigned char flags; + unsigned char flush; unsigned short deliver_delay; unsigned short hole_max; unsigned short ack_rate; @@ -50,12 +67,16 @@ typedef struct ECPRBRecv { } ECPRBRecv; typedef struct ECPRBSend { - unsigned char open; - unsigned char reliable; + unsigned char flags; + unsigned char flush; ecp_win_t win_size; ecp_win_t in_transit; + ecp_seq_t seq_flush; unsigned int nack_rate; ECPRBuffer rbuf; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif } ECPRBSend; typedef struct ECPConnRBuffer { @@ -66,14 +87,17 @@ typedef struct ECPConnRBuffer { int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size); int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq); -ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); - -int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); -int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq); -int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max); -int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay); -ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); - - -ssize_t ecp_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); +ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); +int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq); +ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx); + +int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); +ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); + +int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_conn_rbuf_send_start(struct ECPConnection *conn); +int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max); +int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay); +ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index 222daa7..b2a5ecf 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -5,10 +5,35 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1)) -static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT); +static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + if (size < 0) return size; + + ECPRBRecv *buf = conn->rbuf.recv; + unsigned char payload[ECP_SIZE_PLD(0)]; + + if (buf == NULL) return ECP_ERR; + + buf->flush = 1; + return 0; +} + +static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + ECPRBRecv *buf = conn->rbuf.recv; + ssize_t rv = 0; + unsigned char flags; + unsigned char mtype = msg[0] & ECP_MTYPE_MASK; + + if (mtype < ECP_MAX_MTYPE_SYS) { + flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED; + } else { + flags = ECP_RBUF_FLAG_RECEIVED; + } + + rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); if (rv < 0) return ECP_ERR_RBUF_DUP; + if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size); + if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq; return rv; } @@ -20,11 +45,15 @@ static void msg_flush(ECPConnection *conn) { unsigned int idx = buf->rbuf.msg_start; for (i=0; ireliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break; - if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT; - ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; + if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; + } else { + ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } @@ -32,12 +61,35 @@ static void msg_flush(ECPConnection *conn) { buf->rbuf.seq_start += i; } +static int ack_send(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))]; + unsigned char *buf_ = ecp_pld_get_buf(payload); + ssize_t rv; + + ecp_pld_set_type(payload, ECP_MTYPE_RBACK); + buf_[0] = (buf->seq_ack & 0xFF000000) >> 24; + buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16; + buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8; + buf_[3] = (buf->seq_ack & 0x000000FF); + buf_[4] = (buf->ack_map & 0xFF000000) >> 24; + buf_[5] = (buf->ack_map & 0x00FF0000) >> 16; + buf_[6] = (buf->ack_map & 0x0000FF00) >> 8; + buf_[7] = (buf->ack_map & 0x000000FF); + + rv = ecp_pld_send(conn, payload, sizeof(payload)); + if (rv < 0) return rv; + + buf->ack_pkt = 0; + return ECP_OK; +} + static int ack_shift(ECPRBRecv *buf) { int do_ack = 0; int idx; int i; - if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack); if (idx < 0) return idx; @@ -46,10 +98,10 @@ static int ack_shift(ECPRBRecv *buf) { idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); buf->seq_ack++; - if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { buf->ack_map |= 1; } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) { do_ack = 1; @@ -73,47 +125,61 @@ static int ack_shift(ECPRBRecv *buf) { return do_ack; } -int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { + int rv; + memset(buf, 0, sizeof(ECPRBRecv)); - ecp_rbuf_init(&buf->rbuf, msg, msg_size); + rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); + if (rv) return rv; + buf->ack_map = ECP_RBUF_ACK_FULL; buf->ack_rate = ACK_RATE; - + conn->rbuf.recv = buf; + return ECP_OK; } -int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) { - buf->seq_ack = seq; - buf->seq_max = seq; - buf->rbuf.seq_start = seq + 1; - - return ECP_OK; -} +int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { + ECPRBRecv *buf = conn->rbuf.recv; -int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) { buf->hole_max = hole_max; - buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); - buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); + buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2)); + buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1)); return ECP_OK; } -int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) { +int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { + ECPRBRecv *buf = conn->rbuf.recv; + buf->deliver_delay = delay; if (buf->hole_max < delay - 1) { - ecp_rbuf_recv_set_hole(buf, delay - 1); + ecp_conn_rbuf_recv_set_hole(conn, delay - 1); } return ECP_OK; } -ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - ssize_t rv; +int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { + ECPRBRecv *buf = conn->rbuf.recv; + + if (buf == NULL) return ECP_ERR; + + buf->seq_ack = seq; + buf->seq_max = seq; + buf->rbuf.seq_start = seq + 1; + + return ECP_OK; +} + +ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; + ssize_t rv; int do_ack = 0; - ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; + if (msg_size < 1) return ECP_ERR_MIN_MSG; if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { @@ -122,43 +188,48 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - - rv = msg_store(buf, seq, msg, msg_size); - if (rv < 0) return rv; - buf->ack_map |= ack_mask; do_ack = ack_shift(buf); + + rv = msg_store(conn, seq, msg, msg_size); + if (rv < 0) return rv; } else { return ECP_ERR_RBUF_IDX; } } else { if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { - if (buf->deliver_delay) { - rv = msg_store(buf, seq, msg, msg_size); + if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { + rv = msg_store(conn, seq, msg, msg_size); if (rv < 0) return rv; } else { - rv = ecp_msg_handle(conn, seq, msg, msg_size); + ecp_msg_handle(conn, seq, msg, msg_size); + rv = msg_size; buf->seq_max++; buf->rbuf.seq_start++; + buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); } buf->seq_ack++; } else { - rv = msg_store(buf, seq, msg, msg_size); + rv = msg_store(conn, seq, msg, msg_size); if (rv < 0) return rv; do_ack = ack_shift(buf); } } + if (buf->flush) { + buf->flush = 0; + do_ack = 1; + } if (ack_pkt && !do_ack) { buf->ack_pkt += ack_pkt; // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT) if (buf->ack_pkt > buf->ack_rate) do_ack = 1; } if (do_ack) { - buf->ack_pkt = 0; - // send ack (with seq = 0) + int _rv = ack_send(conn); + if (_rv) return _rv; } - // XXX should handle close msg_flush(conn); return rv; } + diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 5f3bc87..877b086 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -2,75 +2,163 @@ #include -#define ACK_RATE_UNIT 10000 +#define NACK_RATE_UNIT 10000 + +static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) { + unsigned char payload[ECP_SIZE_PLD(0)]; + + ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH_REQ); + return ecp_pld_send(conn, payload, sizeof(payload)); +} static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { - int idx = 0; + ECPRBSend *buf; + ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; ecp_ack_t ack_map = 0; + int do_flush = 0; + int rv = ECP_OK; - ECPRBSend *buf = conn->rbuf.send; + buf = conn->rbuf.send; + if (buf == NULL) return ECP_ERR; + if (size < 0) return size; + if (size < rsize) return ECP_ERR; + + seq_ack = \ + (msg[0] << 24) | \ + (msg[1] << 16) | \ + (msg[2] << 8) | \ + (msg[3]); + ack_map = \ + (msg[4] << 24) | \ + (msg[5] << 16) | \ + (msg[6] << 8) | \ + (msg[7]); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + ECPRBuffer *rbuf = &buf->rbuf; + int idx = ecp_rbuf_msg_idx(rbuf, seq_ack); + if (idx < 0) rv = idx; - idx = ecp_rbuf_msg_idx(rbuf, seq_ack); - if (idx < 0) return idx; - - seq_ack++; - buf->in_transit -= seq_ack - rbuf->seq_start; - - if (ack_map != ECP_RBUF_ACK_FULL) { - int i; - int nack = 0; - ecp_win_t nack_cnt = 0; - - seq_ack -= ECP_RBUF_ACK_SIZE; - for (i=0; ireliable) { - idx = ecp_rbuf_msg_idx(rbuf, seq_ack + i); - // resend packet - // ecp_pkt_send(conn->sock, &conn->node.addr, packet, rv); - if (!nack) { - nack = 1; - - rbuf->seq_start = seq_ack + i; - rbuf->msg_start = idx; + int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE; + + if (!rv) { + seq_ack++; + buf->in_transit -= seq_ack - rbuf->seq_start; + + if (ack_map != ECP_RBUF_ACK_FULL) { + int i; + int nack_first = 0; + unsigned int msg_start; + ecp_seq_t seq_start; + ecp_win_t nack_cnt = 0; + + seq_ack -= ECP_RBUF_ACK_SIZE; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + for (i=0; iflags & ECP_RBUF_FLAG_RELIABLE) { + idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size); + ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size); + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack + i; + msg_start = idx; + } } } } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->in_transit += nack_cnt; + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rbuf->seq_start = seq_start; + rbuf->msg_start = msg_start; + } else { + rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE; + } + } else { + rbuf->seq_start = seq_ack; + buf->nack_rate = (buf->nack_rate * 7) / 8; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); + } } - buf->in_transit += nack_cnt; - buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; - if (!buf->reliable) { - rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE; - } - } else { - rbuf->seq_start = seq_ack; - buf->nack_rate = (buf->nack_rate * 7) / 8; - if (buf->reliable) { - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); + if (buf->flush) { + if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) { + do_flush = 1; + } } } - return size; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (rv) return rv; + + if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + return rsize; } -int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { + int rv; + memset(buf, 0, sizeof(ECPRBRecv)); - ecp_rbuf_init(&buf->rbuf, msg, msg_size); + rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); + if (rv) return rv; + + conn->rbuf.send = buf; return ECP_OK; } -int ecp_rbuf_send_start(ECPRBSend *buf, ecp_seq_t seq) { - buf->rbuf.seq_start = seq + 1; +int ecp_conn_rbuf_send_start(ECPConnection *conn) { + ECPRBSend *buf = conn->rbuf.send; + + if (buf == NULL) return ECP_ERR; + buf->rbuf.seq_start = conn->seq_out; return ECP_OK; } -ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +int ecp_conn_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; - - buf->in_transit++; - return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0); -} \ No newline at end of file + unsigned char payload[ECP_SIZE_PLD(0)]; + ecp_seq_t seq; + + if (buf == NULL) return ECP_ERR; + + // XXX flush seq +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + if (buf->flush) { + if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; + } else { + buf->flush = 1; + buf->seq_flush = seq; + } +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + if (_rv < 0) return _rv; + + return ECP_OK; +} + -- cgit v1.2.3