summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-07-18 18:11:29 +0200
committerUros Majstorovic <majstor@majstor.org>2017-07-18 18:11:29 +0200
commit0967b98ce307a80a7576d3da0c939a757f4e6154 (patch)
treeb25c7000dc1c2d0d0281903c02d24e8cbf62be4b
parent8f53a56d06b128406cba3ce6f13696eb823e6a11 (diff)
ring buffer fixes
-rw-r--r--code/core/rbuf.h10
-rw-r--r--code/core/rbuf_recv.c57
2 files changed, 38 insertions, 29 deletions
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 9cf643d..7804602 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,5 +1,5 @@
-#define ECP_ERR_RBUF_IDX -1
-#define ECP_ERR_RBUF_DUP -1
+#define ECP_ERR_RBUF_IDX -100
+#define ECP_ERR_RBUF_DUP -101
#define ECP_MAX_RBUF_MSGR 256
@@ -11,7 +11,7 @@ typedef struct ECPRBMessage {
char present;
} ECPRBMessage;
-typedef struct ECPRBuffer {
+typedef struct ECPRBRecvBuffer {
unsigned char reliable;
unsigned char deliver_delay;
unsigned char hole_max;
@@ -23,11 +23,11 @@ typedef struct ECPRBuffer {
ecp_ack_t hole_mask_full;
ecp_ack_t hole_mask_empty;
ECPRBMessage msg[ECP_MAX_RBUF_MSGR];
-} ECPRBuffer;
+} ECPRBRecvBuffer;
typedef struct ECPConnRBuffer {
- ECPRBuffer *recv;
+ ECPRBRecvBuffer *recv;
// ECPSBuffer *send;
} ECPConnRBuffer;
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index c2ab965..281bec8 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -15,31 +15,34 @@
#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR)
*/
-static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
+static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) {
ecp_seq_t seq_offset = seq - rbuf->seq_start;
-
+
+ // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset);
return ECP_ERR_RBUF_IDX;
}
-static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) {
+ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
int idx;
ecp_seq_t seq_offset = seq - rbuf->seq_start;
+
+ // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX;
if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
- if (delivered) return ECP_OK;
-
idx = msg_idx(rbuf, seq);
if (idx < 0) return idx;
if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP;
rbuf->msg[idx].present = 1;
+ memcpy(rbuf->msg[idx].msg, msg, msg_size);
+ rbuf->msg[idx].size = msg_size;
- return ECP_OK;
+ return msg_size;
}
-static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) {
+static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) {
int idx = rbuf->msg_start;
ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
ecp_seq_t i = 0;
@@ -59,7 +62,7 @@ static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) {
return ECP_OK;
}
-static int ack_shift(ECPRBuffer *rbuf) {
+static int ack_shift(ECPRBRecvBuffer *rbuf) {
int do_ack = 0;
int idx;
int i;
@@ -99,8 +102,8 @@ static int ack_shift(ECPRBuffer *rbuf) {
return do_ack;
}
-int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
- memset(rbuf, 0, sizeof(ECPRBuffer));
+int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
+ memset(rbuf, 0, sizeof(ECPRBRecvBuffer));
rbuf->hole_max = hole_max;
rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
@@ -113,35 +116,41 @@ int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max)
}
ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- int rv;
+ ssize_t rv;
int do_ack = 0;
- ECPRBuffer *rbuf = conn->rbuf.recv;
+ ECPRBRecvBuffer *rbuf = conn->rbuf.recv;
if (rbuf == NULL) return ECP_ERR;
if (SEQ_LTE(seq, rbuf->seq_ack)) {
- if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) {
- rv = msg_store(rbuf, seq, msg, msg_size, 0);
- if (rv) return rv;
+ ecp_seq_t seq_offset = rbuf->seq_ack - seq;
+ if (seq_offset < ACK_SIZE) {
+ ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
+
+ if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP;
+
+ rv = msg_store(rbuf, seq, msg, msg_size);
+ if (rv < 0) return rv;
- rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq));
+ rbuf->ack_map |= ack_mask;
do_ack = ack_shift(rbuf);
} else {
return ECP_ERR_RBUF_IDX;
}
} else {
if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) {
- rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay);
- if (rv) return rv;
-
- if (!rbuf->deliver_delay) {
+ if (rbuf->deliver_delay) {
+ rv = msg_store(rbuf, seq, msg, msg_size);
+ if (rv < 0) return rv;
+ } else {
+ // rv = deliver
+ rbuf->seq_max++;
rbuf->seq_start++;
- // deliver
}
rbuf->seq_ack++;
} else {
- rv = msg_store(rbuf, seq, msg, msg_size, 0);
- if (rv) return rv;
+ rv = msg_store(rbuf, seq, msg, msg_size);
+ if (rv < 0) return rv;
do_ack = ack_shift(rbuf);
}
@@ -150,5 +159,5 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
// update do_ack for pps
// should send acks more aggresively when ack_map is not full (freq > RTT)
// now send acks as per do_ack
- return msg_size;
+ return rv;
} \ No newline at end of file