diff options
author | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 |
---|---|---|
committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 |
commit | 38e2385f5846860916f8880d818b3b024b8c7dd9 (patch) | |
tree | 7bb01d9c38df29b49bf87c50317ec67c61c6e2a7 /code/core | |
parent | db44820eb01106f7780c7126e53885e8b34c8aea (diff) |
msgq implementation
Diffstat (limited to 'code/core')
-rw-r--r-- | code/core/Makefile | 2 | ||||
-rw-r--r-- | code/core/TODO | 7 | ||||
-rw-r--r-- | code/core/config.h | 1 | ||||
-rw-r--r-- | code/core/core.c | 4 | ||||
-rw-r--r-- | code/core/core.h | 3 | ||||
-rw-r--r-- | code/core/msgq.c | 112 | ||||
-rw-r--r-- | code/core/msgq.h | 23 | ||||
-rw-r--r-- | code/core/rbuf.h | 4 | ||||
-rw-r--r-- | code/core/rbuf_recv.c | 66 |
9 files changed, 130 insertions, 92 deletions
diff --git a/code/core/Makefile b/code/core/Makefile index 84090f1..ed0f760 100644 --- a/code/core/Makefile +++ b/code/core/Makefile @@ -1,7 +1,7 @@ MAKE=make CFLAGS = -I. -pthread -O3 $(PIC) -obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o +obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o subdirs = crypto posix htable diff --git a/code/core/TODO b/code/core/TODO index ade028d..0ee7499 100644 --- a/code/core/TODO +++ b/code/core/TODO @@ -1,8 +1,9 @@ + check for nonce -+ check for seq +- check for seq (without rbuf) - memzero keys after usage - implement socket message queue -core: -- msgq marker +rbuf: +- implement _wait variants of open / send +- consider adding one buffer for all msgs (frag. issue)
\ No newline at end of file diff --git a/code/core/config.h b/code/core/config.h index 7e2e416..ad67d8a 100644 --- a/code/core/config.h +++ b/code/core/config.h @@ -1,4 +1,5 @@ #define ECP_WITH_PTHREAD 1 #define ECP_WITH_HTABLE 1 #define ECP_WITH_RBUF 1 +#define ECP_WITH_MSGQ 1 #define ECP_DEBUG 1
\ No newline at end of file diff --git a/code/core/core.c b/code/core/core.c index 25680f3..d890291 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -1278,9 +1278,9 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { #ifdef ECP_WITH_MSGQ - pthread_mutex_lock(&conn->mutex); + pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex); ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout); - pthread_mutex_unlock(&conn->mutex); + pthread_mutex_unlock(&conn->rbuf.recv->msgq.mutex); return rv; #else return ECP_ERR_NOT_IMPLEMENTED; diff --git a/code/core/core.h b/code/core/core.h index 9738fb4..cb08bc9 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -93,6 +93,9 @@ typedef uint32_t ecp_seq_t; #include "timer.h" #ifdef ECP_WITH_RBUF +#ifdef ECP_WITH_MSGQ +#include "msgq.h" +#endif #include "rbuf.h" #endif diff --git a/code/core/msgq.c b/code/core/msgq.c index 2ba0ed5..c441acd 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -7,7 +7,7 @@ #define MIN(a,b) (((a)<(b))?(a):(b)) #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_NEXT(msgi, max_msgs) ((msgi + 1) % max_msgs) +#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MAX_CONN_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { struct timeval tv; @@ -22,16 +22,27 @@ static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { } int ecp_conn_msgq_create(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; int i; + int rv; + + if (msgq == NULL) return ECP_ERR; + + memset(msgq, 0, sizeof(ECPConnMsgQ)); + rv = pthread_mutex_init(&msgq->mutex, NULL); + if (rv) return ECP_ERR; + for (i=0; i<ECP_MAX_MTYPE; i++) { - int rv = pthread_cond_init(&conn->msgq.cond[i], NULL); + rv = pthread_cond_init(&msgq->cond[i], NULL); if (rv) { int j; for (j=0; j<i; j++) { - pthread_cond_destroy(&conn->msgq.cond[j]); + pthread_cond_destroy(&msgq->cond[j]); } + pthread_mutex_destroy(&msgq->mutex); return ECP_ERR; } } @@ -40,81 +51,78 @@ int ecp_conn_msgq_create(ECPConnection *conn) { } void ecp_conn_msgq_destroy(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; int i; + if (msgq == NULL) return; + for (i=0; i<ECP_MAX_MTYPE; i++) { - pthread_cond_destroy(&conn->msgq.cond[i]); + pthread_cond_destroy(&msgq->cond[i]); } + pthread_mutex_destroy(&msgq->mutex); } -ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ECPConnMsgQ *msgq = &conn->msgq; - unsigned short msg_idx = msgq->empty_idx; - unsigned short i; - unsigned short done = 0; - unsigned char mtype; - - if (msg_size == 0) return ECP_OK; +int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; - mtype = *msg; - msg++; - msg_size--; - - if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; - if (msg_size < 1) return ECP_ERR_MIN_MSG; + if (msgq == NULL) return ECP_ERR; - for (i=0; i<ECP_MAX_CONN_MSG; i++) { - if (!msgq->occupied[msg_idx]) { - ECPMessage *message = &msgq->msg[msg_idx]; - if (msg_size > 0) memcpy(message->msg, msg, msg_size); - message->size = msg_size; - if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]); - msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx; - msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1); - - msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); - msgq->occupied[msg_idx] = 1; - done = 1; - break; - } - msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); - } - if (done) { - return msg_size+1; - } else { - return ECP_ERR_MAX_CONN_MSG; - } + if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; + if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); + + msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; + msgq->idx_w[mtype]++; + + return ECP_OK; } ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { - ECPConnMsgQ *msgq = &conn->msgq; - ECPMessage *message; + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; - unsigned short msg_idx; + ecp_seq_t seq; + int idx; + if (msgq == NULL) return ECP_ERR; if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; - if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) { + if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { if (timeout == -1) { - pthread_cond_wait(&msgq->cond[mtype], &conn->mutex); + pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex); } else { struct timespec ts; - int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout)); + int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); if (_rv) rv = ECP_ERR_TIMEOUT; } } if (!rv) { - msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]]; - msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1); - msgq->occupied[msg_idx] = 0; - message = &msgq->msg[msg_idx]; - rv = message->size; + seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + msgq->idx_r[mtype]++; + idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); + if (idx < 0) rv = idx; + } + if (!rv) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + if (buf->rbuf.seq_start == seq) { + int i, _idx = idx; + ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + + for (i=0; i<msg_cnt; i++) { + if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; + _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); + } + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = _idx; + } + rv = buf->rbuf.msg[idx].size; if (rv >= 0) { rv = MIN(msg_size, rv); - memcpy(msg, message->msg, rv); + memcpy(msg, buf->rbuf.msg[idx].msg, rv); } } + return rv; } diff --git a/code/core/msgq.h b/code/core/msgq.h index 9d16f63..54319e0 100644 --- a/code/core/msgq.h +++ b/code/core/msgq.h @@ -3,30 +3,21 @@ #include <pthread.h> #include <string.h> -#define ECP_MAX_CONN_MSG 8 +#define ECP_MAX_CONN_MSG 32 #define ECP_ERR_MAX_CONN_MSG -100 -struct ECPConnection; - -typedef struct ECPMessage { - unsigned char msg[ECP_MAX_MSG]; - ssize_t size; -} ECPMessage; - typedef struct ECPConnMsgQ { - unsigned short empty_idx; - unsigned short occupied[ECP_MAX_CONN_MSG]; - unsigned short w_idx[ECP_MAX_MTYPE]; - unsigned short r_idx[ECP_MAX_MTYPE]; - unsigned short msg_idx[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG+1]; - ECPMessage msg[ECP_MAX_CONN_MSG]; - pthread_mutex_t mutex; + unsigned short idx_w[ECP_MAX_MTYPE]; + unsigned short idx_r[ECP_MAX_MTYPE]; + ecp_seq_t seq_msg[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG]; pthread_cond_t cond[ECP_MAX_MTYPE]; + pthread_mutex_t mutex; } ECPConnMsgQ; int ecp_conn_msgq_create(struct ECPConnection *conn); void ecp_conn_msgq_destroy(struct ECPConnection *conn); -ssize_t ecp_conn_msgq_push(struct ECPConnection *conn, unsigned char *msg, size_t msg_size); + +int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype); ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); #endif /* ECP_WITH_PTHREAD */
\ No newline at end of file diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 9bb9701..67ac8c9 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -42,7 +42,6 @@ typedef struct ECPRBMessage { unsigned char msg[ECP_MAX_PKT]; ssize_t size; unsigned char flags; - ECPNetAddr addr; } ECPRBMessage; typedef struct ECPRBuffer { @@ -65,6 +64,9 @@ typedef struct ECPRBRecv { ecp_ack_t hole_mask_full; ecp_ack_t hole_mask_empty; ECPRBuffer rbuf; +#ifdef ECP_WITH_MSGQ + ECPConnMsgQ msgq; +#endif } ECPRBRecv; typedef struct ECPRBSend { diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index f86cd13..ad9c53c 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -32,9 +32,20 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); if (rv < 0) return ECP_ERR_RBUF_DUP; - if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size); - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + + if (flags & ECP_RBUF_FLAG_DELIVERED) { +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + + ecp_msg_handle(conn, seq, msg, msg_size); + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + } + return rv; } @@ -48,17 +59,24 @@ static void msg_flush(ECPConnection *conn) { if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + ssize_t rv = 0; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; } else { - ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } + if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; + } else if (rv < 0) { + break; } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - buf->rbuf.msg_start = idx; - buf->rbuf.seq_start += i; + if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = idx; + } } static int ack_send(ECPConnection *conn) { @@ -177,31 +195,38 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; + int _rv = ECP_OK; int do_ack = 0; if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_RBUF_ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - buf->ack_map |= ack_mask; - do_ack = ack_shift(buf); + if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; + if (!_rv) { + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); - rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + rv = msg_store(conn, seq, msg, msg_size); + if (rv < 0) _rv = rv; + } } else { - return ECP_ERR_RBUF_IDX; + _rv = ECP_ERR_RBUF_IDX; } } else { if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + if (rv < 0) _rv = rv; } else { ecp_msg_handle(conn, seq, msg, msg_size); rv = msg_size; @@ -209,14 +234,22 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch buf->rbuf.seq_start++; buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); } - buf->seq_ack++; + if (!_rv) buf->seq_ack++; } else { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) return rv; + if (rv < 0) _rv = rv; - do_ack = ack_shift(buf); + if (!_rv) do_ack = ack_shift(buf); } } + if (!_rv) msg_flush(conn); + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + + if (_rv) return _rv; + if (buf->flush) { buf->flush = 0; do_ack = 1; @@ -230,7 +263,6 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch int _rv = ack_send(conn); if (_rv) return _rv; } - msg_flush(conn); return rv; } |