diff options
| author | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 | 
|---|---|---|
| committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-14 19:56:24 +0200 | 
| commit | 38e2385f5846860916f8880d818b3b024b8c7dd9 (patch) | |
| tree | 7bb01d9c38df29b49bf87c50317ec67c61c6e2a7 /code/core | |
| parent | db44820eb01106f7780c7126e53885e8b34c8aea (diff) | |
msgq implementation
Diffstat (limited to 'code/core')
| -rw-r--r-- | code/core/Makefile | 2 | ||||
| -rw-r--r-- | code/core/TODO | 7 | ||||
| -rw-r--r-- | code/core/config.h | 1 | ||||
| -rw-r--r-- | code/core/core.c | 4 | ||||
| -rw-r--r-- | code/core/core.h | 3 | ||||
| -rw-r--r-- | code/core/msgq.c | 112 | ||||
| -rw-r--r-- | code/core/msgq.h | 23 | ||||
| -rw-r--r-- | code/core/rbuf.h | 4 | ||||
| -rw-r--r-- | code/core/rbuf_recv.c | 66 | 
9 files changed, 130 insertions, 92 deletions
| diff --git a/code/core/Makefile b/code/core/Makefile index 84090f1..ed0f760 100644 --- a/code/core/Makefile +++ b/code/core/Makefile @@ -1,7 +1,7 @@  MAKE=make  CFLAGS = -I. -pthread -O3 $(PIC) -obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o +obj = core.o timer.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o  subdirs = crypto posix htable diff --git a/code/core/TODO b/code/core/TODO index ade028d..0ee7499 100644 --- a/code/core/TODO +++ b/code/core/TODO @@ -1,8 +1,9 @@  + check for nonce -+ check for seq +- check for seq (without rbuf)  - memzero keys after usage  - implement socket message queue -core: -- msgq marker +rbuf: +- implement _wait variants of open / send +- consider adding one buffer for all msgs (frag. issue)
\ No newline at end of file diff --git a/code/core/config.h b/code/core/config.h index 7e2e416..ad67d8a 100644 --- a/code/core/config.h +++ b/code/core/config.h @@ -1,4 +1,5 @@  #define ECP_WITH_PTHREAD    1  #define ECP_WITH_HTABLE     1  #define ECP_WITH_RBUF       1 +#define ECP_WITH_MSGQ       1  #define ECP_DEBUG           1
\ No newline at end of file diff --git a/code/core/core.c b/code/core/core.c index 25680f3..d890291 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -1278,9 +1278,9 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz  ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {  #ifdef ECP_WITH_MSGQ -    pthread_mutex_lock(&conn->mutex); +    pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex);      ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout); -    pthread_mutex_unlock(&conn->mutex); +    pthread_mutex_unlock(&conn->rbuf.recv->msgq.mutex);      return rv;  #else      return ECP_ERR_NOT_IMPLEMENTED; diff --git a/code/core/core.h b/code/core/core.h index 9738fb4..cb08bc9 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -93,6 +93,9 @@ typedef uint32_t ecp_seq_t;  #include "timer.h"  #ifdef ECP_WITH_RBUF +#ifdef ECP_WITH_MSGQ +#include "msgq.h" +#endif  #include "rbuf.h"  #endif diff --git a/code/core/msgq.c b/code/core/msgq.c index 2ba0ed5..c441acd 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -7,7 +7,7 @@  #define MIN(a,b) (((a)<(b))?(a):(b))  #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_NEXT(msgi, max_msgs)    ((msgi + 1) % max_msgs) +#define MSG_IDX_MASK(idx)    ((idx) & ((ECP_MAX_CONN_MSG) - 1))  static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {      struct timeval tv; @@ -22,16 +22,27 @@ static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {  }  int ecp_conn_msgq_create(ECPConnection *conn) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      int i; +    int rv; + +    if (msgq == NULL) return ECP_ERR; +     +    memset(msgq, 0, sizeof(ECPConnMsgQ)); +    rv = pthread_mutex_init(&msgq->mutex, NULL); +    if (rv) return ECP_ERR; +      for (i=0; i<ECP_MAX_MTYPE; i++) { -        int rv = pthread_cond_init(&conn->msgq.cond[i], NULL); +        rv = pthread_cond_init(&msgq->cond[i], NULL);          if (rv) {              int j;              for (j=0; j<i; j++) { -                pthread_cond_destroy(&conn->msgq.cond[j]); +                pthread_cond_destroy(&msgq->cond[j]);              } +            pthread_mutex_destroy(&msgq->mutex);              return ECP_ERR;          }      } @@ -40,81 +51,78 @@ int ecp_conn_msgq_create(ECPConnection *conn) {  }  void ecp_conn_msgq_destroy(ECPConnection *conn) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      int i; +    if (msgq == NULL) return; +      for (i=0; i<ECP_MAX_MTYPE; i++) { -        pthread_cond_destroy(&conn->msgq.cond[i]); +        pthread_cond_destroy(&msgq->cond[i]);      } +    pthread_mutex_destroy(&msgq->mutex);  } -ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) { -    ECPConnMsgQ *msgq = &conn->msgq; -    unsigned short msg_idx = msgq->empty_idx; -    unsigned short i; -    unsigned short done = 0; -    unsigned char mtype; -     -    if (msg_size == 0) return ECP_OK; +int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; -    mtype = *msg; -    msg++; -    msg_size--; - -    if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; -    if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; -    if (msg_size < 1) return ECP_ERR_MIN_MSG; +    if (msgq == NULL) return ECP_ERR; -    for (i=0; i<ECP_MAX_CONN_MSG; i++) { -        if (!msgq->occupied[msg_idx]) { -            ECPMessage *message = &msgq->msg[msg_idx]; -            if (msg_size > 0) memcpy(message->msg, msg, msg_size); -            message->size = msg_size; -            if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]); -            msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx; -            msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1); - -            msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); -            msgq->occupied[msg_idx] = 1; -            done = 1; -            break; -        } -        msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); -    } -    if (done) { -        return msg_size+1; -    } else { -        return ECP_ERR_MAX_CONN_MSG; -    } +    if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; +    if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); +     +    msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; +    msgq->idx_w[mtype]++; +     +    return ECP_OK;  }  ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { -    ECPConnMsgQ *msgq = &conn->msgq; -    ECPMessage *message; +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      ssize_t rv = ECP_OK; -    unsigned short msg_idx; +    ecp_seq_t seq; +    int idx; +    if (msgq == NULL) return ECP_ERR;      if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; -    if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) { +    if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {          if (timeout == -1) { -            pthread_cond_wait(&msgq->cond[mtype], &conn->mutex); +            pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);          } else {              struct timespec ts; -            int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout)); +            int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));              if (_rv) rv = ECP_ERR_TIMEOUT;          }      }      if (!rv) { -        msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]]; -        msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1); -        msgq->occupied[msg_idx] = 0; -        message = &msgq->msg[msg_idx]; -        rv = message->size; +        seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; +        msgq->idx_r[mtype]++; +        idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); +        if (idx < 0) rv = idx; +    } +    if (!rv) { +        buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; +        if (buf->rbuf.seq_start == seq) { +            int i, _idx = idx; +            ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + +            for (i=0; i<msg_cnt; i++) { +                if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; +                _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); +            } +            buf->rbuf.seq_start += i; +            buf->rbuf.msg_start = _idx; +        } +        rv = buf->rbuf.msg[idx].size;          if (rv >= 0) {              rv = MIN(msg_size, rv); -            memcpy(msg, message->msg, rv); +            memcpy(msg, buf->rbuf.msg[idx].msg, rv);          }      } +      return rv;  } diff --git a/code/core/msgq.h b/code/core/msgq.h index 9d16f63..54319e0 100644 --- a/code/core/msgq.h +++ b/code/core/msgq.h @@ -3,30 +3,21 @@  #include <pthread.h>  #include <string.h> -#define ECP_MAX_CONN_MSG        8 +#define ECP_MAX_CONN_MSG        32  #define ECP_ERR_MAX_CONN_MSG    -100 -struct ECPConnection; - -typedef struct ECPMessage { -    unsigned char msg[ECP_MAX_MSG]; -    ssize_t size; -} ECPMessage; -  typedef struct ECPConnMsgQ { -    unsigned short empty_idx; -    unsigned short occupied[ECP_MAX_CONN_MSG]; -    unsigned short w_idx[ECP_MAX_MTYPE]; -    unsigned short r_idx[ECP_MAX_MTYPE]; -    unsigned short msg_idx[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG+1]; -    ECPMessage msg[ECP_MAX_CONN_MSG]; -    pthread_mutex_t mutex; +    unsigned short idx_w[ECP_MAX_MTYPE]; +    unsigned short idx_r[ECP_MAX_MTYPE]; +    ecp_seq_t seq_msg[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG];      pthread_cond_t cond[ECP_MAX_MTYPE]; +    pthread_mutex_t mutex;  } ECPConnMsgQ;  int ecp_conn_msgq_create(struct ECPConnection *conn);  void ecp_conn_msgq_destroy(struct ECPConnection *conn); -ssize_t ecp_conn_msgq_push(struct ECPConnection *conn, unsigned char *msg, size_t msg_size); + +int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype);  ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout);  #endif  /* ECP_WITH_PTHREAD */
\ No newline at end of file diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 9bb9701..67ac8c9 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -42,7 +42,6 @@ typedef struct ECPRBMessage {      unsigned char msg[ECP_MAX_PKT];      ssize_t size;      unsigned char flags; -    ECPNetAddr addr;  } ECPRBMessage;  typedef struct ECPRBuffer { @@ -65,6 +64,9 @@ typedef struct ECPRBRecv {      ecp_ack_t hole_mask_full;      ecp_ack_t hole_mask_empty;      ECPRBuffer rbuf; +#ifdef ECP_WITH_MSGQ +    ECPConnMsgQ msgq; +#endif  } ECPRBRecv;  typedef struct ECPRBSend { diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index f86cd13..ad9c53c 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -32,9 +32,20 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,      rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags);      if (rv < 0) return ECP_ERR_RBUF_DUP; -    if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size); -      if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + +    if (flags & ECP_RBUF_FLAG_DELIVERED) { +#ifdef ECP_WITH_MSGQ +        if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + +        ecp_msg_handle(conn, seq, msg, msg_size); + +#ifdef ECP_WITH_MSGQ +        if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif +    } +      return rv;  } @@ -48,17 +59,24 @@ static void msg_flush(ECPConnection *conn) {          if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break;          if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;          if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { -            buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; +            ssize_t rv = 0;              if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {                  buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;              } else { -                ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); +                rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); +            } +            if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { +                buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; +            } else if (rv < 0) { +                break;              }          }          idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);      } -    buf->rbuf.msg_start = idx; -    buf->rbuf.seq_start += i; +    if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { +        buf->rbuf.seq_start += i; +        buf->rbuf.msg_start = idx; +    }  }  static int ack_send(ECPConnection *conn) { @@ -177,31 +195,38 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch      ECPRBRecv *buf = conn->rbuf.recv;      ecp_seq_t ack_pkt = 0;      ssize_t rv; +    int _rv = ECP_OK;      int do_ack = 0;       if (buf == NULL) return ECP_ERR;      if (msg_size < 1) return ECP_ERR_MIN_MSG; +#ifdef ECP_WITH_MSGQ +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif +      if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;      if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {          ecp_seq_t seq_offset = buf->seq_ack - seq;          if (seq_offset < ECP_RBUF_ACK_SIZE) {              ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); -            if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; -            buf->ack_map |= ack_mask; -            do_ack = ack_shift(buf); +            if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; +            if (!_rv) { +                buf->ack_map |= ack_mask; +                do_ack = ack_shift(buf); -            rv = msg_store(conn, seq, msg, msg_size); -            if (rv < 0) return rv; +                rv = msg_store(conn, seq, msg, msg_size); +                if (rv < 0) _rv = rv; +            }          } else { -            return ECP_ERR_RBUF_IDX; +            _rv = ECP_ERR_RBUF_IDX;          }      } else {          if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {              if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {                  rv = msg_store(conn, seq, msg, msg_size); -                if (rv < 0) return rv; +                if (rv < 0) _rv = rv;              } else {                  ecp_msg_handle(conn, seq, msg, msg_size);                  rv = msg_size; @@ -209,14 +234,22 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch                  buf->rbuf.seq_start++;                  buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);              } -            buf->seq_ack++; +            if (!_rv) buf->seq_ack++;          } else {              rv = msg_store(conn, seq, msg, msg_size); -            if (rv < 0) return rv; +            if (rv < 0) _rv = rv; -            do_ack = ack_shift(buf); +            if (!_rv) do_ack = ack_shift(buf);          }      } +    if (!_rv) msg_flush(conn); + +#ifdef ECP_WITH_MSGQ +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif +     +    if (_rv) return _rv; +          if (buf->flush) {          buf->flush = 0;          do_ack = 1; @@ -230,7 +263,6 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch          int _rv = ack_send(conn);          if (_rv) return _rv;      } -    msg_flush(conn);      return rv;  } | 
