summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-14 19:56:24 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-14 19:56:24 +0200
commit38e2385f5846860916f8880d818b3b024b8c7dd9 (patch)
tree7bb01d9c38df29b49bf87c50317ec67c61c6e2a7
parentdb44820eb01106f7780c7126e53885e8b34c8aea (diff)
msgq implementation
-rw-r--r--code/core/Makefile2
-rw-r--r--code/core/TODO7
-rw-r--r--code/core/config.h1
-rw-r--r--code/core/core.c4
-rw-r--r--code/core/core.h3
-rw-r--r--code/core/msgq.c112
-rw-r--r--code/core/msgq.h23
-rw-r--r--code/core/rbuf.h4
-rw-r--r--code/core/rbuf_recv.c66
9 files changed, 130 insertions, 92 deletions
diff --git a/code/core/Makefile b/code/core/Makefile
index 84090f1..ed0f760 100644
--- a/code/core/Makefile
+++ b/code/core/Makefile
@@ -1,7 +1,7 @@
MAKE=make
CFLAGS = -I. -pthread -O3 $(PIC)
-obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o
+obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o
subdirs = crypto posix htable
diff --git a/code/core/TODO b/code/core/TODO
index ade028d..0ee7499 100644
--- a/code/core/TODO
+++ b/code/core/TODO
@@ -1,8 +1,9 @@
+ check for nonce
-+ check for seq
+- check for seq (without rbuf)
- memzero keys after usage
- implement socket message queue
-core:
-- msgq marker
+rbuf:
+- implement _wait variants of open / send
+- consider adding one buffer for all msgs (frag. issue) \ No newline at end of file
diff --git a/code/core/config.h b/code/core/config.h
index 7e2e416..ad67d8a 100644
--- a/code/core/config.h
+++ b/code/core/config.h
@@ -1,4 +1,5 @@
#define ECP_WITH_PTHREAD 1
#define ECP_WITH_HTABLE 1
#define ECP_WITH_RBUF 1
+#define ECP_WITH_MSGQ 1
#define ECP_DEBUG 1 \ No newline at end of file
diff --git a/code/core/core.c b/code/core/core.c
index 25680f3..d890291 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -1278,9 +1278,9 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz
ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {
#ifdef ECP_WITH_MSGQ
- pthread_mutex_lock(&conn->mutex);
+ pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex);
ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout);
- pthread_mutex_unlock(&conn->mutex);
+ pthread_mutex_unlock(&conn->rbuf.recv->msgq.mutex);
return rv;
#else
return ECP_ERR_NOT_IMPLEMENTED;
diff --git a/code/core/core.h b/code/core/core.h
index 9738fb4..cb08bc9 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -93,6 +93,9 @@ typedef uint32_t ecp_seq_t;
#include "timer.h"
#ifdef ECP_WITH_RBUF
+#ifdef ECP_WITH_MSGQ
+#include "msgq.h"
+#endif
#include "rbuf.h"
#endif
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 2ba0ed5..c441acd 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -7,7 +7,7 @@
#define MIN(a,b) (((a)<(b))?(a):(b))
#define MAX(a,b) (((a)>(b))?(a):(b))
-#define MSG_NEXT(msgi, max_msgs) ((msgi + 1) % max_msgs)
+#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MAX_CONN_MSG) - 1))
static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {
struct timeval tv;
@@ -22,16 +22,27 @@ static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {
}
int ecp_conn_msgq_create(ECPConnection *conn) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
int i;
+ int rv;
+
+ if (msgq == NULL) return ECP_ERR;
+
+ memset(msgq, 0, sizeof(ECPConnMsgQ));
+ rv = pthread_mutex_init(&msgq->mutex, NULL);
+ if (rv) return ECP_ERR;
+
for (i=0; i<ECP_MAX_MTYPE; i++) {
- int rv = pthread_cond_init(&conn->msgq.cond[i], NULL);
+ rv = pthread_cond_init(&msgq->cond[i], NULL);
if (rv) {
int j;
for (j=0; j<i; j++) {
- pthread_cond_destroy(&conn->msgq.cond[j]);
+ pthread_cond_destroy(&msgq->cond[j]);
}
+ pthread_mutex_destroy(&msgq->mutex);
return ECP_ERR;
}
}
@@ -40,81 +51,78 @@ int ecp_conn_msgq_create(ECPConnection *conn) {
}
void ecp_conn_msgq_destroy(ECPConnection *conn) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
int i;
+ if (msgq == NULL) return;
+
for (i=0; i<ECP_MAX_MTYPE; i++) {
- pthread_cond_destroy(&conn->msgq.cond[i]);
+ pthread_cond_destroy(&msgq->cond[i]);
}
+ pthread_mutex_destroy(&msgq->mutex);
}
-ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
- ECPConnMsgQ *msgq = &conn->msgq;
- unsigned short msg_idx = msgq->empty_idx;
- unsigned short i;
- unsigned short done = 0;
- unsigned char mtype;
-
- if (msg_size == 0) return ECP_OK;
+int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
- mtype = *msg;
- msg++;
- msg_size--;
-
- if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
- if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG;
- if (msg_size < 1) return ECP_ERR_MIN_MSG;
+ if (msgq == NULL) return ECP_ERR;
- for (i=0; i<ECP_MAX_CONN_MSG; i++) {
- if (!msgq->occupied[msg_idx]) {
- ECPMessage *message = &msgq->msg[msg_idx];
- if (msg_size > 0) memcpy(message->msg, msg, msg_size);
- message->size = msg_size;
- if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
- msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx;
- msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1);
-
- msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG);
- msgq->occupied[msg_idx] = 1;
- done = 1;
- break;
- }
- msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG);
- }
- if (done) {
- return msg_size+1;
- } else {
- return ECP_ERR_MAX_CONN_MSG;
- }
+ if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG;
+ if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
+
+ msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq;
+ msgq->idx_w[mtype]++;
+
+ return ECP_OK;
}
ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {
- ECPConnMsgQ *msgq = &conn->msgq;
- ECPMessage *message;
+ ECPRBRecv *buf = conn->rbuf.recv;
+ ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
ssize_t rv = ECP_OK;
- unsigned short msg_idx;
+ ecp_seq_t seq;
+ int idx;
+ if (msgq == NULL) return ECP_ERR;
if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
- if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) {
+ if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {
if (timeout == -1) {
- pthread_cond_wait(&msgq->cond[mtype], &conn->mutex);
+ pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);
} else {
struct timespec ts;
- int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout));
+ int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));
if (_rv) rv = ECP_ERR_TIMEOUT;
}
}
if (!rv) {
- msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]];
- msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1);
- msgq->occupied[msg_idx] = 0;
- message = &msgq->msg[msg_idx];
- rv = message->size;
+ seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])];
+ msgq->idx_r[mtype]++;
+ idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
+ if (idx < 0) rv = idx;
+ }
+ if (!rv) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ if (buf->rbuf.seq_start == seq) {
+ int i, _idx = idx;
+ ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
+
+ for (i=0; i<msg_cnt; i++) {
+ if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break;
+ _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size);
+ }
+ buf->rbuf.seq_start += i;
+ buf->rbuf.msg_start = _idx;
+ }
+ rv = buf->rbuf.msg[idx].size;
if (rv >= 0) {
rv = MIN(msg_size, rv);
- memcpy(msg, message->msg, rv);
+ memcpy(msg, buf->rbuf.msg[idx].msg, rv);
}
}
+
return rv;
}
diff --git a/code/core/msgq.h b/code/core/msgq.h
index 9d16f63..54319e0 100644
--- a/code/core/msgq.h
+++ b/code/core/msgq.h
@@ -3,30 +3,21 @@
#include <pthread.h>
#include <string.h>
-#define ECP_MAX_CONN_MSG 8
+#define ECP_MAX_CONN_MSG 32
#define ECP_ERR_MAX_CONN_MSG -100
-struct ECPConnection;
-
-typedef struct ECPMessage {
- unsigned char msg[ECP_MAX_MSG];
- ssize_t size;
-} ECPMessage;
-
typedef struct ECPConnMsgQ {
- unsigned short empty_idx;
- unsigned short occupied[ECP_MAX_CONN_MSG];
- unsigned short w_idx[ECP_MAX_MTYPE];
- unsigned short r_idx[ECP_MAX_MTYPE];
- unsigned short msg_idx[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG+1];
- ECPMessage msg[ECP_MAX_CONN_MSG];
- pthread_mutex_t mutex;
+ unsigned short idx_w[ECP_MAX_MTYPE];
+ unsigned short idx_r[ECP_MAX_MTYPE];
+ ecp_seq_t seq_msg[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG];
pthread_cond_t cond[ECP_MAX_MTYPE];
+ pthread_mutex_t mutex;
} ECPConnMsgQ;
int ecp_conn_msgq_create(struct ECPConnection *conn);
void ecp_conn_msgq_destroy(struct ECPConnection *conn);
-ssize_t ecp_conn_msgq_push(struct ECPConnection *conn, unsigned char *msg, size_t msg_size);
+
+int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype);
ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout);
#endif /* ECP_WITH_PTHREAD */ \ No newline at end of file
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 9bb9701..67ac8c9 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -42,7 +42,6 @@ typedef struct ECPRBMessage {
unsigned char msg[ECP_MAX_PKT];
ssize_t size;
unsigned char flags;
- ECPNetAddr addr;
} ECPRBMessage;
typedef struct ECPRBuffer {
@@ -65,6 +64,9 @@ typedef struct ECPRBRecv {
ecp_ack_t hole_mask_full;
ecp_ack_t hole_mask_empty;
ECPRBuffer rbuf;
+#ifdef ECP_WITH_MSGQ
+ ECPConnMsgQ msgq;
+#endif
} ECPRBRecv;
typedef struct ECPRBSend {
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index f86cd13..ad9c53c 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -32,9 +32,20 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);
if (rv < 0) return ECP_ERR_RBUF_DUP;
- if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
-
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (flags & ECP_RBUF_FLAG_DELIVERED) {
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+#endif
+
+ ecp_msg_handle(conn, seq, msg, msg_size);
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+#endif
+ }
+
return rv;
}
@@ -48,17 +59,24 @@ static void msg_flush(ECPConnection *conn) {
if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;
if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ ssize_t rv = 0;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
} else {
- ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
+ if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED;
+ } else if (rv < 0) {
+ break;
}
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
- buf->rbuf.msg_start = idx;
- buf->rbuf.seq_start += i;
+ if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) {
+ buf->rbuf.seq_start += i;
+ buf->rbuf.msg_start = idx;
+ }
}
static int ack_send(ECPConnection *conn) {
@@ -177,31 +195,38 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
+ int _rv = ECP_OK;
int do_ack = 0;
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex);
+#endif
+
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_RBUF_ACK_SIZE) {
ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset);
- if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP;
- buf->ack_map |= ack_mask;
- do_ack = ack_shift(buf);
+ if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP;
+ if (!_rv) {
+ buf->ack_map |= ack_mask;
+ do_ack = ack_shift(buf);
- rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ rv = msg_store(conn, seq, msg, msg_size);
+ if (rv < 0) _rv = rv;
+ }
} else {
- return ECP_ERR_RBUF_IDX;
+ _rv = ECP_ERR_RBUF_IDX;
}
} else {
if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {
if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ if (rv < 0) _rv = rv;
} else {
ecp_msg_handle(conn, seq, msg, msg_size);
rv = msg_size;
@@ -209,14 +234,22 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
buf->rbuf.seq_start++;
buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
- buf->seq_ack++;
+ if (!_rv) buf->seq_ack++;
} else {
rv = msg_store(conn, seq, msg, msg_size);
- if (rv < 0) return rv;
+ if (rv < 0) _rv = rv;
- do_ack = ack_shift(buf);
+ if (!_rv) do_ack = ack_shift(buf);
}
}
+ if (!_rv) msg_flush(conn);
+
+#ifdef ECP_WITH_MSGQ
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
+#endif
+
+ if (_rv) return _rv;
+
if (buf->flush) {
buf->flush = 0;
do_ack = 1;
@@ -230,7 +263,6 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
int _rv = ack_send(conn);
if (_rv) return _rv;
}
- msg_flush(conn);
return rv;
}