diff options
| author | Uros Majstorovic <majstor@majstor.org> | 2017-07-17 22:17:51 +0200 | 
|---|---|---|
| committer | Uros Majstorovic <majstor@majstor.org> | 2017-07-17 22:17:51 +0200 | 
| commit | 8f53a56d06b128406cba3ce6f13696eb823e6a11 (patch) | |
| tree | a6046e39346dc65d1901fecda480195e214d9870 | |
| parent | 37f628a88d800123dbad003b122322e8181c3baa (diff) | |
ring buffer initial commit
| -rw-r--r-- | code/core/Makefile | 2 | ||||
| -rw-r--r-- | code/core/config.h | 1 | ||||
| -rw-r--r-- | code/core/core.c | 16 | ||||
| -rw-r--r-- | code/core/core.h | 13 | ||||
| -rw-r--r-- | code/core/rbuf.h | 34 | ||||
| -rw-r--r-- | code/core/rbuf_recv.c | 154 | ||||
| -rw-r--r-- | code/proxy/proxy.c | 2 | ||||
| -rw-r--r-- | code/util/mknode.c | 1 | 
8 files changed, 212 insertions, 11 deletions
diff --git a/code/core/Makefile b/code/core/Makefile index f0e46a1..550c0cf 100644 --- a/code/core/Makefile +++ b/code/core/Makefile @@ -1,7 +1,7 @@  MAKE=make  CFLAGS = -I. -pthread -O3 $(PIC) -obj = core.o timer.o msgq.o +obj = core.o timer.o rbuf_recv.o  subdirs = crypto posix htable diff --git a/code/core/config.h b/code/core/config.h index 94efd29..7f9390f 100644 --- a/code/core/config.h +++ b/code/core/config.h @@ -1,3 +1,4 @@  #define ECP_WITH_PTHREAD    1  #define ECP_WITH_HTABLE     1 +#define ECP_WITH_RBUF       0  #define ECP_DEBUG           1
\ No newline at end of file diff --git a/code/core/core.c b/code/core/core.c index a4ccfc5..78b1028 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -426,20 +426,24 @@ int ecp_conn_create(ECPConnection *conn, ECPSocket *sock, unsigned char ctype) {  #ifdef ECP_WITH_PTHREAD      int rv = pthread_mutex_init(&conn->mutex, NULL);      if (rv) return ECP_ERR; +#endif +#ifdef ECP_WITH_MSGQ      rv = ecp_conn_msgq_create(conn);      if (rv) {          pthread_mutex_destroy(&conn->mutex);          return ECP_ERR;      }  #endif - +          return ECP_OK;  }  void ecp_conn_destroy(ECPConnection *conn) { -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ      ecp_conn_msgq_destroy(conn); +#endif +#ifdef ECP_WITH_PTHREAD      pthread_mutex_destroy(&conn->mutex);  #endif  } @@ -1087,8 +1091,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,      cnt_size = pld_size-ECP_SIZE_PLD_HDR; -#ifdef WITH_RBUF -    if  (conn->rbuf.recv) { +#ifdef ECP_WITH_RBUF +    if (conn->rbuf.recv) {          proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);      } else {          proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); @@ -1102,6 +1106,8 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *proxy,  #ifdef ECP_WITH_PTHREAD      pthread_mutex_lock(&conn->mutex); +#endif +#ifdef ECP_WITH_MSGQ      if (!rv && (cnt_size > 0)) {          proc_size = ecp_conn_msgq_push(conn, payload+pld_size-cnt_size, cnt_size);          if (proc_size < 0) rv = ECP_ERR_HANDLE; @@ -1227,7 +1233,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz  }  ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ      pthread_mutex_lock(&conn->mutex);      ssize_t rv = ecp_conn_msgq_pop(conn, mtype, msg, msg_size, timeout);      pthread_mutex_unlock(&conn->mutex); diff --git a/code/core/core.h b/code/core/core.h index 6c680f9..cb9bab6 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -70,7 +70,10 @@  #include "config.h"  #include <stddef.h> +#include <stdint.h> +  typedef long ssize_t; +typedef uint32_t ecp_seq_t;  #ifdef ECP_WITH_PTHREAD  #include <pthread.h> @@ -80,8 +83,8 @@ typedef long ssize_t;  #include "crypto/crypto.h"  #include "timer.h" -#ifdef ECP_WITH_PTHREAD -#include "msgq.h" +#ifdef ECP_WITH_RBUF +#include "rbuf.h"  #endif  #ifdef ECP_DEBUG @@ -95,8 +98,6 @@ struct ECPContext;  struct ECPSocket;  struct ECPConnection; -typedef uint32_t ecp_seq_t; -  typedef int ecp_rng_t (void *, size_t);  typedef int ecp_conn_handler_new_t (struct ECPSocket *s, struct ECPConnection **c, struct ECPConnection *p, unsigned char s_idx, unsigned char c_idx, unsigned char *pub, ecp_aead_key_t *sh, unsigned char *msg, size_t sz); @@ -250,8 +251,10 @@ typedef struct ECPConnection {      unsigned char key_idx_map[ECP_MAX_SOCK_KEY];      ECPDHShared shared[ECP_MAX_NODE_KEY][ECP_MAX_NODE_KEY];      unsigned char nonce[ECP_AEAD_SIZE_NONCE]; +#ifdef ECP_WITH_RBUF +    ECPConnRBuffer rbuf; +#endif  #ifdef ECP_WITH_PTHREAD -    ECPConnMsgQ msgq;      pthread_mutex_t mutex;  #endif      struct ECPConnection *proxy; diff --git a/code/core/rbuf.h b/code/core/rbuf.h new file mode 100644 index 0000000..9cf643d --- /dev/null +++ b/code/core/rbuf.h @@ -0,0 +1,34 @@ +#define ECP_ERR_RBUF_IDX    -1 +#define ECP_ERR_RBUF_DUP    -1 + +#define ECP_MAX_RBUF_MSGR    256 + +typedef uint32_t ecp_ack_t; + +typedef struct ECPRBMessage { +    unsigned char msg[ECP_MAX_MSG]; +    ssize_t size; +    char present; +} ECPRBMessage; + +typedef struct ECPRBuffer { +    unsigned char reliable; +    unsigned char deliver_delay; +    unsigned char hole_max; +    int msg_start; +    ecp_seq_t seq_ack; +    ecp_seq_t seq_max; +    ecp_seq_t seq_start; +    ecp_ack_t ack_map; +    ecp_ack_t hole_mask_full; +    ecp_ack_t hole_mask_empty; +    ECPRBMessage msg[ECP_MAX_RBUF_MSGR]; +} ECPRBuffer; + + +typedef struct ECPConnRBuffer { +    ECPRBuffer *recv; +    // ECPSBuffer *send; +} ECPConnRBuffer; + +ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
\ No newline at end of file diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c new file mode 100644 index 0000000..c2ab965 --- /dev/null +++ b/code/core/rbuf_recv.c @@ -0,0 +1,154 @@ +#include "core.h" + +#include <string.h> + +#define SEQ_HALF            ((ecp_seq_t)1 << (sizeof(ecp_seq_t)*8-1)) +#define ACK_FULL            (~(ecp_ack_t)0) +#define ACK_SIZE            (sizeof(ecp_ack_t)*8) +#define ACK_MASK_FIRST      ((ecp_ack_t)1 << (ACK_SIZE - 1)) + +#define SEQ_LT(a,b)         ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) +#define SEQ_LTE(a,b)        ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) + +#define IDX_MASK(a)         (a & (ECP_MAX_RBUF_MSGR-1)) +/* If ECP_MAX_RBUF_MSGR not pow of 2: +#define IDX_MASK(a)         (a % ECP_MAX_RBUF_MSGR) +*/ + +static int msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { +    ecp_seq_t seq_offset = seq - rbuf->seq_start; +         +    if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset); +    return ECP_ERR_RBUF_IDX; +} + +static int msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size, unsigned char delivered) { +    int idx; +    ecp_seq_t seq_offset = seq - rbuf->seq_start; +    if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX; +     +    if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; +    if (delivered) return ECP_OK; +     +    idx = msg_idx(rbuf, seq); +    if (idx < 0) return idx; +    if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP; +     +    rbuf->msg[idx].present = 1; + +    return ECP_OK; +} + +static int msg_flush(ECPConnection *conn, ECPRBuffer *rbuf) { +    int idx = rbuf->msg_start; +    ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; +    ecp_seq_t i = 0; +     +    for (i=0; i<msg_cnt; i++) { +        if (rbuf->reliable && !rbuf->msg[idx].present) break; +        if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break; +        if (rbuf->msg[idx].present) { +            rbuf->msg[idx].present = 0; +            // deliver idx +        } +        idx = IDX_MASK(idx + 1); +    } +    rbuf->msg_start = idx; +    rbuf->seq_start += i; +     +    return ECP_OK; +} + +static int ack_shift(ECPRBuffer *rbuf) { +    int do_ack = 0; +    int idx; +    int i; +     +    if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + +    idx = msg_idx(rbuf, rbuf->seq_ack); +    if (idx < 0) return idx; + +    while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) { +        idx = IDX_MASK(idx + 1); +        rbuf->seq_ack++; + +        if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue; +         +        rbuf->ack_map = rbuf->ack_map << 1; +        rbuf->ack_map |= rbuf->msg[idx].present; +        if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) { +            do_ack = 1; +        } + +        if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break; +    } +     +    if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) { +        ecp_ack_t hole_mask = rbuf->ack_map; + +        for (i=0; i<rbuf->hole_max-1; i++) { +            hole_mask = hole_mask >> 1; +            if ((hole_mask & rbuf->hole_mask_empty) == 0) { +                do_ack = 1; +                break; +            } +        } +    } +     +    return do_ack; +} + +int ecp_rbuf_recv_init(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { +    memset(rbuf, 0, sizeof(ECPRBuffer)); +    rbuf->hole_max = hole_max; +    rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); +    rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); +    rbuf->seq_ack = seq; +    rbuf->seq_max = seq; +    rbuf->seq_start = seq + 1; +    rbuf->ack_map = ACK_FULL; +     +    return ECP_OK; +} + +ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +    int rv; +    int do_ack = 0; +    ECPRBuffer *rbuf = conn->rbuf.recv; +     +    if (rbuf == NULL) return ECP_ERR; +     +    if (SEQ_LTE(seq, rbuf->seq_ack)) { +        if ((ecp_seq_t)(rbuf->seq_ack - seq) < ACK_SIZE) { +            rv = msg_store(rbuf, seq, msg, msg_size, 0); +            if (rv) return rv; + +            rbuf->ack_map |= (1 << (ecp_seq_t)(rbuf->seq_ack - seq)); +            do_ack = ack_shift(rbuf); +        } else { +            return ECP_ERR_RBUF_IDX; +        } +    } else { +        if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) { +            rv = msg_store(rbuf, seq, msg, msg_size, !rbuf->deliver_delay); +            if (rv) return rv; +             +            if (!rbuf->deliver_delay) { +                rbuf->seq_start++; +                // deliver +            } +            rbuf->seq_ack++; +        } else { +            rv = msg_store(rbuf, seq, msg, msg_size, 0); +            if (rv) return rv; + +            do_ack = ack_shift(rbuf); +        } +    } +    // XXX +    // update do_ack for pps +    // should send acks more aggresively when ack_map is not full (freq > RTT) +    // now send acks as per do_ack +    return msg_size; +}
\ No newline at end of file diff --git a/code/proxy/proxy.c b/code/proxy/proxy.c index 6e84dc3..0f90c20 100644 --- a/code/proxy/proxy.c +++ b/code/proxy/proxy.c @@ -1,6 +1,8 @@  #include "core.h"  #include "proxy.h" +#include <string.h> +  #ifdef ECP_WITH_PTHREAD  static pthread_mutex_t key_perma_mutex;  static pthread_mutex_t key_next_mutex; diff --git a/code/util/mknode.c b/code/util/mknode.c index 8f40e90..1c0a83a 100644 --- a/code/util/mknode.c +++ b/code/util/mknode.c @@ -2,6 +2,7 @@  #include <fcntl.h>  #include <unistd.h>  #include <stdio.h> +#include <string.h>  #include "core.h"  #include "util.h"  | 
