diff options
Diffstat (limited to 'code/core/msgq.c')
-rw-r--r-- | code/core/msgq.c | 48 |
1 files changed, 30 insertions, 18 deletions
diff --git a/code/core/msgq.c b/code/core/msgq.c index c441acd..15629c8 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -1,13 +1,13 @@ #include "core.h" -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ #include <sys/time.h> #define MIN(a,b) (((a)<(b))?(a):(b)) #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MAX_CONN_MSG) - 1)) +#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { struct timeval tv; @@ -63,17 +63,31 @@ void ecp_conn_msgq_destroy(ECPConnection *conn) { pthread_mutex_destroy(&msgq->mutex); } +int ecp_conn_msgq_start(ECPConnection *conn, ecp_seq_t seq) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + + if (msgq == NULL) return ECP_ERR; + + msgq->seq_max = seq; + msgq->seq_start = seq + 1; + + return ECP_OK; +} + int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; if (msgq == NULL) return ECP_ERR; - if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; + if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; msgq->idx_w[mtype]++; + + if (ECP_RBUF_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; return ECP_OK; } @@ -82,7 +96,6 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; - ecp_seq_t seq; int idx; if (msgq == NULL) return ECP_ERR; @@ -98,32 +111,31 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha } } if (!rv) { - seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + ecp_seq_t seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + ecp_seq_t seq_offset = seq - buf->rbuf.seq_start; + unsigned int idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); + msgq->idx_r[mtype]++; - idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); - if (idx < 0) rv = idx; - } - if (!rv) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; - if (buf->rbuf.seq_start == seq) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + + if (msgq->seq_start == seq) { int i, _idx = idx; - ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; for (i=0; i<msg_cnt; i++) { - if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; + if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); } - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = _idx; + msgq->seq_start += i; } - rv = buf->rbuf.msg[idx].size; + rv = buf->rbuf.msg[idx].size - 1; if (rv >= 0) { rv = MIN(msg_size, rv); - memcpy(msg, buf->rbuf.msg[idx].msg, rv); + memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); } } return rv; } -#endif /* ECP_WITH_PTHREAD */
\ No newline at end of file +#endif /* ECP_WITH_MSGQ */
\ No newline at end of file |