From 38e2385f5846860916f8880d818b3b024b8c7dd9 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Mon, 14 Aug 2017 19:56:24 +0200 Subject: msgq implementation --- code/core/msgq.c | 112 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 60 insertions(+), 52 deletions(-) (limited to 'code/core/msgq.c') diff --git a/code/core/msgq.c b/code/core/msgq.c index 2ba0ed5..c441acd 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -7,7 +7,7 @@ #define MIN(a,b) (((a)<(b))?(a):(b)) #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_NEXT(msgi, max_msgs) ((msgi + 1) % max_msgs) +#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MAX_CONN_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { struct timeval tv; @@ -22,16 +22,27 @@ static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { } int ecp_conn_msgq_create(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; int i; + int rv; + + if (msgq == NULL) return ECP_ERR; + + memset(msgq, 0, sizeof(ECPConnMsgQ)); + rv = pthread_mutex_init(&msgq->mutex, NULL); + if (rv) return ECP_ERR; + for (i=0; imsgq.cond[i], NULL); + rv = pthread_cond_init(&msgq->cond[i], NULL); if (rv) { int j; for (j=0; jmsgq.cond[j]); + pthread_cond_destroy(&msgq->cond[j]); } + pthread_mutex_destroy(&msgq->mutex); return ECP_ERR; } } @@ -40,81 +51,78 @@ int ecp_conn_msgq_create(ECPConnection *conn) { } void ecp_conn_msgq_destroy(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; int i; + if (msgq == NULL) return; + for (i=0; imsgq.cond[i]); + pthread_cond_destroy(&msgq->cond[i]); } + pthread_mutex_destroy(&msgq->mutex); } -ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ECPConnMsgQ *msgq = &conn->msgq; - unsigned short msg_idx = msgq->empty_idx; - unsigned short i; - unsigned short done = 0; - unsigned char mtype; - - if (msg_size == 0) return ECP_OK; +int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; - mtype = *msg; - msg++; - msg_size--; - - if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; - if (msg_size < 1) return ECP_ERR_MIN_MSG; + if (msgq == NULL) return ECP_ERR; - for (i=0; ioccupied[msg_idx]) { - ECPMessage *message = &msgq->msg[msg_idx]; - if (msg_size > 0) memcpy(message->msg, msg, msg_size); - message->size = msg_size; - if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]); - msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx; - msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1); - - msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); - msgq->occupied[msg_idx] = 1; - done = 1; - break; - } - msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); - } - if (done) { - return msg_size+1; - } else { - return ECP_ERR_MAX_CONN_MSG; - } + if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; + if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); + + msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; + msgq->idx_w[mtype]++; + + return ECP_OK; } ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { - ECPConnMsgQ *msgq = &conn->msgq; - ECPMessage *message; + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; - unsigned short msg_idx; + ecp_seq_t seq; + int idx; + if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) { + if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { if (timeout == -1) { - pthread_cond_wait(&msgq->cond[mtype], &conn->mutex); + pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); } else { struct timespec ts; - int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout)); + int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); if (_rv) rv = ECP_ERR_TIMEOUT; } } if (!rv) { - msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]]; - msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1); - msgq->occupied[msg_idx] = 0; - message = &msgq->msg[msg_idx]; - rv = message->size; + seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + msgq->idx_r[mtype]++; + idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); + if (idx < 0) rv = idx; + } + if (!rv) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + if (buf->rbuf.seq_start == seq) { + int i, _idx = idx; + ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + + for (i=0; irbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; + _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); + } + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = _idx; + } + rv = buf->rbuf.msg[idx].size; if (rv >= 0) { rv = MIN(msg_size, rv); - memcpy(msg, message->msg, rv); + memcpy(msg, buf->rbuf.msg[idx].msg, rv); } } + return rv; } -- cgit v1.2.3