summaryrefslogtreecommitdiff
path: root/code/ecp/rbuf_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/ecp/rbuf_recv.c')
-rw-r--r--code/ecp/rbuf_recv.c55
1 files changed, 26 insertions, 29 deletions
diff --git a/code/ecp/rbuf_recv.c b/code/ecp/rbuf_recv.c
index 44b5113..c262f1d 100644
--- a/code/ecp/rbuf_recv.c
+++ b/code/ecp/rbuf_recv.c
@@ -5,7 +5,7 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1))
-static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
@@ -29,12 +29,12 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
if (rv < 0) return rv;
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
- if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, NULL);
+ if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, b);
return rv;
}
-static void msg_flush(ECPConnection *conn) {
+static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
#ifdef ECP_WITH_MSGQ
@@ -101,7 +101,7 @@ static void msg_flush(ECPConnection *conn) {
buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
#endif
} else {
- ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, NULL);
+ ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b);
}
}
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
@@ -148,10 +148,11 @@ static int ack_send(ECPConnection *conn) {
buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
buf_[7] = (buf->ack_map & 0x000000FF);
- rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0);
+ rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0);
if (rv < 0) return rv;
buf->ack_pkt = 0;
+ buf->ack_do = 0;
return ECP_OK;
}
@@ -170,7 +171,7 @@ static int ack_shift(ECPRBRecv *buf) {
if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
- if (in_rbuf & ECP_RBUF_FLAG_IN_RBUF) {
+ if (in_rbuf) {
buf->ack_map |= 1;
} else if (!do_ack && ECP_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {
do_ack = 1;
@@ -197,10 +198,12 @@ static int ack_shift(ECPRBRecv *buf) {
ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
if (size < 0) return size;
+ ECPContext *ctx = conn->sock->ctx;
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
- buf->flush = 1;
+ if (ctx->tr.buf_free) ctx->tr.buf_free(b, ECP_SEND_FLAG_MORE);
+ ack_send(conn);
return 0;
}
@@ -209,7 +212,7 @@ ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned c
if (buf == NULL) return ECP_ERR;
buf->timer_pts = 0;
- msg_flush(conn);
+ msg_flush(conn, b);
return 0;
}
@@ -282,17 +285,20 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
return ECP_OK;
}
-ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
- int do_ack = 0;
unsigned char mtype;
mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
- if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, NULL);
+ if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b);
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
+ if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) {
+ ack_pkt = seq - buf->rbuf.seq_max;
+ buf->ack_pkt += ack_pkt;
+ if (buf->ack_pkt > buf->ack_rate) buf->ack_do = 1;
+ }
if (ECP_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_SIZE_ACKB) {
@@ -301,9 +307,9 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
buf->ack_map |= ack_mask;
- do_ack = ack_shift(buf);
+ buf->ack_do = buf->ack_do || ack_shift(buf);
- rv = msg_store(conn, seq, msg, msg_size);
+ rv = msg_store(conn, seq, msg, msg_size, b);
if (rv < 0) return rv;
} else {
return ECP_ERR_RBUF_DUP;
@@ -311,10 +317,10 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
} else {
if ((buf->ack_map == ECP_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
- rv = msg_store(conn, seq, msg, msg_size);
+ rv = msg_store(conn, seq, msg, msg_size, b);
if (rv < 0) return rv;
} else {
- ecp_msg_handle(conn, seq, msg, msg_size, NULL);
+ ecp_msg_handle(conn, seq, msg, msg_size, b);
rv = msg_size;
buf->rbuf.seq_max++;
buf->rbuf.seq_start++;
@@ -322,23 +328,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
}
buf->seq_ack++;
} else {
- rv = msg_store(conn, seq, msg, msg_size);
+ rv = msg_store(conn, seq, msg, msg_size, b);
if (rv < 0) return rv;
- do_ack = ack_shift(buf);
+ buf->ack_do = buf->ack_do || ack_shift(buf);
}
}
- msg_flush(conn);
- if (buf->flush) {
- buf->flush = 0;
- do_ack = 1;
- }
- if (ack_pkt && !do_ack) {
- buf->ack_pkt += ack_pkt;
- // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT)
- if (buf->ack_pkt > buf->ack_rate) do_ack = 1;
- }
- if (do_ack) {
+ msg_flush(conn, b);
+ if (!(mtype < ECP_MAX_MTYPE_SYS) && buf->ack_do) {
int _rv = ack_send(conn);
if (_rv) return _rv;
}