From 0967b98ce307a80a7576d3da0c939a757f4e6154 Mon Sep 17 00:00:00 2001
From: Uros Majstorovic <majstor@majstor.org>
Date: Tue, 18 Jul 2017 18:11:29 +0200
Subject: ring buffer fixes

---
 code/core/rbuf.h      | 10 ++++-----
 code/core/rbuf_recv.c | 57 +++++++++++++++++++++++++++++----------------------
 2 files changed, 38 insertions(+), 29 deletions(-)

(limited to 'code/core')

diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 9cf643d..7804602 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,5 +1,5 @@
-#define ECP_ERR_RBUF_IDX    -1
-#define ECP_ERR_RBUF_DUP    -1
+#define ECP_ERR_RBUF_IDX    -100
+#define ECP_ERR_RBUF_DUP    -101
 
 #define ECP_MAX_RBUF_MSGR    256
 
@@ -11,7 +11,7 @@ typedef struct ECPRBMessage {
     char present;
 } ECPRBMessage;
 
-typedef struct ECPRBuffer {
+typedef struct ECPRBRecvBuffer {
     unsigned char reliable;
     unsigned char deliver_delay;
     unsigned char hole_max;
@@ -23,11 +23,11 @@ typedef struct ECPRBuffer {
     ecp_ack_t hole_mask_full;
     ecp_ack_t hole_mask_empty;
     ECPRBMessage msg[ECP_MAX_RBUF_MSGR];
-} ECPRBuffer;
+} ECPRBRecvBuffer;
 
 
 typedef struct ECPConnRBuffer {
-    ECPRBuffer *recv;
+    ECPRBRecvBuffer *recv;
     // ECPSBuffer *send;
 } ECPConnRBuffer;
 
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index c2ab965..281bec8 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -15,31 +15,34 @@
 #define IDX_MASK(a)         (a % ECP_MAX_RBUF_MSGR)
 */
 
-static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
+static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) {
     ecp_seq_t seq_offset = seq - rbuf->seq_start;
-        
+
+    // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
     if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset);
     return ECP_ERR_RBUF_IDX;
 }
 
-static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) {
+ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
     int idx;
     ecp_seq_t seq_offset = seq - rbuf->seq_start;
+
+    // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR
     if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX;
     
     if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
-    if (delivered) return ECP_OK;
-    
     idx = msg_idx(rbuf, seq);
     if (idx < 0) return idx;
     if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP;
     
     rbuf->msg[idx].present = 1;
+    memcpy(rbuf->msg[idx].msg, msg, msg_size);
+    rbuf->msg[idx].size = msg_size;
 
-    return ECP_OK;
+    return msg_size;
 }
 
-static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) {
+static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) {
     int idx = rbuf->msg_start;
     ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
     ecp_seq_t i = 0;
@@ -59,7 +62,7 @@ static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) {
     return ECP_OK;
 }
 
-static int ack_shift(ECPRBuffer *rbuf) {
+static int ack_shift(ECPRBRecvBuffer *rbuf) {
     int do_ack = 0;
     int idx;
     int i;
@@ -99,8 +102,8 @@ static int ack_shift(ECPRBuffer *rbuf) {
     return do_ack;
 }
 
-int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
-    memset(rbuf, 0, sizeof(ECPRBuffer));
+int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) {
+    memset(rbuf, 0, sizeof(ECPRBRecvBuffer));
     rbuf->hole_max = hole_max;
     rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
     rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
@@ -113,35 +116,41 @@ int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max)
 }
 
 ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
-    int rv;
+    ssize_t rv;
     int do_ack = 0;
-    ECPRBuffer *rbuf = conn->rbuf.recv;
+    ECPRBRecvBuffer *rbuf = conn->rbuf.recv;
     
     if (rbuf == NULL) return ECP_ERR;
     
     if (SEQ_LTE(seq, rbuf->seq_ack)) {
-        if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) {
-            rv = msg_store(rbuf, seq, msg, msg_size, 0);
-            if (rv) return rv;
+        ecp_seq_t seq_offset = rbuf->seq_ack - seq;
+        if (seq_offset < ACK_SIZE) {
+            ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
+
+            if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP;
+            
+            rv = msg_store(rbuf, seq, msg, msg_size);
+            if (rv < 0) return rv;
 
-            rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq));
+            rbuf->ack_map |= ack_mask;
             do_ack = ack_shift(rbuf);
         } else {
             return ECP_ERR_RBUF_IDX;
         }
     } else {
         if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) {
-            rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay);
-            if (rv) return rv;
-            
-            if (!rbuf->deliver_delay) {
+            if (rbuf->deliver_delay) {
+                rv = msg_store(rbuf, seq, msg, msg_size);
+                if (rv < 0) return rv;
+            } else {
+                // rv = deliver
+                rbuf->seq_max++;
                 rbuf->seq_start++;
-                // deliver
             }
             rbuf->seq_ack++;
         } else {
-            rv = msg_store(rbuf, seq, msg, msg_size, 0);
-            if (rv) return rv;
+            rv = msg_store(rbuf, seq, msg, msg_size);
+            if (rv < 0) return rv;
 
             do_ack = ack_shift(rbuf);
         }
@@ -150,5 +159,5 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
     // update do_ack for pps
     // should send acks more aggresively when ack_map is not full (freq > RTT)
     // now send acks as per do_ack
-    return msg_size;
+    return rv;
 }
\ No newline at end of file
-- 
cgit v1.2.3