summaryrefslogtreecommitdiff
path: root/code/core/rbuf_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r--code/core/rbuf_recv.c155
1 files changed, 84 insertions, 71 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index ad9c53c..e00b63f 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -5,78 +5,76 @@
#define ACK_RATE 8
#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1))
-static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
- if (size < 0) return size;
-
- ECPRBRecv *buf = conn->rbuf.recv;
- unsigned char payload[ECP_SIZE_PLD(0)];
-
- if (buf == NULL) return ECP_ERR;
-
- buf->flush = 1;
- return 0;
-}
-
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
- ssize_t rv = 0;
- unsigned char flags;
+ int rv = ECP_OK;
+ ssize_t _rv = 0;
+ unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
if (mtype < ECP_MAX_MTYPE_SYS) {
- flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED;
- } else {
- flags = ECP_RBUF_FLAG_RECEIVED;
+ flags |= ECP_RBUF_FLAG_DELIVERED;
+ ecp_msg_handle(conn, seq, msg, msg_size);
}
- rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
- if (rv < 0) return ECP_ERR_RBUF_DUP;
-
- if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
-
- if (flags & ECP_RBUF_FLAG_DELIVERED) {
#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ pthread_mutex_lock(&buf->msgq.mutex);
+ ecp_seq_t seq_offset = seq - buf->msgq.seq_start;
+ if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL;
+ }
#endif
- ecp_msg_handle(conn, seq, msg, msg_size);
+ if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
#endif
- }
- return rv;
+ if (rv) return rv;
+ return _rv;
}
static void msg_flush(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+#endif
+
ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
ecp_seq_t i = 0;
unsigned int idx = buf->rbuf.msg_start;
for (i=0; i<msg_cnt; i++) {
- if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
+ if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break;
if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
- ssize_t rv = 0;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
+ int rv = 0;
+ ecp_seq_t seq = buf->rbuf.seq_start + i;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+ } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+#ifdef ECP_WITH_MSGQ
+ unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK;
+ int rv = ecp_conn_msgq_push(conn, seq, mtype);
+ if (rv) break;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
+#endif
} else {
- rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
- }
- if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
- } else if (rv < 0) {
- break;
+ ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
}
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
- if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
- buf->rbuf.seq_start += i;
- buf->rbuf.msg_start = idx;
- }
+ buf->rbuf.seq_start += i;
+ buf->rbuf.msg_start = idx;
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+#endif
+
}
static int ack_send(ECPConnection *conn) {
@@ -116,10 +114,10 @@ static int ack_shift(ECPRBRecv *buf) {
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
buf->seq_ack++;
- if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
+ if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;
buf->ack_map = buf->ack_map << 1;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
buf->ack_map |= 1;
} else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {
do_ack = 1;
@@ -143,6 +141,16 @@ static int ack_shift(ECPRBRecv *buf) {
return do_ack;
}
+ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ if (size < 0) return size;
+
+ ECPRBRecv *buf = conn->rbuf.recv;
+ if (buf == NULL) return ECP_ERR;
+
+ buf->flush = 1;
+ return 0;
+}
+
int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
@@ -152,13 +160,20 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage
buf->ack_map = ECP_RBUF_ACK_FULL;
buf->ack_rate = ACK_RATE;
+
+#ifdef ECP_WITH_MSGQ
+ rv = ecp_conn_msgq_create(conn);
+ if (rv) return rv;
+#endif
+
conn->rbuf.recv = buf;
-
return ECP_OK;
}
void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) {
-
+#ifdef ECP_WITH_MSGQ
+ ecp_conn_msgq_destroy(conn);
+#endif
}
int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
@@ -183,50 +198,55 @@ int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
}
int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv;
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
buf->seq_ack = seq;
- return ecp_rbuf_start(&buf->rbuf, seq);
+ rv = ecp_rbuf_start(&buf->rbuf, seq);
+ if (rv) return rv;
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ rv = ecp_conn_msgq_start(conn, seq);
+ if (rv) return rv;
+ }
+#endif
+
+ return ECP_OK;
}
ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
- int _rv = ECP_OK;
int do_ack = 0;
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
-#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
-#endif
-
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_RBUF_ACK_SIZE) {
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
- if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP;
- if (!_rv) {
- buf->ack_map |= ack_mask;
- do_ack = ack_shift(buf);
+ if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
- rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) _rv = rv;
- }
+ buf->ack_map |= ack_mask;
+ do_ack = ack_shift(buf);
+
+ rv = msg_store(conn, seq, msg, msg_size);
+ if (rv < 0) return rv;
} else {
- _rv = ECP_ERR_RBUF_IDX;
+ return ECP_ERR_RBUF_DUP;
}
} else {
if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) _rv = rv;
+ if (rv < 0) return rv;
} else {
ecp_msg_handle(conn, seq, msg, msg_size);
rv = msg_size;
@@ -234,22 +254,15 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
buf->rbuf.seq_start++;
buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
- if (!_rv) buf->seq_ack++;
+ buf->seq_ack++;
} else {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) _rv = rv;
+ if (rv < 0) return rv;
- if (!_rv) do_ack = ack_shift(buf);
+ do_ack = ack_shift(buf);
}
}
- if (!_rv) msg_flush(conn);
-
-#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
-#endif
-
- if (_rv) return _rv;
-
+ msg_flush(conn);
if (buf->flush) {
buf->flush = 0;
do_ack = 1;