From 50a392349a2e06ea5ff08e35cfb2558a1c97b993 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Wed, 19 Jul 2017 15:19:54 +0200 Subject: ring buffer fixes --- code/core/Makefile | 5 +- code/core/rbuf.c | 31 +++++++++ code/core/rbuf.h | 60 ++++++++++++---- code/core/rbuf_recv.c | 187 +++++++++++++++++++++++++------------------------- code/core/rbuf_send.c | 11 +++ code/proxy/proxy.c | 3 +- 6 files changed, 189 insertions(+), 108 deletions(-) create mode 100644 code/core/rbuf.c create mode 100644 code/core/rbuf_send.c (limited to 'code') diff --git a/code/core/Makefile b/code/core/Makefile index 550c0cf..84090f1 100644 --- a/code/core/Makefile +++ b/code/core/Makefile @@ -1,10 +1,13 @@ MAKE=make CFLAGS = -I. -pthread -O3 $(PIC) -obj = core.o timer.o rbuf_recv.o +obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o subdirs = crypto posix htable +%.o: %.c + $(CC) $(CFLAGS) -c $< + %.o: %.c %.h $(CC) $(CFLAGS) -c $< diff --git a/code/core/rbuf.c b/code/core/rbuf.c new file mode 100644 index 0000000..54843fa --- /dev/null +++ b/code/core/rbuf.c @@ -0,0 +1,31 @@ +#include "core.h" + +#include + +int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) { + rbuf->msg = msg; + rbuf->msg_size = msg_size; + + return ECP_OK; +} + +int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { + ecp_seq_t seq_offset = seq - rbuf->seq_start; + + // This also checks for seq_start <= seq if seq type range >> rbuf->msg_size + if (seq_offset < rbuf->msg_size) return ECP_RBUF_IDX_MASK(rbuf->msg_start + seq_offset, rbuf->msg_size); + return ECP_ERR_RBUF_IDX; +} + +ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { + int idx = ecp_rbuf_msg_idx(rbuf, seq); + if (idx < 0) return idx; + if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG; + + memcpy(rbuf->msg[idx].msg, msg, msg_size); + rbuf->msg[idx].size = msg_size; + rbuf->msg[idx].flags = set_flags; + + return msg_size; +} + diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 7804602..172f043 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,34 +1,66 @@ -#define ECP_ERR_RBUF_IDX -100 -#define ECP_ERR_RBUF_DUP -101 +#define ECP_RBUF_FLAG_PRESENT 1 +#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) +#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) -#define ECP_MAX_RBUF_MSGR 256 +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) +/* If size not 2^x: +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size)) +*/ + +#define ECP_ERR_RBUF_FLAG -100 +#define ECP_ERR_RBUF_IDX -101 +#define ECP_ERR_RBUF_DUP -102 typedef uint32_t ecp_ack_t; typedef struct ECPRBMessage { unsigned char msg[ECP_MAX_MSG]; ssize_t size; - char present; + unsigned char flags; } ECPRBMessage; -typedef struct ECPRBRecvBuffer { +typedef struct ECPRBuffer { + ecp_seq_t seq_start; + unsigned int msg_size; + unsigned int msg_start; + ECPRBMessage *msg; +} ECPRBuffer; + +typedef struct ECPRBRecv { unsigned char reliable; - unsigned char deliver_delay; - unsigned char hole_max; - int msg_start; + unsigned short deliver_delay; + unsigned short hole_max; + unsigned short ack_rate; ecp_seq_t seq_ack; ecp_seq_t seq_max; - ecp_seq_t seq_start; + ecp_seq_t ack_pkt; ecp_ack_t ack_map; ecp_ack_t hole_mask_full; ecp_ack_t hole_mask_empty; - ECPRBMessage msg[ECP_MAX_RBUF_MSGR]; -} ECPRBRecvBuffer; + ECPRBuffer rbuf; +} ECPRBRecv; +typedef struct ECPRBSend { + unsigned char reliable; + ECPRBuffer rbuf; +} ECPRBSend; typedef struct ECPConnRBuffer { - ECPRBRecvBuffer *recv; - // ECPSBuffer *send; + ECPRBRecv *recv; + ECPRBSend *send; } ECPConnRBuffer; -ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); \ No newline at end of file + +int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq); +ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); + +int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); +int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq); +int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max); +int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay); +ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); + + +ssize_t ecp_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); + diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index 281bec8..a6db276 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -7,92 +7,65 @@ #define ACK_SIZE (sizeof(ecp_ack_t)*8) #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1)) -#define SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) -#define SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) - -#define IDX_MASK(a) (a & (ECP_MAX_RBUF_MSGR-1)) -/* If ECP_MAX_RBUF_MSGR not pow of 2: -#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR) -*/ - -static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) { - ecp_seq_t seq_offset = seq - rbuf->seq_start; - - // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR - if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset); - return ECP_ERR_RBUF_IDX; -} - -ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - int idx; - ecp_seq_t seq_offset = seq - rbuf->seq_start; - - // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR - if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX; +static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT); + if (rv < 0) return ECP_ERR_RBUF_DUP; - if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - idx = msg_idx(rbuf, seq); - if (idx < 0) return idx; - if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP; - - rbuf->msg[idx].present = 1; - memcpy(rbuf->msg[idx].msg, msg, msg_size); - rbuf->msg[idx].size = msg_size; - - return msg_size; + if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq; + return rv; } -static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) { - int idx = rbuf->msg_start; - ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; +static void msg_flush(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ecp_seq_t msg_cnt = buf->seq_max - buf->rbuf.seq_start + 1; ecp_seq_t i = 0; + unsigned int idx = buf->rbuf.msg_start; for (i=0; ireliable && !rbuf->msg[idx].present) break; - if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break; - if (rbuf->msg[idx].present) { - rbuf->msg[idx].present = 0; - // deliver idx + if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break; + if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT; + ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); } - idx = IDX_MASK(idx + 1); + idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - rbuf->msg_start = idx; - rbuf->seq_start += i; - - return ECP_OK; + buf->rbuf.msg_start = idx; + buf->rbuf.seq_start += i; } -static int ack_shift(ECPRBRecvBuffer *rbuf) { +static int ack_shift(ECPRBRecv *buf) { int do_ack = 0; int idx; int i; - if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; - idx = msg_idx(rbuf, rbuf->seq_ack); + idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack); if (idx < 0) return idx; - while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) { - idx = IDX_MASK(idx + 1); - rbuf->seq_ack++; + while (ECP_RBUF_SEQ_LT(buf->seq_ack, buf->seq_max)) { + idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); + buf->seq_ack++; - if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ACK_FULL)) continue; - rbuf->ack_map = rbuf->ack_map << 1; - rbuf->ack_map |= rbuf->msg[idx].present; - if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) { + buf->ack_map = buf->ack_map << 1; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { + buf->ack_map |= 1; + } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) { do_ack = 1; } - if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break; + if ((buf->ack_map & ACK_MASK_FIRST) == 0) break; } - if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) { - ecp_ack_t hole_mask = rbuf->ack_map; + if (!do_ack && (buf->seq_ack == buf->seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) { + ecp_ack_t hole_mask = buf->ack_map; - for (i=0; ihole_max-1; i++) { + for (i=0; ihole_max-1; i++) { hole_mask = hole_mask >> 1; - if ((hole_mask & rbuf->hole_mask_empty) == 0) { + if ((hole_mask & buf->hole_mask_empty) == 0) { do_ack = 1; break; } @@ -102,62 +75,92 @@ static int ack_shift(ECPRBRecvBuffer *rbuf) { return do_ack; } -int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { - memset(rbuf, 0, sizeof(ECPRBRecvBuffer)); - rbuf->hole_max = hole_max; - rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); - rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); - rbuf->seq_ack = seq; - rbuf->seq_max = seq; - rbuf->seq_start = seq + 1; - rbuf->ack_map = ACK_FULL; +int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { + memset(buf, 0, sizeof(ECPRBRecv)); + memset(msg, 0, sizeof(ECPRBMessage) * msg_size); + ecp_rbuf_init(&buf->rbuf, msg, msg_size); + buf->ack_map = ACK_FULL; + + return ECP_OK; +} + +int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) { + buf->seq_ack = seq; + buf->seq_max = seq; + buf->rbuf.seq_start = seq + 1; + + return ECP_OK; +} + +int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) { + buf->hole_max = hole_max; + buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); + buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); + + return ECP_OK; +} + +int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) { + buf->deliver_delay = delay; + if (buf->hole_max < delay - 1) { + ecp_rbuf_recv_set_hole(buf, delay - 1); + } return ECP_OK; } ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ssize_t rv; - int do_ack = 0; - ECPRBRecvBuffer *rbuf = conn->rbuf.recv; + ecp_seq_t ack_pkt = 0; + int do_ack = 0; + ECPRBRecv *buf = conn->rbuf.recv; - if (rbuf == NULL) return ECP_ERR; + if (buf == NULL) return ECP_ERR; - if (SEQ_LTE(seq, rbuf->seq_ack)) { - ecp_seq_t seq_offset = rbuf->seq_ack - seq; + if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max; + if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { + ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP; + if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(rbuf, seq, msg, msg_size); + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; - rbuf->ack_map |= ack_mask; - do_ack = ack_shift(rbuf); + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); } else { return ECP_ERR_RBUF_IDX; } } else { - if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) { - if (rbuf->deliver_delay) { - rv = msg_store(rbuf, seq, msg, msg_size); + if ((buf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + if (buf->deliver_delay) { + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; } else { - // rv = deliver - rbuf->seq_max++; - rbuf->seq_start++; + rv = ecp_msg_handle(conn, seq, msg, msg_size); + buf->seq_max++; + buf->rbuf.seq_start++; } - rbuf->seq_ack++; + buf->seq_ack++; } else { - rv = msg_store(rbuf, seq, msg, msg_size); + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; - do_ack = ack_shift(rbuf); + do_ack = ack_shift(buf); } } - // XXX - // update do_ack for pps - // should send acks more aggresively when ack_map is not full (freq > RTT) - // now send acks as per do_ack + if (ack_pkt && !do_ack) { + buf->ack_pkt += ack_pkt; + // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT) + if (buf->ack_pkt > buf->ack_rate) do_ack = 1; + } + if (do_ack) { + buf->ack_pkt = 0; + // send ack (with seq = 0) + } + // XXX should handle close + msg_flush(conn); return rv; -} \ No newline at end of file +} diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c new file mode 100644 index 0000000..932ecdf --- /dev/null +++ b/code/core/rbuf_send.c @@ -0,0 +1,11 @@ +#include "core.h" + +#include + +static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + return size; +} + +ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + return ecp_rbuf_msg_store(&conn->rbuf.send->rbuf, seq, msg, msg_size, 0, 0); +} \ No newline at end of file diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c index 0f90c20..ba21ca1 100644 --- a/code/proxy/proxy.c +++ b/code/proxy/proxy.c @@ -389,6 +389,7 @@ int ecp_proxy_init(ECPContext *ctx) { rv = ecp_conn_handler_init(&handler_b); if (rv) return rv; + handler_b.conn_create = proxyb_create; handler_b.conn_destroy = proxyb_destroy; handler_b.conn_open = proxyb_open; @@ -402,7 +403,7 @@ int ecp_proxy_init(ECPContext *ctx) { ctx->pr.pack_raw = proxy_pack_raw; #ifdef ECP_WITH_PTHREAD - pthread_mutex_init(&key_next_mutex, NULL); + pthread_mutex_init(&key_perma_mutex, NULL); pthread_mutex_init(&key_next_mutex, NULL); #endif -- cgit v1.2.3