summaryrefslogtreecommitdiff
path: root/code/core/rbuf_recv.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-14 19:56:24 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-14 19:56:24 +0200
commit38e2385f5846860916f8880d818b3b024b8c7dd9 (patch)
tree7bb01d9c38df29b49bf87c50317ec67c61c6e2a7 /code/core/rbuf_recv.c
parentdb44820eb01106f7780c7126e53885e8b34c8aea (diff)
msgq implementation
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r--code/core/rbuf_recv.c66
1 files changed, 49 insertions, 17 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index f86cd13..ad9c53c 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -32,9 +32,20 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
if (rv < 0) return ECP_ERR_RBUF_DUP;
- if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
-
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (flags & ECP_RBUF_FLAG_DELIVERED) {
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+#endif
+
+ ecp_msg_handle(conn, seq, msg, msg_size);
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+#endif
+ }
+
return rv;
}
@@ -48,17 +59,24 @@ static void msg_flush(ECPConnection *conn) {
if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ ssize_t rv = 0;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
} else {
- ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
+ if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ } else if (rv < 0) {
+ break;
}
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
- buf->rbuf.msg_start = idx;
- buf->rbuf.seq_start += i;
+ if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
+ buf->rbuf.seq_start += i;
+ buf->rbuf.msg_start = idx;
+ }
}
static int ack_send(ECPConnection *conn) {
@@ -177,31 +195,38 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
+ int _rv = ECP_OK;
int do_ack = 0;
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+#endif
+
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_RBUF_ACK_SIZE) {
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
- if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
- buf->ack_map |= ack_mask;
- do_ack = ack_shift(buf);
+ if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP;
+ if (!_rv) {
+ buf->ack_map |= ack_mask;
+ do_ack = ack_shift(buf);
- rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ rv = msg_store(conn, seq, msg, msg_size);
+ if (rv < 0) _rv = rv;
+ }
} else {
- return ECP_ERR_RBUF_IDX;
+ _rv = ECP_ERR_RBUF_IDX;
}
} else {
if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ if (rv < 0) _rv = rv;
} else {
ecp_msg_handle(conn, seq, msg, msg_size);
rv = msg_size;
@@ -209,14 +234,22 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
buf->rbuf.seq_start++;
buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
- buf->seq_ack++;
+ if (!_rv) buf->seq_ack++;
} else {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ if (rv < 0) _rv = rv;
- do_ack = ack_shift(buf);
+ if (!_rv) do_ack = ack_shift(buf);
}
}
+ if (!_rv) msg_flush(conn);
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+#endif
+
+ if (_rv) return _rv;
+
if (buf->flush) {
buf->flush = 0;
do_ack = 1;
@@ -230,7 +263,6 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
int _rv = ack_send(conn);
if (_rv) return _rv;
}
- msg_flush(conn);
return rv;
}