diff options
Diffstat (limited to 'ecp/src/msgq.c')
-rw-r--r-- | ecp/src/msgq.c | 65 |
1 files changed, 34 insertions, 31 deletions
diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c index 201af4a..61c7b02 100644 --- a/ecp/src/msgq.c +++ b/ecp/src/msgq.c @@ -12,7 +12,7 @@ static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { struct timeval tv; uint64_t us_start; - + gettimeofday(&tv, NULL); us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec; us_start += msec * 1000; @@ -26,9 +26,9 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { int rv; if (msgq == NULL) return ECP_ERR; - + memset(msgq, 0, sizeof(ECPConnMsgQ)); - + rv = pthread_mutex_init(&msgq->mutex, NULL); if (rv) return ECP_ERR; @@ -50,7 +50,7 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) { void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) { int i; - + if (msgq == NULL) return; for (i=0; i<ECP_MAX_MTYPE; i++) { @@ -64,7 +64,7 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) { msgq->seq_max = seq; msgq->seq_start = seq + 1; - + return ECP_OK; } @@ -75,23 +75,25 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) mtype &= ECP_MTYPE_MASK; if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - + if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); - + msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; msgq->idx_w[mtype]++; if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; - + return ECP_OK; } ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; - ssize_t rv = ECP_OK; - int idx; + ecp_seq_t seq; + ecp_seq_t seq_offset; + unsigned int idx; + ssize_t rv; if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; @@ -101,33 +103,34 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); } else { struct timespec ts; - int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); - if (_rv) rv = ECP_ERR_TIMEOUT; + int _rv; + + _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); + if (_rv) return ECP_ERR_TIMEOUT; } } - if (!rv) { - ecp_seq_t seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; - ecp_seq_t seq_offset = seq - buf->rbuf.seq_start; - unsigned int idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); - msgq->idx_r[mtype]++; - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + seq_offset = seq - buf->rbuf.seq_start; + idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); - if (msgq->seq_start == seq) { - int i, _idx = idx; - ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; + msgq->idx_r[mtype]++; + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; - for (i=0; i<msg_cnt; i++) { - if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; - _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); - } - msgq->seq_start += i; - } - rv = buf->rbuf.msg[idx].size - 1; - if (rv >= 0) { - rv = MIN(msg_size, rv); - memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); + if (msgq->seq_start == seq) { + int i, _idx = idx; + ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; + + for (i=0; i<msg_cnt; i++) { + if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); } + msgq->seq_start += i; + } + rv = buf->rbuf.msg[idx].size - 1; + if (rv >= 0) { + rv = MIN(msg_size, rv); + memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); } return rv; |