summaryrefslogtreecommitdiff
path: root/ecp/src/ecp/ext/msgq.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/src/ecp/ext/msgq.c')
-rw-r--r--ecp/src/ecp/ext/msgq.c150
1 files changed, 0 insertions, 150 deletions
diff --git a/ecp/src/ecp/ext/msgq.c b/ecp/src/ecp/ext/msgq.c
deleted file mode 100644
index 7172e51..0000000
--- a/ecp/src/ecp/ext/msgq.c
+++ /dev/null
@@ -1,150 +0,0 @@
-#include <sys/time.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <ecp/core.h>
-
-#include "rbuf.h"
-#include "msgq.h"
-
-#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
-
-static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) {
- struct timeval tv;
- uint64_t us_start;
-
- gettimeofday(&tv, NULL);
- us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec;
- us_start += msec * 1000;
- ts->tv_sec = us_start / 1000000;
- ts->tv_nsec = (us_start % 1000000) * 1000;
- return ts;
-}
-
-void ecp_msgq_init(ECPRBConn *conn, ECPMsgQ *msgq) {
- memset(msgq, 0, sizeof(ECPMsgQ));
-
- conn->recv->msgq = msgq;
-}
-
-int ecp_msgq_create(ECPRBConn *conn) {
- ECPMsgQ *msgq = conn->recv->msgq;
- int i, rv;
-
- for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
- rv = pthread_cond_init(&msgq->cond[i], NULL);
- if (rv) {
- int j;
-
- for (j=0; j<i; j++) {
- pthread_cond_destroy(&msgq->cond[j]);
- }
- return ECP_ERR;
- }
- }
-
- return ECP_OK;
-}
-
-void ecp_msgq_destroy(ECPRBConn *conn) {
- ECPMsgQ *msgq = conn->recv->msgq;
- int i;
-
- for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
- pthread_cond_destroy(&msgq->cond[i]);
- }
-}
-
-void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) {
- ECPMsgQ *msgq = conn->recv->msgq;
-
- msgq->seq_max = seq;
- msgq->seq_start = seq + 1;
-}
-
-int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) {
- ECPRBRecv *buf = conn->recv;
- ECPMsgQ *msgq = buf->msgq;
-
- if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE;
-
- if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL;
- if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
-
- msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq;
- msgq->idx_w[mtype]++;
-
- if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq;
-
- return ECP_OK;
-}
-
-ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) {
- ECPRBRecv *buf = conn->recv;
- ECPMsgQ *msgq = buf->msgq;
- ECPRBuffer *rbuf = &buf->rbuf;
- unsigned char *pld_buf;
- unsigned char *msg_buf;
- size_t pld_size, hdr_size, msg_size;
- ecp_seq_t seq;
- unsigned short idx;
- int rv;
-
- if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE;
-
- if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {
- if (timeout == -1) {
- pthread_cond_wait(&msgq->cond[mtype], &buf->mutex);
- } else {
- struct timespec ts;
-
- rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout));
- if (rv) return ECP_ERR_TIMEOUT;
- }
- }
- seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])];
-
- rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx);
- if (rv) return ECP_ERR;
-
- pld_buf = rbuf->arr.pld[idx].buf;
- pld_size = rbuf->arr.pld[idx].size;
-
- msg_buf = ecp_pld_get_msg(pld_buf, pld_size);
- if (msg_buf == NULL) return ECP_ERR;
- hdr_size = msg_buf - pld_buf;
- msg_size = pld_size - hdr_size;
-
- rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
- // if (rbuf->arr.pld[idx].flags == 0);
-
- msgq->idx_r[mtype]++;
- if (msgq->seq_start == seq) {
- int i;
- unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1;
-
- for (i=0; i<msg_cnt; i++) {
- if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break;
- idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
- }
- msgq->seq_start += i;
- }
-
- if (_msg_size < msg_size) return ECP_ERR_FULL;
- if (msg_size) memcpy(msg, msg_buf, msg_size);
- return msg_size;
-}
-
-ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) {
- ECPRBConn *_conn;
- ssize_t rv;
-
- _conn = ecp_rbuf_get_rbconn(conn);
- if (_conn == NULL) return ECP_ERR;
-
- pthread_mutex_lock(&_conn->recv->mutex);
- rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout);
- pthread_mutex_unlock(&_conn->recv->mutex);
-
- return rv;
-}