summaryrefslogtreecommitdiff
path: root/ecp/src/rbuf_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/src/rbuf_recv.c')
-rw-r--r--ecp/src/rbuf_recv.c293
1 files changed, 146 insertions, 147 deletions
diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c
index f2bb1c2..98b472c 100644
--- a/ecp/src/rbuf_recv.c
+++ b/ecp/src/rbuf_recv.c
@@ -5,130 +5,115 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1))
-static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char mtype) {
ECPRBRecv *buf = conn->rbuf.recv;
- unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
- unsigned char mtype;
- ssize_t rv;
- int _rv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ unsigned short idx;
+ unsigned char flags;
+ int skip;
+ int rv;
- _rv = ecp_msg_get_type(msg, msg_size, &mtype);
- if (_rv) return _rv;
+ rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
+ if (rv) return rv;
- mtype &= ECP_MTYPE_MASK;
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+ if (rbuf->arr.msg[idx].flags) return ECP_ERR_RBUF_DUP;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
ecp_seq_t seq_offset;
- int _rv = ECP_OK;
pthread_mutex_lock(&buf->msgq.mutex);
+
seq_offset = seq - buf->msgq.seq_start;
- if (seq_offset >= buf->rbuf.msg_size) _rv = ECP_ERR_RBUF_FULL;
+ if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL;
+
pthread_mutex_unlock(&buf->msgq.mutex);
- if (_rv) return _rv;
+ if (rv) return rv;
}
#endif
- rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
- if (rv < 0) return rv;
+ skip = ecp_rbuf_skip(mtype);
+ flags = ECP_RBUF_FLAG_IN_RBUF;
+ if (skip) flags |= ECP_RBUF_FLAG_SKIP;
+ rbuf->arr.msg[idx].flags = flags;
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
- if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
+ if (skip) return 0;
- return rv;
+ memcpy(rbuf->arr.msg[idx].buf, msg, msg_size);
+ rbuf->arr.msg[idx].size = msg_size;
+ if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
+
+ return msg_size;
}
static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ ecp_seq_t seq;
+ unsigned short idx;
+ int i;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
#endif
- ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
- ecp_seq_t seq_next = buf->rbuf.seq_start;
- ecp_seq_t i = 0;
- unsigned int idx = buf->rbuf.msg_start;
+ seq = rbuf->seq_start;
+ idx = rbuf->idx_start;
- if (buf->timer_pts) {
- ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS);
- buf->timer_pts = 0;
- }
+ unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
- for (i=0; i<msg_cnt; i++) {
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
- } else {
+ while (ECP_SEQ_LTE(seq, rbuf->seq_max)) {
+ if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
+ if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_SKIP)) {
ecp_pts_t msg_pts;
- ecp_seq_t seq = buf->rbuf.seq_start + i;
- unsigned char frag_tot;
- unsigned char frag_cnt;
- uint16_t frag_size;
int rv;
- rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size);
- if (!rv && (frag_cnt != 0) && (seq != seq_next)) {
- ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
-
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) {
- ecp_seq_t seq_fbeg = seq - frag_cnt;
- ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next;
-
- i -= seq_offset;
- idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
- break;
- }
- }
+ rv = ecp_msg_get_pts(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &msg_pts);
+ if (!rv && buf->deliver_delay) {
+ ecp_cts_t now = ecp_tm_abstime_ms(0);
- rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts);
- if (!rv) {
- ecp_pts_t now = ecp_tm_abstime_ms(0);
+ msg_pts += buf->deliver_delay;
if (ECP_PTS_LT(now, msg_pts)) {
- ECPTimerItem ti;
- ecp_seq_t seq_offset = seq - seq_next;
+ if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) {
+ ECPTimerItem ti;
- rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now);
- if (!rv) rv = ecp_timer_push(&ti);
- if (!rv) buf->timer_pts = 1;
-
- i -= seq_offset;
- idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBTIMER, 0, msg_pts - now);
+ if (!rv) rv = ecp_timer_push(&ti);
+ if (!rv) rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_TIMER;
+ }
break;
+ } else if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER) {
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER;
}
}
- seq_next = seq + 1;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
unsigned char mtype;
- rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype);
- if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype);
+ rv = ecp_msg_get_type(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &mtype);
+ if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK);
if (rv) break;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
+ rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
} else
-
#endif
- ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b);
+ ecp_conn_handle_msg(conn, seq, rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, b);
+ } else {
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_SKIP;
}
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ // if (rbuf->arr.msg[idx].flags == 0);
} else {
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break;
- if (buf->hole_max) {
- ecp_seq_t seq = buf->rbuf.seq_start + i;
- ecp_seq_t seq_offset = buf->rbuf.seq_max - seq;
- if (seq_offset <= buf->hole_max) break;
- }
+ if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break;
}
- idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
+ seq++;
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
- buf->rbuf.seq_start += i;
- buf->rbuf.msg_start = idx;
+ rbuf->seq_start = seq;
+ rbuf->idx_start = idx;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
@@ -136,7 +121,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
}
-static int ack_send(ECPConnection *conn) {
+static int ack_send(ECPConnection *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) {
ECPRBRecv *buf = conn->rbuf.recv;
ECPBuffer packet;
ECPBuffer payload;
@@ -152,56 +137,53 @@ static int ack_send(ECPConnection *conn) {
ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK);
_buf = ecp_pld_get_buf(payload.buffer, payload.size);
- _buf[0] = (buf->seq_ack & 0xFF000000) >> 24;
- _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16;
- _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8;
- _buf[3] = (buf->seq_ack & 0x000000FF);
- _buf[4] = (buf->ack_map & 0xFF000000) >> 24;
- _buf[5] = (buf->ack_map & 0x00FF0000) >> 16;
- _buf[6] = (buf->ack_map & 0x0000FF00) >> 8;
- _buf[7] = (buf->ack_map & 0x000000FF);
-
- rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0);
+ _buf[0] = (seq_ack & 0xFF000000) >> 24;
+ _buf[1] = (seq_ack & 0x00FF0000) >> 16;
+ _buf[2] = (seq_ack & 0x0000FF00) >> 8;
+ _buf[3] = (seq_ack & 0x000000FF);
+ _buf[4] = (ack_map & 0xFF000000) >> 24;
+ _buf[5] = (ack_map & 0x00FF0000) >> 16;
+ _buf[6] = (ack_map & 0x0000FF00) >> 8;
+ _buf[7] = (ack_map & 0x000000FF);
+
+ rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0);
if (rv < 0) return rv;
buf->ack_pkt = 0;
- buf->ack_do = 0;
+
return ECP_OK;
}
static int ack_shift(ECPRBRecv *buf) {
+ ECPRBuffer *rbuf = &buf->rbuf;
+ unsigned short idx;
int do_ack = 0;
int in_rbuf = 0;
- int idx;
- int i;
+ int rv;
if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
- while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) {
+ /* walks through messages that are not delivered yet, so no need for msgq mutex lock */
+ while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) {
buf->seq_ack++;
- in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF;
-
+ rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx);
+ if (!rv) {
+ in_rbuf = rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF;
+ } else {
+ in_rbuf = 1;
+ }
if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
if (in_rbuf) {
buf->ack_map |= 1;
- } else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {
+ } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) {
do_ack = 1;
}
- if ((buf->ack_map & ACK_MASK_FIRST) == 0) break;
- }
-
- if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
- ecp_ack_t hole_mask = buf->ack_map;
-
- for (i=0; i<buf->hole_max-1; i++) {
- hole_mask = hole_mask >> 1;
- if ((hole_mask & buf->hole_mask_empty) == 0) {
- do_ack = 1;
- break;
- }
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) {
+ do_ack = 1;
+ break;
}
}
@@ -209,34 +191,38 @@ static int ack_shift(ECPRBRecv *buf) {
}
ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
- if (size < 0) return size;
-
ECPRBRecv *buf = conn->rbuf.recv;
+
if (buf == NULL) return ECP_ERR;
+ if (size < 0) return size;
ecp_tr_release(b->packet, 1);
- ack_send(conn);
+ ack_send(conn, buf->seq_ack, buf->ack_map);
return 0;
}
-ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
+ssize_t ecp_rbuf_handle_timer(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
+
if (buf == NULL) return ECP_ERR;
- buf->timer_pts = 0;
msg_flush(conn, b);
return 0;
}
-int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size) {
+ ECPRBuffer *rbuf = &buf->rbuf;
int rv;
+ if (msg == NULL) return ECP_ERR;
+
memset(buf, 0, sizeof(ECPRBRecv));
- rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size);
- if (rv) return rv;
+ memset(msg, 0, sizeof(ECPRBMessage) * msg_size);
buf->ack_map = ECP_ACK_FULL;
buf->ack_rate = ACK_RATE;
+ rbuf->arr.msg = msg;
+ rbuf->arr_size = msg_size;
#ifdef ECP_WITH_MSGQ
rv = ecp_conn_msgq_create(&buf->msgq);
@@ -251,22 +237,23 @@ void ecp_rbuf_recv_destroy(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return;
+
#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(&buf->msgq);
#endif
-
conn->rbuf.recv = NULL;
}
int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
- int rv;
ECPRBRecv *buf = conn->rbuf.recv;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ int rv;
if (buf == NULL) return ECP_ERR;
seq--;
buf->seq_ack = seq;
- rv = _ecp_rbuf_start(&buf->rbuf, seq);
+ rv = _ecp_rbuf_start(rbuf, seq);
if (rv) return rv;
#ifdef ECP_WITH_MSGQ
@@ -283,13 +270,11 @@ int ecp_rbuf_set_hole(ECPConnection *conn, unsigned short hole_max) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->hole_max = hole_max;
- buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
- buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
return ECP_OK;
}
-int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) {
+int ecp_rbuf_set_delay(ECPConnection *conn, ecp_cts_t delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
@@ -299,69 +284,83 @@ int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) {
ssize_t ecp_rbuf_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
- ecp_seq_t ack_pkt = 0;
- ssize_t rv;
- int _rv;
+ ECPRBuffer *rbuf = &buf->rbuf;
unsigned char mtype;
+ unsigned short ack_pkt = 0;
+ int do_ack = 0;
+ int _rv;
+ ssize_t rv;
_rv = ecp_msg_get_type(msg, msg_size, &mtype);
if (_rv) return _rv;
- mtype &= ECP_MTYPE_MASK;
- if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
-
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) {
- ack_pkt = seq - buf->rbuf.seq_max;
- buf->ack_pkt += ack_pkt;
- if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1;
+ if (ECP_SEQ_LT(rbuf->seq_max, seq)) {
+ ack_pkt = seq - rbuf->seq_max;
}
if (ECP_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_SIZE_ACKB) {
- ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
-
- if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
+ ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset);
- buf->ack_map |= ack_mask;
- buf->ack_do = buf->ack_do || ack_shift(buf);
+ if (ack_bit & buf->ack_map) return ECP_ERR_RBUF_DUP;
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
+
+ buf->ack_map |= ack_bit;
+ do_ack = ack_shift(buf);
} else {
return ECP_ERR_RBUF_DUP;
}
} else {
- if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
+ unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+
+ if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
} else {
- ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
- rv = msg_size;
- buf->rbuf.seq_max++;
- buf->rbuf.seq_start++;
- buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
+ /* receive buffer is empty, so no need for msgq mutex lock */
+ rv = 0;
+ rbuf->seq_max++;
+ rbuf->seq_start++;
+ rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size);
}
buf->seq_ack++;
} else {
- rv = msg_store(conn, seq, msg, msg_size, b);
+ rv = msg_store(conn, seq, msg, msg_size, mtype);
if (rv < 0) return rv;
- buf->ack_do = buf->ack_do || ack_shift(buf);
+ do_ack = ack_shift(buf);
}
}
msg_flush(conn, b);
- if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) {
- int _rv;
-
- _rv = ack_send(conn);
+ if (ack_pkt) {
+ buf->ack_pkt += ack_pkt;
+ if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1;
+ }
+ if (do_ack) {
+ ecp_seq_t seq_ack = buf->seq_ack;
+ ecp_seq_t ack_map = buf->ack_map;
+
+ /* account for missing mackets within hole_max range */
+ if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) {
+ unsigned short h_bits = buf->hole_max + 1;
+ ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits);
+
+ if ((ack_map & h_mask) != h_mask) {
+ h_mask = ~(~((ecp_seq_t)0) >> h_bits);
+ seq_ack -= h_bits;
+ ack_map = (ack_map >> h_bits) | h_mask;
+ }
+ }
+ _rv = ack_send(conn, seq_ack, ack_map);
if (_rv) return _rv;
}
return rv;
}
-
ECPFragIter *ecp_rbuf_get_frag_iter(ECPConnection *conn) {
if (conn->rbuf.recv) return conn->rbuf.recv->frag_iter;
return NULL;