diff options
author | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 |
---|---|---|
committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 |
commit | 38e2385f5846860916f8880d818b3b024b8c7dd9 (patch) | |
tree | 7bb01d9c38df29b49bf87c50317ec67c61c6e2a7 /code/core/rbuf_recv.c | |
parent | db44820eb01106f7780c7126e53885e8b34c8aea (diff) |
msgq implementation
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r-- | code/core/rbuf_recv.c | 66 |
1 files changed, 49 insertions, 17 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index f86cd13..ad9c53c 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -32,9 +32,20 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); if (rv < 0) return ECP_ERR_RBUF_DUP; - if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size); - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + + if (flags & ECP_RBUF_FLAG_DELIVERED) { +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + + ecp_msg_handle(conn, seq, msg, msg_size); + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + } + return rv; } @@ -48,17 +59,24 @@ static void msg_flush(ECPConnection *conn) { if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + ssize_t rv = 0; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; } else { - ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } + if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + } else if (rv < 0) { + break; } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - buf->rbuf.msg_start = idx; - buf->rbuf.seq_start += i; + if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = idx; + } } static int ack_send(ECPConnection *conn) { @@ -177,31 +195,38 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; + int _rv = ECP_OK; int do_ack = 0; if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_RBUF_ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - buf->ack_map |= ack_mask; - do_ack = ack_shift(buf); + if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; + if (!_rv) { + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); - rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + rv = msg_store(conn, seq, msg, msg_size); + if (rv < 0) _rv = rv; + } } else { - return ECP_ERR_RBUF_IDX; + _rv = ECP_ERR_RBUF_IDX; } } else { if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + if (rv < 0) _rv = rv; } else { ecp_msg_handle(conn, seq, msg, msg_size); rv = msg_size; @@ -209,14 +234,22 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch buf->rbuf.seq_start++; buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); } - buf->seq_ack++; + if (!_rv) buf->seq_ack++; } else { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + if (rv < 0) _rv = rv; - do_ack = ack_shift(buf); + if (!_rv) do_ack = ack_shift(buf); } } + if (!_rv) msg_flush(conn); + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + + if (_rv) return _rv; + if (buf->flush) { buf->flush = 0; do_ack = 1; @@ -230,7 +263,6 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch int _rv = ack_send(conn); if (_rv) return _rv; } - msg_flush(conn); return rv; } |