diff options
Diffstat (limited to 'code/core/msgq.c')
-rw-r--r-- | code/core/msgq.c | 121 |
1 files changed, 121 insertions, 0 deletions
diff --git a/code/core/msgq.c b/code/core/msgq.c new file mode 100644 index 0000000..53c55ea --- /dev/null +++ b/code/core/msgq.c @@ -0,0 +1,121 @@ +#include "core.h" + +#ifdef ECP_WITH_PTHREAD + +#include <sys/time.h> + +#define MIN(a,b) (((a)<(b))?(a):(b)) +#define MAX(a,b) (((a)>(b))?(a):(b)) + +#define MSG_NEXT(msgi, max_msgs) ((msgi + 1) % max_msgs) + +static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { + struct timeval tv; + uint64_t us_start; + + gettimeofday(&tv, NULL); + us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec; + us_start += msec * 1000; + ts->tv_sec = us_start / 1000000; + ts->tv_nsec = (us_start % 1000000) * 1000; + return ts; +} + +int ecp_conn_msgq_create(ECPConnection *conn) { + int i; + + for (i=0; i<ECP_MAX_MTYPE; i++) { + int rv = pthread_cond_init(&conn->msgq.cond[i], NULL); + if (rv) { + int j; + + for (j=0; j<i; j++) { + pthread_cond_destroy(&conn->msgq.cond[j]); + } + return ECP_ERR; + } + } + + return ECP_OK; +} + +void ecp_conn_msgq_destroy(ECPConnection *conn) { + int i; + + for (i=0; i<ECP_MAX_MTYPE; i++) { + pthread_cond_destroy(&conn->msgq.cond[i]); + } +} + +ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ECPConnMsgQ *msgq = &conn->msgq; + unsigned short msg_idx = msgq->empty_idx; + unsigned short i; + unsigned short done = 0; + unsigned char mtype; + + if (msg_size == 0) return ECP_OK; + + mtype = *msg; + msg++; + msg_size--; + + if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; + if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; + if (msg_size < ECP_MIN_MSG) return ECP_ERR_MIN_MSG; + + for (i=0; i<ECP_MAX_CONN_MSG; i++) { + if (!msgq->occupied[msg_idx]) { + ECPMessage *message = &msgq->msg[msg_idx]; + if (msg_size > 0) memcpy(message->msg, msg, msg_size); + message->size = msg_size; + if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]); + msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx; + msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1); + + msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); + msgq->occupied[msg_idx] = 1; + done = 1; + break; + } + msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); + } + if (done) { + return msg_size+1; + } else { + return ECP_ERR_MAX_CONN_MSG; + } +} + +ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { + ECPConnMsgQ *msgq = &conn->msgq; + ECPMessage *message; + ssize_t rv = ECP_OK; + unsigned short msg_idx; + + if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; + + if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) { + if (timeout == -1) { + pthread_cond_wait(&msgq->cond[mtype], &conn->mutex); + } else { + struct timespec ts; + int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout)); + if (_rv) rv = ECP_ERR_TIMEOUT; + } + } + if (!rv) { + msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]]; + msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1); + msgq->occupied[msg_idx] = 0; + message = &msgq->msg[msg_idx]; + rv = message->size; + if (rv >= 0) { + rv = MIN(msg_size, rv); + memcpy(msg, message->msg, rv); + } + } + return rv; +} + +#endif /* ECP_WITH_PTHREAD */
\ No newline at end of file |