summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-26 21:59:08 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-26 21:59:08 +0200
commitb83e58e21ea7dda57ddfda47bd1539d15abe687f (patch)
tree801339717059b1be494e872d90836b4e4564d462
parente800e8df9fbe633d09a534ae07eb361833572f1a (diff)
fragments and packet timestamp implemented
-rw-r--r--code/core/core.c112
-rw-r--r--code/core/core.h50
-rw-r--r--code/core/msgq.c4
-rw-r--r--code/core/msgq.h2
-rw-r--r--code/core/posix/time.c6
-rw-r--r--code/core/rbuf.h9
-rw-r--r--code/core/rbuf_recv.c94
-rw-r--r--code/core/rbuf_send.c87
-rw-r--r--code/core/timer.c18
-rw-r--r--code/core/timer.h12
-rw-r--r--code/test/basic.c8
-rw-r--r--code/test/client.c4
-rw-r--r--code/test/echo.c2
-rw-r--r--code/test/server.c4
-rw-r--r--code/test/stress.c6
-rw-r--r--code/test/vc_client.c4
-rw-r--r--code/test/vc_server.c4
-rw-r--r--code/test/voip.c2
-rw-r--r--code/vconn/vconn.c24
19 files changed, 294 insertions, 158 deletions
diff --git a/code/core/core.c b/code/core/core.c
index ca097c4..8e21e82 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -528,7 +528,7 @@ void ecp_conn_unregister(ECPConnection *conn) {
}
static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
@@ -565,7 +565,7 @@ int ecp_conn_open(ECPConnection *conn, ECPNode *node) {
return ECP_OK;
}
-int ecp_conn_close(ECPConnection *conn, unsigned int timeout) {
+int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout) {
ECPSocket *sock = conn->sock;
int refcount = 0;
@@ -653,14 +653,15 @@ int ecp_conn_handler_init(ECPConnHandler *handler) {
#ifdef ECP_WITH_RBUF
handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack;
handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush;
+ handler->msg[ECP_MTYPE_RBFLUSH_PTS] = ecp_rbuf_handle_flush_pts;
#endif
handler->conn_open = ecp_conn_send_open;
return ECP_OK;
}
static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(1)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ);
buf[0] = conn->type;
@@ -777,7 +778,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m
return 0;
} else {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
unsigned char ctype = 0;
if (conn->out) return ECP_ERR;
@@ -826,8 +827,8 @@ ssize_t ecp_conn_handle_kget(ECPConnection *conn, ecp_seq_t seq, unsigned char m
return ECP_ECDH_SIZE_KEY+1;
} else {
- unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REP);
if (conn->out) return ECP_ERR;
@@ -850,7 +851,7 @@ ssize_t ecp_conn_handle_kput(ECPConnection *conn, ecp_seq_t seq, unsigned char m
if (!conn->out) return ECP_ERR;
return 0;
} else {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
if (conn->out) return ECP_ERR;
if (size < ECP_ECDH_SIZE_KEY+1) return ECP_ERR;
@@ -872,8 +873,8 @@ ssize_t ecp_conn_handle_exec(ECPConnection *conn, ecp_seq_t seq, unsigned char m
}
static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
int rv = ECP_OK;
ecp_pld_set_type(payload, ECP_MTYPE_KPUT_REQ);
@@ -1031,10 +1032,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
}
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.send && !si->rb_pass) {
- si->rb_mtype = ecp_pld_get_type(payload);
- rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si);
- }
+ if (conn->rbuf.send) rv = ecp_rbuf_pkt_prep(conn->rbuf.send, si, payload);
#endif
if (!rv && !si->seq_w) conn->seq_out = _seq;
@@ -1137,7 +1135,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
}
pld_size = sock->ctx->cr.aead_dec(payload, ECP_MAX_PLD, packet+ECP_SIZE_PKT_HDR, pkt_size-ECP_SIZE_PKT_HDR, &shsec, packet+ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY);
- if (pld_size < ECP_SIZE_MSG_HDR) rv = ECP_ERR_DECRYPT;
+ if (pld_size < ECP_SIZE_PLD_HDR+1) rv = ECP_ERR_DECRYPT;
if (rv) goto pkt_handle_err;
seq_p = \
@@ -1150,14 +1148,14 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
if (conn == NULL) {
if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_OPEN_REQ) {
- rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_MSG_HDR, pld_size-ECP_SIZE_MSG_HDR);
+ rv = sock->conn_new(sock, &conn, parent, s_idx, c_idx, packet+ECP_SIZE_PROTO+1, &shsec, payload+ECP_SIZE_PLD_HDR+1, pld_size-ECP_SIZE_PLD_HDR-1);
if (rv) return rv;
seq_map = 1;
seq_n = seq_p;
} else if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_KGET_REQ) {
- unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)];
- unsigned char *buf = ecp_pld_get_buf(payload_);
+ unsigned char payload_[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload_, 0);
ecp_pld_set_type(payload_, ECP_MTYPE_KGET_REP);
rv = ecp_sock_dhkey_get_curr(sock, buf, buf+1);
@@ -1286,26 +1284,29 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s
ecp_conn_handler_msg_t *handler = NULL;
ssize_t rv = 0;
unsigned char mtype = 0;
+ unsigned char *content = NULL;
size_t rem_size = msg_size;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
while (rem_size) {
- mtype = msg[0];
- msg++;
- rem_size--;
-
+ mtype = ecp_msg_get_type(msg);
+ content = ecp_msg_get_content(msg, rem_size);
+ if (content == NULL) return ECP_ERR_MIN_MSG;
+
+ rem_size -= content - msg;
+
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
ecp_timer_pop(conn, mtype);
handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL;
if (handler) {
- rv = handler(conn, seq, mtype, msg, rem_size);
+ rv = handler(conn, seq, mtype, content, rem_size);
if (rv < 0) return rv;
if (rv > rem_size) return ECP_ERR;
rem_size -= rv;
- msg += rv;
+ msg = content + rv;
} else {
return msg_size-rem_size-1;
}
@@ -1320,10 +1321,6 @@ int ecp_seq_item_init(ECPSeqItem *seq_item) {
return ECP_OK;
}
-unsigned char *ecp_pld_get_buf(unsigned char *payload) {
- return payload+ECP_SIZE_MSG_HDR;
-}
-
void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) {
payload[ECP_SIZE_PLD_HDR] = mtype;
}
@@ -1332,6 +1329,59 @@ unsigned char ecp_pld_get_type(unsigned char *payload) {
return payload[ECP_SIZE_PLD_HDR];
}
+unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) {
+ size_t offset = 0;
+ if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
+ if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t);
+
+ return payload + ECP_SIZE_PLD_HDR + 1 + offset;
+}
+
+unsigned char ecp_msg_get_type(unsigned char *msg) {
+ return msg[0];
+}
+
+unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) {
+ size_t offset = 0;
+ unsigned char mtype = msg[0];
+
+ if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
+ if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t);
+
+ if (msg_size < 1 + offset) return NULL;
+ return msg + 1 + offset;
+}
+
+int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) {
+ unsigned char mtype = msg[0];
+
+ if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
+ if (msg_size < 3) return ECP_ERR;
+
+ *frag_cnt = msg[1];
+ *frag_tot = msg[2];
+
+ return ECP_OK;
+}
+
+int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) {
+ size_t offset = 0;
+ unsigned char mtype = msg[0];
+
+ if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR;
+
+ if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
+ if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR;
+
+ *pts = \
+ (msg[1 + offset] << 24) | \
+ (msg[2 + offset] << 16) | \
+ (msg[3 + offset] << 8) | \
+ (msg[4 + offset]);
+
+ return ECP_OK;
+}
+
static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) {
unsigned char packet[ECP_MAX_PKT];
ECPSocket *sock = conn->sock;
@@ -1342,7 +1392,7 @@ static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_i
if (rv < 0) return rv;
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.send && !si->rb_pass) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv);
+ if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, si, packet, rv);
#endif
if (ti) {
@@ -1392,7 +1442,7 @@ ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_siz
return ecp_pld_send(conn, payload, payload_size);
}
-ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {
+ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
#ifdef ECP_WITH_RBUF
#ifdef ECP_WITH_MSGQ
pthread_mutex_lock(&conn->rbuf.recv->msgq.mutex);
@@ -1421,7 +1471,7 @@ static int recv_p(ECPSocket *sock) {
}
int ecp_receiver(ECPSocket *sock) {
- unsigned int next = 0;
+ ecp_cts_t next = 0;
sock->running = 1;
while(sock->running) {
int rv = sock->ctx->tr.poll(&sock->sock, next ? next : sock->poll_timeout);
diff --git a/code/core/core.h b/code/core/core.h
index 27d1e0d..cdb4b62 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -8,8 +8,10 @@
#define ECP_ERR_MAX_MTYPE -12
#define ECP_ERR_MIN_PKT -13
#define ECP_ERR_MAX_PLD -14
+// XXX ???
#define ECP_ERR_MIN_MSG -15
#define ECP_ERR_MAX_MSG -16
+//
#define ECP_ERR_NET_ADDR -17
#define ECP_ERR_CONN_NOT_FOUND -20
@@ -40,21 +42,21 @@
#define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
#define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ)
-#define ECP_SIZE_MSG_HDR (ECP_SIZE_PLD_HDR+1)
#define ECP_MAX_PKT 1412
#define ECP_MAX_PLD (ECP_MAX_PKT-ECP_SIZE_PKT_HDR-ECP_AEAD_SIZE_TAG)
-#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_MSG_HDR)
+#define ECP_MAX_MSG (ECP_MAX_PLD-ECP_SIZE_PLD_HDR-1)
-#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
+#define ECP_MIN_PKT (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD_HDR+1+ECP_AEAD_SIZE_TAG)
#define ECP_POLL_TIMEOUT 500
#define ECP_ECDH_IDX_INV 0xFF
#define ECP_ECDH_IDX_PERMA 0x0F
-#define ECP_MTYPE_MASK 0x3f
-#define ECP_MTYPE_FLAG_TIMER 0x80
-#define ECP_MTYPE_FLAG_REP 0x40
+#define ECP_MTYPE_FLAG_FRAG 0x80
+#define ECP_MTYPE_FLAG_PTS 0x40
+#define ECP_MTYPE_FLAG_REP 0x20
+#define ECP_MTYPE_MASK 0x1f
#define ECP_MTYPE_OPEN 0x00
#define ECP_MTYPE_KGET 0x01
@@ -68,9 +70,6 @@
#define ECP_MTYPE_KPUT_REQ (ECP_MTYPE_KPUT)
#define ECP_MTYPE_KPUT_REP (ECP_MTYPE_KPUT | ECP_MTYPE_FLAG_REP)
-#define ECP_SIZE_PLD(X) ((X) + ECP_SIZE_MSG_HDR)
-#define ECP_SIZE_PKT(X) ((X) + ECP_SIZE_PKT_HDR+ECP_SIZE_MSG_HDR+ECP_AEAD_SIZE_TAG)
-
#define ECP_CONN_FLAG_REG 0x01
#define ECP_CONN_FLAG_OPEN 0x02
@@ -94,11 +93,27 @@ typedef uint32_t ecp_ack_t;
#define ECP_SIZE_ACKB (sizeof(ecp_ack_t)*8)
#define ECP_ACK_FULL (~(ecp_ack_t)0)
+typedef uint32_t ecp_cts_t;
+#define ECP_CTS_HALF ((ecp_cts_t)1 << (sizeof(ecp_cts_t) * 8 - 1))
+#define ECP_CTS_LT(a, b) ((ecp_cts_t)((ecp_cts_t)(a) - (ecp_cts_t)(b)) > ECP_CTS_HALF)
+#define ECP_CTS_LTE(a,b) ((ecp_cts_t)((ecp_cts_t)(b) - (ecp_cts_t)(a)) < ECP_CTS_HALF)
+
+typedef uint32_t ecp_pts_t;
+#define ECP_PTS_HALF ((ecp_pts_t)1 << (sizeof(ecp_pts_t) * 8 - 1))
+#define ECP_PTS_LT(a, b) ((ecp_pts_t)((ecp_pts_t)(a) - (ecp_pts_t)(b)) > ECP_PTS_HALF)
+#define ECP_PTS_LTE(a,b) ((ecp_pts_t)((ecp_pts_t)(b) - (ecp_pts_t)(a)) < ECP_PTS_HALF)
+
typedef uint32_t ecp_seq_t;
#define ECP_SEQ_HALF ((ecp_seq_t)1 << (sizeof(ecp_seq_t) * 8 - 1))
#define ECP_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_SEQ_HALF)
#define ECP_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_SEQ_HALF)
+
+#define ECP_SIZE_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0)
+#define ECP_SIZE_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0)
+#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_FRAG(F)+ECP_SIZE_PTS(F))
+#define ECP_SIZE_PKT(X,F) (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD(X,F)+ECP_AEAD_SIZE_TAG)
+
#ifdef ECP_WITH_PTHREAD
#include <pthread.h>
#endif
@@ -165,8 +180,8 @@ typedef struct ECPTransportIface {
typedef struct ECPTimeIface {
int init;
- unsigned int (*abstime_ms) (unsigned int);
- void (*sleep_ms) (unsigned int);
+ ecp_cts_t (*abstime_ms) (ecp_cts_t);
+ void (*sleep_ms) (ecp_cts_t);
} ECPTimeIface;
#ifdef ECP_WITH_HTABLE
@@ -255,7 +270,7 @@ typedef struct ECPContext {
typedef struct ECPSocket {
ECPContext *ctx;
unsigned char running;
- unsigned int poll_timeout;
+ int poll_timeout;
ECPNetSock sock;
ECPDHKey key_perma;
ECPDHKey key[ECP_MAX_SOCK_KEY];
@@ -325,7 +340,7 @@ void ecp_conn_unregister(ECPConnection *conn);
int ecp_conn_init(ECPConnection *conn, ECPNode *node);
int ecp_conn_open(ECPConnection *conn, ECPNode *node);
-int ecp_conn_close(ECPConnection *conn, unsigned int timeout);
+int ecp_conn_close(ECPConnection *conn, ecp_cts_t timeout);
int ecp_conn_handler_init(ECPConnHandler *handler);
ssize_t ecp_conn_send_open(ECPConnection *conn);
@@ -349,16 +364,21 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s
ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
int ecp_seq_item_init(ECPSeqItem *seq_item);
-unsigned char *ecp_pld_get_buf(unsigned char *payload);
void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
+unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype);
unsigned char ecp_pld_get_type(unsigned char *payload);
+unsigned char ecp_msg_get_type(unsigned char *msg);
+unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size);
+int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot);
+int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts);
+
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
-ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout);
+ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout);
int ecp_receiver(ECPSocket *sock);
int ecp_start_receiver(ECPSocket *sock);
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 7356eb8..743478f 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -9,7 +9,7 @@
#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1))
-static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {
+static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {
struct timeval tv;
uint64_t us_start;
@@ -94,7 +94,7 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
return ECP_OK;
}
-ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) {
+ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
ECPRBRecv *buf = conn->rbuf.recv;
ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
ssize_t rv = ECP_OK;
diff --git a/code/core/msgq.h b/code/core/msgq.h
index 75c795e..dd0b9a8 100644
--- a/code/core/msgq.h
+++ b/code/core/msgq.h
@@ -21,6 +21,6 @@ void ecp_conn_msgq_destroy(struct ECPConnection *conn);
int ecp_conn_msgq_start(struct ECPConnection *conn, ecp_seq_t seq);
int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype);
-ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout);
+ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout);
#endif /* ECP_WITH_MSGQ */ \ No newline at end of file
diff --git a/code/core/posix/time.c b/code/core/posix/time.c
index 3dd3920..c7a238a 100644
--- a/code/core/posix/time.c
+++ b/code/core/posix/time.c
@@ -3,16 +3,16 @@
#include <core.h>
-static unsigned int t_abstime_ms(unsigned int msec) {
+static ecp_cts_t t_abstime_ms(ecp_cts_t msec) {
struct timeval tv;
- unsigned int ms_now;
+ ecp_cts_t ms_now;
gettimeofday(&tv, NULL);
ms_now = tv.tv_sec * 1000 + tv.tv_usec / 1000;
return ms_now + msec;
}
-static void t_sleep_ms(unsigned int msec) {
+static void t_sleep_ms(ecp_cts_t msec) {
usleep(msec*1000);
}
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index d10370a..c3e6ff9 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -9,6 +9,7 @@
#define ECP_MTYPE_RBACK 0x04
#define ECP_MTYPE_RBFLUSH 0x05
+#define ECP_MTYPE_RBFLUSH_PTS 0x06
#define ECP_ERR_RBUF_DUP -100
#define ECP_ERR_RBUF_FULL -101
@@ -51,9 +52,10 @@ typedef struct ECPRBTimer {
typedef struct ECPRBRecv {
unsigned char flags;
unsigned char flush;
- unsigned short deliver_delay;
+ unsigned char timer_pts;
unsigned short hole_max;
unsigned short ack_rate;
+ ecp_pts_t deliver_delay;
ecp_seq_t seq_ack;
ecp_seq_t ack_pkt;
ecp_ack_t ack_map;
@@ -100,15 +102,16 @@ int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq);
int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
void ecp_rbuf_recv_destroy(struct ECPConnection *conn);
int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
-int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
+int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, ecp_pts_t delay);
int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
void ecp_rbuf_send_destroy(struct ECPConnection *conn);
int ecp_rbuf_send_start(struct ECPConnection *conn);
-int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si);
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, struct ECPSeqItem *si, unsigned char *payload);
ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, struct ECPSeqItem *si, unsigned char *packet, size_t pkt_size);
ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
+ssize_t ecp_rbuf_handle_flush_pts(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index fba8219..0970f6a 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -8,7 +8,7 @@
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
- unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
+ unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
@@ -42,28 +42,77 @@ static void msg_flush(ECPConnection *conn) {
#endif
ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
+ ecp_seq_t seq_next = buf->rbuf.seq_start;
ecp_seq_t i = 0;
unsigned int idx = buf->rbuf.msg_start;
+ if (buf->timer_pts) {
+ ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS);
+ buf->timer_pts = 0;
+ }
+
for (i=0; i<msg_cnt; i++) {
- if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break;
- if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
- int rv = 0;
- ecp_seq_t seq = buf->rbuf.seq_start + i;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
- } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ } else {
+ int rv = ECP_OK;
+ ecp_pts_t msg_pts;
+ ecp_seq_t seq = buf->rbuf.seq_start + i;
+ unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg);
+ unsigned char frag_tot;
+ unsigned char frag_cnt;
+
+ rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot);
+ if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) {
+ ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
+
+ if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) {
+ ecp_seq_t seq_fbeg = seq - frag_cnt;
+ ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next;
+
+ i -= seq_offset;
+ idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ break;
+ }
+ }
+
+ rv = ecp_msg_get_pts(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &msg_pts);
+ if (rv == ECP_OK) {
+ ecp_pts_t now = conn->sock->ctx->tm.abstime_ms(0);
+ if (ECP_PTS_LT(now, msg_pts)) {
+ ECPTimerItem ti;
+ ecp_seq_t seq_offset = seq - seq_next;
+
+ rv = ecp_timer_item_init(&ti, conn, ECP_MTYPE_RBFLUSH_PTS, 0, msg_pts - now);
+ if (!rv) rv = ecp_timer_push(&ti);
+ if (!rv) buf->timer_pts = 1;
+
+ i -= seq_offset;
+ idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
+ break;
+ }
+ }
+
+ seq_next = seq + 1;
+ if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
- unsigned char mtype = buf->rbuf.msg[idx].msg[0];
- int rv = ecp_conn_msgq_push(conn, seq, mtype);
- if (rv) break;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
+ rv = ecp_conn_msgq_push(conn, seq, ecp_msg_get_type(buf->rbuf.msg[idx].msg));
+ if (rv) break;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
#endif
- } else {
- ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ } else {
+ ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);
+ }
}
buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ } else {
+ if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break;
+ if (buf->hole_max) {
+ ecp_seq_t seq = buf->rbuf.seq_start + i;
+ ecp_seq_t seq_offset = buf->rbuf.seq_max - seq;
+ if (seq_offset <= buf->hole_max) break;
+ }
}
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
}
@@ -78,8 +127,8 @@ static void msg_flush(ECPConnection *conn) {
static int ack_send(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
- unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t))];
- unsigned char *buf_ = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), 0)];
+ unsigned char *buf_ = ecp_pld_get_buf(payload, 0);
ssize_t rv;
ecp_pld_set_type(payload, ECP_MTYPE_RBACK);
@@ -150,6 +199,16 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
return 0;
}
+ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
+ ECPRBRecv *buf = conn->rbuf.recv;
+
+ if (buf == NULL) return ECP_ERR;
+
+ buf->timer_pts = 0;
+ msg_flush(conn);
+ return 0;
+}
+
int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
@@ -185,13 +244,10 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
return ECP_OK;
}
-int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
- if (buf->hole_max < delay - 1) {
- ecp_rbuf_recv_set_hole(conn, delay - 1);
- }
return ECP_OK;
}
@@ -226,7 +282,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
- mtype = msg[0] & ECP_MTYPE_MASK;
+ mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size);
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index e0c7f29..d582e67 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -5,7 +5,7 @@
#define NACK_RATE_UNIT 10000
static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH);
if (ti == NULL) {
@@ -127,7 +127,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size);
if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_NOP);
ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i);
} else {
@@ -235,7 +235,7 @@ int ecp_rbuf_send_start(ECPConnection *conn) {
int ecp_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_seq_t seq;
if (buf == NULL) return ECP_ERR;
@@ -269,7 +269,9 @@ int ecp_rbuf_flush(ECPConnection *conn) {
return ECP_OK;
}
-int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char *payload) {
+ if (si->rb_pass) return ECP_OK;
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
@@ -280,6 +282,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
if (idx < 0) return idx;
+ si->rb_mtype = ecp_pld_get_type(payload);
si->rb_idx = idx;
buf->rbuf.msg[idx].size = 0;
buf->rbuf.msg[idx].flags = 0;
@@ -288,54 +291,58 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
}
ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, ECPSeqItem *si, unsigned char *packet, size_t pkt_size) {
- unsigned char flags = 0;
int do_send = 1;
ssize_t rv = 0;
- ecp_seq_t seq = si->seq;
- unsigned int idx = si->rb_idx;
- unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
-
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
-
- rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
- if (rv < 0) return rv;
+
+ if (!si->rb_pass) {
+ unsigned char flags = 0;
+ ecp_seq_t seq = si->seq;
+ unsigned int idx = si->rb_idx;
+ unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
-#endif
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ if (rv < 0) return rv;
- if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
- if (!buf->cnt_cc) buf->seq_cc = seq;
- buf->cnt_cc++;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
- do_send = 0;
- if (ti) {
- ECPRBTimer *timer = &buf->timer;
- ECPRBTimerItem *item = &timer->item[timer->idx_w];
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+ int _rv = ECP_OK;
+ #ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+ #endif
+
+ if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cnt_cc) buf->seq_cc = seq;
+ buf->cnt_cc++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
+ do_send = 0;
+ if (ti) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[timer->idx_w];
- if (!item->occupied) {
- item->occupied = 1;
- item->item = *ti;
- buf->rbuf.msg[idx].idx_t = timer->idx_w;
- timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ if (!item->occupied) {
+ item->occupied = 1;
+ item->item = *ti;
+ buf->rbuf.msg[idx].idx_t = timer->idx_w;
+ timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ } else {
+ _rv = ECP_ERR_MAX_TIMER;
+ }
} else {
- rv = ECP_ERR_MAX_TIMER;
+ buf->rbuf.msg[idx].idx_t = -1;
}
} else {
- buf->rbuf.msg[idx].idx_t = -1;
+ buf->in_transit++;
}
- } else {
- buf->in_transit++;
- }
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
-#endif
+ #ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+ #endif
- if (rv) return rv;
+ if (_rv) return _rv;
+ }
}
if (do_send) {
diff --git a/code/core/timer.c b/code/core/timer.c
index ea516df..9d2569e 100644
--- a/code/core/timer.c
+++ b/code/core/timer.c
@@ -20,7 +20,7 @@ void ecp_timer_destroy(ECPTimer *timer) {
#endif
}
-int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout) {
+int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout) {
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
if (ti == NULL) return ECP_ERR;
@@ -61,7 +61,7 @@ int ecp_timer_push(ECPTimerItem *ti) {
if (!rv) {
for (i=timer->head; i>=0; i--) {
- if (timer->item[i].abstime >= ti->abstime) {
+ if (ECP_CTS_LTE(ti->abstime, timer->item[i].abstime)) {
if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i));
timer->item[i+1] = *ti;
timer->head++;
@@ -151,22 +151,22 @@ void ecp_timer_remove(ECPConnection *conn) {
}
-unsigned int ecp_timer_exe(ECPSocket *sock) {
+ecp_cts_t ecp_timer_exe(ECPSocket *sock) {
int i;
- unsigned int ret = 0;
+ ecp_cts_t ret = 0;
ECPTimer *timer = &sock->timer;
ECPTimerItem to_exec[ECP_MAX_TIMER];
int to_exec_size = 0;
- unsigned int now = sock->ctx->tm.abstime_ms(0);
+ ecp_cts_t now = sock->ctx->tm.abstime_ms(0);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
#endif
for (i=timer->head; i>=0; i--) {
- unsigned int abstime = timer->item[i].abstime;
+ ecp_cts_t abstime = timer->item[i].abstime;
- if (abstime > now) {
+ if (ECP_CTS_LT(now, abstime)) {
ret = abstime - now;
break;
}
@@ -191,7 +191,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
ecp_timer_retry_t *retry = to_exec[i].retry;
ecp_conn_handler_msg_t *handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL;
- if (to_exec[i].cnt) {
+ if (to_exec[i].cnt > 0) {
ssize_t _rv = 0;
to_exec[i].cnt--;
if (retry) {
@@ -219,7 +219,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
return ret;
}
-ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout) {
+ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout) {
int rv = ECP_OK;
ECPTimerItem ti;
diff --git a/code/core/timer.h b/code/core/timer.h
index ba25b6c..5919a42 100644
--- a/code/core/timer.h
+++ b/code/core/timer.h
@@ -15,9 +15,9 @@ typedef ssize_t ecp_timer_retry_t (struct ECPConnection *, struct ECPTimerItem *
typedef struct ECPTimerItem {
struct ECPConnection *conn;
unsigned char mtype;
- unsigned short cnt;
- unsigned int abstime;
- unsigned int timeout;
+ short cnt;
+ ecp_cts_t abstime;
+ ecp_cts_t timeout;
ecp_timer_retry_t *retry;
unsigned char *pld;
size_t pld_size;
@@ -33,9 +33,9 @@ typedef struct ECPTimer {
int ecp_timer_create(ECPTimer *timer);
void ecp_timer_destroy(ECPTimer *timer);
-int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout);
+int ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, short cnt, ecp_cts_t timeout);
int ecp_timer_push(ECPTimerItem *ti);
void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype);
void ecp_timer_remove(struct ECPConnection *conn);
-unsigned int ecp_timer_exe(struct ECPSocket *sock);
-ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, unsigned short cnt, unsigned int timeout); \ No newline at end of file
+ecp_cts_t ecp_timer_exe(struct ECPSocket *sock);
+ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned char mtype, short cnt, ecp_cts_t timeout); \ No newline at end of file
diff --git a/code/test/basic.c b/code/test/basic.c
index 809d453..dae2819 100644
--- a/code/test/basic.c
+++ b/code/test/basic.c
@@ -29,8 +29,8 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
return s;
}
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "PERA JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
@@ -47,8 +47,8 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
printf("MSG S:%s size:%ld\n", p, s);
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "VAISTINU JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
diff --git a/code/test/client.c b/code/test/client.c
index 1d5dc6c..992479b 100644
--- a/code/test/client.c
+++ b/code/test/client.c
@@ -25,8 +25,8 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
return s;
}
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "PERA JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
diff --git a/code/test/echo.c b/code/test/echo.c
index fe17901..593b904 100644
--- a/code/test/echo.c
+++ b/code/test/echo.c
@@ -15,7 +15,7 @@ ECPConnHandler handler_s;
#define MTYPE_MSG 8
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
- ssize_t rv = ecp_send(conn, p-ECP_SIZE_MSG_HDR, ECP_SIZE_MSG_HDR+s);
+ ssize_t rv = ecp_send(conn, p-ECP_SIZE_PLD_HDR-1, ECP_SIZE_PLD_HDR+1+s);
return s;
}
diff --git a/code/test/server.c b/code/test/server.c
index 6851cdf..471164e 100644
--- a/code/test/server.c
+++ b/code/test/server.c
@@ -17,8 +17,8 @@ ECPConnHandler handler_s;
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
printf("MSG S:%s size:%ld\n", p, s);
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "VAISTINU JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
diff --git a/code/test/stress.c b/code/test/stress.c
index 081c0cb..d1ead1d 100644
--- a/code/test/stress.c
+++ b/code/test/stress.c
@@ -77,7 +77,7 @@ static void catchINFO(int sig) {
void *sender(ECPConnection *c) {
int idx = (int)(c->conn_data);
- unsigned char payload[ECP_SIZE_PLD(1000)];
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
printf("OPEN:%d\n", idx);
while(1) {
@@ -112,7 +112,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
int idx = (int)(conn->conn_data);
- unsigned char payload[ECP_SIZE_PLD(1000)];
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
if (c_start) {
pthread_mutex_lock(&t_mtx[idx]);
@@ -126,7 +126,7 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
}
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
- unsigned char payload[ECP_SIZE_PLD(1000)];
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
ecp_pld_set_type(payload, MTYPE_MSG);
ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
return s;
diff --git a/code/test/vc_client.c b/code/test/vc_client.c
index 8f4b084..d5cb8f2 100644
--- a/code/test/vc_client.c
+++ b/code/test/vc_client.c
@@ -31,8 +31,8 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
printf("OPEN!\n");
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "PERA JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
diff --git a/code/test/vc_server.c b/code/test/vc_server.c
index fb6b923..47c6267 100644
--- a/code/test/vc_server.c
+++ b/code/test/vc_server.c
@@ -26,8 +26,8 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
ssize_t handle_msg(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
printf("MSG S:%s size:%ld\n", p, s);
- unsigned char payload[ECP_SIZE_PLD(1000)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(1000, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
char *msg = "VAISTINU JE CAR!";
ecp_pld_set_type(payload, MTYPE_MSG);
diff --git a/code/test/voip.c b/code/test/voip.c
index 8c53d01..db1ea9f 100644
--- a/code/test/voip.c
+++ b/code/test/voip.c
@@ -188,7 +188,7 @@ int main(int argc, char *argv[]) {
while(1) {
ecp_pld_set_type(payload, MTYPE_MSG);
- opus_buf = ecp_pld_get_buf(payload);
+ opus_buf = ecp_pld_get_buf(payload, 0);
opus_int32 len = a_read(handle_cpt, alsa_in_buf, alsa_frames, opus_enc, opus_buf, ECP_MAX_MSG);
if (len < 0) continue;
ssize_t _rv = ecp_send(&conn, payload, len);
diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c
index c59aefb..4eebb62 100644
--- a/code/vconn/vconn.c
+++ b/code/vconn/vconn.c
@@ -70,7 +70,7 @@ static void vconn_destroy(ECPConnection *conn) {
}
static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
@@ -185,9 +185,9 @@ static ssize_t vconn_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c
if (conn_out == NULL) return ECP_ERR;
- payload = msg - ECP_SIZE_MSG_HDR;
+ payload = msg - ECP_SIZE_PLD_HDR - 1;
ecp_pld_set_type(payload, ECP_MTYPE_EXEC);
- rv = ecp_pld_send(conn_out, payload, ECP_SIZE_MSG_HDR+size);
+ rv = ecp_pld_send(conn_out, payload, size + msg - payload);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn_out->mutex);
@@ -257,8 +257,8 @@ static void vlink_destroy(ECPConnection *conn) {
static ssize_t _vlink_send_open(ECPConnection *conn, ECPTimerItem *ti) {
ECPSocket *sock = conn->sock;
ECPContext *ctx = sock->ctx;
- unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1)];
- unsigned char *buf = ecp_pld_get_buf(payload);
+ unsigned char payload[ECP_SIZE_PLD(ECP_ECDH_SIZE_KEY+1, 0)];
+ unsigned char *buf = ecp_pld_get_buf(payload, 0);
int rv = ECP_OK;
// XXX server should verify perma_key
@@ -347,9 +347,9 @@ static ssize_t vlink_handle_relay(ECPConnection *conn, ecp_seq_t seq, unsigned c
if (conn == NULL) return ECP_ERR;
- payload = msg - ECP_SIZE_MSG_HDR;
+ payload = msg - ECP_SIZE_PLD_HDR - 1;
ecp_pld_set_type(payload, ECP_MTYPE_EXEC);
- rv = ecp_pld_send(conn, payload, ECP_SIZE_MSG_HDR+size);
+ rv = ecp_pld_send(conn, payload, size + msg - payload);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
@@ -373,11 +373,11 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t
unsigned char *buf = NULL;
int rv;
- if (pld_out_size < ECP_SIZE_MSG_HDR+2+2*ECP_ECDH_SIZE_KEY) return ECP_ERR;
+ if (pld_out_size < ECP_SIZE_PLD_HDR + 3 + 2 * ECP_ECDH_SIZE_KEY) return ECP_ERR;
if (conn_next == NULL) return ECP_ERR;
ecp_pld_set_type(pld_out, ECP_MTYPE_OPEN_REQ);
- buf = ecp_pld_get_buf(pld_out);
+ buf = ecp_pld_get_buf(pld_out, 0);
buf[0] = ECP_CTYPE_VCONN;
rv = ecp_conn_dhkey_get_curr(conn_next, NULL, buf+1);
@@ -386,12 +386,12 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t
memcpy(buf+1+ECP_ECDH_SIZE_KEY, ctx->cr.dh_pub_get_buf(&conn_next->node.public), ECP_ECDH_SIZE_KEY);
buf[1+2*ECP_ECDH_SIZE_KEY] = ECP_MTYPE_RELAY;
- return ECP_SIZE_MSG_HDR+2+2*ECP_ECDH_SIZE_KEY;
+ return ECP_SIZE_PLD_HDR + 3 + 2 * ECP_ECDH_SIZE_KEY;
}
}
ecp_pld_set_type(pld_out, ECP_MTYPE_RELAY);
- return ECP_SIZE_MSG_HDR;
+ return ECP_SIZE_PLD_HDR + 1;
}
@@ -506,7 +506,7 @@ int ecp_vconn_init(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn
}
static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));