summaryrefslogtreecommitdiff
path: root/code/core/rbuf_recv.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r--code/core/rbuf_recv.c48
1 files changed, 24 insertions, 24 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index e00b63f..d78ffce 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -7,32 +7,31 @@
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
- int rv = ECP_OK;
- ssize_t _rv = 0;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
- if (mtype < ECP_MAX_MTYPE_SYS) {
- flags |= ECP_RBUF_FLAG_DELIVERED;
- ecp_msg_handle(conn, seq, msg, msg_size);
- }
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ int rv = ECP_OK;
+
pthread_mutex_lock(&buf->msgq.mutex);
ecp_seq_t seq_offset = seq - buf->msgq.seq_start;
if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL;
+ pthread_mutex_unlock(&buf->msgq.mutex);
+
+ if (rv) return rv;
}
#endif
- if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+ ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+ if (rv < 0) return rv;
-#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
-#endif
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+ if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size);
- if (rv) return rv;
- return _rv;
+ return rv;
}
static void msg_flush(ECPConnection *conn) {
@@ -52,11 +51,11 @@ static void msg_flush(ECPConnection *conn) {
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
int rv = 0;
ecp_seq_t seq = buf->rbuf.seq_start + i;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
} else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
- unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK;
+ unsigned char mtype = buf->rbuf.msg[idx].msg[0];
int rv = ecp_conn_msgq_push(conn, seq, mtype);
if (rv) break;
buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
@@ -92,8 +91,8 @@ static int ack_send(ECPConnection *conn) {
buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
buf_[7] = (buf->ack_map & 0x000000FF);
-
- rv = ecp_pld_send(conn, payload, sizeof(payload));
+
+ rv = ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
if (rv < 0) return rv;
buf->ack_pkt = 0;
@@ -151,7 +150,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
return 0;
}
-int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
memset(buf, 0, sizeof(ECPRBRecv));
@@ -170,13 +169,13 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage
return ECP_OK;
}
-void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) {
+void ecp_rbuf_recv_destroy(ECPConnection *conn) {
#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(conn);
#endif
}
-int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->hole_max = hole_max;
@@ -186,18 +185,18 @@ int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
return ECP_OK;
}
-int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
if (buf->hole_max < delay - 1) {
- ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
+ ecp_rbuf_recv_set_hole(conn, delay - 1);
}
return ECP_OK;
}
-int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
int rv;
ECPRBRecv *buf = conn->rbuf.recv;
@@ -217,7 +216,7 @@ int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
return ECP_OK;
}
-ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
@@ -226,6 +225,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
+ // XXX mtype == RBACK | RBFLUSH handle imediately
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;