summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--code/core/Makefile2
-rw-r--r--code/core/config.h1
-rw-r--r--code/core/core.c16
-rw-r--r--code/core/core.h13
-rw-r--r--code/core/rbuf.h34
-rw-r--r--code/core/rbuf_recv.c154
-rw-r--r--code/proxy/proxy.c2
-rw-r--r--code/util/mknode.c1
8 files changed, 212 insertions, 11 deletions
diff --git a/code/core/Makefile b/code/core/Makefile
index f0e46a1..550c0cf 100644
--- a/code/core/Makefile
+++ b/code/core/Makefile
@@ -1,7 +1,7 @@
MAKE=make
CFLAGS = -I. -pthread -O3 $(PIC)
-obj = core.o timer.o msgq.o
+obj = core.o timer.o rbuf_recv.o
subdirs = crypto posix htable
diff --git a/code/core/config.h b/code/core/config.h
index 94efd29..7f9390f 100644
--- a/code/core/config.h
+++ b/code/core/config.h
@@ -1,3 +1,4 @@
#define ECP_WITH_PTHREAD 1
#define ECP_WITH_HTABLE 1
+#define ECP_WITH_RBUF 0
#define ECP_DEBUG 1 \ No newline at end of file
diff --git a/code/core/core.c b/code/core/core.c
index a4ccfc5..78b1028 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -426,20 +426,24 @@ int ecp_conn_create(ECPConnection *conn, ECPSocket *sock, unsigned char ctype) {
#ifdef ECP_WITH_PTHREAD
int rv = pthread_mutex_init(&conn->mutex, NULL);
if (rv) return ECP_ERR;
+#endif
+#ifdef ECP_WITH_MSGQ
rv = ecp_conn_msgq_create(conn);
if (rv) {
pthread_mutex_destroy(&conn->mutex);
return ECP_ERR;
}
#endif
-
+
return ECP_OK;
}
void ecp_conn_destroy(ECPConnection *conn) {
-#ifdef ECP_WITH_PTHREAD
+#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(conn);
+#endif
+#ifdef ECP_WITH_PTHREAD
pthread_mutex_destroy(&conn->mutex);
#endif
}
@@ -1087,8 +1091,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
cnt_size = pld_size-ECP_SIZE_PLD_HDR;
-#ifdef WITH_RBUF
- if (conn->rbuf.recv) {
+#ifdef ECP_WITH_RBUF
+ if (conn->rbuf.recv) {
proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
} else {
proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
@@ -1102,6 +1106,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
+#endif
+#ifdef ECP_WITH_MSGQ
if (!rv && (cnt_size > 0)) {
proc_size = ecp_conn_msgq_push(conn, payload+pld_size-cnt_size, cnt_size);
if (proc_size < 0) rv = ECP_ERR_HANDLE;
@@ -1227,7 +1233,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz
}
ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {
-#ifdef ECP_WITH_PTHREAD
+#ifdef ECP_WITH_MSGQ
pthread_mutex_lock(&conn->mutex);
ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout);
pthread_mutex_unlock(&conn->mutex);
diff --git a/code/core/core.h b/code/core/core.h
index 6c680f9..cb9bab6 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -70,7 +70,10 @@
#include "config.h"
#include <stddef.h>
+#include <stdint.h>
+
typedef long ssize_t;
+typedef uint32_t ecp_seq_t;
#ifdef ECP_WITH_PTHREAD
#include <pthread.h>
@@ -80,8 +83,8 @@ typedef long ssize_t;
#include "crypto/crypto.h"
#include "timer.h"
-#ifdef ECP_WITH_PTHREAD
-#include "msgq.h"
+#ifdef ECP_WITH_RBUF
+#include "rbuf.h"
#endif
#ifdef ECP_DEBUG
@@ -95,8 +98,6 @@ struct ECPContext;
struct ECPSocket;
struct ECPConnection;
-typedef uint32_t ecp_seq_t;
-
typedef int ecp_rng_t (void *, size_t);
typedef int ecp_conn_handler_new_t (struct ECPSocket *s, struct ECPConnection **c, struct ECPConnection *p, unsigned char s_idx, unsigned char c_idx, unsigned char *pub, ecp_aead_key_t *sh, unsigned char *msg, size_t sz);
@@ -250,8 +251,10 @@ typedef struct ECPConnection {
unsigned char key_idx_map[ECP_MAX_SOCK_KEY];
ECPDHShared shared[ECP_MAX_NODE_KEY][ECP_MAX_NODE_KEY];
unsigned char nonce[ECP_AEAD_SIZE_NONCE];
+#ifdef ECP_WITH_RBUF
+ ECPConnRBuffer rbuf;
+#endif
#ifdef ECP_WITH_PTHREAD
- ECPConnMsgQ msgq;
pthread_mutex_t mutex;
#endif
struct ECPConnection *proxy;
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
new file mode 100644
index 0000000..9cf643d
--- /dev/null
+++ b/code/core/rbuf.h
@@ -0,0 +1,34 @@
+#define ECP_ERR_RBUF_IDX -1
+#define ECP_ERR_RBUF_DUP -1
+
+#define ECP_MAX_RBUF_MSGR 256
+
+typedef uint32_t ecp_ack_t;
+
+typedef struct ECPRBMessage {
+ unsigned char msg[ECP_MAX_MSG];
+ ssize_t size;
+ char present;
+} ECPRBMessage;
+
+typedef struct ECPRBuffer {
+ unsigned char reliable;
+ unsigned char deliver_delay;
+ unsigned char hole_max;
+ int msg_start;
+ ecp_seq_t seq_ack;
+ ecp_seq_t seq_max;
+ ecp_seq_t seq_start;
+ ecp_ack_t ack_map;
+ ecp_ack_t hole_mask_full;
+ ecp_ack_t hole_mask_empty;
+ ECPRBMessage msg[ECP_MAX_RBUF_MSGR];
+} ECPRBuffer;
+
+
+typedef struct ECPConnRBuffer {
+ ECPRBuffer *recv;
+ // ECPSBuffer *send;
+} ECPConnRBuffer;
+
+ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); \ No newline at end of file
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
new file mode 100644
index 0000000..c2ab965
--- /dev/null
+++ b/code/core/rbuf_recv.c
@@ -0,0 +1,154 @@
+#include "core.h"
+
+#include <string.h>
+
+#define SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
+#define ACK_FULL (~(ecp_ack_t)0)
+#define ACK_SIZE (sizeof(ecp_ack_t)*8)
+#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1))
+
+#define SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF)
+#define SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF)
+
+#define IDX_MASK(a) (a & (ECP_MAX_RBUF_MSGR-1))
+/* If ECP_MAX_RBUF_MSGR not pow of 2:
+#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR)
+*/
+
+static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
+ ecp_seq_t seq_offset = seq - rbuf->seq_start;
+
+ if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset);
+ return ECP_ERR_RBUF_IDX;
+}
+
+static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) {
+ int idx;
+ ecp_seq_t seq_offset = seq - rbuf->seq_start;
+ if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX;
+
+ if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
+ if (delivered) return ECP_OK;
+
+ idx = msg_idx(rbuf, seq);
+ if (idx < 0) return idx;
+ if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP;
+
+ rbuf->msg[idx].present = 1;
+
+ return ECP_OK;
+}
+
+static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) {
+ int idx = rbuf->msg_start;
+ ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+ ecp_seq_t i = 0;
+
+ for (i=0; i<msg_cnt; i++) {
+ if (rbuf->reliable && !rbuf->msg[idx].present) break;
+ if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break;
+ if (rbuf->msg[idx].present) {
+ rbuf->msg[idx].present = 0;
+ // deliver idx
+ }
+ idx = IDX_MASK(idx + 1);
+ }
+ rbuf->msg_start = idx;
+ rbuf->seq_start += i;
+
+ return ECP_OK;
+}
+
+static int ack_shift(ECPRBuffer *rbuf) {
+ int do_ack = 0;
+ int idx;
+ int i;
+
+ if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
+
+ idx = msg_idx(rbuf, rbuf->seq_ack);
+ if (idx < 0) return idx;
+
+ while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) {
+ idx = IDX_MASK(idx + 1);
+ rbuf->seq_ack++;
+
+ if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue;
+
+ rbuf->ack_map = rbuf->ack_map << 1;
+ rbuf->ack_map |= rbuf->msg[idx].present;
+ if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) {
+ do_ack = 1;
+ }
+
+ if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break;
+ }
+
+ if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) {
+ ecp_ack_t hole_mask = rbuf->ack_map;
+
+ for (i=0; i<rbuf->hole_max-1; i++) {
+ hole_mask = hole_mask >> 1;
+ if ((hole_mask & rbuf->hole_mask_empty) == 0) {
+ do_ack = 1;
+ break;
+ }
+ }
+ }
+
+ return do_ack;
+}
+
+int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
+ memset(rbuf, 0, sizeof(ECPRBuffer));
+ rbuf->hole_max = hole_max;
+ rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
+ rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
+ rbuf->seq_ack = seq;
+ rbuf->seq_max = seq;
+ rbuf->seq_start = seq + 1;
+ rbuf->ack_map = ACK_FULL;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ int rv;
+ int do_ack = 0;
+ ECPRBuffer *rbuf = conn->rbuf.recv;
+
+ if (rbuf == NULL) return ECP_ERR;
+
+ if (SEQ_LTE(seq, rbuf->seq_ack)) {
+ if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) {
+ rv = msg_store(rbuf, seq, msg, msg_size, 0);
+ if (rv) return rv;
+
+ rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq));
+ do_ack = ack_shift(rbuf);
+ } else {
+ return ECP_ERR_RBUF_IDX;
+ }
+ } else {
+ if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) {
+ rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay);
+ if (rv) return rv;
+
+ if (!rbuf->deliver_delay) {
+ rbuf->seq_start++;
+ // deliver
+ }
+ rbuf->seq_ack++;
+ } else {
+ rv = msg_store(rbuf, seq, msg, msg_size, 0);
+ if (rv) return rv;
+
+ do_ack = ack_shift(rbuf);
+ }
+ }
+ // XXX
+ // update do_ack for pps
+ // should send acks more aggresively when ack_map is not full (freq > RTT)
+ // now send acks as per do_ack
+ return msg_size;
+} \ No newline at end of file
diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c
index 6e84dc3..0f90c20 100644
--- a/code/proxy/proxy.c
+++ b/code/proxy/proxy.c
@@ -1,6 +1,8 @@
#include "core.h"
#include "proxy.h"
+#include <string.h>
+
#ifdef ECP_WITH_PTHREAD
static pthread_mutex_t key_perma_mutex;
static pthread_mutex_t key_next_mutex;
diff --git a/code/util/mknode.c b/code/util/mknode.c
index 8f40e90..1c0a83a 100644
--- a/code/util/mknode.c
+++ b/code/util/mknode.c
@@ -2,6 +2,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
+#include <string.h>
#include "core.h"
#include "util.h"