summaryrefslogtreecommitdiff
path: root/ecp/src/ecp/msgq.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/src/ecp/msgq.c')
-rw-r--r--ecp/src/ecp/msgq.c123
1 files changed, 65 insertions, 58 deletions
diff --git a/ecp/src/ecp/msgq.c b/ecp/src/ecp/msgq.c
index 8f81d8a..8a5408b 100644
--- a/ecp/src/ecp/msgq.c
+++ b/ecp/src/ecp/msgq.c
@@ -1,12 +1,14 @@
-#include "core.h"
-
-#ifdef ECP_WITH_MSGQ
-
#include <sys/time.h>
+#include <stdlib.h>
+#include <string.h>
-#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
+#include "core.h"
+#include "rbuf.h"
+#include "msgq.h"
-static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {
+#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
+
+static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) {
struct timeval tv;
uint64_t us_start;
@@ -18,16 +20,15 @@ static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {
return ts;
}
-int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
+int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) {
int i;
int rv;
- memset(msgq, 0, sizeof(ECPConnMsgQ));
+ if (conn->recv == NULL) return ECP_ERR;
- rv = pthread_mutex_init(&msgq->mutex, NULL);
- if (rv) return ECP_ERR;
+ memset(msgq, 0, sizeof(ECPMsgQ));
- for (i=0; i<ECP_MAX_MTYPE; i++) {
+ for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
rv = pthread_cond_init(&msgq->cond[i], NULL);
if (rv) {
int j;
@@ -35,40 +36,42 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {
for (j=0; j<i; j++) {
pthread_cond_destroy(&msgq->cond[j]);
}
- pthread_mutex_destroy(&msgq->mutex);
return ECP_ERR;
}
}
+ conn->recv->msgq = msgq;
return ECP_OK;
}
-void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {
+void ecp_msgq_destroy(ECPRBConn *conn) {
+ ECPMsgQ *msgq = conn->recv->msgq;
int i;
- for (i=0; i<ECP_MAX_MTYPE; i++) {
+ for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
pthread_cond_destroy(&msgq->cond[i]);
}
- pthread_mutex_destroy(&msgq->mutex);
+
+ conn->recv->msgq = NULL;
}
-int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {
+void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) {
+ ECPMsgQ *msgq = conn->recv->msgq;
+
msgq->seq_max = seq;
msgq->seq_start = seq + 1;
-
- return ECP_OK;
}
-int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) {
- ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = &buf->msgq;
+int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) {
+ ECPRBRecv *buf = conn->recv;
+ ECPMsgQ *msgq = buf->msgq;
- if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
+ if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE;
if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL;
if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
- msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq;
+ msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq;
msgq->idx_w[mtype]++;
if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq;
@@ -76,68 +79,72 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
return ECP_OK;
}
-ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
- ECPRBRecv *buf = conn->rbuf.recv;
- ECPConnMsgQ *msgq = &buf->msgq;
+ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) {
+ ECPRBRecv *buf = conn->recv;
+ ECPMsgQ *msgq = buf->msgq;
ECPRBuffer *rbuf = &buf->rbuf;
- ecp_seq_t seq;
+ unsigned char *pld_buf;
unsigned char *msg_buf;
- unsigned char *content;
+ size_t pld_size, hdr_size, msg_size;
+ ecp_seq_t seq;
unsigned short idx;
- int _rv;
- ssize_t rv;
+ int rv;
- if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
+ if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE;
if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {
if (timeout == -1) {
- pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);
+ pthread_cond_wait(&msgq->cond[mtype], &buf->mutex);
} else {
struct timespec ts;
- _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));
- if (_rv) return ECP_ERR_TIMEOUT;
+ rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout));
+ if (rv) return ECP_ERR_TIMEOUT;
}
}
- seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])];
-
- _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
- if (_rv) return ECP_ERR;
+ seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])];
- msg_buf = rbuf->arr.msg[idx].buf;
- rv = rbuf->arr.msg[idx].size;
-
- content = ecp_msg_get_content(msg_buf, rv);
- if (content == NULL) {
- rv = ECP_ERR;
- goto msgq_pop_fin;
- }
+ rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
+ if (rv) return ECP_ERR;
- rv -= content - msg_buf;
- if (msg_size < rv) {
- rv = ECP_ERR_FULL;
- goto msgq_pop_fin;
- }
+ pld_buf = rbuf->arr.pld[idx].buf;
+ pld_size = rbuf->arr.pld[idx].size;
- memcpy(msg, content, rv);
+ msg_buf = ecp_pld_get_msg(pld_buf, pld_size);
+ if (msg_buf == NULL) return ECP_ERR;
+ hdr_size = msg_buf - pld_buf;
+ msg_size = pld_size - hdr_size;
- rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
- // if (rbuf->arr.msg[idx].flags == 0);
+ rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ // if (rbuf->arr.pld[idx].flags == 0);
-msgq_pop_fin:
msgq->idx_r[mtype]++;
if (msgq->seq_start == seq) {
int i;
unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1;
for (i=0; i<msg_cnt; i++) {
- if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
+ if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
msgq->seq_start += i;
}
- return rv;
+ if (_msg_size < msg_size) return ECP_ERR_FULL;
+ if (msg_size) memcpy(msg, msg_buf, msg_size);
+ return msg_size;
}
-#endif /* ECP_WITH_MSGQ */ \ No newline at end of file
+ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) {
+ ECPRBConn *_conn;
+ ssize_t rv;
+
+ _conn = ecp_rbuf_get_rbconn(conn);
+ if (_conn == NULL) return ECP_ERR;
+
+ pthread_mutex_lock(&_conn->recv->mutex);
+ rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout);
+ pthread_mutex_unlock(&_conn->recv->mutex);
+
+ return rv;
+}