summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-07-21 00:33:35 +0200
committerUros Majstorovic <majstor@majstor.org>2017-07-21 00:33:35 +0200
commitd580fc0e00bf4b7acd3f9ee3345fad32223b40db (patch)
treef1b128e938ea37a437d1ec05686e5abc2b724238
parent50a392349a2e06ea5ff08e35cfb2558a1c97b993 (diff)
send ring buffer implementation; not done
-rw-r--r--code/core/core.c1
-rw-r--r--code/core/rbuf.c8
-rw-r--r--code/core/rbuf.h26
-rw-r--r--code/core/rbuf_recv.c16
-rw-r--r--code/core/rbuf_send.c64
5 files changed, 96 insertions, 19 deletions
diff --git a/code/core/core.c b/code/core/core.c
index 78b1028..812e48c 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -1214,6 +1214,7 @@ ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned cha
rv = ecp_proxy_pack(conn, &addr, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size);
if (rv < 0) return rv;
+ // XXX insert ecp_rbuf_send_store
return ecp_pkt_send(sock, &addr, packet, rv);
}
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index 54843fa..d6c9c6e 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -4,7 +4,11 @@
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
rbuf->msg = msg;
- rbuf->msg_size = msg_size;
+ if (msg_size) {
+ rbuf->msg_size = msg_size;
+ } else {
+ rbuf->msg_size = ECP_RBUF_SEQ_HALF;
+ }
return ECP_OK;
}
@@ -20,6 +24,8 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) {
int idx = ecp_rbuf_msg_idx(rbuf, seq);
if (idx < 0) return idx;
+
+ if (rbuf->msg == NULL) return 0;
if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG;
memcpy(rbuf->msg[idx].msg, msg, msg_size);
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 172f043..3577211 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,20 +1,27 @@
#define ECP_RBUF_FLAG_PRESENT 1
-#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF)
-#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF)
-
-#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
-/* If size not 2^x:
-#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size))
-*/
#define ECP_ERR_RBUF_FLAG -100
#define ECP_ERR_RBUF_IDX -101
#define ECP_ERR_RBUF_DUP -102
typedef uint32_t ecp_ack_t;
+typedef uint32_t ecp_win_t;
+
+#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
+
+#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0)
+#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8)
+
+#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF)
+#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF)
+
+#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1))
+/* If size not 2^x:
+#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size))
+*/
typedef struct ECPRBMessage {
- unsigned char msg[ECP_MAX_MSG];
+ unsigned char msg[ECP_MAX_PKT];
ssize_t size;
unsigned char flags;
} ECPRBMessage;
@@ -42,6 +49,9 @@ typedef struct ECPRBRecv {
typedef struct ECPRBSend {
unsigned char reliable;
+ ecp_win_t win_size;
+ ecp_win_t in_transit;
+ unsigned int nack_rate;
ECPRBuffer rbuf;
} ECPRBSend;
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index a6db276..9b96be2 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -2,10 +2,8 @@
#include <string.h>
-#define SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1))
-#define ACK_FULL (~(ecp_ack_t)0)
-#define ACK_SIZE (sizeof(ecp_ack_t)*8)
-#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1))
+#define ACK_RATE 8
+#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1))
static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT);
@@ -48,7 +46,7 @@ static int ack_shift(ECPRBRecv *buf) {
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
buf->seq_ack++;
- if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ACK_FULL)) continue;
+ if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
@@ -79,8 +77,8 @@ int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_siz
memset(buf, 0, sizeof(ECPRBRecv));
memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
ecp_rbuf_init(&buf->rbuf, msg, msg_size);
- buf->ack_map = ACK_FULL;
-
+ buf->ack_map = ECP_RBUF_ACK_FULL;
+ buf->ack_rate = ACK_RATE;
return ECP_OK;
}
@@ -120,7 +118,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
- if (seq_offset < ACK_SIZE) {
+ if (seq_offset < ECP_RBUF_ACK_SIZE) {
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
@@ -134,7 +132,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
return ECP_ERR_RBUF_IDX;
}
} else {
- if ((buf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
+ if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if (buf->deliver_delay) {
rv = msg_store(buf, seq, msg, msg_size);
if (rv < 0) return rv;
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 932ecdf..8f0cc38 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -2,10 +2,72 @@
#include <string.h>
+#define ACK_RATE_UNIT 10000
+
static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ int idx = 0;
+ ecp_seq_t seq_ack = 0;
+ ecp_ack_t ack_map = 0;
+
+ ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
+
+ if (ack_map != ECP_RBUF_ACK_FULL) {
+ int i;
+ int nack = 0;
+ ecp_win_t ack_cnt = 0;
+
+ seq_ack -= ECP_RBUF_ACK_SIZE - 1;
+
+ idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
+ if (idx < 0) return idx;
+
+ if (buf->reliable) buf->in_transit -= seq_ack - rbuf->seq_start;
+ for (i=0; i<ECP_RBUF_ACK_SIZE; i++) {
+ if (ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) {
+ ack_cnt++;
+ } else if (buf->reliable) {
+ // if resend packet (seq_ack + i)
+ if (!nack) {
+ nack = 1;
+
+ rbuf->seq_start = seq_ack + i;
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + i, rbuf->msg_size);
+ }
+ }
+ }
+ if (buf->reliable) {
+ buf->in_transit -= ack_cnt;
+ } else {
+ rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
+ buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - ack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
+ }
+ } else {
+ seq_ack++;
+
+ idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
+ if (idx < 0) return idx;
+
+ rbuf->seq_start = seq_ack;
+ if (buf->reliable) {
+ buf->in_transit -= seq_ack - rbuf->seq_start;
+ rbuf->msg_start = idx;
+ } else {
+ buf->nack_rate = (buf->nack_rate * 7) / 8;
+ }
+ }
return size;
}
+int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+ memset(buf, 0, sizeof(ECPRBRecv));
+ memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
+ ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ return ECP_OK;
+}
+
ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- return ecp_rbuf_msg_store(&conn->rbuf.send->rbuf, seq, msg, msg_size, 0, 0);
+ ECPRBSend *buf = conn->rbuf.send;
+ if (buf->reliable) buf->in_transit++;
+ return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0);
} \ No newline at end of file