From 0967b98ce307a80a7576d3da0c939a757f4e6154 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Tue, 18 Jul 2017 18:11:29 +0200 Subject: ring buffer fixes --- code/core/rbuf.h | 10 ++++----- code/core/rbuf_recv.c | 57 +++++++++++++++++++++++++++++---------------------- 2 files changed, 38 insertions(+), 29 deletions(-) (limited to 'code') diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 9cf643d..7804602 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,5 +1,5 @@ -#define ECP_ERR_RBUF_IDX -1 -#define ECP_ERR_RBUF_DUP -1 +#define ECP_ERR_RBUF_IDX -100 +#define ECP_ERR_RBUF_DUP -101 #define ECP_MAX_RBUF_MSGR 256 @@ -11,7 +11,7 @@ typedef struct ECPRBMessage { char present; } ECPRBMessage; -typedef struct ECPRBuffer { +typedef struct ECPRBRecvBuffer { unsigned char reliable; unsigned char deliver_delay; unsigned char hole_max; @@ -23,11 +23,11 @@ typedef struct ECPRBuffer { ecp_ack_t hole_mask_full; ecp_ack_t hole_mask_empty; ECPRBMessage msg[ECP_MAX_RBUF_MSGR]; -} ECPRBuffer; +} ECPRBRecvBuffer; typedef struct ECPConnRBuffer { - ECPRBuffer *recv; + ECPRBRecvBuffer *recv; // ECPSBuffer *send; } ECPConnRBuffer; diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index c2ab965..281bec8 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -15,31 +15,34 @@ #define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR) */ -static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { +static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) { ecp_seq_t seq_offset = seq - rbuf->seq_start; - + + // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset); return ECP_ERR_RBUF_IDX; } -static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) { +ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { int idx; ecp_seq_t seq_offset = seq - rbuf->seq_start; + + // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX; if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - if (delivered) return ECP_OK; - idx = msg_idx(rbuf, seq); if (idx < 0) return idx; if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP; rbuf->msg[idx].present = 1; + memcpy(rbuf->msg[idx].msg, msg, msg_size); + rbuf->msg[idx].size = msg_size; - return ECP_OK; + return msg_size; } -static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) { +static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) { int idx = rbuf->msg_start; ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; ecp_seq_t i = 0; @@ -59,7 +62,7 @@ static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) { return ECP_OK; } -static int ack_shift(ECPRBuffer *rbuf) { +static int ack_shift(ECPRBRecvBuffer *rbuf) { int do_ack = 0; int idx; int i; @@ -99,8 +102,8 @@ static int ack_shift(ECPRBuffer *rbuf) { return do_ack; } -int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { - memset(rbuf, 0, sizeof(ECPRBuffer)); +int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { + memset(rbuf, 0, sizeof(ECPRBRecvBuffer)); rbuf->hole_max = hole_max; rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); @@ -113,35 +116,41 @@ int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) } ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - int rv; + ssize_t rv; int do_ack = 0; - ECPRBuffer *rbuf = conn->rbuf.recv; + ECPRBRecvBuffer *rbuf = conn->rbuf.recv; if (rbuf == NULL) return ECP_ERR; if (SEQ_LTE(seq, rbuf->seq_ack)) { - if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) { - rv = msg_store(rbuf, seq, msg, msg_size, 0); - if (rv) return rv; + ecp_seq_t seq_offset = rbuf->seq_ack - seq; + if (seq_offset < ACK_SIZE) { + ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); + + if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP; + + rv = msg_store(rbuf, seq, msg, msg_size); + if (rv < 0) return rv; - rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq)); + rbuf->ack_map |= ack_mask; do_ack = ack_shift(rbuf); } else { return ECP_ERR_RBUF_IDX; } } else { if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) { - rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay); - if (rv) return rv; - - if (!rbuf->deliver_delay) { + if (rbuf->deliver_delay) { + rv = msg_store(rbuf, seq, msg, msg_size); + if (rv < 0) return rv; + } else { + // rv = deliver + rbuf->seq_max++; rbuf->seq_start++; - // deliver } rbuf->seq_ack++; } else { - rv = msg_store(rbuf, seq, msg, msg_size, 0); - if (rv) return rv; + rv = msg_store(rbuf, seq, msg, msg_size); + if (rv < 0) return rv; do_ack = ack_shift(rbuf); } @@ -150,5 +159,5 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m // update do_ack for pps // should send acks more aggresively when ack_map is not full (freq > RTT) // now send acks as per do_ack - return msg_size; + return rv; } \ No newline at end of file -- cgit v1.2.3