From d580fc0e00bf4b7acd3f9ee3345fad32223b40db Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Fri, 21 Jul 2017 00:33:35 +0200 Subject: send ring buffer implementation; not done --- code/core/core.c | 1 + code/core/rbuf.c | 8 ++++++- code/core/rbuf.h | 26 ++++++++++++++------- code/core/rbuf_recv.c | 16 ++++++------- code/core/rbuf_send.c | 64 ++++++++++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 96 insertions(+), 19 deletions(-) (limited to 'code/core') diff --git a/code/core/core.c b/code/core/core.c index 78b1028..812e48c 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -1214,6 +1214,7 @@ ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned cha rv = ecp_proxy_pack(conn, &addr, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size); if (rv < 0) return rv; + // XXX insert ecp_rbuf_send_store return ecp_pkt_send(sock, &addr, packet, rv); } diff --git a/code/core/rbuf.c b/code/core/rbuf.c index 54843fa..d6c9c6e 100644 --- a/code/core/rbuf.c +++ b/code/core/rbuf.c @@ -4,7 +4,11 @@ int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) { rbuf->msg = msg; - rbuf->msg_size = msg_size; + if (msg_size) { + rbuf->msg_size = msg_size; + } else { + rbuf->msg_size = ECP_RBUF_SEQ_HALF; + } return ECP_OK; } @@ -20,6 +24,8 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { int idx = ecp_rbuf_msg_idx(rbuf, seq); if (idx < 0) return idx; + + if (rbuf->msg == NULL) return 0; if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG; memcpy(rbuf->msg[idx].msg, msg, msg_size); diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 172f043..3577211 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,20 +1,27 @@ #define ECP_RBUF_FLAG_PRESENT 1 -#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) -#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) - -#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) -/* If size not 2^x: -#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size)) -*/ #define ECP_ERR_RBUF_FLAG -100 #define ECP_ERR_RBUF_IDX -101 #define ECP_ERR_RBUF_DUP -102 typedef uint32_t ecp_ack_t; +typedef uint32_t ecp_win_t; + +#define ECP_RBUF_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) + +#define ECP_RBUF_ACK_FULL (~(ecp_ack_t)0) +#define ECP_RBUF_ACK_SIZE (sizeof(ecp_ack_t)*8) + +#define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF) +#define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF) + +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) +/* If size not 2^x: +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size)) +*/ typedef struct ECPRBMessage { - unsigned char msg[ECP_MAX_MSG]; + unsigned char msg[ECP_MAX_PKT]; ssize_t size; unsigned char flags; } ECPRBMessage; @@ -42,6 +49,9 @@ typedef struct ECPRBRecv { typedef struct ECPRBSend { unsigned char reliable; + ecp_win_t win_size; + ecp_win_t in_transit; + unsigned int nack_rate; ECPRBuffer rbuf; } ECPRBSend; diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index a6db276..9b96be2 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -2,10 +2,8 @@ #include -#define SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) -#define ACK_FULL (~(ecp_ack_t)0) -#define ACK_SIZE (sizeof(ecp_ack_t)*8) -#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1)) +#define ACK_RATE 8 +#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1)) static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT); @@ -48,7 +46,7 @@ static int ack_shift(ECPRBRecv *buf) { idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); buf->seq_ack++; - if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { @@ -79,8 +77,8 @@ int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_siz memset(buf, 0, sizeof(ECPRBRecv)); memset(msg, 0, sizeof(ECPRBMessage) * msg_size); ecp_rbuf_init(&buf->rbuf, msg, msg_size); - buf->ack_map = ACK_FULL; - + buf->ack_map = ECP_RBUF_ACK_FULL; + buf->ack_rate = ACK_RATE; return ECP_OK; } @@ -120,7 +118,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; - if (seq_offset < ACK_SIZE) { + if (seq_offset < ECP_RBUF_ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; @@ -134,7 +132,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m return ECP_ERR_RBUF_IDX; } } else { - if ((buf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if (buf->deliver_delay) { rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 932ecdf..8f0cc38 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -2,10 +2,72 @@ #include +#define ACK_RATE_UNIT 10000 + static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + int idx = 0; + ecp_seq_t seq_ack = 0; + ecp_ack_t ack_map = 0; + + ECPRBSend *buf = conn->rbuf.send; + ECPRBuffer *rbuf = &buf->rbuf; + + if (ack_map != ECP_RBUF_ACK_FULL) { + int i; + int nack = 0; + ecp_win_t ack_cnt = 0; + + seq_ack -= ECP_RBUF_ACK_SIZE - 1; + + idx = ecp_rbuf_msg_idx(rbuf, seq_ack); + if (idx < 0) return idx; + + if (buf->reliable) buf->in_transit -= seq_ack - rbuf->seq_start; + for (i=0; ireliable) { + // if resend packet (seq_ack + i) + if (!nack) { + nack = 1; + + rbuf->seq_start = seq_ack + i; + rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + i, rbuf->msg_size); + } + } + } + if (buf->reliable) { + buf->in_transit -= ack_cnt; + } else { + rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE; + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - ack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; + } + } else { + seq_ack++; + + idx = ecp_rbuf_msg_idx(rbuf, seq_ack); + if (idx < 0) return idx; + + rbuf->seq_start = seq_ack; + if (buf->reliable) { + buf->in_transit -= seq_ack - rbuf->seq_start; + rbuf->msg_start = idx; + } else { + buf->nack_rate = (buf->nack_rate * 7) / 8; + } + } return size; } +int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { + memset(buf, 0, sizeof(ECPRBRecv)); + memset(msg, 0, sizeof(ECPRBMessage) * msg_size); + ecp_rbuf_init(&buf->rbuf, msg, msg_size); + return ECP_OK; +} + ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - return ecp_rbuf_msg_store(&conn->rbuf.send->rbuf, seq, msg, msg_size, 0, 0); + ECPRBSend *buf = conn->rbuf.send; + if (buf->reliable) buf->in_transit++; + return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0); } \ No newline at end of file -- cgit v1.2.3