summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-07-19 15:19:54 +0200
committerUros Majstorovic <majstor@majstor.org>2017-07-19 15:19:54 +0200
commit50a392349a2e06ea5ff08e35cfb2558a1c97b993 (patch)
tree5139b68b7b27e290e4ae88ea4e5b127b724af55a
parent0967b98ce307a80a7576d3da0c939a757f4e6154 (diff)
ring buffer fixes
-rw-r--r--code/core/Makefile5
-rw-r--r--code/core/rbuf.c31
-rw-r--r--code/core/rbuf.h60
-rw-r--r--code/core/rbuf_recv.c187
-rw-r--r--code/core/rbuf_send.c11
-rw-r--r--code/proxy/proxy.c3
6 files changed, 189 insertions, 108 deletions
diff --git a/code/core/Makefile b/code/core/Makefile
index 550c0cf..84090f1 100644
--- a/code/core/Makefile
+++ b/code/core/Makefile
@@ -1,10 +1,13 @@
MAKE=make
CFLAGS = -I. -pthread -O3 $(PIC)
-obj = core.o timer.o rbuf_recv.o
+obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o
subdirs = crypto posix htable
+%.o: %.c
+ $(CC) $(CFLAGS) -c $<
+
%.o: %.c %.h
$(CC) $(CFLAGS) -c $<
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
new file mode 100644
index 0000000..54843fa
--- /dev/null
+++ b/code/core/rbuf.c
@@ -0,0 +1,31 @@
+#include "core.h"
+
+#include <string.h>
+
+int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
+ rbuf->msg = msg;
+ rbuf->msg_size = msg_size;
+
+ return ECP_OK;
+}
+
+int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
+ ecp_seq_t seq_offset = seq - rbuf->seq_start;
+
+ // This also checks for seq_start <= seq if seq type range >> rbuf->msg_size
+ if (seq_offset < rbuf->msg_size) return ECP_RBUF_IDX_MASK(rbuf->msg_start + seq_offset, rbuf->msg_size);
+ return ECP_ERR_RBUF_IDX;
+}
+
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
+ int idx = ecp_rbuf_msg_idx(rbuf, seq);
+ if (idx < 0) return idx;
+ if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG;
+
+ memcpy(rbuf->msg[idx].msg, msg, msg_size);
+ rbuf->msg[idx].size = msg_size;
+ rbuf->msg[idx].flags = set_flags;
+
+ return msg_size;
+}
+
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 7804602..172f043 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,34 +1,66 @@
-#define ECP_ERR_RBUF_IDX -100
-#define ECP_ERR_RBUF_DUP -101
+#define ECP_RBUF_FLAG_PRESENT 1
+#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF)
+#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF)
-#define ECP_MAX_RBUF_MSGR 256
+#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
+/* If size not 2^x:
+#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size))
+*/
+
+#define ECP_ERR_RBUF_FLAG -100
+#define ECP_ERR_RBUF_IDX -101
+#define ECP_ERR_RBUF_DUP -102
typedef uint32_t ecp_ack_t;
typedef struct ECPRBMessage {
unsigned char msg[ECP_MAX_MSG];
ssize_t size;
- char present;
+ unsigned char flags;
} ECPRBMessage;
-typedef struct ECPRBRecvBuffer {
+typedef struct ECPRBuffer {
+ ecp_seq_t seq_start;
+ unsigned int msg_size;
+ unsigned int msg_start;
+ ECPRBMessage *msg;
+} ECPRBuffer;
+
+typedef struct ECPRBRecv {
unsigned char reliable;
- unsigned char deliver_delay;
- unsigned char hole_max;
- int msg_start;
+ unsigned short deliver_delay;
+ unsigned short hole_max;
+ unsigned short ack_rate;
ecp_seq_t seq_ack;
ecp_seq_t seq_max;
- ecp_seq_t seq_start;
+ ecp_seq_t ack_pkt;
ecp_ack_t ack_map;
ecp_ack_t hole_mask_full;
ecp_ack_t hole_mask_empty;
- ECPRBMessage msg[ECP_MAX_RBUF_MSGR];
-} ECPRBRecvBuffer;
+ ECPRBuffer rbuf;
+} ECPRBRecv;
+typedef struct ECPRBSend {
+ unsigned char reliable;
+ ECPRBuffer rbuf;
+} ECPRBSend;
typedef struct ECPConnRBuffer {
- ECPRBRecvBuffer *recv;
- // ECPSBuffer *send;
+ ECPRBRecv *recv;
+ ECPRBSend *send;
} ECPConnRBuffer;
-ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); \ No newline at end of file
+
+int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
+ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
+
+int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq);
+int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max);
+int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay);
+ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+
+ssize_t ecp_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index 281bec8..a6db276 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -7,92 +7,65 @@
#define ACK_SIZE (sizeof(ecp_ack_t)*8)
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1))
-#define SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF)
-#define SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF)
-
-#define IDX_MASK(a) (a & (ECP_MAX_RBUF_MSGR-1))
-/* If ECP_MAX_RBUF_MSGR not pow of 2:
-#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR)
-*/
-
-static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) {
- ecp_seq_t seq_offset = seq - rbuf->seq_start;
-
- // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
- if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset);
- return ECP_ERR_RBUF_IDX;
-}
-
-ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- int idx;
- ecp_seq_t seq_offset = seq - rbuf->seq_start;
-
- // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
- if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX;
+static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT);
+ if (rv < 0) return ECP_ERR_RBUF_DUP;
- if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
- idx = msg_idx(rbuf, seq);
- if (idx < 0) return idx;
- if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP;
-
- rbuf->msg[idx].present = 1;
- memcpy(rbuf->msg[idx].msg, msg, msg_size);
- rbuf->msg[idx].size = msg_size;
-
- return msg_size;
+ if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq;
+ return rv;
}
-static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) {
- int idx = rbuf->msg_start;
- ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+static void msg_flush(ECPConnection *conn) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ecp_seq_t msg_cnt = buf->seq_max - buf->rbuf.seq_start + 1;
ecp_seq_t i = 0;
+ unsigned int idx = buf->rbuf.msg_start;
for (i=0; i<msg_cnt; i++) {
- if (rbuf->reliable && !rbuf->msg[idx].present) break;
- if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break;
- if (rbuf->msg[idx].present) {
- rbuf->msg[idx].present = 0;
- // deliver idx
+ if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break;
+ if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT;
+ ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
}
- idx = IDX_MASK(idx + 1);
+ idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
- rbuf->msg_start = idx;
- rbuf->seq_start += i;
-
- return ECP_OK;
+ buf->rbuf.msg_start = idx;
+ buf->rbuf.seq_start += i;
}
-static int ack_shift(ECPRBRecvBuffer *rbuf) {
+static int ack_shift(ECPRBRecv *buf) {
int do_ack = 0;
int idx;
int i;
- if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
+ if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
- idx = msg_idx(rbuf, rbuf->seq_ack);
+ idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack);
if (idx < 0) return idx;
- while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) {
- idx = IDX_MASK(idx + 1);
- rbuf->seq_ack++;
+ while (ECP_RBUF_SEQ_LT(buf->seq_ack, buf->seq_max)) {
+ idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
+ buf->seq_ack++;
- if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue;
+ if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ACK_FULL)) continue;
- rbuf->ack_map = rbuf->ack_map << 1;
- rbuf->ack_map |= rbuf->msg[idx].present;
- if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) {
+ buf->ack_map = buf->ack_map << 1;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
+ buf->ack_map |= 1;
+ } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) {
do_ack = 1;
}
- if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break;
+ if ((buf->ack_map & ACK_MASK_FIRST) == 0) break;
}
- if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) {
- ecp_ack_t hole_mask = rbuf->ack_map;
+ if (!do_ack && (buf->seq_ack == buf->seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
+ ecp_ack_t hole_mask = buf->ack_map;
- for (i=0; i<rbuf->hole_max-1; i++) {
+ for (i=0; i<buf->hole_max-1; i++) {
hole_mask = hole_mask >> 1;
- if ((hole_mask & rbuf->hole_mask_empty) == 0) {
+ if ((hole_mask & buf->hole_mask_empty) == 0) {
do_ack = 1;
break;
}
@@ -102,62 +75,92 @@ static int ack_shift(ECPRBRecvBuffer *rbuf) {
return do_ack;
}
-int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
- memset(rbuf, 0, sizeof(ECPRBRecvBuffer));
- rbuf->hole_max = hole_max;
- rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
- rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
- rbuf->seq_ack = seq;
- rbuf->seq_max = seq;
- rbuf->seq_start = seq + 1;
- rbuf->ack_map = ACK_FULL;
+int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+ memset(buf, 0, sizeof(ECPRBRecv));
+ memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
+ ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ buf->ack_map = ACK_FULL;
+
+ return ECP_OK;
+}
+
+int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) {
+ buf->seq_ack = seq;
+ buf->seq_max = seq;
+ buf->rbuf.seq_start = seq + 1;
+
+ return ECP_OK;
+}
+
+int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) {
+ buf->hole_max = hole_max;
+ buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
+ buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
+
+ return ECP_OK;
+}
+
+int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) {
+ buf->deliver_delay = delay;
+ if (buf->hole_max < delay - 1) {
+ ecp_rbuf_recv_set_hole(buf, delay - 1);
+ }
return ECP_OK;
}
ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ssize_t rv;
- int do_ack = 0;
- ECPRBRecvBuffer *rbuf = conn->rbuf.recv;
+ ecp_seq_t ack_pkt = 0;
+ int do_ack = 0;
+ ECPRBRecv *buf = conn->rbuf.recv;
- if (rbuf == NULL) return ECP_ERR;
+ if (buf == NULL) return ECP_ERR;
- if (SEQ_LTE(seq, rbuf->seq_ack)) {
- ecp_seq_t seq_offset = rbuf->seq_ack - seq;
+ if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
+ if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
+ ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ACK_SIZE) {
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
- if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP;
+ if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
- rv = msg_store(rbuf, seq, msg, msg_size);
+ rv = msg_store(buf, seq, msg, msg_size);
if (rv < 0) return rv;
- rbuf->ack_map |= ack_mask;
- do_ack = ack_shift(rbuf);
+ buf->ack_map |= ack_mask;
+ do_ack = ack_shift(buf);
} else {
return ECP_ERR_RBUF_IDX;
}
} else {
- if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) {
- if (rbuf->deliver_delay) {
- rv = msg_store(rbuf, seq, msg, msg_size);
+ if ((buf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
+ if (buf->deliver_delay) {
+ rv = msg_store(buf, seq, msg, msg_size);
if (rv < 0) return rv;
} else {
- // rv = deliver
- rbuf->seq_max++;
- rbuf->seq_start++;
+ rv = ecp_msg_handle(conn, seq, msg, msg_size);
+ buf->seq_max++;
+ buf->rbuf.seq_start++;
}
- rbuf->seq_ack++;
+ buf->seq_ack++;
} else {
- rv = msg_store(rbuf, seq, msg, msg_size);
+ rv = msg_store(buf, seq, msg, msg_size);
if (rv < 0) return rv;
- do_ack = ack_shift(rbuf);
+ do_ack = ack_shift(buf);
}
}
- // XXX
- // update do_ack for pps
- // should send acks more aggresively when ack_map is not full (freq > RTT)
- // now send acks as per do_ack
+ if (ack_pkt && !do_ack) {
+ buf->ack_pkt += ack_pkt;
+ // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT)
+ if (buf->ack_pkt > buf->ack_rate) do_ack = 1;
+ }
+ if (do_ack) {
+ buf->ack_pkt = 0;
+ // send ack (with seq = 0)
+ }
+ // XXX should handle close
+ msg_flush(conn);
return rv;
-} \ No newline at end of file
+}
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
new file mode 100644
index 0000000..932ecdf
--- /dev/null
+++ b/code/core/rbuf_send.c
@@ -0,0 +1,11 @@
+#include "core.h"
+
+#include <string.h>
+
+static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ return size;
+}
+
+ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ return ecp_rbuf_msg_store(&conn->rbuf.send->rbuf, seq, msg, msg_size, 0, 0);
+} \ No newline at end of file
diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c
index 0f90c20..ba21ca1 100644
--- a/code/proxy/proxy.c
+++ b/code/proxy/proxy.c
@@ -389,6 +389,7 @@ int ecp_proxy_init(ECPContext *ctx) {
rv = ecp_conn_handler_init(&handler_b);
if (rv) return rv;
+
handler_b.conn_create = proxyb_create;
handler_b.conn_destroy = proxyb_destroy;
handler_b.conn_open = proxyb_open;
@@ -402,7 +403,7 @@ int ecp_proxy_init(ECPContext *ctx) {
ctx->pr.pack_raw = proxy_pack_raw;
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_init(&key_next_mutex, NULL);
+ pthread_mutex_init(&key_perma_mutex, NULL);
pthread_mutex_init(&key_next_mutex, NULL);
#endif