summaryrefslogtreecommitdiff
path: root/ecp/src/msgq.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2022-01-30 02:04:24 +0100
committerUros Majstorovic <majstor@majstor.org>2022-01-30 02:04:24 +0100
commit483b954dff83b86877184718acdd66533cb694ac (patch)
treef4f3b594416468674ba6321e2fb690649d852697 /ecp/src/msgq.c
parentceb32f60ef7d6210883acf1f17500f87cac8888c (diff)
code cleanup and fixes mainly rbuf
Diffstat (limited to 'ecp/src/msgq.c')
-rw-r--r--ecp/src/msgq.c68
1 files changed, 36 insertions, 32 deletions
diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c
index 61c7b02..8f81d8a 100644
--- a/ecp/src/msgq.c
+++ b/ecp/src/msgq.c
@@ -4,9 +4,6 @@
#include <sys/time.h>
-#define MIN(a,b) (((a)<(b))?(a):(b))
-#define MAX(a,b) (((a)>(b))?(a):(b))
-
#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {
@@ -25,8 +22,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
int i;
int rv;
- if (msgq == NULL) return ECP_ERR;
-
memset(msgq, 0, sizeof(ECPConnMsgQ));
rv = pthread_mutex_init(&msgq->mutex, NULL);
@@ -51,8 +46,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {
int i;
- if (msgq == NULL) return;
-
for (i=0; i<ECP_MAX_MTYPE; i++) {
pthread_cond_destroy(&msgq->cond[i]);
}
@@ -60,8 +53,6 @@ void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {
}
int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {
- if (msgq == NULL) return ECP_ERR;
-
msgq->seq_max = seq;
msgq->seq_start = seq + 1;
@@ -70,13 +61,11 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {
int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) {
ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
+ ECPConnMsgQ *msgq = &buf->msgq;
- mtype &= ECP_MTYPE_MASK;
- if (msgq == NULL) return ECP_ERR;
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
- if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG;
+ if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL;
if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq;
@@ -89,13 +78,15 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
+ ECPConnMsgQ *msgq = &buf->msgq;
+ ECPRBuffer *rbuf = &buf->rbuf;
ecp_seq_t seq;
- ecp_seq_t seq_offset;
- unsigned int idx;
+ unsigned char *msg_buf;
+ unsigned char *content;
+ unsigned short idx;
+ int _rv;
ssize_t rv;
- if (msgq == NULL) return ECP_ERR;
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {
@@ -103,35 +94,48 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha
pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);
} else {
struct timespec ts;
- int _rv;
_rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));
if (_rv) return ECP_ERR_TIMEOUT;
}
}
-
seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])];
- seq_offset = seq - buf->rbuf.seq_start;
- idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size);
- msgq->idx_r[mtype]++;
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
+ if (_rv) return ECP_ERR;
+ msg_buf = rbuf->arr.msg[idx].buf;
+ rv = rbuf->arr.msg[idx].size;
+
+ content = ecp_msg_get_content(msg_buf, rv);
+ if (content == NULL) {
+ rv = ECP_ERR;
+ goto msgq_pop_fin;
+ }
+
+ rv -= content - msg_buf;
+ if (msg_size < rv) {
+ rv = ECP_ERR_FULL;
+ goto msgq_pop_fin;
+ }
+
+ memcpy(msg, content, rv);
+
+ rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ // if (rbuf->arr.msg[idx].flags == 0);
+
+msgq_pop_fin:
+ msgq->idx_r[mtype]++;
if (msgq->seq_start == seq) {
- int i, _idx = idx;
- ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1;
+ int i;
+ unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1;
for (i=0; i<msg_cnt; i++) {
- if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
- _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size);
+ if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
+ idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
msgq->seq_start += i;
}
- rv = buf->rbuf.msg[idx].size - 1;
- if (rv >= 0) {
- rv = MIN(msg_size, rv);
- memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv);
- }
return rv;
}