From 2a46bdf517eb5fcb8ba59c398a32859c6496475d Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Tue, 31 Aug 2021 11:14:41 +0200 Subject: transient messages added; code cleanup --- ecp/src/rbuf_recv.c | 59 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 22 deletions(-) (limited to 'ecp/src/rbuf_recv.c') diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c index e949da7..17bbd01 100644 --- a/ecp/src/rbuf_recv.c +++ b/ecp/src/rbuf_recv.c @@ -8,13 +8,18 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; + unsigned char mtype; ssize_t rv; + int _rv; + + _rv = ecp_msg_get_type(msg, msg_size, &mtype); + if (_rv) return _rv; + mtype &= ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; -#ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +#ifdef ECP_WITH_MSGQ ecp_seq_t seq_offset; int _rv = ECP_OK; @@ -24,14 +29,14 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, pthread_mutex_unlock(&buf->msgq.mutex); if (_rv) return _rv; - } #endif + } rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); if (rv < 0) return rv; if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, b); + if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b); return rv; } @@ -62,9 +67,10 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { ecp_seq_t seq = buf->rbuf.seq_start + i; unsigned char frag_tot; unsigned char frag_cnt; + uint16_t frag_size; int rv; - rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot); + rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size); if (!rv && (frag_cnt != 0) && (seq != seq_next)) { ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); @@ -98,12 +104,16 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { seq_next = seq + 1; if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ - rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg)); + unsigned char mtype; + + rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype); + if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype); if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; #endif } else { - ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b); + ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b); } } buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; @@ -132,7 +142,7 @@ static int ack_send(ECPConnection *conn) { ECPBuffer payload; unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)]; unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)]; - unsigned char *buf_ = ecp_pld_get_buf(pld_buf, ECP_MTYPE_RBACK); + unsigned char *_buf; ssize_t rv; packet.buffer = pkt_buf; @@ -140,15 +150,16 @@ static int ack_send(ECPConnection *conn) { payload.buffer = pld_buf; payload.size = ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn); - ecp_pld_set_type(pld_buf, ECP_MTYPE_RBACK); - buf_[0] = (buf->seq_ack & 0xFF000000) >> 24; - buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16; - buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8; - buf_[3] = (buf->seq_ack & 0x000000FF); - buf_[4] = (buf->ack_map & 0xFF000000) >> 24; - buf_[5] = (buf->ack_map & 0x00FF0000) >> 16; - buf_[6] = (buf->ack_map & 0x0000FF00) >> 8; - buf_[7] = (buf->ack_map & 0x000000FF); + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); + _buf = ecp_pld_get_buf(payload.buffer, payload.size); + _buf[0] = (buf->seq_ack & 0xFF000000) >> 24; + _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16; + _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8; + _buf[3] = (buf->seq_ack & 0x000000FF); + _buf[4] = (buf->ack_map & 0xFF000000) >> 24; + _buf[5] = (buf->ack_map & 0x00FF0000) >> 16; + _buf[6] = (buf->ack_map & 0x0000FF00) >> 8; + _buf[7] = (buf->ack_map & 0x000000FF); rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0); if (rv < 0) return rv; @@ -258,12 +269,12 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { rv = ecp_rbuf_start(&buf->rbuf, seq); if (rv) return rv; -#ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +#ifdef ECP_WITH_MSGQ rv = ecp_conn_msgq_start(&buf->msgq, seq); if (rv) return rv; - } #endif + } return ECP_OK; } @@ -290,10 +301,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; + int _rv; unsigned char mtype; - mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; - if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b); + _rv = ecp_msg_get_type(msg, msg_size, &mtype); + if (_rv) return _rv; + + mtype &= ECP_MTYPE_MASK; + if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b); if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) { ack_pkt = seq - buf->rbuf.seq_max; @@ -321,7 +336,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m rv = msg_store(conn, seq, msg, msg_size, b); if (rv < 0) return rv; } else { - ecp_msg_handle(conn, seq, msg, msg_size, b); + ecp_conn_handle_msg(conn, seq, msg, msg_size, b); rv = msg_size; buf->rbuf.seq_max++; buf->rbuf.seq_start++; -- cgit v1.2.3