summaryrefslogtreecommitdiff
path: root/code/core/rbuf_recv.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-10 21:02:16 +0200
commitcbba099541d27400ad45083a4b1102b86f9e8dea (patch)
treee260d7da3d07b4de0e989726917f08d23c55f70d /code/core/rbuf_recv.c
parente9ced8e60689c6f46ac4fc31013b88f4c3f4fa80 (diff)
rbuffer almost implemented
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r--code/core/rbuf_recv.c149
1 files changed, 110 insertions, 39 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index 222daa7..b2a5ecf 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -5,10 +5,35 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1))
-static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT);
+static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ if (size < 0) return size;
+
+ ECPRBRecv *buf = conn->rbuf.recv;
+ unsigned char payload[ECP_SIZE_PLD(0)];
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->flush = 1;
+ return 0;
+}
+
+static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ssize_t rv = 0;
+ unsigned char flags;
+ unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
+
+ if (mtype < ECP_MAX_MTYPE_SYS) {
+ flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED;
+ } else {
+ flags = ECP_RBUF_FLAG_RECEIVED;
+ }
+
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
if (rv < 0) return ECP_ERR_RBUF_DUP;
+ if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
+
if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq;
return rv;
}
@@ -20,11 +45,15 @@ static void msg_flush(ECPConnection *conn) {
unsigned int idx = buf->rbuf.msg_start;
for (i=0; i<msg_cnt; i++) {
- if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break;
- if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT;
- ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
+ if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+ } else {
+ ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
@@ -32,12 +61,35 @@ static void msg_flush(ECPConnection *conn) {
buf->rbuf.seq_start += i;
}
+static int ack_send(ECPConnection *conn) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))];
+ unsigned char *buf_ = ecp_pld_get_buf(payload);
+ ssize_t rv;
+
+ ecp_pld_set_type(payload, ECP_MTYPE_RBACK);
+ buf_[0] = (buf->seq_ack & 0xFF000000) >> 24;
+ buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16;
+ buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8;
+ buf_[3] = (buf->seq_ack & 0x000000FF);
+ buf_[4] = (buf->ack_map & 0xFF000000) >> 24;
+ buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
+ buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
+ buf_[7] = (buf->ack_map & 0x000000FF);
+
+ rv = ecp_pld_send(conn, payload, sizeof(payload));
+ if (rv < 0) return rv;
+
+ buf->ack_pkt = 0;
+ return ECP_OK;
+}
+
static int ack_shift(ECPRBRecv *buf) {
int do_ack = 0;
int idx;
int i;
- if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack);
if (idx < 0) return idx;
@@ -46,10 +98,10 @@ static int ack_shift(ECPRBRecv *buf) {
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
buf->seq_ack++;
- if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
+ if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) {
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
buf->ack_map |= 1;
} else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) {
do_ack = 1;
@@ -73,47 +125,61 @@ static int ack_shift(ECPRBRecv *buf) {
return do_ack;
}
-int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+ int rv;
+
memset(buf, 0, sizeof(ECPRBRecv));
- ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
+ if (rv) return rv;
+
buf->ack_map = ECP_RBUF_ACK_FULL;
buf->ack_rate = ACK_RATE;
-
+ conn->rbuf.recv = buf;
+
return ECP_OK;
}
-int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) {
- buf->seq_ack = seq;
- buf->seq_max = seq;
- buf->rbuf.seq_start = seq + 1;
-
- return ECP_OK;
-}
+int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+ ECPRBRecv *buf = conn->rbuf.recv;
-int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) {
buf->hole_max = hole_max;
- buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2));
- buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1));
+ buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
+ buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
return ECP_OK;
}
-int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) {
+int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
buf->deliver_delay = delay;
if (buf->hole_max < delay - 1) {
- ecp_rbuf_recv_set_hole(buf, delay - 1);
+ ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
}
return ECP_OK;
}
-ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
- ssize_t rv;
+int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->seq_ack = seq;
+ buf->seq_max = seq;
+ buf->rbuf.seq_start = seq + 1;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
+ ssize_t rv;
int do_ack = 0;
- ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
+ if (msg_size < 1) return ECP_ERR_MIN_MSG;
if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
@@ -122,43 +188,48 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
-
- rv = msg_store(buf, seq, msg, msg_size);
- if (rv < 0) return rv;
-
buf->ack_map |= ack_mask;
do_ack = ack_shift(buf);
+
+ rv = msg_store(conn, seq, msg, msg_size);
+ if (rv < 0) return rv;
} else {
return ECP_ERR_RBUF_IDX;
}
} else {
if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
- if (buf->deliver_delay) {
- rv = msg_store(buf, seq, msg, msg_size);
+ if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
+ rv = msg_store(conn, seq, msg, msg_size);
if (rv < 0) return rv;
} else {
- rv = ecp_msg_handle(conn, seq, msg, msg_size);
+ ecp_msg_handle(conn, seq, msg, msg_size);
+ rv = msg_size;
buf->seq_max++;
buf->rbuf.seq_start++;
+ buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
buf->seq_ack++;
} else {
- rv = msg_store(buf, seq, msg, msg_size);
+ rv = msg_store(conn, seq, msg, msg_size);
if (rv < 0) return rv;
do_ack = ack_shift(buf);
}
}
+ if (buf->flush) {
+ buf->flush = 0;
+ do_ack = 1;
+ }
if (ack_pkt && !do_ack) {
buf->ack_pkt += ack_pkt;
// should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT)
if (buf->ack_pkt > buf->ack_rate) do_ack = 1;
}
if (do_ack) {
- buf->ack_pkt = 0;
- // send ack (with seq = 0)
+ int _rv = ack_send(conn);
+ if (_rv) return _rv;
}
- // XXX should handle close
msg_flush(conn);
return rv;
}
+