From fb521b5d33ccb57c2f56d0548f172edbe31e9e91 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Sat, 12 Mar 2022 13:41:28 +0100 Subject: dir/ext source files moved --- ecp/src/ecp/Makefile | 8 +- ecp/src/ecp/core.c | 2 +- ecp/src/ecp/dir.c | 203 -------------- ecp/src/ecp/dir.h | 32 --- ecp/src/ecp/dir/dir.c | 203 ++++++++++++++ ecp/src/ecp/dir/dir.h | 32 +++ ecp/src/ecp/dir/dir_srv.c | 88 ++++++ ecp/src/ecp/dir/dir_srv.h | 10 + ecp/src/ecp/dir_srv.c | 88 ------ ecp/src/ecp/dir_srv.h | 10 - ecp/src/ecp/ext/msgq.c | 151 +++++++++++ ecp/src/ecp/ext/msgq.h | 19 ++ ecp/src/ecp/ext/rbuf.c | 113 ++++++++ ecp/src/ecp/ext/rbuf.h | 123 +++++++++ ecp/src/ecp/ext/rbuf_recv.c | 473 +++++++++++++++++++++++++++++++++ ecp/src/ecp/ext/rbuf_send.c | 419 +++++++++++++++++++++++++++++ ecp/src/ecp/msgq.c | 150 ----------- ecp/src/ecp/msgq.h | 19 -- ecp/src/ecp/rbuf.c | 112 -------- ecp/src/ecp/rbuf.h | 123 --------- ecp/src/ecp/rbuf_recv.c | 473 --------------------------------- ecp/src/ecp/rbuf_send.c | 418 ----------------------------- ecp/src/platform/posix/platform_obj.mk | 3 +- 23 files changed, 1640 insertions(+), 1632 deletions(-) delete mode 100644 ecp/src/ecp/dir.c delete mode 100644 ecp/src/ecp/dir.h create mode 100644 ecp/src/ecp/dir/dir.c create mode 100644 ecp/src/ecp/dir/dir.h create mode 100644 ecp/src/ecp/dir/dir_srv.c create mode 100644 ecp/src/ecp/dir/dir_srv.h delete mode 100644 ecp/src/ecp/dir_srv.c delete mode 100644 ecp/src/ecp/dir_srv.h create mode 100644 ecp/src/ecp/ext/msgq.c create mode 100644 ecp/src/ecp/ext/msgq.h create mode 100644 ecp/src/ecp/ext/rbuf.c create mode 100644 ecp/src/ecp/ext/rbuf.h create mode 100644 ecp/src/ecp/ext/rbuf_recv.c create mode 100644 ecp/src/ecp/ext/rbuf_send.c delete mode 100644 ecp/src/ecp/msgq.c delete mode 100644 ecp/src/ecp/msgq.h delete mode 100644 ecp/src/ecp/rbuf.c delete mode 100644 ecp/src/ecp/rbuf.h delete mode 100644 ecp/src/ecp/rbuf_recv.c delete mode 100644 ecp/src/ecp/rbuf_send.c diff --git a/ecp/src/ecp/Makefile b/ecp/src/ecp/Makefile index 4590dd3..2bdd563 100644 --- a/ecp/src/ecp/Makefile +++ b/ecp/src/ecp/Makefile @@ -1,6 +1,6 @@ include common.mk -obj = core.o timer.o dir.o dir_srv.o +obj = core.o timer.o subdirs = crypto $(platform_dir) build_dir = ../../build-$(platform) @@ -31,6 +31,12 @@ install: all if [ -f vconn/libecpvconn.a ]; then \ install vconn/libecpvconn.a $(build_dir); \ fi + if [ -f dir/libecpext.a ]; then \ + install vconn/libecpext.a $(build_dir); \ + fi + if [ -f dir/libecpdir.a ]; then \ + install vconn/libecpdir.a $(build_dir); \ + fi clean: for i in $(subdirs); do \ diff --git a/ecp/src/ecp/core.c b/ecp/src/ecp/core.c index d59ea8f..badb09e 100644 --- a/ecp/src/ecp/core.c +++ b/ecp/src/ecp/core.c @@ -13,7 +13,7 @@ #endif #ifdef ECP_WITH_DIR -#include "dir.h" +#include "dir/dir.h" #endif #include "cr.h" diff --git a/ecp/src/ecp/dir.c b/ecp/src/ecp/dir.c deleted file mode 100644 index 0e07486..0000000 --- a/ecp/src/ecp/dir.c +++ /dev/null @@ -1,203 +0,0 @@ -#include -#include - -#include "core.h" -#include "cr.h" - -#include "dir.h" -#include "dir_srv.h" - -static int dir_update(ECPDirList *list, ECPDirItem *item) { - int i; - - for (i=0; icount; i++) { - if (memcmp(&list->item[i].node.key_perma.public, &item->node.key_perma.public, sizeof(item->node.key_perma.public)) == 0) { - return ECP_OK; - } - } - - if (list->count == ECP_MAX_DIR_ITEM) return ECP_ERR_SIZE; - - list->item[list->count] = *item; - list->count++; - - return ECP_OK; -} - -ssize_t ecp_dir_parse(ECPDirList *list, unsigned char *buf, size_t buf_size) { - ECPDirItem item; - size_t rsize; - uint16_t count; - int i; - int rv; - - if (buf_size < sizeof(uint16_t)) return ECP_ERR_SIZE; - - count = \ - (buf[0] << 8) | \ - (buf[1]); - - rsize = sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM; - if (buf_size < rsize) return ECP_ERR_SIZE; - - buf += sizeof(uint16_t); - for (i=0; icount * ECP_SIZE_DIR_ITEM; - if (buf_size < rsize) return ECP_ERR_SIZE; - - buf[0] = (list->count & 0xFF00) >> 8; - buf[1] = (list->count & 0x00FF); - buf += sizeof(uint16_t); - for (i=0; icount; i++) { - ecp_dir_item_serialize(&list->item[i], buf); - buf += ECP_SIZE_DIR_ITEM; - } - - return rsize; -} - -void ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) { - ECPDHPub *key; - ecp_tr_addr_t *addr; - - key = &item->node.key_perma; - addr = &item->node.addr; - - memcpy(&key->public, buf, sizeof(key->public)); - buf += sizeof(key->public); - - memcpy(&addr->host, buf, sizeof(addr->host)); - buf += sizeof(addr->host); - - addr->port = \ - (buf[0] << 8) | \ - (buf[1]); - buf += sizeof(uint16_t); - - item->capabilities = \ - (buf[0] << 8) | \ - (buf[1]); - buf += sizeof(uint16_t); -} - -void ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf) { - ECPDHPub *key; - ecp_tr_addr_t *addr; - - key = &item->node.key_perma; - addr = &item->node.addr; - - memcpy(buf, &key->public, sizeof(key->public)); - buf += sizeof(key->public); - - memcpy(buf, &addr->host, sizeof(addr->host)); - buf += sizeof(addr->host); - - buf[0] = (addr->port & 0xFF00) >> 8; - buf[1] = (addr->port & 0x00FF); - buf += sizeof(uint16_t); - - buf[0] = (item->capabilities & 0xFF00) >> 8; - buf[1] = (item->capabilities & 0x00FF); - buf += sizeof(uint16_t); -} - -ssize_t ecp_dir_handle(ECPConnection *conn, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { - ecp_dir_handler_t handler; - - handler = ecp_get_dir_handler(conn); - if (handler) { - return handler(conn, msg, msg_size, b); - } else { - return ECP_ERR_HANDLER; - } -} - -int ecp_dir_handle_open(ECPConnection *conn, ECP2Buffer *b) { - ssize_t rv; - - if (ecp_conn_is_inb(conn)) return ECP_OK; - -#ifdef ECP_WITH_DIRSRV - rv = ecp_dir_send_upd(conn); -#else - rv = ecp_dir_send_req(conn); -#endif - if (rv < 0) return rv; - - return ECP_OK; -} - -ssize_t ecp_dir_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { - switch (mtype) { -#ifdef ECP_WITH_DIRSRV - case ECP_MTYPE_DIR_UPD: { - return ecp_dir_handle_upd(conn, msg, msg_size); - } - - case ECP_MTYPE_DIR_REQ: { - return ecp_dir_handle_req(conn, msg, msg_size); - } -#endif - - case ECP_MTYPE_DIR_REP: { -#ifdef ECP_WITH_DIRSRV - return ecp_dir_handle_rep(conn, msg, msg_size); -#else - return ecp_dir_handle(conn, msg, msg_size, b); -#endif - } - - default: - return ECP_ERR_MTYPE; - } -} - -// iterator for client - -static ssize_t _dir_send_req(ECPConnection *conn, ECPTimerItem *ti) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_DIR_REQ, conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_DIR_REQ, conn)]; - - packet.buffer = pkt_buf; - packet.size = sizeof(pkt_buf); - payload.buffer = pld_buf; - payload.size = sizeof(pld_buf); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REQ); - return ecp_pld_send_wtimer(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti); -} - -ssize_t ecp_dir_send_req(ECPConnection *conn) { - return ecp_timer_send(conn, _dir_send_req, ECP_MTYPE_DIR_REP, ECP_SEND_TRIES, ECP_SEND_TIMEOUT); -} - -int ecp_dir_get(ECPConnection *conn, ECPSocket *sock, ECPNode *node) { - int rv; - ssize_t _rv; - - rv = ecp_conn_create(conn, sock, ECP_CTYPE_DIR); - if (rv) return rv; - - rv = ecp_conn_open(conn, node); - if (rv) return rv; - - return ECP_OK; -} diff --git a/ecp/src/ecp/dir.h b/ecp/src/ecp/dir.h deleted file mode 100644 index e9f8e1e..0000000 --- a/ecp/src/ecp/dir.h +++ /dev/null @@ -1,32 +0,0 @@ -#define ECP_MAX_DIR_ITEM 30 -#define ECP_SIZE_DIR_ITEM 40 - -#define ECP_MTYPE_DIR_UPD 0x00 -#define ECP_MTYPE_DIR_REQ 0x01 -#define ECP_MTYPE_DIR_REP 0x02 - -#define ECP_CTYPE_DIR (0x00 | ECP_CTYPE_FLAG_SYS) - -typedef struct ECPDirItem { - ECPNode node; - uint16_t capabilities; -} ECPDirItem; - -typedef struct ECPDirList { - ECPDirItem item[ECP_MAX_DIR_ITEM]; - uint16_t count; -} ECPDirList; - -ssize_t ecp_dir_parse(ECPDirList *list, unsigned char *buf, size_t buf_size); -ssize_t ecp_dir_serialize(ECPDirList *list, unsigned char *buf, size_t buf_size); - -void ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf); -void ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf); - -ssize_t ecp_dir_handle(ECPConnection *conn, unsigned char *msg, size_t msg_size, ECP2Buffer *b); - -int ecp_dir_handle_open(ECPConnection *conn, ECP2Buffer *b); -ssize_t ecp_dir_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *b); - -ssize_t ecp_dir_send_req(ECPConnection *conn); -int ecp_dir_get(ECPConnection *conn, ECPSocket *sock, ECPNode *node); \ No newline at end of file diff --git a/ecp/src/ecp/dir/dir.c b/ecp/src/ecp/dir/dir.c new file mode 100644 index 0000000..7c4b0d6 --- /dev/null +++ b/ecp/src/ecp/dir/dir.c @@ -0,0 +1,203 @@ +#include +#include + +#include +#include + +#include "dir.h" +#include "dir_srv.h" + +static int dir_update(ECPDirList *list, ECPDirItem *item) { + int i; + + for (i=0; icount; i++) { + if (memcmp(&list->item[i].node.key_perma.public, &item->node.key_perma.public, sizeof(item->node.key_perma.public)) == 0) { + return ECP_OK; + } + } + + if (list->count == ECP_MAX_DIR_ITEM) return ECP_ERR_SIZE; + + list->item[list->count] = *item; + list->count++; + + return ECP_OK; +} + +ssize_t ecp_dir_parse(ECPDirList *list, unsigned char *buf, size_t buf_size) { + ECPDirItem item; + size_t rsize; + uint16_t count; + int i; + int rv; + + if (buf_size < sizeof(uint16_t)) return ECP_ERR_SIZE; + + count = \ + (buf[0] << 8) | \ + (buf[1]); + + rsize = sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM; + if (buf_size < rsize) return ECP_ERR_SIZE; + + buf += sizeof(uint16_t); + for (i=0; icount * ECP_SIZE_DIR_ITEM; + if (buf_size < rsize) return ECP_ERR_SIZE; + + buf[0] = (list->count & 0xFF00) >> 8; + buf[1] = (list->count & 0x00FF); + buf += sizeof(uint16_t); + for (i=0; icount; i++) { + ecp_dir_item_serialize(&list->item[i], buf); + buf += ECP_SIZE_DIR_ITEM; + } + + return rsize; +} + +void ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) { + ECPDHPub *key; + ecp_tr_addr_t *addr; + + key = &item->node.key_perma; + addr = &item->node.addr; + + memcpy(&key->public, buf, sizeof(key->public)); + buf += sizeof(key->public); + + memcpy(&addr->host, buf, sizeof(addr->host)); + buf += sizeof(addr->host); + + addr->port = \ + (buf[0] << 8) | \ + (buf[1]); + buf += sizeof(uint16_t); + + item->capabilities = \ + (buf[0] << 8) | \ + (buf[1]); + buf += sizeof(uint16_t); +} + +void ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf) { + ECPDHPub *key; + ecp_tr_addr_t *addr; + + key = &item->node.key_perma; + addr = &item->node.addr; + + memcpy(buf, &key->public, sizeof(key->public)); + buf += sizeof(key->public); + + memcpy(buf, &addr->host, sizeof(addr->host)); + buf += sizeof(addr->host); + + buf[0] = (addr->port & 0xFF00) >> 8; + buf[1] = (addr->port & 0x00FF); + buf += sizeof(uint16_t); + + buf[0] = (item->capabilities & 0xFF00) >> 8; + buf[1] = (item->capabilities & 0x00FF); + buf += sizeof(uint16_t); +} + +ssize_t ecp_dir_handle(ECPConnection *conn, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { + ecp_dir_handler_t handler; + + handler = ecp_get_dir_handler(conn); + if (handler) { + return handler(conn, msg, msg_size, b); + } else { + return ECP_ERR_HANDLER; + } +} + +int ecp_dir_handle_open(ECPConnection *conn, ECP2Buffer *b) { + ssize_t rv; + + if (ecp_conn_is_inb(conn)) return ECP_OK; + +#ifdef ECP_WITH_DIRSRV + rv = ecp_dir_send_upd(conn); +#else + rv = ecp_dir_send_req(conn); +#endif + if (rv < 0) return rv; + + return ECP_OK; +} + +ssize_t ecp_dir_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { + switch (mtype) { +#ifdef ECP_WITH_DIRSRV + case ECP_MTYPE_DIR_UPD: { + return ecp_dir_handle_upd(conn, msg, msg_size); + } + + case ECP_MTYPE_DIR_REQ: { + return ecp_dir_handle_req(conn, msg, msg_size); + } +#endif + + case ECP_MTYPE_DIR_REP: { +#ifdef ECP_WITH_DIRSRV + return ecp_dir_handle_rep(conn, msg, msg_size); +#else + return ecp_dir_handle(conn, msg, msg_size, b); +#endif + } + + default: + return ECP_ERR_MTYPE; + } +} + +// iterator for client + +static ssize_t _dir_send_req(ECPConnection *conn, ECPTimerItem *ti) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_DIR_REQ, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_DIR_REQ, conn)]; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REQ); + return ecp_pld_send_wtimer(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_DIR_REQ), 0, ti); +} + +ssize_t ecp_dir_send_req(ECPConnection *conn) { + return ecp_timer_send(conn, _dir_send_req, ECP_MTYPE_DIR_REP, ECP_SEND_TRIES, ECP_SEND_TIMEOUT); +} + +int ecp_dir_get(ECPConnection *conn, ECPSocket *sock, ECPNode *node) { + int rv; + ssize_t _rv; + + rv = ecp_conn_create(conn, sock, ECP_CTYPE_DIR); + if (rv) return rv; + + rv = ecp_conn_open(conn, node); + if (rv) return rv; + + return ECP_OK; +} diff --git a/ecp/src/ecp/dir/dir.h b/ecp/src/ecp/dir/dir.h new file mode 100644 index 0000000..e9f8e1e --- /dev/null +++ b/ecp/src/ecp/dir/dir.h @@ -0,0 +1,32 @@ +#define ECP_MAX_DIR_ITEM 30 +#define ECP_SIZE_DIR_ITEM 40 + +#define ECP_MTYPE_DIR_UPD 0x00 +#define ECP_MTYPE_DIR_REQ 0x01 +#define ECP_MTYPE_DIR_REP 0x02 + +#define ECP_CTYPE_DIR (0x00 | ECP_CTYPE_FLAG_SYS) + +typedef struct ECPDirItem { + ECPNode node; + uint16_t capabilities; +} ECPDirItem; + +typedef struct ECPDirList { + ECPDirItem item[ECP_MAX_DIR_ITEM]; + uint16_t count; +} ECPDirList; + +ssize_t ecp_dir_parse(ECPDirList *list, unsigned char *buf, size_t buf_size); +ssize_t ecp_dir_serialize(ECPDirList *list, unsigned char *buf, size_t buf_size); + +void ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf); +void ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf); + +ssize_t ecp_dir_handle(ECPConnection *conn, unsigned char *msg, size_t msg_size, ECP2Buffer *b); + +int ecp_dir_handle_open(ECPConnection *conn, ECP2Buffer *b); +ssize_t ecp_dir_handle_msg(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *b); + +ssize_t ecp_dir_send_req(ECPConnection *conn); +int ecp_dir_get(ECPConnection *conn, ECPSocket *sock, ECPNode *node); \ No newline at end of file diff --git a/ecp/src/ecp/dir/dir_srv.c b/ecp/src/ecp/dir/dir_srv.c new file mode 100644 index 0000000..b4d030d --- /dev/null +++ b/ecp/src/ecp/dir/dir_srv.c @@ -0,0 +1,88 @@ +#include + +#include +#include + +#include "dir.h" +#include "dir_srv.h" + +int ecp_dir_init(ECPContext *ctx, ECPDirList *dir_online, ECPDirList *dir_shadow) { + ctx->dir_online = dir_online; + ctx->dir_shadow = dir_shadow; + + return ECP_OK; +} + +ssize_t ecp_dir_send_list(ECPConnection *conn, unsigned char mtype, ECPDirList *list) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + size_t hdr_size; + size_t msg_size; + ssize_t rv; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + ecp_pld_set_type(payload.buffer, payload.size, mtype); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + hdr_size = msg - payload.buffer; + msg_size = payload.size - hdr_size; + + rv = ecp_dir_serialize(list, msg, msg_size); + if (rv < 0) return rv; + + rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(rv, mtype), 0); + return rv; +} + +ssize_t ecp_dir_send_upd(ECPConnection *conn) { + ECPContext *ctx = conn->sock->ctx; + ssize_t rv; + + rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_UPD, ctx->dir_shadow); + return rv; +} + +ssize_t ecp_dir_handle_upd(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ECPContext *ctx = conn->sock->ctx; + ssize_t rsize; + ssize_t rv; + + rsize = ecp_dir_parse(ctx->dir_shadow, msg, msg_size); + if (rsize < 0) return rsize; + + rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_REP, ctx->dir_shadow); + if (rv < 0) return rv; + + return rsize; +} + +ssize_t ecp_dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ssize_t rv; + + rv = ecp_dir_send_rep(conn); + if (rv < 0) return rv; + + return 0; +} + +ssize_t ecp_dir_send_rep(ECPConnection *conn) { + ECPContext *ctx = conn->sock->ctx; + ssize_t rv; + + rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_REP, ctx->dir_online); + return rv; +} + +ssize_t ecp_dir_handle_rep(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ECPContext *ctx = conn->sock->ctx; + ssize_t rv; + + rv = ecp_dir_parse(ctx->dir_shadow, msg, msg_size); + return rv; +} \ No newline at end of file diff --git a/ecp/src/ecp/dir/dir_srv.h b/ecp/src/ecp/dir/dir_srv.h new file mode 100644 index 0000000..85606c8 --- /dev/null +++ b/ecp/src/ecp/dir/dir_srv.h @@ -0,0 +1,10 @@ +int ecp_dir_init(ECPContext *ctx, ECPDirList *dir_online, ECPDirList *dir_shadow); +ssize_t ecp_dir_send_list(ECPConnection *conn, unsigned char mtype, ECPDirList *list); + +ssize_t ecp_dir_send_upd(ECPConnection *conn); +ssize_t ecp_dir_handle_upd(ECPConnection *conn, unsigned char *msg, size_t msg_size); + +ssize_t ecp_dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size); + +ssize_t ecp_dir_send_rep(ECPConnection *conn); +ssize_t ecp_dir_handle_rep(ECPConnection *conn, unsigned char *msg, size_t msg_size); diff --git a/ecp/src/ecp/dir_srv.c b/ecp/src/ecp/dir_srv.c deleted file mode 100644 index 952d393..0000000 --- a/ecp/src/ecp/dir_srv.c +++ /dev/null @@ -1,88 +0,0 @@ -#include - -#include "core.h" -#include "cr.h" - -#include "dir.h" -#include "dir_srv.h" - -int ecp_dir_init(ECPContext *ctx, ECPDirList *dir_online, ECPDirList *dir_shadow) { - ctx->dir_online = dir_online; - ctx->dir_shadow = dir_shadow; - - return ECP_OK; -} - -ssize_t ecp_dir_send_list(ECPConnection *conn, unsigned char mtype, ECPDirList *list) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_MAX_PKT]; - unsigned char pld_buf[ECP_MAX_PLD]; - unsigned char *msg; - size_t hdr_size; - size_t msg_size; - ssize_t rv; - - packet.buffer = pkt_buf; - packet.size = ECP_MAX_PKT; - payload.buffer = pld_buf; - payload.size = ECP_MAX_PLD; - - ecp_pld_set_type(payload.buffer, payload.size, mtype); - msg = ecp_pld_get_msg(payload.buffer, payload.size); - hdr_size = msg - payload.buffer; - msg_size = payload.size - hdr_size; - - rv = ecp_dir_serialize(list, msg, msg_size); - if (rv < 0) return rv; - - rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(rv, mtype), 0); - return rv; -} - -ssize_t ecp_dir_send_upd(ECPConnection *conn) { - ECPContext *ctx = conn->sock->ctx; - ssize_t rv; - - rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_UPD, ctx->dir_shadow); - return rv; -} - -ssize_t ecp_dir_handle_upd(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ECPContext *ctx = conn->sock->ctx; - ssize_t rsize; - ssize_t rv; - - rsize = ecp_dir_parse(ctx->dir_shadow, msg, msg_size); - if (rsize < 0) return rsize; - - rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_REP, ctx->dir_shadow); - if (rv < 0) return rv; - - return rsize; -} - -ssize_t ecp_dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ssize_t rv; - - rv = ecp_dir_send_rep(conn); - if (rv < 0) return rv; - - return 0; -} - -ssize_t ecp_dir_send_rep(ECPConnection *conn) { - ECPContext *ctx = conn->sock->ctx; - ssize_t rv; - - rv = ecp_dir_send_list(conn, ECP_MTYPE_DIR_REP, ctx->dir_online); - return rv; -} - -ssize_t ecp_dir_handle_rep(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ECPContext *ctx = conn->sock->ctx; - ssize_t rv; - - rv = ecp_dir_parse(ctx->dir_shadow, msg, msg_size); - return rv; -} \ No newline at end of file diff --git a/ecp/src/ecp/dir_srv.h b/ecp/src/ecp/dir_srv.h deleted file mode 100644 index 85606c8..0000000 --- a/ecp/src/ecp/dir_srv.h +++ /dev/null @@ -1,10 +0,0 @@ -int ecp_dir_init(ECPContext *ctx, ECPDirList *dir_online, ECPDirList *dir_shadow); -ssize_t ecp_dir_send_list(ECPConnection *conn, unsigned char mtype, ECPDirList *list); - -ssize_t ecp_dir_send_upd(ECPConnection *conn); -ssize_t ecp_dir_handle_upd(ECPConnection *conn, unsigned char *msg, size_t msg_size); - -ssize_t ecp_dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size); - -ssize_t ecp_dir_send_rep(ECPConnection *conn); -ssize_t ecp_dir_handle_rep(ECPConnection *conn, unsigned char *msg, size_t msg_size); diff --git a/ecp/src/ecp/ext/msgq.c b/ecp/src/ecp/ext/msgq.c new file mode 100644 index 0000000..63f5aa6 --- /dev/null +++ b/ecp/src/ecp/ext/msgq.c @@ -0,0 +1,151 @@ +#include +#include +#include + +#include + +#include "rbuf.h" +#include "msgq.h" + +#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) + +static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) { + struct timeval tv; + uint64_t us_start; + + gettimeofday(&tv, NULL); + us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec; + us_start += msec * 1000; + ts->tv_sec = us_start / 1000000; + ts->tv_nsec = (us_start % 1000000) * 1000; + return ts; +} + +int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { + int i; + int rv; + + if (conn->recv == NULL) return ECP_ERR; + + memset(msgq, 0, sizeof(ECPMsgQ)); + + for (i=0; icond[i], NULL); + if (rv) { + int j; + + for (j=0; jcond[j]); + } + return ECP_ERR; + } + } + conn->recv->msgq = msgq; + + return ECP_OK; +} + +void ecp_msgq_destroy(ECPRBConn *conn) { + ECPMsgQ *msgq = conn->recv->msgq; + int i; + + for (i=0; icond[i]); + } + + conn->recv->msgq = NULL; +} + +void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPMsgQ *msgq = conn->recv->msgq; + + msgq->seq_max = seq; + msgq->seq_start = seq + 1; +} + +int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; + + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; + + if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; + if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); + + msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq; + msgq->idx_w[mtype]++; + + if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; + + return ECP_OK; +} + +ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char *pld_buf; + unsigned char *msg_buf; + size_t pld_size, hdr_size, msg_size; + ecp_seq_t seq; + unsigned short idx; + int rv; + + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; + + if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { + if (timeout == -1) { + pthread_cond_wait(&msgq->cond[mtype], &buf->mutex); + } else { + struct timespec ts; + + rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout)); + if (rv) return ECP_ERR_TIMEOUT; + } + } + seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])]; + + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return ECP_ERR; + + pld_buf = rbuf->arr.pld[idx].buf; + pld_size = rbuf->arr.pld[idx].size; + + msg_buf = ecp_pld_get_msg(pld_buf, pld_size); + if (msg_buf == NULL) return ECP_ERR; + hdr_size = msg_buf - pld_buf; + msg_size = pld_size - hdr_size; + + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.pld[idx].flags == 0); + + msgq->idx_r[mtype]++; + if (msgq->seq_start == seq) { + int i; + unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; + + for (i=0; iarr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + msgq->seq_start += i; + } + + if (_msg_size < msg_size) return ECP_ERR_FULL; + if (msg_size) memcpy(msg, msg_buf, msg_size); + return msg_size; +} + +ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) { + ECPRBConn *_conn; + ssize_t rv; + + _conn = ecp_rbuf_get_rbconn(conn); + if (_conn == NULL) return ECP_ERR; + + pthread_mutex_lock(&_conn->recv->mutex); + rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout); + pthread_mutex_unlock(&_conn->recv->mutex); + + return rv; +} diff --git a/ecp/src/ecp/ext/msgq.h b/ecp/src/ecp/ext/msgq.h new file mode 100644 index 0000000..dddb6e7 --- /dev/null +++ b/ecp/src/ecp/ext/msgq.h @@ -0,0 +1,19 @@ +#define ECP_MSGQ_MAX_MSG 32 + +#define ECP_MSGQ_MAX_MTYPE (ECP_MAX_MTYPE) + +typedef struct ECPMsgQ { + unsigned short idx_w[ECP_MSGQ_MAX_MTYPE]; + unsigned short idx_r[ECP_MSGQ_MAX_MTYPE]; + ecp_seq_t seq_start; + ecp_seq_t seq_max; + ecp_seq_t seq_msg[ECP_MSGQ_MAX_MTYPE][ECP_MSGQ_MAX_MSG]; + pthread_cond_t cond[ECP_MSGQ_MAX_MTYPE]; +} ECPMsgQ; + +int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq); +void ecp_msgq_destroy(ECPRBConn *conn); +void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype); +ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout); +ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout); \ No newline at end of file diff --git a/ecp/src/ecp/ext/rbuf.c b/ecp/src/ecp/ext/rbuf.c new file mode 100644 index 0000000..70ee0d2 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf.c @@ -0,0 +1,113 @@ +#include + +#include + +#include "rbuf.h" + +ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn) { + if (ecp_conn_has_rbuf(conn)) return (ECPRBConn *)conn; + return NULL; +} + +ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn) { + return &conn->b; +} + +void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) { + rbuf->seq_max = seq; + rbuf->seq_start = seq + 1; +} + +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) { + ecp_seq_t seq_offset = seq - rbuf->seq_start; + + /* This also checks for seq_start <= seq if seq type range >> rbuf->arr_size */ + if (seq_offset >= rbuf->arr_size) return ECP_ERR_FULL; + + if (idx) *idx = ECP_RBUF_IDX_MASK(rbuf->idx_start + seq_offset, rbuf->arr_size); + return ECP_OK; +} + +void ecp_rbuf_conn_init(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + ecp_conn_set_flags(_conn, ECP_CONN_FLAG_RBUF); + conn->send = NULL; + conn->recv = NULL; + conn->iter = NULL; +} + +int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + int rv; + + rv = ecp_conn_create(_conn, sock, type); + if (rv) return rv; + + ecp_rbuf_conn_init(conn); + return ECP_OK; +} + +int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + int rv; + + rv = ecp_conn_create_inb(_conn, sock, type); + if (rv) return rv; + + ecp_rbuf_conn_init(conn); + return ECP_OK; +} + +void ecp_rbuf_destroy(ECPRBConn *conn) { + if (conn->send) ecp_rbsend_destroy(conn); + if (conn->recv) ecp_rbrecv_destroy(conn); + conn->iter = NULL; +} + +void ecp_rbuf_start(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + if (conn->send) { + ecp_seq_t seq_out; + + seq_out = (ecp_seq_t)(_conn->nonce_out); + ecp_rbsend_start(conn, seq_out); + } + + if (conn->recv) { + ecp_seq_t seq_in; + + seq_in = (ecp_seq_t)(_conn->nonce_in); + ecp_rbrecv_start(conn, seq_in); + } +} + +ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) { + switch (mtype) { + case ECP_MTYPE_RBACK: + if (conn->send) return ecp_rbuf_handle_ack(conn, msg, msg_size); + break; + + case ECP_MTYPE_RBNOP: + if (conn->recv) return ecp_rbuf_handle_nop(conn, msg, msg_size); + break; + + case ECP_MTYPE_RBFLUSH: + if (conn->recv) return ecp_rbuf_handle_flush(conn); + break; + + default: + break; + } + + return ECP_ERR_MTYPE; +} + +int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err) { + if (conn->recv && (mtype == ECP_MTYPE_RBTIMER)) { + ecp_rbuf_handle_timer(conn); + return ECP_OK; + } + return ECP_PASS; +} diff --git a/ecp/src/ecp/ext/rbuf.h b/ecp/src/ecp/ext/rbuf.h new file mode 100644 index 0000000..36ff963 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf.h @@ -0,0 +1,123 @@ +#define ECP_RBUF_FLAG_IN_RBUF 0x01 +#define ECP_RBUF_FLAG_IN_MSGQ 0x02 +#define ECP_RBUF_FLAG_IN_TIMER 0x04 +#define ECP_RBUF_FLAG_SKIP 0x08 + +#define ECP_RBUF_FLAG_CCONTROL 0x01 +#define ECP_RBUF_FLAG_RELIABLE 0x02 + + +#define ECP_MTYPE_RBNOP (0x08 | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBACK (0x09 | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBFLUSH (0x0a | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBTIMER (0x0b | ECP_MTYPE_FLAG_SYS) + +#define ECP_ERR_RBUF_DUP -100 +#define ECP_ERR_RBUF_TIMER -101 + +#define ecp_rbuf_skip(mtype) (mtype & ECP_MTYPE_FLAG_SYS) + +/* size must be power of 2 */ +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) + +typedef uint32_t ecp_win_t; + +struct ECPMsgQ; +struct ECPFragIter; + +typedef struct ECPRBPayload { + unsigned char buf[ECP_MAX_PLD]; + size_t size; + unsigned char flags; +} ECPRBPayload; + +typedef struct ECPRBPacket { + unsigned char buf[ECP_MAX_PKT]; + size_t size; + unsigned char flags; +} ECPRBPacket; + +typedef struct ECPRBuffer { + ecp_seq_t seq_start; + ecp_seq_t seq_max; + unsigned short arr_size; + unsigned short idx_start; + union { + ECPRBPayload *pld; + ECPRBPacket *pkt; + } arr; +} ECPRBuffer; + +typedef struct ECPRBRecv { + unsigned char start; + unsigned char flags; + ecp_sts_t deliver_delay; + unsigned short hole_max; + unsigned short ack_rate; + unsigned short ack_pkt; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + ECPRBuffer rbuf; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif +#ifdef ECP_WITH_MSGQ + struct ECPMsgQ *msgq; +#endif +} ECPRBRecv; + +typedef struct ECPRBSend { + unsigned char start; + unsigned char flags; + ecp_win_t win_size; + ecp_win_t in_transit; + ecp_win_t cnt_cc; + ecp_seq_t seq_cc; + ecp_seq_t seq_flush; + ecp_seq_t seq_nack; + unsigned char flush; + unsigned int nack_rate; + ECPRBuffer rbuf; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif +} ECPRBSend; + +typedef struct ECPRBConn { + ECPConnection b; + ECPRBRecv *recv; + ECPRBSend *send; + struct ECPFragIter *iter; +} ECPRBConn; + +ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn); +ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn); +void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx); +void ecp_rbuf_conn_init(ECPRBConn *conn); +int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type); +int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type); +void ecp_rbuf_destroy(ECPRBConn *conn); +void ecp_rbuf_start(ECPRBConn *conn); +ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs); +int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err); + +/* send */ +ssize_t ecp_rbuf_send_flush(ECPRBConn *conn); +ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size); +int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size); +void ecp_rbsend_destroy(ECPRBConn *conn); +void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size); +int ecp_rbuf_flush(ECPRBConn *conn); +ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti); + +ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size); +ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn); +void ecp_rbuf_handle_timer(ECPRBConn *conn) ; +int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size); +void ecp_rbrecv_destroy(ECPRBConn *conn); +void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max); +int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay); +ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size); diff --git a/ecp/src/ecp/ext/rbuf_recv.c b/ecp/src/ecp/ext/rbuf_recv.c new file mode 100644 index 0000000..4c95be6 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf_recv.c @@ -0,0 +1,473 @@ +#include +#include + +#include +#include + +#include "rbuf.h" + +#ifdef ECP_WITH_MSGQ +#include "msgq.h" +#endif + +#define ACK_RATE 8 +#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) + +static ssize_t msg_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *pld, size_t pld_size) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + unsigned char flags; + int skip; + int rv; + + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return rv; + + if (rbuf->arr.pld[idx].flags) return ECP_ERR_RBUF_DUP; + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) { + ecp_seq_t seq_offset; + +#ifndef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + seq_offset = seq - buf->msgq->seq_start; + if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL; + +#ifndef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (rv) return rv; + } +#endif + + skip = ecp_rbuf_skip(mtype); + flags = ECP_RBUF_FLAG_IN_RBUF; + if (skip) flags |= ECP_RBUF_FLAG_SKIP; + rbuf->arr.pld[idx].flags = flags; + + if ((mtype == ECP_MTYPE_RBNOP) && pld) { + return ecp_pld_handle_one(_conn, seq, pld, pld_size, NULL); + } else if (skip) { + return 0; + } + + if (pld && pld_size) memcpy(rbuf->arr.pld[idx].buf, pld, pld_size); + rbuf->arr.pld[idx].size = pld_size; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + return pld_size; +} + +static void msg_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq; + unsigned short idx; + int i; + +#ifdef ECP_WITH_MSGQ +#ifndef ECP_WITH_RTHD + if (buf->msgq) pthread_mutex_lock(&buf->mutex); +#endif +#endif + + seq = rbuf->seq_start; + idx = rbuf->idx_start; + + while (ECP_SEQ_LTE(seq, rbuf->seq_max)) { + if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ecp_pts_t msg_pts; + int rv; + + rv = ecp_pld_get_pts(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &msg_pts); + if (!rv && buf->deliver_delay) { + ecp_sts_t now = ecp_tm_abstime_ms(0); + + msg_pts += buf->deliver_delay; + if (ECP_PTS_LT(now, msg_pts)) { + if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) { + ECPTimerItem ti; + + ecp_timer_item_init(&ti, _conn, ECP_MTYPE_RBTIMER, NULL, 0, msg_pts - now); + rv = ecp_timer_push(&ti); + if (!rv) rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_TIMER; + } + break; + } else if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER) { + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER; + } + } + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) { + unsigned char mtype; + + rv = ecp_pld_get_type(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &mtype); + if (!rv) rv = ecp_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK); + if (rv) break; + + rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + } else +#endif + ecp_pld_handle(_conn, seq, rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, NULL); + } else { + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_SKIP; + } + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + // if (rbuf->arr.pld[idx].flags == 0); + } else { + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; + if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; + } + seq++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + rbuf->seq_start = seq; + rbuf->idx_start = idx; + +#ifdef ECP_WITH_MSGQ +#ifndef ECP_WITH_RTHD + if (buf->msgq) pthread_mutex_unlock(&buf->mutex); +#endif +#endif +} + +static int ack_shift(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + int do_ack = 0; + int in_rbuf = 0; + int rv; + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + + /* walks through messages that are not delivered yet, so no need for msgq mutex lock */ + while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { + buf->seq_ack++; + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx); + if (!rv) { + in_rbuf = rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF; + } else { + in_rbuf = 1; + } + if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; + + buf->ack_map = buf->ack_map << 1; + if (in_rbuf) { + buf->ack_map |= 1; + } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) { + do_ack = 1; + } + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) { + do_ack = 1; + break; + } + } + + return do_ack; +} + +static int ack_send(ECPRBConn *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; + unsigned char *_buf; + ssize_t rv; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); + _buf = ecp_pld_get_msg(payload.buffer, payload.size); + _buf[0] = seq_ack >> 24; + _buf[1] = seq_ack >> 16; + _buf[2] = seq_ack >> 8; + _buf[3] = seq_ack; + _buf[4] = ack_map >> 24; + _buf[5] = ack_map >> 16; + _buf[6] = ack_map >> 8; + _buf[7] = ack_map; + + rv = ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); + if (rv < 0) return rv; + + buf->ack_pkt = 0; + + return ECP_OK; +} + +ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + size_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); + int i; + + if (msg_size < rsize) return ECP_ERR_SIZE; + + seq_ack = \ + ((ecp_seq_t)msg[0] << 24) | \ + ((ecp_seq_t)msg[1] << 16) | \ + ((ecp_seq_t)msg[2] << 8) | \ + ((ecp_seq_t)msg[3]); + ack_map = \ + ((ecp_ack_t)msg[4] << 24) | \ + ((ecp_ack_t)msg[5] << 16) | \ + ((ecp_ack_t)msg[6] << 8) | \ + ((ecp_ack_t)msg[7]); + + seq_ack -= (ECP_SIZE_ACKB - 1); + for (i=0; i> 1; + } + + return rsize; +} + +ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + ack_send(conn, buf->seq_ack, buf->ack_map); + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return 0; +} + +void ecp_rbuf_handle_timer(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + msg_flush(conn); + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) { + ECPRBuffer *rbuf = &buf->rbuf; + int rv; + + memset(buf, 0, sizeof(ECPRBRecv)); + memset(pld, 0, sizeof(ECPRBPayload) * pld_size); + + buf->ack_map = ECP_ACK_FULL; + buf->ack_rate = ACK_RATE; + rbuf->arr.pld = pld; + rbuf->arr_size = pld_size; + +#ifdef ECP_WITH_PTHREAD + rv = pthread_mutex_init(&buf->mutex, NULL); + if (rv) return ECP_ERR; +#endif + + conn->recv = buf; + return ECP_OK; +} + +void ecp_rbrecv_destroy(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) ecp_msgq_destroy(conn); +#endif + + conn->recv = NULL; +} + +void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->start = 1; + buf->seq_ack = seq; + _ecp_rbuf_start(rbuf, seq); + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) ecp_msgq_start(conn, seq); +#endif + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->hole_max = hole_max; + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->deliver_delay = delay; + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char mtype; + unsigned short ack_pkt = 0; + int do_ack = 0; + int _rv; + ssize_t rv; + + _rv = ecp_pld_get_type(pld, pld_size, &mtype); + if (_rv) return _rv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + if (!buf->start) { + rv = 0; + goto rbuf_store_fin; + } + + if (ECP_SEQ_LT(rbuf->seq_max, seq)) { + ack_pkt = seq - rbuf->seq_max; + } + if (ECP_SEQ_LTE(seq, buf->seq_ack)) { + ecp_seq_t seq_offset = buf->seq_ack - seq; + if (seq_offset < ECP_SIZE_ACKB) { + ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset); + + if (ack_bit & buf->ack_map) { + rv = ECP_ERR_RBUF_DUP; + goto rbuf_store_fin; + } + + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + + buf->ack_map |= ack_bit; + /* reliable transport can prevent seq_ack from reaching seq_max */ + if (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { + do_ack = ack_shift(conn); + } + } else { + rv = ECP_ERR_RBUF_DUP; + goto rbuf_store_fin; + } + } else { + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; + + if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + int deliver = 1; +#ifdef ECP_WITH_MSGQ + if (buf->msgq) deliver = 0; +#endif + if (buf->deliver_delay) deliver = 0; + if (deliver) { + /* receive buffer is empty, so no need for msgq mutex lock */ + rv = 0; + rbuf->seq_max++; + rbuf->seq_start++; + rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size); + } else { + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + } + buf->seq_ack++; + } else { + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + + do_ack = ack_shift(conn); + } + } + msg_flush(conn); + if (ack_pkt) { + buf->ack_pkt += ack_pkt; + if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1; + } + if (do_ack) { + ecp_seq_t seq_ack = buf->seq_ack; + ecp_seq_t ack_map = buf->ack_map; + + /* account for missing mackets within hole_max range */ + if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) { + unsigned short h_bits = buf->hole_max + 1; + ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits); + + if ((ack_map & h_mask) != h_mask) { + h_mask = ~(~((ecp_seq_t)0) >> h_bits); + seq_ack -= h_bits; + ack_map = (ack_map >> h_bits) | h_mask; + } + } + _rv = ack_send(conn, seq_ack, ack_map); + if (_rv) { + rv = _rv; + goto rbuf_store_fin; + } + } + +rbuf_store_fin: + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return rv; +} diff --git a/ecp/src/ecp/ext/rbuf_send.c b/ecp/src/ecp/ext/rbuf_send.c new file mode 100644 index 0000000..3dc777c --- /dev/null +++ b/ecp/src/ecp/ext/rbuf_send.c @@ -0,0 +1,419 @@ +#include +#include + +#include + +#include "rbuf.h" + +#define NACK_RATE_UNIT 10000 + +#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) + +static void cc_flush(ECPRBConn *conn, unsigned char flags) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + int rv; + + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx); + if (rv) return; + + while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) { + ECPBuffer packet; + + if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break; + + if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, flags, NULL, &_conn->remote.addr); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->cnt_cc--; + buf->in_transit++; + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + } + buf->seq_cc++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->seq_start = buf->seq_cc; + rbuf->idx_start = idx; + } +} + +static ssize_t _rbuf_send_flush(ECPConnection *_conn, ECPTimerItem *ti) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBFLUSH); + + return ecp_pld_send_wtimer(_conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, ti); +} + +ssize_t ecp_rbuf_send_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + return ecp_timer_send(_conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); +} + +ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + ssize_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); + ecp_seq_t seq_start; + ecp_seq_t seq_max; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + unsigned short idx; + unsigned short msg_cnt; + int do_flush = 0; + int i; + int rv; + + if (msg_size < rsize) return ECP_ERR_SIZE; + + seq_ack = \ + ((ecp_seq_t)msg[0] << 24) | \ + ((ecp_seq_t)msg[1] << 16) | \ + ((ecp_seq_t)msg[2] << 8) | \ + ((ecp_seq_t)msg[3]); + ack_map = \ + ((ecp_ack_t)msg[4] << 24) | \ + ((ecp_ack_t)msg[5] << 16) | \ + ((ecp_ack_t)msg[6] << 8) | \ + ((ecp_ack_t)msg[7]); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max); + if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR; + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, NULL); + if (rv) goto handle_ack_fin; + } + + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit = seq_max - seq_ack; + } + + if (ack_map != ECP_ACK_FULL) { + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + ecp_ack_t ack_map_nop = 0; + unsigned short nack_cnt = 0; + int nack_first = 0; + + seq_ack -= (ECP_SIZE_ACKB - 1); + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + for (i=0; iseq_nack, seq_ack)) { + nack_cnt++; + buf->seq_nack = seq_ack; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ack_map_nop |= ack_mask; + } else { + ECPBuffer packet; + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE, NULL, &_conn->remote.addr); + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit++; + } + } + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack; + } + } + } + seq_ack++; + ack_mask = ack_mask >> 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ack_map_nop) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; + unsigned char *_buf; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBNOP); + _buf = ecp_pld_get_msg(payload.buffer, payload.size); + + seq_ack--; + _buf[0] = seq_ack >> 24; + _buf[1] = seq_ack >> 16; + _buf[2] = seq_ack >> 8; + _buf[3] = seq_ack; + _buf[4] = ack_map_nop >> 24; + _buf[5] = ack_map_nop >> 16; + _buf[6] = ack_map_nop >> 8; + _buf[7] = ack_map_nop; + + ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP), ECP_SEND_FLAG_MORE); + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; + } else { + buf->nack_rate = (buf->nack_rate * 7) / 8; + seq_start = seq_ack + 1; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + msg_cnt = seq_start - rbuf->seq_start; + idx = rbuf->idx_start; + for (i=0; iarr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + rbuf->seq_start = seq_start; + rbuf->idx_start = idx; + } + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) do_flush = 1; + } + if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE); + +handle_ack_fin: + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (!rv && do_flush) { + ssize_t _rv; + + _rv = ecp_rbuf_send_flush(conn); + if (_rv < 0) rv = _rv; + } + + if (rv) return rv; + return rsize; +} + +int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { + ECPRBuffer *rbuf = &buf->rbuf; + int rv; + + memset(buf, 0, sizeof(ECPRBRecv)); + memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); + + rbuf->arr.pkt = pkt; + rbuf->arr_size = pkt_size; + +#ifdef ECP_WITH_PTHREAD + rv = pthread_mutex_init(&buf->mutex, NULL); + if (rv) return ECP_ERR; +#endif + + conn->send = buf; + return ECP_OK; +} + +void ecp_rbsend_destroy(ECPRBConn *conn) { + ECPRBSend *buf = conn->send; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif + + conn->send = NULL; +} + +void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->start = 1; + buf->seq_nack = seq; + _ecp_rbuf_start(rbuf, seq); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size) { + ECPRBSend *buf = conn->send; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->win_size = size; + if (buf->cnt_cc) cc_flush(conn, 0); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +int ecp_rbuf_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ecp_seq_t seq; + ssize_t rv; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&_conn->mutex); +#endif + seq = (ecp_seq_t)(_conn->nonce_out) - 1; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&_conn->mutex); +#endif + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; + } else { + buf->flush = 1; + buf->seq_flush = seq; + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + rv = ecp_rbuf_send_flush(conn); + if (rv < 0) return rv; + + return ECP_OK; +} + +ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti) { + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char mtype; + int rb_rel; + int rb_cc; + int do_send; + int do_skip; + int _rv = ECP_OK; + ssize_t rv = 0; + + _rv = ecp_pld_get_type(payload->buffer, pld_size, &mtype); + if (_rv) return _rv; + + do_send = 1; + do_skip = ecp_rbuf_skip(mtype); + if (ti && !do_skip) return ECP_ERR_RBUF_TIMER; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + if (!buf->start) { + rv = 0; + goto pld_send_fin; + } + + rb_rel = (buf->flags & ECP_RBUF_FLAG_RELIABLE); + rb_cc = ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && (buf->cnt_cc || (buf->in_transit >= buf->win_size))); + + if (rb_rel || rb_cc) { + ecp_seq_t seq; + unsigned short idx; + unsigned char _flags; + + _rv = ecp_pkt_get_seq(packet->buffer, pkt_size, &seq); + if (_rv) { + rv = _rv; + goto pld_send_fin; + } + + _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (_rv) rv = ECP_ERR_RBUF_DUP; + if (!rv && rbuf->arr.pkt[idx].flags) rv = ECP_ERR_RBUF_DUP; + if (rv) goto pld_send_fin; + + _flags = ECP_RBUF_FLAG_IN_RBUF; + if (do_skip) { + _flags |= ECP_RBUF_FLAG_SKIP; + } else { + do_send = 0; + } + + rbuf->arr.pkt[idx].flags = _flags; + if (!do_send) { + memcpy(rbuf->arr.pkt[idx].buf, packet->buffer, pkt_size); + rbuf->arr.pkt[idx].size = pkt_size; + rv = pld_size; + } + + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + if (rb_cc && !do_send) { + if (buf->cnt_cc == 0) buf->seq_cc = seq; + buf->cnt_cc++; + } + } + + if ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && !do_skip && do_send) { + buf->in_transit++; + } + +pld_send_fin: + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + return rv; +} diff --git a/ecp/src/ecp/msgq.c b/ecp/src/ecp/msgq.c deleted file mode 100644 index 8a5408b..0000000 --- a/ecp/src/ecp/msgq.c +++ /dev/null @@ -1,150 +0,0 @@ -#include -#include -#include - -#include "core.h" -#include "rbuf.h" -#include "msgq.h" - -#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) - -static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) { - struct timeval tv; - uint64_t us_start; - - gettimeofday(&tv, NULL); - us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec; - us_start += msec * 1000; - ts->tv_sec = us_start / 1000000; - ts->tv_nsec = (us_start % 1000000) * 1000; - return ts; -} - -int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { - int i; - int rv; - - if (conn->recv == NULL) return ECP_ERR; - - memset(msgq, 0, sizeof(ECPMsgQ)); - - for (i=0; icond[i], NULL); - if (rv) { - int j; - - for (j=0; jcond[j]); - } - return ECP_ERR; - } - } - conn->recv->msgq = msgq; - - return ECP_OK; -} - -void ecp_msgq_destroy(ECPRBConn *conn) { - ECPMsgQ *msgq = conn->recv->msgq; - int i; - - for (i=0; icond[i]); - } - - conn->recv->msgq = NULL; -} - -void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) { - ECPMsgQ *msgq = conn->recv->msgq; - - msgq->seq_max = seq; - msgq->seq_start = seq + 1; -} - -int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) { - ECPRBRecv *buf = conn->recv; - ECPMsgQ *msgq = buf->msgq; - - if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; - - if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; - if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); - - msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq; - msgq->idx_w[mtype]++; - - if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; - - return ECP_OK; -} - -ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) { - ECPRBRecv *buf = conn->recv; - ECPMsgQ *msgq = buf->msgq; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned char *pld_buf; - unsigned char *msg_buf; - size_t pld_size, hdr_size, msg_size; - ecp_seq_t seq; - unsigned short idx; - int rv; - - if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; - - if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { - if (timeout == -1) { - pthread_cond_wait(&msgq->cond[mtype], &buf->mutex); - } else { - struct timespec ts; - - rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout)); - if (rv) return ECP_ERR_TIMEOUT; - } - } - seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])]; - - rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); - if (rv) return ECP_ERR; - - pld_buf = rbuf->arr.pld[idx].buf; - pld_size = rbuf->arr.pld[idx].size; - - msg_buf = ecp_pld_get_msg(pld_buf, pld_size); - if (msg_buf == NULL) return ECP_ERR; - hdr_size = msg_buf - pld_buf; - msg_size = pld_size - hdr_size; - - rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; - // if (rbuf->arr.pld[idx].flags == 0); - - msgq->idx_r[mtype]++; - if (msgq->seq_start == seq) { - int i; - unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; - - for (i=0; iarr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; - idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); - } - msgq->seq_start += i; - } - - if (_msg_size < msg_size) return ECP_ERR_FULL; - if (msg_size) memcpy(msg, msg_buf, msg_size); - return msg_size; -} - -ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) { - ECPRBConn *_conn; - ssize_t rv; - - _conn = ecp_rbuf_get_rbconn(conn); - if (_conn == NULL) return ECP_ERR; - - pthread_mutex_lock(&_conn->recv->mutex); - rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout); - pthread_mutex_unlock(&_conn->recv->mutex); - - return rv; -} diff --git a/ecp/src/ecp/msgq.h b/ecp/src/ecp/msgq.h deleted file mode 100644 index dddb6e7..0000000 --- a/ecp/src/ecp/msgq.h +++ /dev/null @@ -1,19 +0,0 @@ -#define ECP_MSGQ_MAX_MSG 32 - -#define ECP_MSGQ_MAX_MTYPE (ECP_MAX_MTYPE) - -typedef struct ECPMsgQ { - unsigned short idx_w[ECP_MSGQ_MAX_MTYPE]; - unsigned short idx_r[ECP_MSGQ_MAX_MTYPE]; - ecp_seq_t seq_start; - ecp_seq_t seq_max; - ecp_seq_t seq_msg[ECP_MSGQ_MAX_MTYPE][ECP_MSGQ_MAX_MSG]; - pthread_cond_t cond[ECP_MSGQ_MAX_MTYPE]; -} ECPMsgQ; - -int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq); -void ecp_msgq_destroy(ECPRBConn *conn); -void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq); -int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype); -ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout); -ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout); \ No newline at end of file diff --git a/ecp/src/ecp/rbuf.c b/ecp/src/ecp/rbuf.c deleted file mode 100644 index d2e6e28..0000000 --- a/ecp/src/ecp/rbuf.c +++ /dev/null @@ -1,112 +0,0 @@ -#include - -#include "core.h" -#include "rbuf.h" - -ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn) { - if (ecp_conn_has_rbuf(conn)) return (ECPRBConn *)conn; - return NULL; -} - -ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn) { - return &conn->b; -} - -void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) { - rbuf->seq_max = seq; - rbuf->seq_start = seq + 1; -} - -int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) { - ecp_seq_t seq_offset = seq - rbuf->seq_start; - - /* This also checks for seq_start <= seq if seq type range >> rbuf->arr_size */ - if (seq_offset >= rbuf->arr_size) return ECP_ERR_FULL; - - if (idx) *idx = ECP_RBUF_IDX_MASK(rbuf->idx_start + seq_offset, rbuf->arr_size); - return ECP_OK; -} - -void ecp_rbuf_conn_init(ECPRBConn *conn) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - - ecp_conn_set_flags(_conn, ECP_CONN_FLAG_RBUF); - conn->send = NULL; - conn->recv = NULL; - conn->iter = NULL; -} - -int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - int rv; - - rv = ecp_conn_create(_conn, sock, type); - if (rv) return rv; - - ecp_rbuf_conn_init(conn); - return ECP_OK; -} - -int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - int rv; - - rv = ecp_conn_create_inb(_conn, sock, type); - if (rv) return rv; - - ecp_rbuf_conn_init(conn); - return ECP_OK; -} - -void ecp_rbuf_destroy(ECPRBConn *conn) { - if (conn->send) ecp_rbsend_destroy(conn); - if (conn->recv) ecp_rbrecv_destroy(conn); - conn->iter = NULL; -} - -void ecp_rbuf_start(ECPRBConn *conn) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - - if (conn->send) { - ecp_seq_t seq_out; - - seq_out = (ecp_seq_t)(_conn->nonce_out); - ecp_rbsend_start(conn, seq_out); - } - - if (conn->recv) { - ecp_seq_t seq_in; - - seq_in = (ecp_seq_t)(_conn->nonce_in); - ecp_rbrecv_start(conn, seq_in); - } -} - -ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) { - switch (mtype) { - case ECP_MTYPE_RBACK: - if (conn->send) return ecp_rbuf_handle_ack(conn, msg, msg_size); - break; - - case ECP_MTYPE_RBNOP: - if (conn->recv) return ecp_rbuf_handle_nop(conn, msg, msg_size); - break; - - case ECP_MTYPE_RBFLUSH: - if (conn->recv) return ecp_rbuf_handle_flush(conn); - break; - - default: - break; - } - - return ECP_ERR_MTYPE; -} - -int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err) { - if (conn->recv && (mtype == ECP_MTYPE_RBTIMER)) { - ecp_rbuf_handle_timer(conn); - return ECP_OK; - } - return ECP_PASS; -} diff --git a/ecp/src/ecp/rbuf.h b/ecp/src/ecp/rbuf.h deleted file mode 100644 index 36ff963..0000000 --- a/ecp/src/ecp/rbuf.h +++ /dev/null @@ -1,123 +0,0 @@ -#define ECP_RBUF_FLAG_IN_RBUF 0x01 -#define ECP_RBUF_FLAG_IN_MSGQ 0x02 -#define ECP_RBUF_FLAG_IN_TIMER 0x04 -#define ECP_RBUF_FLAG_SKIP 0x08 - -#define ECP_RBUF_FLAG_CCONTROL 0x01 -#define ECP_RBUF_FLAG_RELIABLE 0x02 - - -#define ECP_MTYPE_RBNOP (0x08 | ECP_MTYPE_FLAG_SYS) -#define ECP_MTYPE_RBACK (0x09 | ECP_MTYPE_FLAG_SYS) -#define ECP_MTYPE_RBFLUSH (0x0a | ECP_MTYPE_FLAG_SYS) -#define ECP_MTYPE_RBTIMER (0x0b | ECP_MTYPE_FLAG_SYS) - -#define ECP_ERR_RBUF_DUP -100 -#define ECP_ERR_RBUF_TIMER -101 - -#define ecp_rbuf_skip(mtype) (mtype & ECP_MTYPE_FLAG_SYS) - -/* size must be power of 2 */ -#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) - -typedef uint32_t ecp_win_t; - -struct ECPMsgQ; -struct ECPFragIter; - -typedef struct ECPRBPayload { - unsigned char buf[ECP_MAX_PLD]; - size_t size; - unsigned char flags; -} ECPRBPayload; - -typedef struct ECPRBPacket { - unsigned char buf[ECP_MAX_PKT]; - size_t size; - unsigned char flags; -} ECPRBPacket; - -typedef struct ECPRBuffer { - ecp_seq_t seq_start; - ecp_seq_t seq_max; - unsigned short arr_size; - unsigned short idx_start; - union { - ECPRBPayload *pld; - ECPRBPacket *pkt; - } arr; -} ECPRBuffer; - -typedef struct ECPRBRecv { - unsigned char start; - unsigned char flags; - ecp_sts_t deliver_delay; - unsigned short hole_max; - unsigned short ack_rate; - unsigned short ack_pkt; - ecp_seq_t seq_ack; - ecp_ack_t ack_map; - ECPRBuffer rbuf; -#ifdef ECP_WITH_PTHREAD - pthread_mutex_t mutex; -#endif -#ifdef ECP_WITH_MSGQ - struct ECPMsgQ *msgq; -#endif -} ECPRBRecv; - -typedef struct ECPRBSend { - unsigned char start; - unsigned char flags; - ecp_win_t win_size; - ecp_win_t in_transit; - ecp_win_t cnt_cc; - ecp_seq_t seq_cc; - ecp_seq_t seq_flush; - ecp_seq_t seq_nack; - unsigned char flush; - unsigned int nack_rate; - ECPRBuffer rbuf; -#ifdef ECP_WITH_PTHREAD - pthread_mutex_t mutex; -#endif -} ECPRBSend; - -typedef struct ECPRBConn { - ECPConnection b; - ECPRBRecv *recv; - ECPRBSend *send; - struct ECPFragIter *iter; -} ECPRBConn; - -ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn); -ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn); -void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); -int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx); -void ecp_rbuf_conn_init(ECPRBConn *conn); -int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type); -int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type); -void ecp_rbuf_destroy(ECPRBConn *conn); -void ecp_rbuf_start(ECPRBConn *conn); -ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs); -int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err); - -/* send */ -ssize_t ecp_rbuf_send_flush(ECPRBConn *conn); -ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size); -int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size); -void ecp_rbsend_destroy(ECPRBConn *conn); -void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq); -int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size); -int ecp_rbuf_flush(ECPRBConn *conn); -ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti); - -ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size); -ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn); -void ecp_rbuf_handle_timer(ECPRBConn *conn) ; -int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size); -void ecp_rbrecv_destroy(ECPRBConn *conn); -void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq); -int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max); -int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay); -ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size); diff --git a/ecp/src/ecp/rbuf_recv.c b/ecp/src/ecp/rbuf_recv.c deleted file mode 100644 index 1d50494..0000000 --- a/ecp/src/ecp/rbuf_recv.c +++ /dev/null @@ -1,473 +0,0 @@ -#include -#include - -#include "core.h" -#include "tm.h" - -#include "rbuf.h" - -#ifdef ECP_WITH_MSGQ -#include "msgq.h" -#endif - -#define ACK_RATE 8 -#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) - -static ssize_t msg_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *pld, size_t pld_size) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned short idx; - unsigned char flags; - int skip; - int rv; - - rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); - if (rv) return rv; - - if (rbuf->arr.pld[idx].flags) return ECP_ERR_RBUF_DUP; - -#ifdef ECP_WITH_MSGQ - if (buf->msgq) { - ecp_seq_t seq_offset; - -#ifndef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - seq_offset = seq - buf->msgq->seq_start; - if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL; - -#ifndef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif - - if (rv) return rv; - } -#endif - - skip = ecp_rbuf_skip(mtype); - flags = ECP_RBUF_FLAG_IN_RBUF; - if (skip) flags |= ECP_RBUF_FLAG_SKIP; - rbuf->arr.pld[idx].flags = flags; - - if ((mtype == ECP_MTYPE_RBNOP) && pld) { - return ecp_pld_handle_one(_conn, seq, pld, pld_size, NULL); - } else if (skip) { - return 0; - } - - if (pld && pld_size) memcpy(rbuf->arr.pld[idx].buf, pld, pld_size); - rbuf->arr.pld[idx].size = pld_size; - if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - - return pld_size; -} - -static void msg_flush(ECPRBConn *conn) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - ecp_seq_t seq; - unsigned short idx; - int i; - -#ifdef ECP_WITH_MSGQ -#ifndef ECP_WITH_RTHD - if (buf->msgq) pthread_mutex_lock(&buf->mutex); -#endif -#endif - - seq = rbuf->seq_start; - idx = rbuf->idx_start; - - while (ECP_SEQ_LTE(seq, rbuf->seq_max)) { - if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_SKIP)) { - ecp_pts_t msg_pts; - int rv; - - rv = ecp_pld_get_pts(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &msg_pts); - if (!rv && buf->deliver_delay) { - ecp_sts_t now = ecp_tm_abstime_ms(0); - - msg_pts += buf->deliver_delay; - if (ECP_PTS_LT(now, msg_pts)) { - if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) { - ECPTimerItem ti; - - ecp_timer_item_init(&ti, _conn, ECP_MTYPE_RBTIMER, NULL, 0, msg_pts - now); - rv = ecp_timer_push(&ti); - if (!rv) rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_TIMER; - } - break; - } else if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER) { - rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER; - } - } - -#ifdef ECP_WITH_MSGQ - if (buf->msgq) { - unsigned char mtype; - - rv = ecp_pld_get_type(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &mtype); - if (!rv) rv = ecp_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK); - if (rv) break; - - rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; - } else -#endif - ecp_pld_handle(_conn, seq, rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, NULL); - } else { - rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_SKIP; - } - rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; - // if (rbuf->arr.pld[idx].flags == 0); - } else { - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; - if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; - } - seq++; - idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); - } - rbuf->seq_start = seq; - rbuf->idx_start = idx; - -#ifdef ECP_WITH_MSGQ -#ifndef ECP_WITH_RTHD - if (buf->msgq) pthread_mutex_unlock(&buf->mutex); -#endif -#endif -} - -static int ack_shift(ECPRBConn *conn) { - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned short idx; - int do_ack = 0; - int in_rbuf = 0; - int rv; - - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; - - /* walks through messages that are not delivered yet, so no need for msgq mutex lock */ - while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { - buf->seq_ack++; - rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx); - if (!rv) { - in_rbuf = rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF; - } else { - in_rbuf = 1; - } - if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; - - buf->ack_map = buf->ack_map << 1; - if (in_rbuf) { - buf->ack_map |= 1; - } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) { - do_ack = 1; - } - - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) { - do_ack = 1; - break; - } - } - - return do_ack; -} - -static int ack_send(ECPRBConn *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBRecv *buf = conn->recv; - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; - unsigned char *_buf; - ssize_t rv; - - packet.buffer = pkt_buf; - packet.size = sizeof(pkt_buf); - payload.buffer = pld_buf; - payload.size = sizeof(pld_buf); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); - _buf = ecp_pld_get_msg(payload.buffer, payload.size); - _buf[0] = seq_ack >> 24; - _buf[1] = seq_ack >> 16; - _buf[2] = seq_ack >> 8; - _buf[3] = seq_ack; - _buf[4] = ack_map >> 24; - _buf[5] = ack_map >> 16; - _buf[6] = ack_map >> 8; - _buf[7] = ack_map; - - rv = ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); - if (rv < 0) return rv; - - buf->ack_pkt = 0; - - return ECP_OK; -} - -ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - ecp_seq_t seq_ack; - ecp_ack_t ack_map; - ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); - size_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); - int i; - - if (msg_size < rsize) return ECP_ERR_SIZE; - - seq_ack = \ - ((ecp_seq_t)msg[0] << 24) | \ - ((ecp_seq_t)msg[1] << 16) | \ - ((ecp_seq_t)msg[2] << 8) | \ - ((ecp_seq_t)msg[3]); - ack_map = \ - ((ecp_ack_t)msg[4] << 24) | \ - ((ecp_ack_t)msg[5] << 16) | \ - ((ecp_ack_t)msg[6] << 8) | \ - ((ecp_ack_t)msg[7]); - - seq_ack -= (ECP_SIZE_ACKB - 1); - for (i=0; i> 1; - } - - return rsize; -} - -ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn) { - ECPRBRecv *buf = conn->recv; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - ack_send(conn, buf->seq_ack, buf->ack_map); - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif - - return 0; -} - -void ecp_rbuf_handle_timer(ECPRBConn *conn) { - ECPRBRecv *buf = conn->recv; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - msg_flush(conn); - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif -} - -int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) { - ECPRBuffer *rbuf = &buf->rbuf; - int rv; - - memset(buf, 0, sizeof(ECPRBRecv)); - memset(pld, 0, sizeof(ECPRBPayload) * pld_size); - - buf->ack_map = ECP_ACK_FULL; - buf->ack_rate = ACK_RATE; - rbuf->arr.pld = pld; - rbuf->arr_size = pld_size; - -#ifdef ECP_WITH_PTHREAD - rv = pthread_mutex_init(&buf->mutex, NULL); - if (rv) return ECP_ERR; -#endif - - conn->recv = buf; - return ECP_OK; -} - -void ecp_rbrecv_destroy(ECPRBConn *conn) { - ECPRBRecv *buf = conn->recv; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_destroy(&buf->mutex); -#endif - -#ifdef ECP_WITH_MSGQ - if (buf->msgq) ecp_msgq_destroy(conn); -#endif - - conn->recv = NULL; -} - -void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq) { - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->start = 1; - buf->seq_ack = seq; - _ecp_rbuf_start(rbuf, seq); - -#ifdef ECP_WITH_MSGQ - if (buf->msgq) ecp_msgq_start(conn, seq); -#endif - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif -} - -int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max) { - ECPRBRecv *buf = conn->recv; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->hole_max = hole_max; - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif - - return ECP_OK; -} - -int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay) { - ECPRBRecv *buf = conn->recv; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->deliver_delay = delay; - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif - - return ECP_OK; -} - -ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size) { - ECPRBRecv *buf = conn->recv; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned char mtype; - unsigned short ack_pkt = 0; - int do_ack = 0; - int _rv; - ssize_t rv; - - _rv = ecp_pld_get_type(pld, pld_size, &mtype); - if (_rv) return _rv; - -#ifdef ECP_WITH_RTHD - pthread_mutex_lock(&buf->mutex); -#endif - - if (!buf->start) { - rv = 0; - goto rbuf_store_fin; - } - - if (ECP_SEQ_LT(rbuf->seq_max, seq)) { - ack_pkt = seq - rbuf->seq_max; - } - if (ECP_SEQ_LTE(seq, buf->seq_ack)) { - ecp_seq_t seq_offset = buf->seq_ack - seq; - if (seq_offset < ECP_SIZE_ACKB) { - ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset); - - if (ack_bit & buf->ack_map) { - rv = ECP_ERR_RBUF_DUP; - goto rbuf_store_fin; - } - - rv = msg_store(conn, seq, mtype, pld, pld_size); - if (rv < 0) goto rbuf_store_fin; - - buf->ack_map |= ack_bit; - /* reliable transport can prevent seq_ack from reaching seq_max */ - if (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { - do_ack = ack_shift(conn); - } - } else { - rv = ECP_ERR_RBUF_DUP; - goto rbuf_store_fin; - } - } else { - unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; - - if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { - int deliver = 1; -#ifdef ECP_WITH_MSGQ - if (buf->msgq) deliver = 0; -#endif - if (buf->deliver_delay) deliver = 0; - if (deliver) { - /* receive buffer is empty, so no need for msgq mutex lock */ - rv = 0; - rbuf->seq_max++; - rbuf->seq_start++; - rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size); - } else { - rv = msg_store(conn, seq, mtype, pld, pld_size); - if (rv < 0) goto rbuf_store_fin; - } - buf->seq_ack++; - } else { - rv = msg_store(conn, seq, mtype, pld, pld_size); - if (rv < 0) goto rbuf_store_fin; - - do_ack = ack_shift(conn); - } - } - msg_flush(conn); - if (ack_pkt) { - buf->ack_pkt += ack_pkt; - if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1; - } - if (do_ack) { - ecp_seq_t seq_ack = buf->seq_ack; - ecp_seq_t ack_map = buf->ack_map; - - /* account for missing mackets within hole_max range */ - if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) { - unsigned short h_bits = buf->hole_max + 1; - ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits); - - if ((ack_map & h_mask) != h_mask) { - h_mask = ~(~((ecp_seq_t)0) >> h_bits); - seq_ack -= h_bits; - ack_map = (ack_map >> h_bits) | h_mask; - } - } - _rv = ack_send(conn, seq_ack, ack_map); - if (_rv) { - rv = _rv; - goto rbuf_store_fin; - } - } - -rbuf_store_fin: - -#ifdef ECP_WITH_RTHD - pthread_mutex_unlock(&buf->mutex); -#endif - - return rv; -} diff --git a/ecp/src/ecp/rbuf_send.c b/ecp/src/ecp/rbuf_send.c deleted file mode 100644 index 4a7cb2c..0000000 --- a/ecp/src/ecp/rbuf_send.c +++ /dev/null @@ -1,418 +0,0 @@ -#include -#include - -#include "core.h" -#include "rbuf.h" - -#define NACK_RATE_UNIT 10000 - -#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) -#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) - -static void cc_flush(ECPRBConn *conn, unsigned char flags) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBSend *buf = conn->send; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned short idx; - int rv; - - rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx); - if (rv) return; - - while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) { - ECPBuffer packet; - - if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break; - - if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - packet.buffer = rbuf->arr.pkt[idx].buf; - packet.size = ECP_MAX_PKT; - ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, flags, NULL, &_conn->remote.addr); - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->cnt_cc--; - buf->in_transit++; - } - if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { - rbuf->arr.pkt[idx].flags = 0; - // if (rbuf->arr.pkt[idx].flags == 0); - } - buf->seq_cc++; - idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); - } - if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { - rbuf->seq_start = buf->seq_cc; - rbuf->idx_start = idx; - } -} - -static ssize_t _rbuf_send_flush(ECPConnection *_conn, ECPTimerItem *ti) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; - - packet.buffer = pkt_buf; - packet.size = sizeof(pkt_buf); - payload.buffer = pld_buf; - payload.size = sizeof(pld_buf); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBFLUSH); - - return ecp_pld_send_wtimer(_conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, ti); -} - -ssize_t ecp_rbuf_send_flush(ECPRBConn *conn) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - - return ecp_timer_send(_conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); -} - -ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBSend *buf = conn->send; - ECPRBuffer *rbuf = &buf->rbuf; - ssize_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); - ecp_seq_t seq_start; - ecp_seq_t seq_max; - ecp_seq_t seq_ack; - ecp_ack_t ack_map; - unsigned short idx; - unsigned short msg_cnt; - int do_flush = 0; - int i; - int rv; - - if (msg_size < rsize) return ECP_ERR_SIZE; - - seq_ack = \ - ((ecp_seq_t)msg[0] << 24) | \ - ((ecp_seq_t)msg[1] << 16) | \ - ((ecp_seq_t)msg[2] << 8) | \ - ((ecp_seq_t)msg[3]); - ack_map = \ - ((ecp_ack_t)msg[4] << 24) | \ - ((ecp_ack_t)msg[5] << 16) | \ - ((ecp_ack_t)msg[6] << 8) | \ - ((ecp_ack_t)msg[7]); - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max); - if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR; - - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, NULL); - if (rv) goto handle_ack_fin; - } - - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { - buf->in_transit = seq_max - seq_ack; - } - - if (ack_map != ECP_ACK_FULL) { - ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); - ecp_ack_t ack_map_nop = 0; - unsigned short nack_cnt = 0; - int nack_first = 0; - - seq_ack -= (ECP_SIZE_ACKB - 1); - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); - if (rv) goto handle_ack_fin; - } - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - for (i=0; iseq_nack, seq_ack)) { - nack_cnt++; - buf->seq_nack = seq_ack; - } - - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { - ack_map_nop |= ack_mask; - } else { - ECPBuffer packet; - - packet.buffer = rbuf->arr.pkt[idx].buf; - packet.size = ECP_MAX_PKT; - ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE, NULL, &_conn->remote.addr); - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { - buf->in_transit++; - } - } - if (!nack_first) { - nack_first = 1; - seq_start = seq_ack; - } - } - } - seq_ack++; - ack_mask = ack_mask >> 1; - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); - } - - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ack_map_nop) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; - unsigned char *_buf; - - packet.buffer = pkt_buf; - packet.size = sizeof(pkt_buf); - payload.buffer = pld_buf; - payload.size = sizeof(pld_buf); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBNOP); - _buf = ecp_pld_get_msg(payload.buffer, payload.size); - - seq_ack--; - _buf[0] = seq_ack >> 24; - _buf[1] = seq_ack >> 16; - _buf[2] = seq_ack >> 8; - _buf[3] = seq_ack; - _buf[4] = ack_map_nop >> 24; - _buf[5] = ack_map_nop >> 16; - _buf[6] = ack_map_nop >> 8; - _buf[7] = ack_map_nop; - - ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP), ECP_SEND_FLAG_MORE); - } - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; - } else { - buf->nack_rate = (buf->nack_rate * 7) / 8; - seq_start = seq_ack + 1; - } - - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - msg_cnt = seq_start - rbuf->seq_start; - idx = rbuf->idx_start; - for (i=0; iarr.pkt[idx].flags = 0; - // if (rbuf->arr.pkt[idx].flags == 0); - idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); - } - rbuf->seq_start = seq_start; - rbuf->idx_start = idx; - } - if (buf->flush) { - if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; - if (buf->flush) do_flush = 1; - } - if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE); - -handle_ack_fin: - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - if (!rv && do_flush) { - ssize_t _rv; - - _rv = ecp_rbuf_send_flush(conn); - if (_rv < 0) rv = _rv; - } - - if (rv) return rv; - return rsize; -} - -int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { - ECPRBuffer *rbuf = &buf->rbuf; - int rv; - - memset(buf, 0, sizeof(ECPRBRecv)); - memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); - - rbuf->arr.pkt = pkt; - rbuf->arr_size = pkt_size; - -#ifdef ECP_WITH_PTHREAD - rv = pthread_mutex_init(&buf->mutex, NULL); - if (rv) return ECP_ERR; -#endif - - conn->send = buf; - return ECP_OK; -} - -void ecp_rbsend_destroy(ECPRBConn *conn) { - ECPRBSend *buf = conn->send; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_destroy(&buf->mutex); -#endif - - conn->send = NULL; -} - -void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq) { - ECPRBSend *buf = conn->send; - ECPRBuffer *rbuf = &buf->rbuf; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->start = 1; - buf->seq_nack = seq; - _ecp_rbuf_start(rbuf, seq); - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif -} - -int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size) { - ECPRBSend *buf = conn->send; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - buf->win_size = size; - if (buf->cnt_cc) cc_flush(conn, 0); - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - return ECP_OK; -} - -int ecp_rbuf_flush(ECPRBConn *conn) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - ECPRBSend *buf = conn->send; - ecp_seq_t seq; - ssize_t rv; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&_conn->mutex); -#endif - seq = (ecp_seq_t)(_conn->nonce_out) - 1; -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&_conn->mutex); -#endif - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - if (buf->flush) { - if (ECP_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; - } else { - buf->flush = 1; - buf->seq_flush = seq; - } - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - rv = ecp_rbuf_send_flush(conn); - if (rv < 0) return rv; - - return ECP_OK; -} - -ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti) { - ECPRBSend *buf = conn->send; - ECPRBuffer *rbuf = &buf->rbuf; - unsigned char mtype; - int rb_rel; - int rb_cc; - int do_send; - int do_skip; - int _rv = ECP_OK; - ssize_t rv = 0; - - _rv = ecp_pld_get_type(payload->buffer, pld_size, &mtype); - if (_rv) return _rv; - - do_send = 1; - do_skip = ecp_rbuf_skip(mtype); - if (ti && !do_skip) return ECP_ERR_RBUF_TIMER; - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - if (!buf->start) { - rv = 0; - goto pld_send_fin; - } - - rb_rel = (buf->flags & ECP_RBUF_FLAG_RELIABLE); - rb_cc = ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && (buf->cnt_cc || (buf->in_transit >= buf->win_size))); - - if (rb_rel || rb_cc) { - ecp_seq_t seq; - unsigned short idx; - unsigned char _flags; - - _rv = ecp_pkt_get_seq(packet->buffer, pkt_size, &seq); - if (_rv) { - rv = _rv; - goto pld_send_fin; - } - - _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); - if (_rv) rv = ECP_ERR_RBUF_DUP; - if (!rv && rbuf->arr.pkt[idx].flags) rv = ECP_ERR_RBUF_DUP; - if (rv) goto pld_send_fin; - - _flags = ECP_RBUF_FLAG_IN_RBUF; - if (do_skip) { - _flags |= ECP_RBUF_FLAG_SKIP; - } else { - do_send = 0; - } - - rbuf->arr.pkt[idx].flags = _flags; - if (!do_send) { - memcpy(rbuf->arr.pkt[idx].buf, packet->buffer, pkt_size); - rbuf->arr.pkt[idx].size = pkt_size; - rv = pld_size; - } - - if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - - if (rb_cc && !do_send) { - if (buf->cnt_cc == 0) buf->seq_cc = seq; - buf->cnt_cc++; - } - } - - if ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && !do_skip && do_send) { - buf->in_transit++; - } - -pld_send_fin: - -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif - - return rv; -} diff --git a/ecp/src/platform/posix/platform_obj.mk b/ecp/src/platform/posix/platform_obj.mk index 4157ff4..56a76c3 100644 --- a/ecp/src/platform/posix/platform_obj.mk +++ b/ecp/src/platform/posix/platform_obj.mk @@ -1,2 +1 @@ -obj += ext.o frag.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o -subdirs += htable vconn +subdirs += htable vconn ext dir -- cgit v1.2.3