diff options
author | Uros Majstorovic <majstor@majstor.org> | 2017-08-26 21:59:08 +0200 |
---|---|---|
committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-26 21:59:08 +0200 |
commit | b83e58e21ea7dda57ddfda47bd1539d15abe687f (patch) | |
tree | 801339717059b1be494e872d90836b4e4564d462 /code | |
parent | e800e8df9fbe633d09a534ae07eb361833572f1a (diff) |
fragments and packet timestamp implemented
Diffstat (limited to 'code')
-rw-r--r-- | code/core/core.c | 112 | ||||
-rw-r--r-- | code/core/core.h | 50 | ||||
-rw-r--r-- | code/core/msgq.c | 4 | ||||
-rw-r--r-- | code/core/msgq.h | 2 | ||||
-rw-r--r-- | code/core/posix/time.c | 6 | ||||
-rw-r--r-- | code/core/rbuf.h | 9 | ||||
-rw-r--r-- | code/core/rbuf_recv.c | 94 | ||||
-rw-r--r-- | code/core/rbuf_send.c | 87 | ||||
-rw-r--r-- | code/core/timer.c | 18 | ||||
-rw-r--r-- | code/core/timer.h | 12 | ||||
-rw-r--r-- | code/test/basic.c | 8 | ||||
-rw-r--r-- | code/test/client.c | 4 | ||||
-rw-r--r-- | code/test/echo.c | 2 | ||||
-rw-r--r-- | code/test/server.c | 4 | ||||
-rw-r--r-- | code/test/stress.c | 6 | ||||
-rw-r--r-- | code/test/vc_client.c | 4 | ||||
-rw-r--r-- | code/test/vc_server.c | 4 | ||||
-rw-r--r-- | code/test/voip.c | 2 | ||||
-rw-r--r-- | code/vconn/vconn.c | 24 |
19 files changed, 294 insertions, 158 deletions
diff --git a/code/core/core.c b/code/core/core.c index ca097c4..8e21e82 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -528,7 +528,7 @@ void ecp_conn_unregister(ECPConnection *conn) { } static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); @@ -565,7 +565,7 @@ int ecp_conn_open(ECPConnection *conn, ECPNode *node) { return ECP_OK; } -int ecp_conn_close(ECPConnection *conn, unsigned int timeout) { +int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout) { ECPSocket *sock = conn->sock; int refcount = 0; @@ -653,14 +653,15 @@ int ecp_conn_handler_init(ECPConnHandler *handler) { #ifdef ECP_WITH_RBUF handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack; handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush; + handler->msg[ECP_MTYPE_RBFLUSH_PTS] = ecp_rbuf_handle_flush_pts; #endif handler->conn_open = ecp_conn_send_open; return ECP_OK; } static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ); buf[0] = conn->type; @@ -777,7 +778,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m return 0; } else { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; unsigned char ctype = 0; if (conn->out) return ECP_ERR; @@ -826,8 +827,8 @@ ssize_t ecp_conn_handle_kget(ECPConnection *conn, ecp_seq_t seq, unsigned char m return ECP_ECDH_SIZE_KEY+1; } else { - unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); ecp_pld_set_type(payload, ECP_MTYPE_KGET_REP); if (conn->out) return ECP_ERR; @@ -850,7 +851,7 @@ ssize_t ecp_conn_handle_kput(ECPConnection *conn, ecp_seq_t seq, unsigned char m if (!conn->out) return ECP_ERR; return 0; } else { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; if (conn->out) return ECP_ERR; if (size < ECP_ECDH_SIZE_KEY+1) return ECP_ERR; @@ -872,8 +873,8 @@ ssize_t ecp_conn_handle_exec(ECPConnection *conn, ecp_seq_t seq, unsigned char m } static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); int rv = ECP_OK; ecp_pld_set_type(payload, ECP_MTYPE_KPUT_REQ); @@ -1031,10 +1032,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz } #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && !si->rb_pass) { - si->rb_mtype = ecp_pld_get_type(payload); - rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si); - } + if (conn->rbuf.send) rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si, payload); #endif if (!rv && !si->seq_w) conn->seq_out = _seq; @@ -1137,7 +1135,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, } pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY); - if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT; + if (pld_size < ECP_SIZE_PLD_HDR+1) rv = ECP_ERR_DECRYPT; if (rv) goto pkt_handle_err; seq_p = \ @@ -1150,14 +1148,14 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, if (conn == NULL) { if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_OPEN_REQ) { - rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_MSG_HDR, pld_size-ECP_SIZE_MSG_HDR); + rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_PLD_HDR+1, pld_size-ECP_SIZE_PLD_HDR-1); if (rv) return rv; seq_map = 1; seq_n = seq_p; } else if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_KGET_REQ) { - unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload_); + unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload_, 0); ecp_pld_set_type(payload_, ECP_MTYPE_KGET_REP); rv = ecp_sock_dhkey_get_curr(sock, buf, buf+1); @@ -1286,26 +1284,29 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s ecp_conn_handler_msg_t *handler = NULL; ssize_t rv = 0; unsigned char mtype = 0; + unsigned char *content = NULL; size_t rem_size = msg_size; if (msg_size < 1) return ECP_ERR_MIN_MSG; while (rem_size) { - mtype = msg[0]; - msg++; - rem_size--; - + mtype = ecp_msg_get_type(msg); + content = ecp_msg_get_content(msg, rem_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + + rem_size -= content - msg; + if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; ecp_timer_pop(conn, mtype); handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; if (handler) { - rv = handler(conn, seq, mtype, msg, rem_size); + rv = handler(conn, seq, mtype, content, rem_size); if (rv < 0) return rv; if (rv > rem_size) return ECP_ERR; rem_size -= rv; - msg += rv; + msg = content + rv; } else { return msg_size-rem_size-1; } @@ -1320,10 +1321,6 @@ int ecp_seq_item_init(ECPSeqItem *seq_item) { return ECP_OK; } -unsigned char *ecp_pld_get_buf(unsigned char *payload) { - return payload+ECP_SIZE_MSG_HDR; -} - void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) { payload[ECP_SIZE_PLD_HDR] = mtype; } @@ -1332,6 +1329,59 @@ unsigned char ecp_pld_get_type(unsigned char *payload) { return payload[ECP_SIZE_PLD_HDR]; } +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { + size_t offset = 0; + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); + + return payload + ECP_SIZE_PLD_HDR + 1 + offset; +} + +unsigned char ecp_msg_get_type(unsigned char *msg) { + return msg[0]; +} + +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { + size_t offset = 0; + unsigned char mtype = msg[0]; + + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); + + if (msg_size < 1 + offset) return NULL; + return msg + 1 + offset; +} + +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { + unsigned char mtype = msg[0]; + + if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; + if (msg_size < 3) return ECP_ERR; + + *frag_cnt = msg[1]; + *frag_tot = msg[2]; + + return ECP_OK; +} + +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { + size_t offset = 0; + unsigned char mtype = msg[0]; + + if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; + + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR; + + *pts = \ + (msg[1 + offset] << 24) | \ + (msg[2 + offset] << 16) | \ + (msg[3 + offset] << 8) | \ + (msg[4 + offset]); + + return ECP_OK; +} + static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; @@ -1342,7 +1392,7 @@ static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_i if (rv < 0) return rv; #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && !si->rb_pass) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv); + if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv); #endif if (ti) { @@ -1392,7 +1442,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz return ecp_pld_send(conn, payload, payload_size); } -ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { +ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { #ifdef ECP_WITH_RBUF #ifdef ECP_WITH_MSGQ pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex); @@ -1421,7 +1471,7 @@ static int recv_p(ECPSocket *sock) { } int ecp_receiver(ECPSocket *sock) { - unsigned int next = 0; + ecp_cts_t next = 0; sock->running = 1; while(sock->running) { int rv = sock->ctx->tr.poll(&sock->sock, next ? next : sock->poll_timeout); diff --git a/code/core/core.h b/code/core/core.h index 27d1e0d..cdb4b62 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -8,8 +8,10 @@ #define ECP_ERR_MAX_MTYPE -12 #define ECP_ERR_MIN_PKT -13 #define ECP_ERR_MAX_PLD -14 +// XXX ??? #define ECP_ERR_MIN_MSG -15 #define ECP_ERR_MAX_MSG -16 +// #define ECP_ERR_NET_ADDR -17 #define ECP_ERR_CONN_NOT_FOUND -20 @@ -40,21 +42,21 @@ #define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE) #define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ) -#define ECP_SIZE_MSG_HDR (ECP_SIZE_PLD_HDR+1) #define ECP_MAX_PKT 1412 #define ECP_MAX_PLD (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG) -#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_MSG_HDR) +#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_PLD_HDR-1) -#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) +#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD_HDR+1+ECP_AEAD_SIZE_TAG) #define ECP_POLL_TIMEOUT 500 #define ECP_ECDH_IDX_INV 0xFF #define ECP_ECDH_IDX_PERMA 0x0F -#define ECP_MTYPE_MASK 0x3f -#define ECP_MTYPE_FLAG_TIMER 0x80 -#define ECP_MTYPE_FLAG_REP 0x40 +#define ECP_MTYPE_FLAG_FRAG 0x80 +#define ECP_MTYPE_FLAG_PTS 0x40 +#define ECP_MTYPE_FLAG_REP 0x20 +#define ECP_MTYPE_MASK 0x1f #define ECP_MTYPE_OPEN 0x00 #define ECP_MTYPE_KGET 0x01 @@ -68,9 +70,6 @@ #define ECP_MTYPE_KPUT_REQ (ECP_MTYPE_KPUT) #define ECP_MTYPE_KPUT_REP (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP) -#define ECP_SIZE_PLD(X) ((X) + ECP_SIZE_MSG_HDR) -#define ECP_SIZE_PKT(X) ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) - #define ECP_CONN_FLAG_REG 0x01 #define ECP_CONN_FLAG_OPEN 0x02 @@ -94,11 +93,27 @@ typedef uint32_t ecp_ack_t; #define ECP_SIZE_ACKB (sizeof(ecp_ack_t)*8) #define ECP_ACK_FULL (~(ecp_ack_t)0) +typedef uint32_t ecp_cts_t; +#define ECP_CTS_HALF ((ecp_cts_t)1 << (sizeof(ecp_cts_t) * 8 - 1)) +#define ECP_CTS_LT(a, b) ((ecp_cts_t)((ecp_cts_t)(a) - (ecp_cts_t)(b)) > ECP_CTS_HALF) +#define ECP_CTS_LTE(a,b) ((ecp_cts_t)((ecp_cts_t)(b) - (ecp_cts_t)(a)) < ECP_CTS_HALF) + +typedef uint32_t ecp_pts_t; +#define ECP_PTS_HALF ((ecp_pts_t)1 << (sizeof(ecp_pts_t) * 8 - 1)) +#define ECP_PTS_LT(a, b) ((ecp_pts_t)((ecp_pts_t)(a) - (ecp_pts_t)(b)) > ECP_PTS_HALF) +#define ECP_PTS_LTE(a,b) ((ecp_pts_t)((ecp_pts_t)(b) - (ecp_pts_t)(a)) < ECP_PTS_HALF) + typedef uint32_t ecp_seq_t; #define ECP_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t) * 8 - 1)) #define ECP_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_SEQ_HALF) #define ECP_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_SEQ_HALF) + +#define ECP_SIZE_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0) +#define ECP_SIZE_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0) +#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_FRAG(F)+ECP_SIZE_PTS(F)) +#define ECP_SIZE_PKT(X,F) (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD(X,F)+ECP_AEAD_SIZE_TAG) + #ifdef ECP_WITH_PTHREAD #include <pthread.h> #endif @@ -165,8 +180,8 @@ typedef struct ECPTransportIface { typedef struct ECPTimeIface { int init; - unsigned int (*abstime_ms) (unsigned int); - void (*sleep_ms) (unsigned int); + ecp_cts_t (*abstime_ms) (ecp_cts_t); + void (*sleep_ms) (ecp_cts_t); } ECPTimeIface; #ifdef ECP_WITH_HTABLE @@ -255,7 +270,7 @@ typedef struct ECPContext { typedef struct ECPSocket { ECPContext *ctx; unsigned char running; - unsigned int poll_timeout; + int poll_timeout; ECPNetSock sock; ECPDHKey key_perma; ECPDHKey key[ECP_MAX_SOCK_KEY]; @@ -325,7 +340,7 @@ void ecp_conn_unregister(ECPConnection *conn); int ecp_conn_init(ECPConnection *conn, ECPNode *node); int ecp_conn_open(ECPConnection *conn, ECPNode *node); -int ecp_conn_close(ECPConnection *conn, unsigned int timeout); +int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout); int ecp_conn_handler_init(ECPConnHandler *handler); ssize_t ecp_conn_send_open(ECPConnection *conn); @@ -349,16 +364,21 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); int ecp_seq_item_init(ECPSeqItem *seq_item); -unsigned char *ecp_pld_get_buf(unsigned char *payload); void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype); unsigned char ecp_pld_get_type(unsigned char *payload); +unsigned char ecp_msg_get_type(unsigned char *msg); +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size); +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot); +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts); + ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); -ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); +ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout); int ecp_receiver(ECPSocket *sock); int ecp_start_receiver(ECPSocket *sock); diff --git a/code/core/msgq.c b/code/core/msgq.c index 7356eb8..743478f 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -9,7 +9,7 @@ #define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) -static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { +static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { struct timeval tv; uint64_t us_start; @@ -94,7 +94,7 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) return ECP_OK; } -ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { +ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; diff --git a/code/core/msgq.h b/code/core/msgq.h index 75c795e..dd0b9a8 100644 --- a/code/core/msgq.h +++ b/code/core/msgq.h @@ -21,6 +21,6 @@ void ecp_conn_msgq_destroy(struct ECPConnection *conn); int ecp_conn_msgq_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype); -ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); +ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout); #endif /* ECP_WITH_MSGQ */
\ No newline at end of file diff --git a/code/core/posix/time.c b/code/core/posix/time.c index 3dd3920..c7a238a 100644 --- a/code/core/posix/time.c +++ b/code/core/posix/time.c @@ -3,16 +3,16 @@ #include <core.h> -static unsigned int t_abstime_ms(unsigned int msec) { +static ecp_cts_t t_abstime_ms(ecp_cts_t msec) { struct timeval tv; - unsigned int ms_now; + ecp_cts_t ms_now; gettimeofday(&tv, NULL); ms_now = tv.tv_sec * 1000 + tv.tv_usec / 1000; return ms_now + msec; } -static void t_sleep_ms(unsigned int msec) { +static void t_sleep_ms(ecp_cts_t msec) { usleep(msec*1000); } diff --git a/code/core/rbuf.h b/code/core/rbuf.h index d10370a..c3e6ff9 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -9,6 +9,7 @@ #define ECP_MTYPE_RBACK 0x04 #define ECP_MTYPE_RBFLUSH 0x05 +#define ECP_MTYPE_RBFLUSH_PTS 0x06 #define ECP_ERR_RBUF_DUP -100 #define ECP_ERR_RBUF_FULL -101 @@ -51,9 +52,10 @@ typedef struct ECPRBTimer { typedef struct ECPRBRecv { unsigned char flags; unsigned char flush; - unsigned short deliver_delay; + unsigned char timer_pts; unsigned short hole_max; unsigned short ack_rate; + ecp_pts_t deliver_delay; ecp_seq_t seq_ack; ecp_seq_t ack_pkt; ecp_ack_t ack_map; @@ -100,15 +102,16 @@ int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); void ecp_rbuf_recv_destroy(struct ECPConnection *conn); int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max); -int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay); +int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, ecp_pts_t delay); int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); void ecp_rbuf_send_destroy(struct ECPConnection *conn); int ecp_rbuf_send_start(struct ECPConnection *conn); -int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si); +int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si, unsigned char *payload); ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, struct ECPSeqItem *si, unsigned char *packet, size_t pkt_size); ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); +ssize_t ecp_rbuf_handle_flush_pts(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index fba8219..0970f6a 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -8,7 +8,7 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype = msg[0] & ECP_MTYPE_MASK; + unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; @@ -42,28 +42,77 @@ static void msg_flush(ECPConnection *conn) { #endif ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + ecp_seq_t seq_next = buf->rbuf.seq_start; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; + if (buf->timer_pts) { + ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); + buf->timer_pts = 0; + } + for (i=0; i<msg_cnt; i++) { - if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break; - if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - int rv = 0; - ecp_seq_t seq = buf->rbuf.seq_start + i; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; - } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + } else { + int rv = ECP_OK; + ecp_pts_t msg_pts; + ecp_seq_t seq = buf->rbuf.seq_start + i; + unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg); + unsigned char frag_tot; + unsigned char frag_cnt; + + rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot); + if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) { + ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); + + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { + ecp_seq_t seq_fbeg = seq - frag_cnt; + ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts); + if (rv == ECP_OK) { + ecp_pts_t now = conn->sock->ctx->tm.abstime_ms(0); + if (ECP_PTS_LT(now, msg_pts)) { + ECPTimerItem ti; + ecp_seq_t seq_offset = seq - seq_next; + + rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now); + if (!rv) rv = ecp_timer_push(&ti); + if (!rv) buf->timer_pts = 1; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + seq_next = seq + 1; + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ - unsigned char mtype = buf->rbuf.msg[idx].msg[0]; - int rv = ecp_conn_msgq_push(conn, seq, mtype); - if (rv) break; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg)); + if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; #endif - } else { - ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } else { + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } } buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + } else { + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; + if (buf->hole_max) { + ecp_seq_t seq = buf->rbuf.seq_start + i; + ecp_seq_t seq_offset = buf->rbuf.seq_max - seq; + if (seq_offset <= buf->hole_max) break; + } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } @@ -78,8 +127,8 @@ static void msg_flush(ECPConnection *conn) { static int ack_send(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; - unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))]; - unsigned char *buf_ = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), 0)]; + unsigned char *buf_ = ecp_pld_get_buf(payload, 0); ssize_t rv; ecp_pld_set_type(payload, ECP_MTYPE_RBACK); @@ -150,6 +199,16 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char return 0; } +ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + ECPRBRecv *buf = conn->rbuf.recv; + + if (buf == NULL) return ECP_ERR; + + buf->timer_pts = 0; + msg_flush(conn); + return 0; +} + int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; @@ -185,13 +244,10 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { return ECP_OK; } -int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { +int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; - if (buf->hole_max < delay - 1) { - ecp_rbuf_recv_set_hole(conn, delay - 1); - } return ECP_OK; } @@ -226,7 +282,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; - mtype = msg[0] & ECP_MTYPE_MASK; + mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size); if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index e0c7f29..d582e67 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -5,7 +5,7 @@ #define NACK_RATE_UNIT 10000 static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH); if (ti == NULL) { @@ -127,7 +127,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size); if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_NOP); ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i); } else { @@ -235,7 +235,7 @@ int ecp_rbuf_send_start(ECPConnection *conn) { int ecp_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_seq_t seq; if (buf == NULL) return ECP_ERR; @@ -269,7 +269,9 @@ int ecp_rbuf_flush(ECPConnection *conn) { return ECP_OK; } -int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char *payload) { + if (si->rb_pass) return ECP_OK; + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif @@ -280,6 +282,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { if (idx < 0) return idx; + si->rb_mtype = ecp_pld_get_type(payload); si->rb_idx = idx; buf->rbuf.msg[idx].size = 0; buf->rbuf.msg[idx].flags = 0; @@ -288,54 +291,58 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { } ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, ECPSeqItem *si, unsigned char *packet, size_t pkt_size) { - unsigned char flags = 0; int do_send = 1; ssize_t rv = 0; - ecp_seq_t seq = si->seq; - unsigned int idx = si->rb_idx; - unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; - - if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - - rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); - if (rv < 0) return rv; + + if (!si->rb_pass) { + unsigned char flags = 0; + ecp_seq_t seq = si->seq; + unsigned int idx = si->rb_idx; + unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif + if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); + if (rv < 0) return rv; - if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { - if (!buf->cnt_cc) buf->seq_cc = seq; - buf->cnt_cc++; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; - do_send = 0; - if (ti) { - ECPRBTimer *timer = &buf->timer; - ECPRBTimerItem *item = &timer->item[timer->idx_w]; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + int _rv = ECP_OK; + #ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); + #endif + + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + + if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { + if (!buf->cnt_cc) buf->seq_cc = seq; + buf->cnt_cc++; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; + do_send = 0; + if (ti) { + ECPRBTimer *timer = &buf->timer; + ECPRBTimerItem *item = &timer->item[timer->idx_w]; - if (!item->occupied) { - item->occupied = 1; - item->item = *ti; - buf->rbuf.msg[idx].idx_t = timer->idx_w; - timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; + if (!item->occupied) { + item->occupied = 1; + item->item = *ti; + buf->rbuf.msg[idx].idx_t = timer->idx_w; + timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; + } else { + _rv = ECP_ERR_MAX_TIMER; + } } else { - rv = ECP_ERR_MAX_TIMER; + buf->rbuf.msg[idx].idx_t = -1; } } else { - buf->rbuf.msg[idx].idx_t = -1; + buf->in_transit++; } - } else { - buf->in_transit++; - } -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif + #ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); + #endif - if (rv) return rv; + if (_rv) return _rv; + } } if (do_send) { diff --git a/code/core/timer.c b/code/core/timer.c index ea516df..9d2569e 100644 --- a/code/core/timer.c +++ b/code/core/timer.c @@ -20,7 +20,7 @@ void ecp_timer_destroy(ECPTimer *timer) { #endif } -int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout) { +int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout) { if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (ti == NULL) return ECP_ERR; @@ -61,7 +61,7 @@ int ecp_timer_push(ECPTimerItem *ti) { if (!rv) { for (i=timer->head; i>=0; i--) { - if (timer->item[i].abstime >= ti->abstime) { + if (ECP_CTS_LTE(ti->abstime, timer->item[i].abstime)) { if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i)); timer->item[i+1] = *ti; timer->head++; @@ -151,22 +151,22 @@ void ecp_timer_remove(ECPConnection *conn) { } -unsigned int ecp_timer_exe(ECPSocket *sock) { +ecp_cts_t ecp_timer_exe(ECPSocket *sock) { int i; - unsigned int ret = 0; + ecp_cts_t ret = 0; ECPTimer *timer = &sock->timer; ECPTimerItem to_exec[ECP_MAX_TIMER]; int to_exec_size = 0; - unsigned int now = sock->ctx->tm.abstime_ms(0); + ecp_cts_t now = sock->ctx->tm.abstime_ms(0); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif for (i=timer->head; i>=0; i--) { - unsigned int abstime = timer->item[i].abstime; + ecp_cts_t abstime = timer->item[i].abstime; - if (abstime > now) { + if (ECP_CTS_LT(now, abstime)) { ret = abstime - now; break; } @@ -191,7 +191,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) { ecp_timer_retry_t *retry = to_exec[i].retry; ecp_conn_handler_msg_t *handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; - if (to_exec[i].cnt) { + if (to_exec[i].cnt > 0) { ssize_t _rv = 0; to_exec[i].cnt--; if (retry) { @@ -219,7 +219,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) { return ret; } -ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout) { +ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout) { int rv = ECP_OK; ECPTimerItem ti; diff --git a/code/core/timer.h b/code/core/timer.h index ba25b6c..5919a42 100644 --- a/code/core/timer.h +++ b/code/core/timer.h @@ -15,9 +15,9 @@ typedef ssize_t ecp_timer_retry_t (struct ECPConnection *, struct ECPTimerItem * typedef struct ECPTimerItem { struct ECPConnection *conn; unsigned char mtype; - unsigned short cnt; - unsigned int abstime; - unsigned int timeout; + short cnt; + ecp_cts_t abstime; + ecp_cts_t timeout; ecp_timer_retry_t *retry; unsigned char *pld; size_t pld_size; @@ -33,9 +33,9 @@ typedef struct ECPTimer { int ecp_timer_create(ECPTimer *timer); void ecp_timer_destroy(ECPTimer *timer); -int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout); +int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout); int ecp_timer_push(ECPTimerItem *ti); void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype); void ecp_timer_remove(struct ECPConnection *conn); -unsigned int ecp_timer_exe(struct ECPSocket *sock); -ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout);
\ No newline at end of file +ecp_cts_t ecp_timer_exe(struct ECPSocket *sock); +ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout);
\ No newline at end of file diff --git a/code/test/basic.c b/code/test/basic.c index 809d453..dae2819 100644 --- a/code/test/basic.c +++ b/code/test/basic.c @@ -29,8 +29,8 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign return s; } - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "PERA JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); @@ -47,8 +47,8 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { printf("MSG S:%s size:%ld\n", p, s); - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "VAISTINU JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); diff --git a/code/test/client.c b/code/test/client.c index 1d5dc6c..992479b 100644 --- a/code/test/client.c +++ b/code/test/client.c @@ -25,8 +25,8 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign return s; } - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "PERA JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); diff --git a/code/test/echo.c b/code/test/echo.c index fe17901..593b904 100644 --- a/code/test/echo.c +++ b/code/test/echo.c @@ -15,7 +15,7 @@ ECPConnHandler handler_s; #define MTYPE_MSG 8 ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { - ssize_t rv = ecp_send(conn, p-ECP_SIZE_MSG_HDR, ECP_SIZE_MSG_HDR+s); + ssize_t rv = ecp_send(conn, p-ECP_SIZE_PLD_HDR-1, ECP_SIZE_PLD_HDR+1+s); return s; } diff --git a/code/test/server.c b/code/test/server.c index 6851cdf..471164e 100644 --- a/code/test/server.c +++ b/code/test/server.c @@ -17,8 +17,8 @@ ECPConnHandler handler_s; ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { printf("MSG S:%s size:%ld\n", p, s); - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "VAISTINU JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); diff --git a/code/test/stress.c b/code/test/stress.c index 081c0cb..d1ead1d 100644 --- a/code/test/stress.c +++ b/code/test/stress.c @@ -77,7 +77,7 @@ static void catchINFO(int sig) { void *sender(ECPConnection *c) { int idx = (int)(c->conn_data); - unsigned char payload[ECP_SIZE_PLD(1000)]; + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; printf("OPEN:%d\n", idx); while(1) { @@ -112,7 +112,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { int idx = (int)(conn->conn_data); - unsigned char payload[ECP_SIZE_PLD(1000)]; + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; if (c_start) { pthread_mutex_lock(&t_mtx[idx]); @@ -126,7 +126,7 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne } ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { - unsigned char payload[ECP_SIZE_PLD(1000)]; + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; ecp_pld_set_type(payload, MTYPE_MSG); ssize_t _rv = ecp_send(conn, payload, sizeof(payload)); return s; diff --git a/code/test/vc_client.c b/code/test/vc_client.c index 8f4b084..d5cb8f2 100644 --- a/code/test/vc_client.c +++ b/code/test/vc_client.c @@ -31,8 +31,8 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned printf("OPEN!\n"); - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "PERA JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); diff --git a/code/test/vc_server.c b/code/test/vc_server.c index fb6b923..47c6267 100644 --- a/code/test/vc_server.c +++ b/code/test/vc_server.c @@ -26,8 +26,8 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned ssize_t handle_msg(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) { printf("MSG S:%s size:%ld\n", p, s); - unsigned char payload[ECP_SIZE_PLD(1000)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1000, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); char *msg = "VAISTINU JE CAR!"; ecp_pld_set_type(payload, MTYPE_MSG); diff --git a/code/test/voip.c b/code/test/voip.c index 8c53d01..db1ea9f 100644 --- a/code/test/voip.c +++ b/code/test/voip.c @@ -188,7 +188,7 @@ int main(int argc, char *argv[]) { while(1) { ecp_pld_set_type(payload, MTYPE_MSG); - opus_buf = ecp_pld_get_buf(payload); + opus_buf = ecp_pld_get_buf(payload, 0); opus_int32 len = a_read(handle_cpt, alsa_in_buf, alsa_frames, opus_enc, opus_buf, ECP_MAX_MSG); if (len < 0) continue; ssize_t _rv = ecp_send(&conn, payload, len); diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c index c59aefb..4eebb62 100644 --- a/code/vconn/vconn.c +++ b/code/vconn/vconn.c @@ -70,7 +70,7 @@ static void vconn_destroy(ECPConnection *conn) { } static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); @@ -185,9 +185,9 @@ static ssize_t vconn_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c if (conn_out == NULL) return ECP_ERR; - payload = msg - ECP_SIZE_MSG_HDR; + payload = msg - ECP_SIZE_PLD_HDR - 1; ecp_pld_set_type(payload, ECP_MTYPE_EXEC); - rv = ecp_pld_send(conn_out, payload, ECP_SIZE_MSG_HDR+size); + rv = ecp_pld_send(conn_out, payload, size + msg - payload); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn_out->mutex); @@ -257,8 +257,8 @@ static void vlink_destroy(ECPConnection *conn) { static ssize_t _vlink_send_open(ECPConnection *conn, ECPTimerItem *ti) { ECPSocket *sock = conn->sock; ECPContext *ctx = sock->ctx; - unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); int rv = ECP_OK; // XXX server should verify perma_key @@ -347,9 +347,9 @@ static ssize_t vlink_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c if (conn == NULL) return ECP_ERR; - payload = msg - ECP_SIZE_MSG_HDR; + payload = msg - ECP_SIZE_PLD_HDR - 1; ecp_pld_set_type(payload, ECP_MTYPE_EXEC); - rv = ecp_pld_send(conn, payload, ECP_SIZE_MSG_HDR+size); + rv = ecp_pld_send(conn, payload, size + msg - payload); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn->mutex); @@ -373,11 +373,11 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t unsigned char *buf = NULL; int rv; - if (pld_out_size < ECP_SIZE_MSG_HDR+2+2*ECP_ECDH_SIZE_KEY) return ECP_ERR; + if (pld_out_size < ECP_SIZE_PLD_HDR + 3 + 2 * ECP_ECDH_SIZE_KEY) return ECP_ERR; if (conn_next == NULL) return ECP_ERR; ecp_pld_set_type(pld_out, ECP_MTYPE_OPEN_REQ); - buf = ecp_pld_get_buf(pld_out); + buf = ecp_pld_get_buf(pld_out, 0); buf[0] = ECP_CTYPE_VCONN; rv = ecp_conn_dhkey_get_curr(conn_next, NULL, buf+1); @@ -386,12 +386,12 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t memcpy(buf+1+ECP_ECDH_SIZE_KEY, ctx->cr.dh_pub_get_buf(&conn_next->node.public), ECP_ECDH_SIZE_KEY); buf[1+2*ECP_ECDH_SIZE_KEY] = ECP_MTYPE_RELAY; - return ECP_SIZE_MSG_HDR+2+2*ECP_ECDH_SIZE_KEY; + return ECP_SIZE_PLD_HDR + 3 + 2 * ECP_ECDH_SIZE_KEY; } } ecp_pld_set_type(pld_out, ECP_MTYPE_RELAY); - return ECP_SIZE_MSG_HDR; + return ECP_SIZE_PLD_HDR + 1; } @@ -506,7 +506,7 @@ int ecp_vconn_init(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn } static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); |