diff options
Diffstat (limited to 'ecp/src/rbuf_recv.c')
-rw-r--r-- | ecp/src/rbuf_recv.c | 293 |
1 files changed, 146 insertions, 147 deletions
diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c index f2bb1c2..98b472c 100644 --- a/ecp/src/rbuf_recv.c +++ b/ecp/src/rbuf_recv.c @@ -5,130 +5,115 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) -static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { +static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; - unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype; - ssize_t rv; - int _rv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + unsigned char flags; + int skip; + int rv; - _rv = ecp_msg_get_type(msg, msg_size, &mtype); - if (_rv) return _rv; + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return rv; - mtype &= ECP_MTYPE_MASK; - if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; + if (rbuf->arr.msg[idx].flags) return ECP_ERR_RBUF_DUP; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { ecp_seq_t seq_offset; - int _rv = ECP_OK; pthread_mutex_lock(&buf->msgq.mutex); + seq_offset = seq - buf->msgq.seq_start; - if (seq_offset >= buf->rbuf.msg_size) _rv = ECP_ERR_RBUF_FULL; + if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL; + pthread_mutex_unlock(&buf->msgq.mutex); - if (_rv) return _rv; + if (rv) return rv; } #endif - rv = _ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); - if (rv < 0) return rv; + skip = ecp_rbuf_skip(mtype); + flags = ECP_RBUF_FLAG_IN_RBUF; + if (skip) flags |= ECP_RBUF_FLAG_SKIP; + rbuf->arr.msg[idx].flags = flags; - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b); + if (skip) return 0; - return rv; + memcpy(rbuf->arr.msg[idx].buf, msg, msg_size); + rbuf->arr.msg[idx].size = msg_size; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + return msg_size; } static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq; + unsigned short idx; + int i; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); #endif - ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; - ecp_seq_t seq_next = buf->rbuf.seq_start; - ecp_seq_t i = 0; - unsigned int idx = buf->rbuf.msg_start; + seq = rbuf->seq_start; + idx = rbuf->idx_start; - if (buf->timer_pts) { - ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); - buf->timer_pts = 0; - } + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; - for (i=0; i<msg_cnt; i++) { - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; - } else { + while (ECP_SEQ_LTE(seq, rbuf->seq_max)) { + if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_SKIP)) { ecp_pts_t msg_pts; - ecp_seq_t seq = buf->rbuf.seq_start + i; - unsigned char frag_tot; - unsigned char frag_cnt; - uint16_t frag_size; int rv; - rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size); - if (!rv && (frag_cnt != 0) && (seq != seq_next)) { - ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); - - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { - ecp_seq_t seq_fbeg = seq - frag_cnt; - ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; - - i -= seq_offset; - idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); - break; - } - } + rv = ecp_msg_get_pts(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &msg_pts); + if (!rv && buf->deliver_delay) { + ecp_cts_t now = ecp_tm_abstime_ms(0); - rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts); - if (!rv) { - ecp_pts_t now = ecp_tm_abstime_ms(0); + msg_pts += buf->deliver_delay; if (ECP_PTS_LT(now, msg_pts)) { - ECPTimerItem ti; - ecp_seq_t seq_offset = seq - seq_next; + if (!(rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) { + ECPTimerItem ti; - rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now); - if (!rv) rv = ecp_timer_push(&ti); - if (!rv) buf->timer_pts = 1; - - i -= seq_offset; - idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBTIMER, 0, msg_pts - now); + if (!rv) rv = ecp_timer_push(&ti); + if (!rv) rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_TIMER; + } break; + } else if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_TIMER) { + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER; } } - seq_next = seq + 1; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { unsigned char mtype; - rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype); - if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype); + rv = ecp_msg_get_type(rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, &mtype); + if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK); if (rv) break; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + rbuf->arr.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; } else - #endif - ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b); + ecp_conn_handle_msg(conn, seq, rbuf->arr.msg[idx].buf, rbuf->arr.msg[idx].size, b); + } else { + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_SKIP; } - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + // if (rbuf->arr.msg[idx].flags == 0); } else { if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; - if (buf->hole_max) { - ecp_seq_t seq = buf->rbuf.seq_start + i; - ecp_seq_t seq_offset = buf->rbuf.seq_max - seq; - if (seq_offset <= buf->hole_max) break; - } + if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; } - idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); + seq++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = idx; + rbuf->seq_start = seq; + rbuf->idx_start = idx; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); @@ -136,7 +121,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { } -static int ack_send(ECPConnection *conn) { +static int ack_send(ECPConnection *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) { ECPRBRecv *buf = conn->rbuf.recv; ECPBuffer packet; ECPBuffer payload; @@ -152,56 +137,53 @@ static int ack_send(ECPConnection *conn) { ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); _buf = ecp_pld_get_buf(payload.buffer, payload.size); - _buf[0] = (buf->seq_ack & 0xFF000000) >> 24; - _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16; - _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8; - _buf[3] = (buf->seq_ack & 0x000000FF); - _buf[4] = (buf->ack_map & 0xFF000000) >> 24; - _buf[5] = (buf->ack_map & 0x00FF0000) >> 16; - _buf[6] = (buf->ack_map & 0x0000FF00) >> 8; - _buf[7] = (buf->ack_map & 0x000000FF); - - rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0); + _buf[0] = (seq_ack & 0xFF000000) >> 24; + _buf[1] = (seq_ack & 0x00FF0000) >> 16; + _buf[2] = (seq_ack & 0x0000FF00) >> 8; + _buf[3] = (seq_ack & 0x000000FF); + _buf[4] = (ack_map & 0xFF000000) >> 24; + _buf[5] = (ack_map & 0x00FF0000) >> 16; + _buf[6] = (ack_map & 0x0000FF00) >> 8; + _buf[7] = (ack_map & 0x000000FF); + + rv = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); if (rv < 0) return rv; buf->ack_pkt = 0; - buf->ack_do = 0; + return ECP_OK; } static int ack_shift(ECPRBRecv *buf) { + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; int do_ack = 0; int in_rbuf = 0; - int idx; - int i; + int rv; if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; - while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) { + /* walks through messages that are not delivered yet, so no need for msgq mutex lock */ + while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { buf->seq_ack++; - in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF; - + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx); + if (!rv) { + in_rbuf = rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF; + } else { + in_rbuf = 1; + } if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; if (in_rbuf) { buf->ack_map |= 1; - } else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) { + } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) { do_ack = 1; } - if ((buf->ack_map & ACK_MASK_FIRST) == 0) break; - } - - if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) { - ecp_ack_t hole_mask = buf->ack_map; - - for (i=0; i<buf->hole_max-1; i++) { - hole_mask = hole_mask >> 1; - if ((hole_mask & buf->hole_mask_empty) == 0) { - do_ack = 1; - break; - } + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) { + do_ack = 1; + break; } } @@ -209,34 +191,38 @@ static int ack_shift(ECPRBRecv *buf) { } ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { - if (size < 0) return size; - ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; + if (size < 0) return size; ecp_tr_release(b->packet, 1); - ack_send(conn); + ack_send(conn, buf->seq_ack, buf->ack_map); return 0; } -ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { +ssize_t ecp_rbuf_handle_timer(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; - buf->timer_pts = 0; msg_flush(conn, b); return 0; } -int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned short msg_size) { + ECPRBuffer *rbuf = &buf->rbuf; int rv; + if (msg == NULL) return ECP_ERR; + memset(buf, 0, sizeof(ECPRBRecv)); - rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size); - if (rv) return rv; + memset(msg, 0, sizeof(ECPRBMessage) * msg_size); buf->ack_map = ECP_ACK_FULL; buf->ack_rate = ACK_RATE; + rbuf->arr.msg = msg; + rbuf->arr_size = msg_size; #ifdef ECP_WITH_MSGQ rv = ecp_conn_msgq_create(&buf->msgq); @@ -251,22 +237,23 @@ void ecp_rbuf_recv_destroy(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return; + #ifdef ECP_WITH_MSGQ ecp_conn_msgq_destroy(&buf->msgq); #endif - conn->rbuf.recv = NULL; } int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { - int rv; ECPRBRecv *buf = conn->rbuf.recv; + ECPRBuffer *rbuf = &buf->rbuf; + int rv; if (buf == NULL) return ECP_ERR; seq--; buf->seq_ack = seq; - rv = _ecp_rbuf_start(&buf->rbuf, seq); + rv = _ecp_rbuf_start(rbuf, seq); if (rv) return rv; #ifdef ECP_WITH_MSGQ @@ -283,13 +270,11 @@ int ecp_rbuf_set_hole(ECPConnection *conn, unsigned short hole_max) { ECPRBRecv *buf = conn->rbuf.recv; buf->hole_max = hole_max; - buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2)); - buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1)); return ECP_OK; } -int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) { +int ecp_rbuf_set_delay(ECPConnection *conn, ecp_cts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; @@ -299,69 +284,83 @@ int ecp_rbuf_set_delay(ECPConnection *conn, ecp_pts_t delay) { ssize_t ecp_rbuf_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; - ecp_seq_t ack_pkt = 0; - ssize_t rv; - int _rv; + ECPRBuffer *rbuf = &buf->rbuf; unsigned char mtype; + unsigned short ack_pkt = 0; + int do_ack = 0; + int _rv; + ssize_t rv; _rv = ecp_msg_get_type(msg, msg_size, &mtype); if (_rv) return _rv; - mtype &= ECP_MTYPE_MASK; - if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b); - - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) { - ack_pkt = seq - buf->rbuf.seq_max; - buf->ack_pkt += ack_pkt; - if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) { + ack_pkt = seq - rbuf->seq_max; } if (ECP_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_SIZE_ACKB) { - ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - - if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; + ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset); - buf->ack_map |= ack_mask; - buf->ack_do = buf->ack_do || ack_shift(buf); + if (ack_bit & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; + + buf->ack_map |= ack_bit; + do_ack = ack_shift(buf); } else { return ECP_ERR_RBUF_DUP; } } else { - if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; + + if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; } else { - ecp_conn_handle_msg(conn, seq, msg, msg_size, b); - rv = msg_size; - buf->rbuf.seq_max++; - buf->rbuf.seq_start++; - buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); + /* receive buffer is empty, so no need for msgq mutex lock */ + rv = 0; + rbuf->seq_max++; + rbuf->seq_start++; + rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size); } buf->seq_ack++; } else { - rv = msg_store(conn, seq, msg, msg_size, b); + rv = msg_store(conn, seq, msg, msg_size, mtype); if (rv < 0) return rv; - buf->ack_do = buf->ack_do || ack_shift(buf); + do_ack = ack_shift(buf); } } msg_flush(conn, b); - if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) { - int _rv; - - _rv = ack_send(conn); + if (ack_pkt) { + buf->ack_pkt += ack_pkt; + if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1; + } + if (do_ack) { + ecp_seq_t seq_ack = buf->seq_ack; + ecp_seq_t ack_map = buf->ack_map; + + /* account for missing mackets within hole_max range */ + if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) { + unsigned short h_bits = buf->hole_max + 1; + ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits); + + if ((ack_map & h_mask) != h_mask) { + h_mask = ~(~((ecp_seq_t)0) >> h_bits); + seq_ack -= h_bits; + ack_map = (ack_map >> h_bits) | h_mask; + } + } + _rv = ack_send(conn, seq_ack, ack_map); if (_rv) return _rv; } return rv; } - ECPFragIter *ecp_rbuf_get_frag_iter(ECPConnection *conn) { if (conn->rbuf.recv) return conn->rbuf.recv->frag_iter; return NULL; |