diff options
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r-- | code/core/rbuf_recv.c | 94 |
1 files changed, 75 insertions, 19 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index fba8219..0970f6a 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -8,7 +8,7 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype = msg[0] & ECP_MTYPE_MASK; + unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; @@ -42,28 +42,77 @@ static void msg_flush(ECPConnection *conn) { #endif ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + ecp_seq_t seq_next = buf->rbuf.seq_start; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; + if (buf->timer_pts) { + ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); + buf->timer_pts = 0; + } + for (i=0; i<msg_cnt; i++) { - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break; - if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - int rv = 0; - ecp_seq_t seq = buf->rbuf.seq_start + i; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; - } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + } else { + int rv = ECP_OK; + ecp_pts_t msg_pts; + ecp_seq_t seq = buf->rbuf.seq_start + i; + unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg); + unsigned char frag_tot; + unsigned char frag_cnt; + + rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot); + if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) { + ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); + + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { + ecp_seq_t seq_fbeg = seq - frag_cnt; + ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts); + if (rv == ECP_OK) { + ecp_pts_t now = conn->sock->ctx->tm.abstime_ms(0); + if (ECP_PTS_LT(now, msg_pts)) { + ECPTimerItem ti; + ecp_seq_t seq_offset = seq - seq_next; + + rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now); + if (!rv) rv = ecp_timer_push(&ti); + if (!rv) buf->timer_pts = 1; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + seq_next = seq + 1; + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ - unsigned char mtype = buf->rbuf.msg[idx].msg[0]; - int rv = ecp_conn_msgq_push(conn, seq, mtype); - if (rv) break; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg)); + if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; #endif - } else { - ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } else { + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } } buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + } else { + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; + if (buf->hole_max) { + ecp_seq_t seq = buf->rbuf.seq_start + i; + ecp_seq_t seq_offset = buf->rbuf.seq_max - seq; + if (seq_offset <= buf->hole_max) break; + } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } @@ -78,8 +127,8 @@ static void msg_flush(ECPConnection *conn) { static int ack_send(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; - unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))]; - unsigned char *buf_ = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), 0)]; + unsigned char *buf_ = ecp_pld_get_buf(payload, 0); ssize_t rv; ecp_pld_set_type(payload, ECP_MTYPE_RBACK); @@ -150,6 +199,16 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char return 0; } +ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + ECPRBRecv *buf = conn->rbuf.recv; + + if (buf == NULL) return ECP_ERR; + + buf->timer_pts = 0; + msg_flush(conn); + return 0; +} + int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; @@ -185,13 +244,10 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { return ECP_OK; } -int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { +int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; - if (buf->hole_max < delay - 1) { - ecp_rbuf_recv_set_hole(conn, delay - 1); - } return ECP_OK; } @@ -226,7 +282,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; - mtype = msg[0] & ECP_MTYPE_MASK; + mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size); if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; |