diff options
Diffstat (limited to 'ecp/src/msgq.c')
-rw-r--r-- | ecp/src/msgq.c | 68 |
1 files changed, 36 insertions, 32 deletions
diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c index 61c7b02..8f81d8a 100644 --- a/ecp/src/msgq.c +++ b/ecp/src/msgq.c @@ -4,9 +4,6 @@ #include <sys/time.h> -#define MIN(a,b) (((a)<(b))?(a):(b)) -#define MAX(a,b) (((a)>(b))?(a):(b)) - #define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { @@ -25,8 +22,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { int i; int rv; - if (msgq == NULL) return ECP_ERR; - memset(msgq, 0, sizeof(ECPConnMsgQ)); rv = pthread_mutex_init(&msgq->mutex, NULL); @@ -51,8 +46,6 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { int i; - if (msgq == NULL) return; - for (i=0; i<ECP_MAX_MTYPE; i++) { pthread_cond_destroy(&msgq->cond[i]); } @@ -60,8 +53,6 @@ void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { } int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { - if (msgq == NULL) return ECP_ERR; - msgq->seq_max = seq; msgq->seq_start = seq + 1; @@ -70,13 +61,11 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + ECPConnMsgQ *msgq = &buf->msgq; - mtype &= ECP_MTYPE_MASK; - if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; + if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; @@ -89,13 +78,15 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { ECPRBRecv *buf = conn->rbuf.recv; - ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + ECPConnMsgQ *msgq = &buf->msgq; + ECPRBuffer *rbuf = &buf->rbuf; ecp_seq_t seq; - ecp_seq_t seq_offset; - unsigned int idx; + unsigned char *msg_buf; + unsigned char *content; + unsigned short idx; + int _rv; ssize_t rv; - if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { @@ -103,35 +94,48 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); } else { struct timespec ts; - int _rv; _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); if (_rv) return ECP_ERR_TIMEOUT; } } - seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; - seq_offset = seq - buf->rbuf.seq_start; - idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); - msgq->idx_r[mtype]++; - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (_rv) return ECP_ERR; + msg_buf = rbuf->arr.msg[idx].buf; + rv = rbuf->arr.msg[idx].size; + + content = ecp_msg_get_content(msg_buf, rv); + if (content == NULL) { + rv = ECP_ERR; + goto msgq_pop_fin; + } + + rv -= content - msg_buf; + if (msg_size < rv) { + rv = ECP_ERR_FULL; + goto msgq_pop_fin; + } + + memcpy(msg, content, rv); + + rbuf->arr.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.msg[idx].flags == 0); + +msgq_pop_fin: + msgq->idx_r[mtype]++; if (msgq->seq_start == seq) { - int i, _idx = idx; - ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; + int i; + unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; for (i=0; i<msg_cnt; i++) { - if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; - _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); + if (rbuf->arr.msg[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } msgq->seq_start += i; } - rv = buf->rbuf.msg[idx].size - 1; - if (rv >= 0) { - rv = MIN(msg_size, rv); - memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); - } return rv; } |