summaryrefslogtreecommitdiff
path: root/ecp/src/rbuf_recv.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2021-08-31 11:14:41 +0200
committerUros Majstorovic <majstor@majstor.org>2021-08-31 11:14:41 +0200
commit2a46bdf517eb5fcb8ba59c398a32859c6496475d (patch)
treed8773d60c6827cf6499b9fda98323feabc10e63f /ecp/src/rbuf_recv.c
parentff7ffade2f5686ae977af578cb87040cc4654994 (diff)
transient messages added; code cleanup
Diffstat (limited to 'ecp/src/rbuf_recv.c')
-rw-r--r--ecp/src/rbuf_recv.c59
1 files changed, 37 insertions, 22 deletions
diff --git a/ecp/src/rbuf_recv.c b/ecp/src/rbuf_recv.c
index e949da7..17bbd01 100644
--- a/ecp/src/rbuf_recv.c
+++ b/ecp/src/rbuf_recv.c
@@ -8,13 +8,18 @@
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
- unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
+ unsigned char mtype;
ssize_t rv;
+ int _rv;
+
+ _rv = ecp_msg_get_type(msg, msg_size, &mtype);
+ if (_rv) return _rv;
+ mtype &= ECP_MTYPE_MASK;
if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
-#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+#ifdef ECP_WITH_MSGQ
ecp_seq_t seq_offset;
int _rv = ECP_OK;
@@ -24,14 +29,14 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
pthread_mutex_unlock(&buf->msgq.mutex);
if (_rv) return _rv;
- }
#endif
+ }
rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
if (rv < 0) return rv;
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
- if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size, b);
+ if (flags & ECP_RBUF_FLAG_SYS) ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
return rv;
}
@@ -62,9 +67,10 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
ecp_seq_t seq = buf->rbuf.seq_start + i;
unsigned char frag_tot;
unsigned char frag_cnt;
+ uint16_t frag_size;
int rv;
- rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot);
+ rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot, &frag_size);
if (!rv && (frag_cnt != 0) && (seq != seq_next)) {
ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
@@ -98,12 +104,16 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
seq_next = seq + 1;
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
- rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg));
+ unsigned char mtype;
+
+ rv = ecp_msg_get_type(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &mtype);
+ if (!rv) rv = ecp_conn_msgq_push(conn, seq, mtype);
if (rv) break;
+
buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
#endif
} else {
- ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b);
+ ecp_conn_handle_msg(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, b);
}
}
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
@@ -132,7 +142,7 @@ static int ack_send(ECPConnection *conn) {
ECPBuffer payload;
unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)];
unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)];
- unsigned char *buf_ = ecp_pld_get_buf(pld_buf, ECP_MTYPE_RBACK);
+ unsigned char *_buf;
ssize_t rv;
packet.buffer = pkt_buf;
@@ -140,15 +150,16 @@ static int ack_send(ECPConnection *conn) {
payload.buffer = pld_buf;
payload.size = ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn);
- ecp_pld_set_type(pld_buf, ECP_MTYPE_RBACK);
- buf_[0] = (buf->seq_ack & 0xFF000000) >> 24;
- buf_[1] = (buf->seq_ack & 0x00FF0000) >> 16;
- buf_[2] = (buf->seq_ack & 0x0000FF00) >> 8;
- buf_[3] = (buf->seq_ack & 0x000000FF);
- buf_[4] = (buf->ack_map & 0xFF000000) >> 24;
- buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
- buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
- buf_[7] = (buf->ack_map & 0x000000FF);
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK);
+ _buf = ecp_pld_get_buf(payload.buffer, payload.size);
+ _buf[0] = (buf->seq_ack & 0xFF000000) >> 24;
+ _buf[1] = (buf->seq_ack & 0x00FF0000) >> 16;
+ _buf[2] = (buf->seq_ack & 0x0000FF00) >> 8;
+ _buf[3] = (buf->seq_ack & 0x000000FF);
+ _buf[4] = (buf->ack_map & 0xFF000000) >> 24;
+ _buf[5] = (buf->ack_map & 0x00FF0000) >> 16;
+ _buf[6] = (buf->ack_map & 0x0000FF00) >> 8;
+ _buf[7] = (buf->ack_map & 0x000000FF);
rv = ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0, 0);
if (rv < 0) return rv;
@@ -258,12 +269,12 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
rv = ecp_rbuf_start(&buf->rbuf, seq);
if (rv) return rv;
-#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+#ifdef ECP_WITH_MSGQ
rv = ecp_conn_msgq_start(&buf->msgq, seq);
if (rv) return rv;
- }
#endif
+ }
return ECP_OK;
}
@@ -290,10 +301,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
+ int _rv;
unsigned char mtype;
- mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
- if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b);
+ _rv = ecp_msg_get_type(msg, msg_size, &mtype);
+ if (_rv) return _rv;
+
+ mtype &= ECP_MTYPE_MASK;
+ if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) {
ack_pkt = seq - buf->rbuf.seq_max;
@@ -321,7 +336,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
rv = msg_store(conn, seq, msg, msg_size, b);
if (rv < 0) return rv;
} else {
- ecp_msg_handle(conn, seq, msg, msg_size, b);
+ ecp_conn_handle_msg(conn, seq, msg, msg_size, b);
rv = msg_size;
buf->rbuf.seq_max++;
buf->rbuf.seq_start++;