summaryrefslogtreecommitdiff
path: root/code/core/rbuf_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r--code/core/rbuf_recv.c94
1 files changed, 75 insertions, 19 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index fba8219..0970f6a 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -8,7 +8,7 @@
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
- unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
+ unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
@@ -42,28 +42,77 @@ static void msg_flush(ECPConnection *conn) {
#endif
ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
+ ecp_seq_t seq_next = buf->rbuf.seq_start;
ecp_seq_t i = 0;
unsigned int idx = buf->rbuf.msg_start;
+ if (buf->timer_pts) {
+ ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS);
+ buf->timer_pts = 0;
+ }
+
for (i=0; i<msg_cnt; i++) {
- if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break;
- if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
- int rv = 0;
- ecp_seq_t seq = buf->rbuf.seq_start + i;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
- } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ } else {
+ int rv = ECP_OK;
+ ecp_pts_t msg_pts;
+ ecp_seq_t seq = buf->rbuf.seq_start + i;
+ unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg);
+ unsigned char frag_tot;
+ unsigned char frag_cnt;
+
+ rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot);
+ if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) {
+ ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
+
+ if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) {
+ ecp_seq_t seq_fbeg = seq - frag_cnt;
+ ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next;
+
+ i -= seq_offset;
+ idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ break;
+ }
+ }
+
+ rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts);
+ if (rv == ECP_OK) {
+ ecp_pts_t now = conn->sock->ctx->tm.abstime_ms(0);
+ if (ECP_PTS_LT(now, msg_pts)) {
+ ECPTimerItem ti;
+ ecp_seq_t seq_offset = seq - seq_next;
+
+ rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now);
+ if (!rv) rv = ecp_timer_push(&ti);
+ if (!rv) buf->timer_pts = 1;
+
+ i -= seq_offset;
+ idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ break;
+ }
+ }
+
+ seq_next = seq + 1;
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
- unsigned char mtype = buf->rbuf.msg[idx].msg[0];
- int rv = ecp_conn_msgq_push(conn, seq, mtype);
- if (rv) break;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
+ rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg));
+ if (rv) break;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
#endif
- } else {
- ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ } else {
+ ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
}
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ } else {
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break;
+ if (buf->hole_max) {
+ ecp_seq_t seq = buf->rbuf.seq_start + i;
+ ecp_seq_t seq_offset = buf->rbuf.seq_max - seq;
+ if (seq_offset <= buf->hole_max) break;
+ }
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
@@ -78,8 +127,8 @@ static void msg_flush(ECPConnection *conn) {
static int ack_send(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
- unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))];
- unsigned char *buf_ = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), 0)];
+ unsigned char *buf_ = ecp_pld_get_buf(payload, 0);
ssize_t rv;
ecp_pld_set_type(payload, ECP_MTYPE_RBACK);
@@ -150,6 +199,16 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
return 0;
}
+ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->timer_pts = 0;
+ msg_flush(conn);
+ return 0;
+}
+
int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
@@ -185,13 +244,10 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
return ECP_OK;
}
-int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
- if (buf->hole_max < delay - 1) {
- ecp_rbuf_recv_set_hole(conn, delay - 1);
- }
return ECP_OK;
}
@@ -226,7 +282,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
- mtype = msg[0] & ECP_MTYPE_MASK;
+ mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size);
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;