From b83e58e21ea7dda57ddfda47bd1539d15abe687f Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Sat, 26 Aug 2017 21:59:08 +0200 Subject: fragments and packet timestamp implemented --- code/core/core.c | 112 +++++++++++++++++++++++++++++++++++-------------- code/core/core.h | 50 +++++++++++++++------- code/core/msgq.c | 4 +- code/core/msgq.h | 2 +- code/core/posix/time.c | 6 +-- code/core/rbuf.h | 9 ++-- code/core/rbuf_recv.c | 94 ++++++++++++++++++++++++++++++++--------- code/core/rbuf_send.c | 87 ++++++++++++++++++++------------------ code/core/timer.c | 18 ++++---- code/core/timer.h | 12 +++--- 10 files changed, 265 insertions(+), 129 deletions(-) (limited to 'code/core') diff --git a/code/core/core.c b/code/core/core.c index ca097c4..8e21e82 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -528,7 +528,7 @@ void ecp_conn_unregister(ECPConnection *conn) { } static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); @@ -565,7 +565,7 @@ int ecp_conn_open(ECPConnection *conn, ECPNode *node) { return ECP_OK; } -int ecp_conn_close(ECPConnection *conn, unsigned int timeout) { +int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout) { ECPSocket *sock = conn->sock; int refcount = 0; @@ -653,14 +653,15 @@ int ecp_conn_handler_init(ECPConnHandler *handler) { #ifdef ECP_WITH_RBUF handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack; handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush; + handler->msg[ECP_MTYPE_RBFLUSH_PTS] = ecp_rbuf_handle_flush_pts; #endif handler->conn_open = ecp_conn_send_open; return ECP_OK; } static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ); buf[0] = conn->type; @@ -777,7 +778,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m return 0; } else { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; unsigned char ctype = 0; if (conn->out) return ECP_ERR; @@ -826,8 +827,8 @@ ssize_t ecp_conn_handle_kget(ECPConnection *conn, ecp_seq_t seq, unsigned char m return ECP_ECDH_SIZE_KEY+1; } else { - unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); ecp_pld_set_type(payload, ECP_MTYPE_KGET_REP); if (conn->out) return ECP_ERR; @@ -850,7 +851,7 @@ ssize_t ecp_conn_handle_kput(ECPConnection *conn, ecp_seq_t seq, unsigned char m if (!conn->out) return ECP_ERR; return 0; } else { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; if (conn->out) return ECP_ERR; if (size < ECP_ECDH_SIZE_KEY+1) return ECP_ERR; @@ -872,8 +873,8 @@ ssize_t ecp_conn_handle_exec(ECPConnection *conn, ecp_seq_t seq, unsigned char m } static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload, 0); int rv = ECP_OK; ecp_pld_set_type(payload, ECP_MTYPE_KPUT_REQ); @@ -1031,10 +1032,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz } #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && !si->rb_pass) { - si->rb_mtype = ecp_pld_get_type(payload); - rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si); - } + if (conn->rbuf.send) rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si, payload); #endif if (!rv && !si->seq_w) conn->seq_out = _seq; @@ -1137,7 +1135,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, } pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY); - if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT; + if (pld_size < ECP_SIZE_PLD_HDR+1) rv = ECP_ERR_DECRYPT; if (rv) goto pkt_handle_err; seq_p = \ @@ -1150,14 +1148,14 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, if (conn == NULL) { if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_OPEN_REQ) { - rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_MSG_HDR, pld_size-ECP_SIZE_MSG_HDR); + rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_PLD_HDR+1, pld_size-ECP_SIZE_PLD_HDR-1); if (rv) return rv; seq_map = 1; seq_n = seq_p; } else if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_KGET_REQ) { - unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)]; - unsigned char *buf = ecp_pld_get_buf(payload_); + unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)]; + unsigned char *buf = ecp_pld_get_buf(payload_, 0); ecp_pld_set_type(payload_, ECP_MTYPE_KGET_REP); rv = ecp_sock_dhkey_get_curr(sock, buf, buf+1); @@ -1286,26 +1284,29 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s ecp_conn_handler_msg_t *handler = NULL; ssize_t rv = 0; unsigned char mtype = 0; + unsigned char *content = NULL; size_t rem_size = msg_size; if (msg_size < 1) return ECP_ERR_MIN_MSG; while (rem_size) { - mtype = msg[0]; - msg++; - rem_size--; - + mtype = ecp_msg_get_type(msg); + content = ecp_msg_get_content(msg, rem_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + + rem_size -= content - msg; + if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; ecp_timer_pop(conn, mtype); handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; if (handler) { - rv = handler(conn, seq, mtype, msg, rem_size); + rv = handler(conn, seq, mtype, content, rem_size); if (rv < 0) return rv; if (rv > rem_size) return ECP_ERR; rem_size -= rv; - msg += rv; + msg = content + rv; } else { return msg_size-rem_size-1; } @@ -1320,10 +1321,6 @@ int ecp_seq_item_init(ECPSeqItem *seq_item) { return ECP_OK; } -unsigned char *ecp_pld_get_buf(unsigned char *payload) { - return payload+ECP_SIZE_MSG_HDR; -} - void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) { payload[ECP_SIZE_PLD_HDR] = mtype; } @@ -1332,6 +1329,59 @@ unsigned char ecp_pld_get_type(unsigned char *payload) { return payload[ECP_SIZE_PLD_HDR]; } +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { + size_t offset = 0; + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); + + return payload + ECP_SIZE_PLD_HDR + 1 + offset; +} + +unsigned char ecp_msg_get_type(unsigned char *msg) { + return msg[0]; +} + +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { + size_t offset = 0; + unsigned char mtype = msg[0]; + + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); + + if (msg_size < 1 + offset) return NULL; + return msg + 1 + offset; +} + +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { + unsigned char mtype = msg[0]; + + if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; + if (msg_size < 3) return ECP_ERR; + + *frag_cnt = msg[1]; + *frag_tot = msg[2]; + + return ECP_OK; +} + +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { + size_t offset = 0; + unsigned char mtype = msg[0]; + + if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; + + if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; + if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR; + + *pts = \ + (msg[1 + offset] << 24) | \ + (msg[2 + offset] << 16) | \ + (msg[3 + offset] << 8) | \ + (msg[4 + offset]); + + return ECP_OK; +} + static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; @@ -1342,7 +1392,7 @@ static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_i if (rv < 0) return rv; #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && !si->rb_pass) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv); + if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv); #endif if (ti) { @@ -1392,7 +1442,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz return ecp_pld_send(conn, payload, payload_size); } -ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { +ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { #ifdef ECP_WITH_RBUF #ifdef ECP_WITH_MSGQ pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex); @@ -1421,7 +1471,7 @@ static int recv_p(ECPSocket *sock) { } int ecp_receiver(ECPSocket *sock) { - unsigned int next = 0; + ecp_cts_t next = 0; sock->running = 1; while(sock->running) { int rv = sock->ctx->tr.poll(&sock->sock, next ? next : sock->poll_timeout); diff --git a/code/core/core.h b/code/core/core.h index 27d1e0d..cdb4b62 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -8,8 +8,10 @@ #define ECP_ERR_MAX_MTYPE -12 #define ECP_ERR_MIN_PKT -13 #define ECP_ERR_MAX_PLD -14 +// XXX ??? #define ECP_ERR_MIN_MSG -15 #define ECP_ERR_MAX_MSG -16 +// #define ECP_ERR_NET_ADDR -17 #define ECP_ERR_CONN_NOT_FOUND -20 @@ -40,21 +42,21 @@ #define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE) #define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ) -#define ECP_SIZE_MSG_HDR (ECP_SIZE_PLD_HDR+1) #define ECP_MAX_PKT 1412 #define ECP_MAX_PLD (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG) -#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_MSG_HDR) +#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_PLD_HDR-1) -#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) +#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD_HDR+1+ECP_AEAD_SIZE_TAG) #define ECP_POLL_TIMEOUT 500 #define ECP_ECDH_IDX_INV 0xFF #define ECP_ECDH_IDX_PERMA 0x0F -#define ECP_MTYPE_MASK 0x3f -#define ECP_MTYPE_FLAG_TIMER 0x80 -#define ECP_MTYPE_FLAG_REP 0x40 +#define ECP_MTYPE_FLAG_FRAG 0x80 +#define ECP_MTYPE_FLAG_PTS 0x40 +#define ECP_MTYPE_FLAG_REP 0x20 +#define ECP_MTYPE_MASK 0x1f #define ECP_MTYPE_OPEN 0x00 #define ECP_MTYPE_KGET 0x01 @@ -68,9 +70,6 @@ #define ECP_MTYPE_KPUT_REQ (ECP_MTYPE_KPUT) #define ECP_MTYPE_KPUT_REP (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP) -#define ECP_SIZE_PLD(X) ((X) + ECP_SIZE_MSG_HDR) -#define ECP_SIZE_PKT(X) ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG) - #define ECP_CONN_FLAG_REG 0x01 #define ECP_CONN_FLAG_OPEN 0x02 @@ -94,11 +93,27 @@ typedef uint32_t ecp_ack_t; #define ECP_SIZE_ACKB (sizeof(ecp_ack_t)*8) #define ECP_ACK_FULL (~(ecp_ack_t)0) +typedef uint32_t ecp_cts_t; +#define ECP_CTS_HALF ((ecp_cts_t)1 << (sizeof(ecp_cts_t) * 8 - 1)) +#define ECP_CTS_LT(a, b) ((ecp_cts_t)((ecp_cts_t)(a) - (ecp_cts_t)(b)) > ECP_CTS_HALF) +#define ECP_CTS_LTE(a,b) ((ecp_cts_t)((ecp_cts_t)(b) - (ecp_cts_t)(a)) < ECP_CTS_HALF) + +typedef uint32_t ecp_pts_t; +#define ECP_PTS_HALF ((ecp_pts_t)1 << (sizeof(ecp_pts_t) * 8 - 1)) +#define ECP_PTS_LT(a, b) ((ecp_pts_t)((ecp_pts_t)(a) - (ecp_pts_t)(b)) > ECP_PTS_HALF) +#define ECP_PTS_LTE(a,b) ((ecp_pts_t)((ecp_pts_t)(b) - (ecp_pts_t)(a)) < ECP_PTS_HALF) + typedef uint32_t ecp_seq_t; #define ECP_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t) * 8 - 1)) #define ECP_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_SEQ_HALF) #define ECP_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_SEQ_HALF) + +#define ECP_SIZE_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0) +#define ECP_SIZE_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0) +#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_FRAG(F)+ECP_SIZE_PTS(F)) +#define ECP_SIZE_PKT(X,F) (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD(X,F)+ECP_AEAD_SIZE_TAG) + #ifdef ECP_WITH_PTHREAD #include #endif @@ -165,8 +180,8 @@ typedef struct ECPTransportIface { typedef struct ECPTimeIface { int init; - unsigned int (*abstime_ms) (unsigned int); - void (*sleep_ms) (unsigned int); + ecp_cts_t (*abstime_ms) (ecp_cts_t); + void (*sleep_ms) (ecp_cts_t); } ECPTimeIface; #ifdef ECP_WITH_HTABLE @@ -255,7 +270,7 @@ typedef struct ECPContext { typedef struct ECPSocket { ECPContext *ctx; unsigned char running; - unsigned int poll_timeout; + int poll_timeout; ECPNetSock sock; ECPDHKey key_perma; ECPDHKey key[ECP_MAX_SOCK_KEY]; @@ -325,7 +340,7 @@ void ecp_conn_unregister(ECPConnection *conn); int ecp_conn_init(ECPConnection *conn, ECPNode *node); int ecp_conn_open(ECPConnection *conn, ECPNode *node); -int ecp_conn_close(ECPConnection *conn, unsigned int timeout); +int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout); int ecp_conn_handler_init(ECPConnHandler *handler); ssize_t ecp_conn_send_open(ECPConnection *conn); @@ -349,16 +364,21 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); int ecp_seq_item_init(ECPSeqItem *seq_item); -unsigned char *ecp_pld_get_buf(unsigned char *payload); void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype); unsigned char ecp_pld_get_type(unsigned char *payload); +unsigned char ecp_msg_get_type(unsigned char *msg); +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size); +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot); +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts); + ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); -ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); +ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout); int ecp_receiver(ECPSocket *sock); int ecp_start_receiver(ECPSocket *sock); diff --git a/code/core/msgq.c b/code/core/msgq.c index 7356eb8..743478f 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -9,7 +9,7 @@ #define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) -static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { +static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) { struct timeval tv; uint64_t us_start; @@ -94,7 +94,7 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) return ECP_OK; } -ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { +ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) { ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; diff --git a/code/core/msgq.h b/code/core/msgq.h index 75c795e..dd0b9a8 100644 --- a/code/core/msgq.h +++ b/code/core/msgq.h @@ -21,6 +21,6 @@ void ecp_conn_msgq_destroy(struct ECPConnection *conn); int ecp_conn_msgq_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype); -ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); +ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout); #endif /* ECP_WITH_MSGQ */ \ No newline at end of file diff --git a/code/core/posix/time.c b/code/core/posix/time.c index 3dd3920..c7a238a 100644 --- a/code/core/posix/time.c +++ b/code/core/posix/time.c @@ -3,16 +3,16 @@ #include -static unsigned int t_abstime_ms(unsigned int msec) { +static ecp_cts_t t_abstime_ms(ecp_cts_t msec) { struct timeval tv; - unsigned int ms_now; + ecp_cts_t ms_now; gettimeofday(&tv, NULL); ms_now = tv.tv_sec * 1000 + tv.tv_usec / 1000; return ms_now + msec; } -static void t_sleep_ms(unsigned int msec) { +static void t_sleep_ms(ecp_cts_t msec) { usleep(msec*1000); } diff --git a/code/core/rbuf.h b/code/core/rbuf.h index d10370a..c3e6ff9 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -9,6 +9,7 @@ #define ECP_MTYPE_RBACK 0x04 #define ECP_MTYPE_RBFLUSH 0x05 +#define ECP_MTYPE_RBFLUSH_PTS 0x06 #define ECP_ERR_RBUF_DUP -100 #define ECP_ERR_RBUF_FULL -101 @@ -51,9 +52,10 @@ typedef struct ECPRBTimer { typedef struct ECPRBRecv { unsigned char flags; unsigned char flush; - unsigned short deliver_delay; + unsigned char timer_pts; unsigned short hole_max; unsigned short ack_rate; + ecp_pts_t deliver_delay; ecp_seq_t seq_ack; ecp_seq_t ack_pkt; ecp_ack_t ack_map; @@ -100,15 +102,16 @@ int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); void ecp_rbuf_recv_destroy(struct ECPConnection *conn); int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max); -int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay); +int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, ecp_pts_t delay); int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); void ecp_rbuf_send_destroy(struct ECPConnection *conn); int ecp_rbuf_send_start(struct ECPConnection *conn); -int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si); +int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si, unsigned char *payload); ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, struct ECPSeqItem *si, unsigned char *packet, size_t pkt_size); ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); +ssize_t ecp_rbuf_handle_flush_pts(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index fba8219..0970f6a 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -8,7 +8,7 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; - unsigned char mtype = msg[0] & ECP_MTYPE_MASK; + unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; @@ -42,28 +42,77 @@ static void msg_flush(ECPConnection *conn) { #endif ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + ecp_seq_t seq_next = buf->rbuf.seq_start; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; + if (buf->timer_pts) { + ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); + buf->timer_pts = 0; + } + for (i=0; iflags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break; - if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { - int rv = 0; - ecp_seq_t seq = buf->rbuf.seq_start + i; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; - } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + } else { + int rv = ECP_OK; + ecp_pts_t msg_pts; + ecp_seq_t seq = buf->rbuf.seq_start + i; + unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg); + unsigned char frag_tot; + unsigned char frag_cnt; + + rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot); + if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) { + ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); + + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { + ecp_seq_t seq_fbeg = seq - frag_cnt; + ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts); + if (rv == ECP_OK) { + ecp_pts_t now = conn->sock->ctx->tm.abstime_ms(0); + if (ECP_PTS_LT(now, msg_pts)) { + ECPTimerItem ti; + ecp_seq_t seq_offset = seq - seq_next; + + rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now); + if (!rv) rv = ecp_timer_push(&ti); + if (!rv) buf->timer_pts = 1; + + i -= seq_offset; + idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); + break; + } + } + + seq_next = seq + 1; + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ - unsigned char mtype = buf->rbuf.msg[idx].msg[0]; - int rv = ecp_conn_msgq_push(conn, seq, mtype); - if (rv) break; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg)); + if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; #endif - } else { - ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } else { + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); + } } buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + } else { + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; + if (buf->hole_max) { + ecp_seq_t seq = buf->rbuf.seq_start + i; + ecp_seq_t seq_offset = buf->rbuf.seq_max - seq; + if (seq_offset <= buf->hole_max) break; + } } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } @@ -78,8 +127,8 @@ static void msg_flush(ECPConnection *conn) { static int ack_send(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; - unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))]; - unsigned char *buf_ = ecp_pld_get_buf(payload); + unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), 0)]; + unsigned char *buf_ = ecp_pld_get_buf(payload, 0); ssize_t rv; ecp_pld_set_type(payload, ECP_MTYPE_RBACK); @@ -150,6 +199,16 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char return 0; } +ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + ECPRBRecv *buf = conn->rbuf.recv; + + if (buf == NULL) return ECP_ERR; + + buf->timer_pts = 0; + msg_flush(conn); + return 0; +} + int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; @@ -185,13 +244,10 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { return ECP_OK; } -int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { +int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; - if (buf->hole_max < delay - 1) { - ecp_rbuf_recv_set_hole(conn, delay - 1); - } return ECP_OK; } @@ -226,7 +282,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; - mtype = msg[0] & ECP_MTYPE_MASK; + mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size); if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index e0c7f29..d582e67 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -5,7 +5,7 @@ #define NACK_RATE_UNIT 10000 static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH); if (ti == NULL) { @@ -127,7 +127,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size); if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_pld_set_type(payload, ECP_MTYPE_NOP); ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i); } else { @@ -235,7 +235,7 @@ int ecp_rbuf_send_start(ECPConnection *conn) { int ecp_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; - unsigned char payload[ECP_SIZE_PLD(0)]; + unsigned char payload[ECP_SIZE_PLD(0, 0)]; ecp_seq_t seq; if (buf == NULL) return ECP_ERR; @@ -269,7 +269,9 @@ int ecp_rbuf_flush(ECPConnection *conn) { return ECP_OK; } -int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char *payload) { + if (si->rb_pass) return ECP_OK; + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif @@ -280,6 +282,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { if (idx < 0) return idx; + si->rb_mtype = ecp_pld_get_type(payload); si->rb_idx = idx; buf->rbuf.msg[idx].size = 0; buf->rbuf.msg[idx].flags = 0; @@ -288,54 +291,58 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) { } ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, ECPSeqItem *si, unsigned char *packet, size_t pkt_size) { - unsigned char flags = 0; int do_send = 1; ssize_t rv = 0; - ecp_seq_t seq = si->seq; - unsigned int idx = si->rb_idx; - unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; - - if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - - rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); - if (rv < 0) return rv; + + if (!si->rb_pass) { + unsigned char flags = 0; + ecp_seq_t seq = si->seq; + unsigned int idx = si->rb_idx; + unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK; - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif + if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); + if (rv < 0) return rv; - if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { - if (!buf->cnt_cc) buf->seq_cc = seq; - buf->cnt_cc++; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; - do_send = 0; - if (ti) { - ECPRBTimer *timer = &buf->timer; - ECPRBTimerItem *item = &timer->item[timer->idx_w]; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + int _rv = ECP_OK; + #ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); + #endif + + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + + if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { + if (!buf->cnt_cc) buf->seq_cc = seq; + buf->cnt_cc++; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; + do_send = 0; + if (ti) { + ECPRBTimer *timer = &buf->timer; + ECPRBTimerItem *item = &timer->item[timer->idx_w]; - if (!item->occupied) { - item->occupied = 1; - item->item = *ti; - buf->rbuf.msg[idx].idx_t = timer->idx_w; - timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; + if (!item->occupied) { + item->occupied = 1; + item->item = *ti; + buf->rbuf.msg[idx].idx_t = timer->idx_w; + timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; + } else { + _rv = ECP_ERR_MAX_TIMER; + } } else { - rv = ECP_ERR_MAX_TIMER; + buf->rbuf.msg[idx].idx_t = -1; } } else { - buf->rbuf.msg[idx].idx_t = -1; + buf->in_transit++; } - } else { - buf->in_transit++; - } -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif + #ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); + #endif - if (rv) return rv; + if (_rv) return _rv; + } } if (do_send) { diff --git a/code/core/timer.c b/code/core/timer.c index ea516df..9d2569e 100644 --- a/code/core/timer.c +++ b/code/core/timer.c @@ -20,7 +20,7 @@ void ecp_timer_destroy(ECPTimer *timer) { #endif } -int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout) { +int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout) { if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (ti == NULL) return ECP_ERR; @@ -61,7 +61,7 @@ int ecp_timer_push(ECPTimerItem *ti) { if (!rv) { for (i=timer->head; i>=0; i--) { - if (timer->item[i].abstime >= ti->abstime) { + if (ECP_CTS_LTE(ti->abstime, timer->item[i].abstime)) { if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i)); timer->item[i+1] = *ti; timer->head++; @@ -151,22 +151,22 @@ void ecp_timer_remove(ECPConnection *conn) { } -unsigned int ecp_timer_exe(ECPSocket *sock) { +ecp_cts_t ecp_timer_exe(ECPSocket *sock) { int i; - unsigned int ret = 0; + ecp_cts_t ret = 0; ECPTimer *timer = &sock->timer; ECPTimerItem to_exec[ECP_MAX_TIMER]; int to_exec_size = 0; - unsigned int now = sock->ctx->tm.abstime_ms(0); + ecp_cts_t now = sock->ctx->tm.abstime_ms(0); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif for (i=timer->head; i>=0; i--) { - unsigned int abstime = timer->item[i].abstime; + ecp_cts_t abstime = timer->item[i].abstime; - if (abstime > now) { + if (ECP_CTS_LT(now, abstime)) { ret = abstime - now; break; } @@ -191,7 +191,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) { ecp_timer_retry_t *retry = to_exec[i].retry; ecp_conn_handler_msg_t *handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; - if (to_exec[i].cnt) { + if (to_exec[i].cnt > 0) { ssize_t _rv = 0; to_exec[i].cnt--; if (retry) { @@ -219,7 +219,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) { return ret; } -ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout) { +ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout) { int rv = ECP_OK; ECPTimerItem ti; diff --git a/code/core/timer.h b/code/core/timer.h index ba25b6c..5919a42 100644 --- a/code/core/timer.h +++ b/code/core/timer.h @@ -15,9 +15,9 @@ typedef ssize_t ecp_timer_retry_t (struct ECPConnection *, struct ECPTimerItem * typedef struct ECPTimerItem { struct ECPConnection *conn; unsigned char mtype; - unsigned short cnt; - unsigned int abstime; - unsigned int timeout; + short cnt; + ecp_cts_t abstime; + ecp_cts_t timeout; ecp_timer_retry_t *retry; unsigned char *pld; size_t pld_size; @@ -33,9 +33,9 @@ typedef struct ECPTimer { int ecp_timer_create(ECPTimer *timer); void ecp_timer_destroy(ECPTimer *timer); -int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout); +int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout); int ecp_timer_push(ECPTimerItem *ti); void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype); void ecp_timer_remove(struct ECPConnection *conn); -unsigned int ecp_timer_exe(struct ECPSocket *sock); -ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout); \ No newline at end of file +ecp_cts_t ecp_timer_exe(struct ECPSocket *sock); +ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout); \ No newline at end of file -- cgit v1.2.3