diff options
Diffstat (limited to 'code/ecp/rbuf_recv.c')
-rw-r--r-- | code/ecp/rbuf_recv.c | 55 |
1 files changed, 26 insertions, 29 deletions
diff --git a/code/ecp/rbuf_recv.c b/code/ecp/rbuf_recv.c index 44b5113..c262f1d 100644 --- a/code/ecp/rbuf_recv.c +++ b/code/ecp/rbuf_recv.c @@ -5,7 +5,7 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) -static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; @@ -29,12 +29,12 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, if (rv < 0) return rv; if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, NULL); + if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, b); return rv; } -static void msg_flush(ECPConnection *conn) { +static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; #ifdef ECP_WITH_MSGQ @@ -101,7 +101,7 @@ static void msg_flush(ECPConnection *conn) { buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; #endif } else { - ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, NULL); + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b); } } buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; @@ -148,10 +148,11 @@ static int ack_send(ECPConnection *conn) { buf_[6] = (buf->ack_map & 0x0000FF00) >> 8; buf_[7] = (buf->ack_map & 0x000000FF); - rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); + rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0); if (rv < 0) return rv; buf->ack_pkt = 0; + buf->ack_do = 0; return ECP_OK; } @@ -170,7 +171,7 @@ static int ack_shift(ECPRBRecv *buf) { if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; - if (in_rbuf & ECP_RBUF_FLAG_IN_RBUF) { + if (in_rbuf) { buf->ack_map |= 1; } else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) { do_ack = 1; @@ -197,10 +198,12 @@ static int ack_shift(ECPRBRecv *buf) { ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { if (size < 0) return size; + ECPContext *ctx = conn->sock->ctx; ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; - buf->flush = 1; + if (ctx->tr.buf_free) ctx->tr.buf_free(b, ECP_SEND_FLAG_MORE); + ack_send(conn); return 0; } @@ -209,7 +212,7 @@ ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned c if (buf == NULL) return ECP_ERR; buf->timer_pts = 0; - msg_flush(conn); + msg_flush(conn, b); return 0; } @@ -282,17 +285,20 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { return ECP_OK; } -ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; - int do_ack = 0; unsigned char mtype; mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; - if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, NULL); + if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b); - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) { + ack_pkt = seq - buf->rbuf.seq_max; + buf->ack_pkt += ack_pkt; + if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1; + } if (ECP_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_SIZE_ACKB) { @@ -301,9 +307,9 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; buf->ack_map |= ack_mask; - do_ack = ack_shift(buf); + buf->ack_do = buf->ack_do || ack_shift(buf); - rv = msg_store(conn, seq, msg, msg_size); + rv = msg_store(conn, seq, msg, msg_size, b); if (rv < 0) return rv; } else { return ECP_ERR_RBUF_DUP; @@ -311,10 +317,10 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m } else { if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { - rv = msg_store(conn, seq, msg, msg_size); + rv = msg_store(conn, seq, msg, msg_size, b); if (rv < 0) return rv; } else { - ecp_msg_handle(conn, seq, msg, msg_size, NULL); + ecp_msg_handle(conn, seq, msg, msg_size, b); rv = msg_size; buf->rbuf.seq_max++; buf->rbuf.seq_start++; @@ -322,23 +328,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m } buf->seq_ack++; } else { - rv = msg_store(conn, seq, msg, msg_size); + rv = msg_store(conn, seq, msg, msg_size, b); if (rv < 0) return rv; - do_ack = ack_shift(buf); + buf->ack_do = buf->ack_do || ack_shift(buf); } } - msg_flush(conn); - if (buf->flush) { - buf->flush = 0; - do_ack = 1; - } - if (ack_pkt && !do_ack) { - buf->ack_pkt += ack_pkt; - // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT) - if (buf->ack_pkt > buf->ack_rate) do_ack = 1; - } - if (do_ack) { + msg_flush(conn, b); + if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) { int _rv = ack_send(conn); if (_rv) return _rv; } |