From 8f53a56d06b128406cba3ce6f13696eb823e6a11 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Mon, 17 Jul 2017 22:17:51 +0200 Subject: ring buffer initial commit --- code/core/Makefile | 2 +- code/core/config.h | 1 + code/core/core.c | 16 ++++-- code/core/core.h | 13 +++-- code/core/rbuf.h | 34 +++++++++++ code/core/rbuf_recv.c | 154 ++++++++++++++++++++++++++++++++++++++++++++++++++ code/proxy/proxy.c | 2 + code/util/mknode.c | 1 + 8 files changed, 212 insertions(+), 11 deletions(-) create mode 100644 code/core/rbuf.h create mode 100644 code/core/rbuf_recv.c diff --git a/code/core/Makefile b/code/core/Makefile index f0e46a1..550c0cf 100644 --- a/code/core/Makefile +++ b/code/core/Makefile @@ -1,7 +1,7 @@ MAKE=make CFLAGS = -I. -pthread -O3 $(PIC) -obj = core.o timer.o msgq.o +obj = core.o timer.o rbuf_recv.o subdirs = crypto posix htable diff --git a/code/core/config.h b/code/core/config.h index 94efd29..7f9390f 100644 --- a/code/core/config.h +++ b/code/core/config.h @@ -1,3 +1,4 @@ #define ECP_WITH_PTHREAD 1 #define ECP_WITH_HTABLE 1 +#define ECP_WITH_RBUF 0 #define ECP_DEBUG 1 \ No newline at end of file diff --git a/code/core/core.c b/code/core/core.c index a4ccfc5..78b1028 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -426,20 +426,24 @@ int ecp_conn_create(ECPConnection *conn, ECPSocket *sock, unsigned char ctype) { #ifdef ECP_WITH_PTHREAD int rv = pthread_mutex_init(&conn->mutex, NULL); if (rv) return ECP_ERR; +#endif +#ifdef ECP_WITH_MSGQ rv = ecp_conn_msgq_create(conn); if (rv) { pthread_mutex_destroy(&conn->mutex); return ECP_ERR; } #endif - + return ECP_OK; } void ecp_conn_destroy(ECPConnection *conn) { -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ ecp_conn_msgq_destroy(conn); +#endif +#ifdef ECP_WITH_PTHREAD pthread_mutex_destroy(&conn->mutex); #endif } @@ -1087,8 +1091,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, cnt_size = pld_size-ECP_SIZE_PLD_HDR; -#ifdef WITH_RBUF - if (conn->rbuf.recv) { +#ifdef ECP_WITH_RBUF + if (conn->rbuf.recv) { proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size); } else { proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); @@ -1102,6 +1106,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy, #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn->mutex); +#endif +#ifdef ECP_WITH_MSGQ if (!rv && (cnt_size > 0)) { proc_size = ecp_conn_msgq_push(conn, payload+pld_size-cnt_size, cnt_size); if (proc_size < 0) rv = ECP_ERR_HANDLE; @@ -1227,7 +1233,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz } ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ pthread_mutex_lock(&conn->mutex); ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout); pthread_mutex_unlock(&conn->mutex); diff --git a/code/core/core.h b/code/core/core.h index 6c680f9..cb9bab6 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -70,7 +70,10 @@ #include "config.h" #include +#include + typedef long ssize_t; +typedef uint32_t ecp_seq_t; #ifdef ECP_WITH_PTHREAD #include @@ -80,8 +83,8 @@ typedef long ssize_t; #include "crypto/crypto.h" #include "timer.h" -#ifdef ECP_WITH_PTHREAD -#include "msgq.h" +#ifdef ECP_WITH_RBUF +#include "rbuf.h" #endif #ifdef ECP_DEBUG @@ -95,8 +98,6 @@ struct ECPContext; struct ECPSocket; struct ECPConnection; -typedef uint32_t ecp_seq_t; - typedef int ecp_rng_t (void *, size_t); typedef int ecp_conn_handler_new_t (struct ECPSocket *s, struct ECPConnection **c, struct ECPConnection *p, unsigned char s_idx, unsigned char c_idx, unsigned char *pub, ecp_aead_key_t *sh, unsigned char *msg, size_t sz); @@ -250,8 +251,10 @@ typedef struct ECPConnection { unsigned char key_idx_map[ECP_MAX_SOCK_KEY]; ECPDHShared shared[ECP_MAX_NODE_KEY][ECP_MAX_NODE_KEY]; unsigned char nonce[ECP_AEAD_SIZE_NONCE]; +#ifdef ECP_WITH_RBUF + ECPConnRBuffer rbuf; +#endif #ifdef ECP_WITH_PTHREAD - ECPConnMsgQ msgq; pthread_mutex_t mutex; #endif struct ECPConnection *proxy; diff --git a/code/core/rbuf.h b/code/core/rbuf.h new file mode 100644 index 0000000..9cf643d --- /dev/null +++ b/code/core/rbuf.h @@ -0,0 +1,34 @@ +#define ECP_ERR_RBUF_IDX -1 +#define ECP_ERR_RBUF_DUP -1 + +#define ECP_MAX_RBUF_MSGR 256 + +typedef uint32_t ecp_ack_t; + +typedef struct ECPRBMessage { + unsigned char msg[ECP_MAX_MSG]; + ssize_t size; + char present; +} ECPRBMessage; + +typedef struct ECPRBuffer { + unsigned char reliable; + unsigned char deliver_delay; + unsigned char hole_max; + int msg_start; + ecp_seq_t seq_ack; + ecp_seq_t seq_max; + ecp_seq_t seq_start; + ecp_ack_t ack_map; + ecp_ack_t hole_mask_full; + ecp_ack_t hole_mask_empty; + ECPRBMessage msg[ECP_MAX_RBUF_MSGR]; +} ECPRBuffer; + + +typedef struct ECPConnRBuffer { + ECPRBuffer *recv; + // ECPSBuffer *send; +} ECPConnRBuffer; + +ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); \ No newline at end of file diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c new file mode 100644 index 0000000..c2ab965 --- /dev/null +++ b/code/core/rbuf_recv.c @@ -0,0 +1,154 @@ +#include "core.h" + +#include + +#define SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) +#define ACK_FULL (~(ecp_ack_t)0) +#define ACK_SIZE (sizeof(ecp_ack_t)*8) +#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1)) + +#define SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) +#define SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) + +#define IDX_MASK(a) (a & (ECP_MAX_RBUF_MSGR-1)) +/* If ECP_MAX_RBUF_MSGR not pow of 2: +#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR) +*/ + +static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { + ecp_seq_t seq_offset = seq - rbuf->seq_start; + + if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset); + return ECP_ERR_RBUF_IDX; +} + +static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) { + int idx; + ecp_seq_t seq_offset = seq - rbuf->seq_start; + if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX; + + if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + if (delivered) return ECP_OK; + + idx = msg_idx(rbuf, seq); + if (idx < 0) return idx; + if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP; + + rbuf->msg[idx].present = 1; + + return ECP_OK; +} + +static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) { + int idx = rbuf->msg_start; + ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; + ecp_seq_t i = 0; + + for (i=0; ireliable && !rbuf->msg[idx].present) break; + if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break; + if (rbuf->msg[idx].present) { + rbuf->msg[idx].present = 0; + // deliver idx + } + idx = IDX_MASK(idx + 1); + } + rbuf->msg_start = idx; + rbuf->seq_start += i; + + return ECP_OK; +} + +static int ack_shift(ECPRBuffer *rbuf) { + int do_ack = 0; + int idx; + int i; + + if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + + idx = msg_idx(rbuf, rbuf->seq_ack); + if (idx < 0) return idx; + + while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) { + idx = IDX_MASK(idx + 1); + rbuf->seq_ack++; + + if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue; + + rbuf->ack_map = rbuf->ack_map << 1; + rbuf->ack_map |= rbuf->msg[idx].present; + if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) { + do_ack = 1; + } + + if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break; + } + + if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) { + ecp_ack_t hole_mask = rbuf->ack_map; + + for (i=0; ihole_max-1; i++) { + hole_mask = hole_mask >> 1; + if ((hole_mask & rbuf->hole_mask_empty) == 0) { + do_ack = 1; + break; + } + } + } + + return do_ack; +} + +int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { + memset(rbuf, 0, sizeof(ECPRBuffer)); + rbuf->hole_max = hole_max; + rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); + rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); + rbuf->seq_ack = seq; + rbuf->seq_max = seq; + rbuf->seq_start = seq + 1; + rbuf->ack_map = ACK_FULL; + + return ECP_OK; +} + +ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + int rv; + int do_ack = 0; + ECPRBuffer *rbuf = conn->rbuf.recv; + + if (rbuf == NULL) return ECP_ERR; + + if (SEQ_LTE(seq, rbuf->seq_ack)) { + if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) { + rv = msg_store(rbuf, seq, msg, msg_size, 0); + if (rv) return rv; + + rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq)); + do_ack = ack_shift(rbuf); + } else { + return ECP_ERR_RBUF_IDX; + } + } else { + if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) { + rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay); + if (rv) return rv; + + if (!rbuf->deliver_delay) { + rbuf->seq_start++; + // deliver + } + rbuf->seq_ack++; + } else { + rv = msg_store(rbuf, seq, msg, msg_size, 0); + if (rv) return rv; + + do_ack = ack_shift(rbuf); + } + } + // XXX + // update do_ack for pps + // should send acks more aggresively when ack_map is not full (freq > RTT) + // now send acks as per do_ack + return msg_size; +} \ No newline at end of file diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c index 6e84dc3..0f90c20 100644 --- a/code/proxy/proxy.c +++ b/code/proxy/proxy.c @@ -1,6 +1,8 @@ #include "core.h" #include "proxy.h" +#include + #ifdef ECP_WITH_PTHREAD static pthread_mutex_t key_perma_mutex; static pthread_mutex_t key_next_mutex; diff --git a/code/util/mknode.c b/code/util/mknode.c index 8f40e90..1c0a83a 100644 --- a/code/util/mknode.c +++ b/code/util/mknode.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "core.h" #include "util.h" -- cgit v1.2.3