diff options
Diffstat (limited to 'ecp/src/ecp/msgq.c')
-rw-r--r-- | ecp/src/ecp/msgq.c | 123 |
1 files changed, 65 insertions, 58 deletions
diff --git a/ecp/src/ecp/msgq.c b/ecp/src/ecp/msgq.c index 8f81d8a..8a5408b 100644 --- a/ecp/src/ecp/msgq.c +++ b/ecp/src/ecp/msgq.c @@ -1,12 +1,14 @@ -#include "core.h" - -#ifdef ECP_WITH_MSGQ - #include <sys/time.h> +#include <stdlib.h> +#include <string.h> -#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) +#include "core.h" +#include "rbuf.h" +#include "msgq.h" -static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { +#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) + +static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) { struct timeval tv; uint64_t us_start; @@ -18,16 +20,15 @@ static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { return ts; } -int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { +int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { int i; int rv; - memset(msgq, 0, sizeof(ECPConnMsgQ)); + if (conn->recv == NULL) return ECP_ERR; - rv = pthread_mutex_init(&msgq->mutex, NULL); - if (rv) return ECP_ERR; + memset(msgq, 0, sizeof(ECPMsgQ)); - for (i=0; i<ECP_MAX_MTYPE; i++) { + for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { rv = pthread_cond_init(&msgq->cond[i], NULL); if (rv) { int j; @@ -35,40 +36,42 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { for (j=0; j<i; j++) { pthread_cond_destroy(&msgq->cond[j]); } - pthread_mutex_destroy(&msgq->mutex); return ECP_ERR; } } + conn->recv->msgq = msgq; return ECP_OK; } -void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { +void ecp_msgq_destroy(ECPRBConn *conn) { + ECPMsgQ *msgq = conn->recv->msgq; int i; - for (i=0; i<ECP_MAX_MTYPE; i++) { + for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { pthread_cond_destroy(&msgq->cond[i]); } - pthread_mutex_destroy(&msgq->mutex); + + conn->recv->msgq = NULL; } -int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { +void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPMsgQ *msgq = conn->recv->msgq; + msgq->seq_max = seq; msgq->seq_start = seq + 1; - - return ECP_OK; } -int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { - ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = &buf->msgq; +int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; - if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); - msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; + msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq; msgq->idx_w[mtype]++; if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; @@ -76,68 +79,72 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) return ECP_OK; } -ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { - ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = &buf->msgq; +ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; ECPRBuffer *rbuf = &buf->rbuf; - ecp_seq_t seq; + unsigned char *pld_buf; unsigned char *msg_buf; - unsigned char *content; + size_t pld_size, hdr_size, msg_size; + ecp_seq_t seq; unsigned short idx; - int _rv; - ssize_t rv; + int rv; - if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { if (timeout == -1) { - pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); + pthread_cond_wait(&msgq->cond[mtype], &buf->mutex); } else { struct timespec ts; - _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); - if (_rv) return ECP_ERR_TIMEOUT; + rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout)); + if (rv) return ECP_ERR_TIMEOUT; } } - seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; - - _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); - if (_rv) return ECP_ERR; + seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])]; - msg_buf = rbuf->arr.msg[idx].buf; - rv = rbuf->arr.msg[idx].size; - - content = ecp_msg_get_content(msg_buf, rv); - if (content == NULL) { - rv = ECP_ERR; - goto msgq_pop_fin; - } + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return ECP_ERR; - rv -= content - msg_buf; - if (msg_size < rv) { - rv = ECP_ERR_FULL; - goto msgq_pop_fin; - } + pld_buf = rbuf->arr.pld[idx].buf; + pld_size = rbuf->arr.pld[idx].size; - memcpy(msg, content, rv); + msg_buf = ecp_pld_get_msg(pld_buf, pld_size); + if (msg_buf == NULL) return ECP_ERR; + hdr_size = msg_buf - pld_buf; + msg_size = pld_size - hdr_size; - rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; - // if (rbuf->arr.msg[idx].flags == 0); + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.pld[idx].flags == 0); -msgq_pop_fin: msgq->idx_r[mtype]++; if (msgq->seq_start == seq) { int i; unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; for (i=0; i<msg_cnt; i++) { - if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } msgq->seq_start += i; } - return rv; + if (_msg_size < msg_size) return ECP_ERR_FULL; + if (msg_size) memcpy(msg, msg_buf, msg_size); + return msg_size; } -#endif /* ECP_WITH_MSGQ */
\ No newline at end of file +ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) { + ECPRBConn *_conn; + ssize_t rv; + + _conn = ecp_rbuf_get_rbconn(conn); + if (_conn == NULL) return ECP_ERR; + + pthread_mutex_lock(&_conn->recv->mutex); + rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout); + pthread_mutex_unlock(&_conn->recv->mutex); + + return rv; +} |