diff options
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r-- | code/core/rbuf_recv.c | 155 |
1 files changed, 84 insertions, 71 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index ad9c53c..e00b63f 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -5,78 +5,76 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1)) -static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { - if (size < 0) return size; - - ECPRBRecv *buf = conn->rbuf.recv; - unsigned char payload[ECP_SIZE_PLD(0)]; - - if (buf == NULL) return ECP_ERR; - - buf->flush = 1; - return 0; -} - static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; - ssize_t rv = 0; - unsigned char flags; + int rv = ECP_OK; + ssize_t _rv = 0; + unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; unsigned char mtype = msg[0] & ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) { - flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED; - } else { - flags = ECP_RBUF_FLAG_RECEIVED; + flags |= ECP_RBUF_FLAG_DELIVERED; + ecp_msg_handle(conn, seq, msg, msg_size); } - rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); - if (rv < 0) return ECP_ERR_RBUF_DUP; - - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - - if (flags & ECP_RBUF_FLAG_DELIVERED) { #ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + pthread_mutex_lock(&buf->msgq.mutex); + ecp_seq_t seq_offset = seq - buf->msgq.seq_start; + if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL; + } #endif - ecp_msg_handle(conn, seq, msg, msg_size); + if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); #ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); #endif - } - return rv; + if (rv) return rv; + return _rv; } static void msg_flush(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; for (i=0; i<msg_cnt; i++) { - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break; if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { - ssize_t rv = 0; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + int rv = 0; + ecp_seq_t seq = buf->rbuf.seq_start + i; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; + } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +#ifdef ECP_WITH_MSGQ + unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK; + int rv = ecp_conn_msgq_push(conn, seq, mtype); + if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; +#endif } else { - rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); - } - if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; - } else if (rv < 0) { - break; + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); } + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = idx; - } + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = idx; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + } static int ack_send(ECPConnection *conn) { @@ -116,10 +114,10 @@ static int ack_shift(ECPRBRecv *buf) { idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); buf->seq_ack++; - if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { buf->ack_map |= 1; } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) { do_ack = 1; @@ -143,6 +141,16 @@ static int ack_shift(ECPRBRecv *buf) { return do_ack; } +ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + if (size < 0) return size; + + ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; + + buf->flush = 1; + return 0; +} + int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; @@ -152,13 +160,20 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage buf->ack_map = ECP_RBUF_ACK_FULL; buf->ack_rate = ACK_RATE; + +#ifdef ECP_WITH_MSGQ + rv = ecp_conn_msgq_create(conn); + if (rv) return rv; +#endif + conn->rbuf.recv = buf; - return ECP_OK; } void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) { - +#ifdef ECP_WITH_MSGQ + ecp_conn_msgq_destroy(conn); +#endif } int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { @@ -183,50 +198,55 @@ int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { } int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { + int rv; ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; buf->seq_ack = seq; - return ecp_rbuf_start(&buf->rbuf, seq); + rv = ecp_rbuf_start(&buf->rbuf, seq); + if (rv) return rv; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + rv = ecp_conn_msgq_start(conn, seq); + if (rv) return rv; + } +#endif + + return ECP_OK; } ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; - int _rv = ECP_OK; int do_ack = 0; if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; -#ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); -#endif - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_RBUF_ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; - if (!_rv) { - buf->ack_map |= ack_mask; - do_ack = ack_shift(buf); + if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; - } + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); + + rv = msg_store(conn, seq, msg, msg_size); + if (rv < 0) return rv; } else { - _rv = ECP_ERR_RBUF_IDX; + return ECP_ERR_RBUF_DUP; } } else { if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; + if (rv < 0) return rv; } else { ecp_msg_handle(conn, seq, msg, msg_size); rv = msg_size; @@ -234,22 +254,15 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch buf->rbuf.seq_start++; buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); } - if (!_rv) buf->seq_ack++; + buf->seq_ack++; } else { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; + if (rv < 0) return rv; - if (!_rv) do_ack = ack_shift(buf); + do_ack = ack_shift(buf); } } - if (!_rv) msg_flush(conn); - -#ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); -#endif - - if (_rv) return _rv; - + msg_flush(conn); if (buf->flush) { buf->flush = 0; do_ack = 1; |