summaryrefslogtreecommitdiff
path: root/ecp
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-05-14 00:52:49 +0200
committerUros Majstorovic <majstor@majstor.org>2024-05-14 00:52:49 +0200
commit7de54ce67127e010fde241089f262421818ea7be (patch)
tree648ae1e07b78bfca3bdac8e5bd952ce1c19d78f2 /ecp
parent52f23e15fc485f6b297568fad64a8582e68e9da8 (diff)
added fragmented messages support to ecp core; added timer and connection params
Diffstat (limited to 'ecp')
-rw-r--r--ecp/src/ecp/common.mk5
-rw-r--r--ecp/src/ecp/core.c132
-rw-r--r--ecp/src/ecp/core.h17
-rw-r--r--ecp/src/ecp/ext.h12
-rw-r--r--ecp/src/ecp/ext/Makefile4
-rw-r--r--ecp/src/ecp/ext/frag.c143
-rw-r--r--ecp/src/ecp/ext/frag.h12
-rw-r--r--ecp/src/ecp/ext/rbuf_recv.c2
-rw-r--r--ecp/src/ecp/ext/rbuf_send.c2
-rw-r--r--ecp/src/ecp/timer.c37
-rw-r--r--ecp/src/ecp/timer.h8
-rw-r--r--ecp/src/platform/posix/features_tmpl.mk1
12 files changed, 169 insertions, 206 deletions
diff --git a/ecp/src/ecp/common.mk b/ecp/src/ecp/common.mk
index ad8277a..60f7087 100644
--- a/ecp/src/ecp/common.mk
+++ b/ecp/src/ecp/common.mk
@@ -23,11 +23,6 @@ CFLAGS += -DECP_WITH_VCONN=1
subdirs += vconn
endif
-ifeq ($(with_frag),yes)
-CFLAGS += -DECP_WITH_FRAG=1
-ext_subdir = yes
-endif
-
ifeq ($(with_rbuf),yes)
CFLAGS += -DECP_WITH_RBUF=1
ext_subdir = yes
diff --git a/ecp/src/ecp/core.c b/ecp/src/ecp/core.c
index 66ed814..92b0ef5 100644
--- a/ecp/src/ecp/core.c
+++ b/ecp/src/ecp/core.c
@@ -1147,6 +1147,39 @@ int ecp_conn_set_closed(ECPConnection *conn) {
return is_cls ? ECP_ERR_CLOSED : ECP_OK;
}
+void *ecp_conn_set_param(ECPConnection *conn, void *param) {
+ void *r;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+
+ r = conn->param;
+ conn->param = param;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
+ return r;
+}
+
+void *ecp_conn_get_param(ECPConnection *conn) {
+ void *r;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+
+ r = conn->param;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
+ return r;
+}
+
void ecp_conn_set_remote_key(ECPConnection *conn, ECPDHPub *key) {
conn->remote.key_perma = *key;
}
@@ -1852,7 +1885,7 @@ ssize_t ecp_send_init_req(ECPConnection *conn, int retry) {
if (retry) {
ECPTimerItem ti;
- ecp_timer_item_init(&ti, conn, ECP_MTYPE_OPEN_REP, _retry_ireq, ECP_SEND_TRIES-1, ECP_SEND_TIMEOUT);
+ ecp_timer_item_init(&ti, conn, ECP_MTYPE_OPEN_REP, _retry_ireq, ECP_SEND_TRIES-1, ECP_SEND_TIMEOUT, NULL);
rv = _send_ireq(conn, &ti);
} else {
@@ -2230,7 +2263,7 @@ ssize_t ecp_send_keyx_req(ECPConnection *conn, int retry) {
ssize_t rv;
if (retry) {
- rv = ecp_timer_send(conn, _send_kxreq, ECP_MTYPE_KEYX_REP, ECP_SEND_TRIES, ECP_SEND_TIMEOUT);
+ rv = ecp_timer_send(conn, _send_kxreq, ECP_MTYPE_KEYX_REP, ECP_SEND_TRIES, ECP_SEND_TIMEOUT, NULL);
} else {
rv = _send_kxreq(conn, NULL);
}
@@ -2315,7 +2348,7 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype,
ssize_t ecp_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
unsigned char mtype;
unsigned char *msg;
- unsigned char frag_tot;
+ unsigned char frag_cnt, frag_tot;
size_t hdr_size, msg_size;
ssize_t rv;
int _rv;
@@ -2323,13 +2356,14 @@ ssize_t ecp_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *pa
_rv = ecp_pld_get_type(payload, pld_size, &mtype);
if (_rv) return _rv;
+ frag_cnt = 0;
frag_tot = 0;
if (mtype & ECP_MTYPE_FLAG_FRAG) {
- _rv = ecp_pld_get_frag(payload, pld_size, NULL, &frag_tot, NULL);
+ _rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, NULL);
if (_rv) return _rv;
}
- ecp_timer_pop(conn, mtype, frag_tot);
+ ecp_timer_pop(conn, mtype, seq, frag_cnt, frag_tot);
msg = ecp_pld_get_msg(payload, pld_size);
if (msg == NULL) return ECP_ERR;
@@ -2352,12 +2386,6 @@ ssize_t ecp_pld_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *payloa
size_t pld_size = _pld_size;
ssize_t rv;
- rv = ecp_ext_pld_handle_one(conn, seq, payload, pld_size, bufs);
- if (rv < 0) return rv;
-
- payload += rv;
- pld_size -= rv;
-
while (pld_size) {
rv = ecp_pld_handle_one(conn, seq, payload, pld_size, bufs);
if (rv < 0) return rv;
@@ -3031,6 +3059,85 @@ unsigned char *ecp_pld_get_msg(unsigned char *pld, size_t pld_size) {
return pld + ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(pld[0]);
}
+unsigned char *ecp_msg_get_pld(unsigned char mtype, unsigned char *msg) {
+ return msg - (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype));
+}
+
+int ecp_msg_get_frag(unsigned char mtype, unsigned char *msg, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size) {
+ unsigned char *payload;
+ size_t hdr_size;
+
+ if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
+
+ payload = ecp_msg_get_pld(mtype, msg);
+ hdr_size = msg - payload;
+
+ ecp_pld_get_frag(payload, hdr_size, frag_cnt, frag_tot, frag_size);
+
+ return ECP_OK;
+}
+
+int ecp_msg_get_pts(unsigned char mtype, unsigned char *msg, ecp_pts_t *pts) {
+ unsigned char *payload;
+ size_t hdr_size;
+
+ if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR;
+
+ payload = ecp_msg_get_pld(mtype, msg);
+ hdr_size = msg - payload;
+
+ ecp_pld_get_pts(payload, hdr_size, pts);
+
+ return ECP_OK;
+}
+
+int ecp_frag_start(ECPConnection *conn, ecp_seq_t seq, unsigned char frag_cnt, unsigned char frag_tot, int *is_first, int *is_last) {
+ int rv;
+
+ if (is_first) *is_first = 0;
+ if (is_last) *is_last = 0;
+
+ rv = ECP_OK;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+
+ if (conn->frag_cnt) {
+ ecp_seq_t seq_frag = conn->frag_seq + frag_cnt;
+
+ if (ECP_SEQ_LT(seq_frag, seq)) {
+ conn->frag_cnt = 0;
+ } else if (conn->frag_err || (seq_frag != seq)) {
+ rv = ECP_ERR_FRAG;
+ if (seq_frag != seq) return rv;
+ }
+ }
+ if (conn->frag_cnt) {
+ conn->frag_cnt--;
+ } else if (frag_tot) {
+ conn->frag_cnt = frag_tot - 1;
+ conn->frag_seq = seq - frag_cnt;
+ conn->frag_err = 0;
+ if (is_first) *is_first = 1;
+ }
+ if (conn->frag_cnt == 0) {
+ if (is_last) *is_last = 1;
+ }
+
+ return rv;
+}
+
+void ecp_frag_end(ECPConnection *conn, ecp_seq_t seq, unsigned char frag_cnt, int err) {
+ if (err && conn->frag_cnt && (conn->frag_seq + frag_cnt == seq)) {
+ conn->frag_err = 1;
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+}
+
ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, uint16_t zpad, unsigned char *cookie, ecp_nonce_t *n, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti) {
ecp_tr_addr_t addr;
size_t pkt_size;
@@ -3091,9 +3198,6 @@ ssize_t ecp_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *ms
payload.buffer = pld_buf;
payload.size = ECP_MAX_PLD;
- rv = ecp_ext_msg_send(conn, mtype, msg, msg_size, &packet, &payload);
- if (rv) return rv;
-
if (ECP_SIZE_PKT_BUF(ECP_SIZE_PLD(msg_size, mtype), conn) <= ECP_MAX_PKT) {
ecp_pld_set_type(pld_buf, ECP_MAX_PLD, mtype);
msg_buf = ecp_pld_get_msg(pld_buf, ECP_MAX_PLD);
diff --git a/ecp/src/ecp/core.h b/ecp/src/ecp/core.h
index 45e24dc..f5d21bb 100644
--- a/ecp/src/ecp/core.h
+++ b/ecp/src/ecp/core.h
@@ -29,6 +29,7 @@
#define ECP_ERR_VBOX -23
#define ECP_ERR_KEYID -24
#define ECP_ERR_SEQ -25
+#define ECP_ERR_FRAG -26
#define ECP_ERR_ENCRYPT -30
#define ECP_ERR_DECRYPT -31
@@ -223,9 +224,6 @@ typedef uint64_t ecp_nonce_t;
struct ECP2Buffer;
struct ECPSocket;
struct ECPConnection;
-#ifdef ECP_WITH_FRAG
-struct ECPFragIter;
-#endif
#ifdef ECP_WITH_HTABLE
#include "htable/htable.h"
@@ -373,14 +371,14 @@ typedef struct ECPConnection {
ECPDHShkey shkey[ECP_MAX_NODE_KEY][ECP_MAX_NODE_KEY];
ecp_sts_t access_ts;
ecp_sts_t keyx_ts;
+ ecp_seq_t frag_seq;
+ unsigned char frag_cnt;
+ unsigned char frag_err;
void *param;
#ifdef ECP_WITH_VCONN
struct ECPConnection *parent;
unsigned short pcount;
#endif
-#ifdef ECP_WITH_FRAG
- struct ECPFragIter *iter;
-#endif
#ifdef ECP_WITH_PTHREAD
pthread_mutex_t mutex;
#endif
@@ -428,6 +426,8 @@ int ecp_conn_test_uflags(ECPConnection *conn, unsigned char flags);
int ecp_conn_is_reg(ECPConnection *conn);
int ecp_conn_is_open(ECPConnection *conn);
int ecp_conn_set_closed(ECPConnection *conn);
+void *ecp_conn_set_param(ECPConnection *conn, void *param);
+void *ecp_conn_get_param(ECPConnection *conn);
void ecp_conn_set_remote_key(ECPConnection *conn, ECPDHPub *key);
void ecp_conn_set_remote_addr(ECPConnection *conn, ecp_tr_addr_t *addr);
int ecp_conn_create(ECPConnection *conn, ECPConnection *parent);
@@ -510,6 +510,11 @@ int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt
int ecp_pld_get_pts(unsigned char *pld, size_t pld_size, ecp_pts_t *pts);
int ecp_pld_set_pts(unsigned char *pld, size_t pld_size, ecp_pts_t pts);
unsigned char *ecp_pld_get_msg(unsigned char *pld, size_t pld_size);
+unsigned char *ecp_msg_get_pld(unsigned char mtype, unsigned char *msg);
+int ecp_msg_get_frag(unsigned char mtype, unsigned char *msg, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size);
+int ecp_msg_get_pts(unsigned char mtype, unsigned char *msg, ecp_pts_t *pts);
+int ecp_frag_start(ECPConnection *conn, ecp_seq_t seq, unsigned char frag_cnt, unsigned char frag_tot, int *is_first, int *is_last);
+void ecp_frag_end(ECPConnection *conn, ecp_seq_t seq, unsigned char frag_cnt, int err);
ssize_t _ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, unsigned char s_idx, unsigned char c_idx, uint16_t zpad, unsigned char *cookie, ecp_nonce_t *n, ECPBuffer *payload, size_t pld_size, unsigned char flags, ECPTimerItem *ti);
ssize_t ecp_pld_send(ECPConnection *conn, ECPBuffer *packet, ECPBuffer *payload, size_t pld_size, unsigned char flags);
diff --git a/ecp/src/ecp/ext.h b/ecp/src/ecp/ext.h
index d661329..835aa95 100644
--- a/ecp/src/ecp/ext.h
+++ b/ecp/src/ecp/ext.h
@@ -1,15 +1,3 @@
-#ifdef ECP_WITH_FRAG
-
-ssize_t ecp_ext_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs);
-ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload);
-
-#else
-
-#define ecp_ext_pld_handle_one(c,s,p,sz,b) (0)
-#define ecp_ext_msg_send(c,t,m,sz,p1,p2) (0)
-
-#endif
-
#ifdef ECP_WITH_RBUF
int ecp_ext_err_handle(ECPConnection *conn, unsigned char mtype, int err);
diff --git a/ecp/src/ecp/ext/Makefile b/ecp/src/ecp/ext/Makefile
index b20258f..67c9cbc 100644
--- a/ecp/src/ecp/ext/Makefile
+++ b/ecp/src/ecp/ext/Makefile
@@ -2,10 +2,6 @@ include ../common.mk
obj =
-ifeq ($(with_frag),yes)
-obj += frag.o
-endif
-
ifeq ($(with_rbuf),yes)
obj += rbuf.o rbuf_send.o rbuf_recv.o rbuf_ext.o
endif
diff --git a/ecp/src/ecp/ext/frag.c b/ecp/src/ecp/ext/frag.c
deleted file mode 100644
index f348da6..0000000
--- a/ecp/src/ecp/ext/frag.c
+++ /dev/null
@@ -1,143 +0,0 @@
-#include <stdlib.h>
-#include <string.h>
-
-#include <ecp/core.h>
-
-#include "frag.h"
-
-int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size) {
- memset(iter, 0, sizeof(ECPFragIter));
- iter->buffer = buffer;
- iter->buf_size = buf_size;
-
- conn->iter = iter;
- return ECP_OK;
-}
-
-void ecp_frag_iter_reset(ECPFragIter *iter) {
- iter->seq = 0;
- iter->frag_cnt = 0;
- iter->pld_size = 0;
-}
-
-ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size) {
- ECPFragIter *iter = conn->iter;
- unsigned char *msg;
- unsigned char frag_cnt, frag_tot;
- uint16_t frag_size;
- size_t hdr_size, msg_size;
- size_t buf_offset;
- int rv;
-
- rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, &frag_size);
- if (rv) return ECP_ERR;
-
- msg = ecp_pld_get_msg(payload, pld_size);
- if (msg == NULL) return ECP_ERR;
- hdr_size = msg - payload;
-
- msg_size = pld_size - hdr_size;
- if (msg_size == 0) return ECP_ERR;
-
- if (iter->pld_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter);
-
- if (iter->pld_size == 0) {
- iter->seq = seq - frag_cnt;
- iter->frag_cnt = 0;
- }
-
- mtype &= ~ECP_MTYPE_FLAG_FRAG;
- buf_offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) + frag_size * frag_cnt;
- if (buf_offset + msg_size > iter->buf_size) return ECP_ERR_SIZE;
- memcpy(iter->buffer + buf_offset, msg, msg_size);
-
- if (frag_cnt == 0) {
- if (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) > iter->buf_size) return ECP_ERR_SIZE;
-
- iter->buffer[0] = mtype;
- if (ECP_SIZE_MT_FLAG(mtype)) {
- memcpy(iter->buffer + ECP_SIZE_MTYPE, payload + ECP_SIZE_MTYPE, ECP_SIZE_MT_FLAG(mtype));
- }
- msg_size += ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype);
- }
-
- iter->frag_cnt++;
- iter->pld_size += msg_size;
- if (iter->frag_cnt == frag_tot) {
- ecp_pld_handle_one(conn, iter->seq, iter->buffer, iter->pld_size, NULL);
- ecp_frag_iter_reset(iter);
- }
-
- return pld_size;
-}
-
-ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
- unsigned char *msg_buf;
- unsigned char *pld_buf;
- size_t pld_size;
- size_t frag_size, frag_size_final;
- ecp_nonce_t nonce, nonce_start;
- int pkt_cnt = 0;
- int i;
-
- mtype |= ECP_MTYPE_FLAG_FRAG;
- frag_size = ECP_MAX_PKT - ECP_SIZE_PKT_BUF(ECP_SIZE_PLD(0, mtype), conn);
- pkt_cnt = msg_size / frag_size;
- frag_size_final = msg_size - frag_size * pkt_cnt;
- if (frag_size_final) pkt_cnt++;
-
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&conn->mutex);
-#endif
-
- nonce_start = conn->nonce_out;
- conn->nonce_out += pkt_cnt;
-
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&conn->mutex);
-#endif
-
- pld_buf = payload->buffer;
- pld_size = payload->size;
- for (i=0; i<pkt_cnt; i++) {
- ssize_t rv;
-
- ecp_pld_set_type(pld_buf, pld_size, mtype);
- ecp_pld_set_frag(pld_buf, pld_size, i, pkt_cnt, frag_size);
- msg_buf = ecp_pld_get_msg(pld_buf, pld_size);
-
- if ((i == pkt_cnt - 1) && frag_size_final) frag_size = frag_size_final;
- memcpy(msg_buf, msg, frag_size);
- msg += frag_size;
- nonce = nonce_start + i;
-
- rv = ecp_pld_send_wnonce(conn, packet, payload, ECP_SIZE_PLD(frag_size, mtype), 0, &nonce);
- if (rv < 0) return rv;
- }
-
- return msg_size;
-}
-
-ssize_t ecp_ext_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
- if (conn->iter) {
- unsigned char mtype;
- int rv;
-
- rv = ecp_pld_get_type(payload, pld_size, &mtype);
- if (rv) return rv;
-
- if (mtype & ECP_MTYPE_FLAG_FRAG) {
- return ecp_pld_defrag(conn, seq, mtype, payload, pld_size);
- }
- }
-
- return 0;
-}
-
-ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
- if (ECP_SIZE_PKT_BUF(ECP_SIZE_PLD(msg_size, mtype), conn) > ECP_MAX_PKT) {
- return ecp_msg_send_wfrag(conn, mtype, msg, msg_size, packet, payload);
- } else {
- return 0;
- }
-}
diff --git a/ecp/src/ecp/ext/frag.h b/ecp/src/ecp/ext/frag.h
deleted file mode 100644
index 2bf35d1..0000000
--- a/ecp/src/ecp/ext/frag.h
+++ /dev/null
@@ -1,12 +0,0 @@
-typedef struct ECPFragIter {
- ecp_seq_t seq;
- unsigned char frag_cnt;
- unsigned char *buffer;
- size_t buf_size;
- size_t pld_size;
-} ECPFragIter;
-
-int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size);
-void ecp_frag_iter_reset(ECPFragIter *iter);
-ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size);
-ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload);
diff --git a/ecp/src/ecp/ext/rbuf_recv.c b/ecp/src/ecp/ext/rbuf_recv.c
index 1282a78..777af66 100644
--- a/ecp/src/ecp/ext/rbuf_recv.c
+++ b/ecp/src/ecp/ext/rbuf_recv.c
@@ -96,7 +96,7 @@ static void msg_flush(ECPRBConn *conn) {
if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) {
ECPTimerItem ti;
- ecp_timer_item_init(&ti, _conn, ECP_MTYPE_RBTIMER, NULL, 0, msg_pts - now);
+ ecp_timer_item_init(&ti, _conn, ECP_MTYPE_RBTIMER, NULL, 0, msg_pts - now, NULL);
rv = ecp_timer_push(&ti);
if (!rv) rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_TIMER;
}
diff --git a/ecp/src/ecp/ext/rbuf_send.c b/ecp/src/ecp/ext/rbuf_send.c
index 5716f92..c235ad9 100644
--- a/ecp/src/ecp/ext/rbuf_send.c
+++ b/ecp/src/ecp/ext/rbuf_send.c
@@ -73,7 +73,7 @@ static ssize_t _rbuf_send_flush(ECPConnection *_conn, ECPTimerItem *ti) {
ssize_t ecp_rbuf_send_flush(ECPRBConn *conn) {
ECPConnection *_conn = ecp_rbuf_get_conn(conn);
- return ecp_timer_send(_conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+ return ecp_timer_send(_conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500, NULL);
}
ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size) {
diff --git a/ecp/src/ecp/timer.c b/ecp/src/ecp/timer.c
index 7e54344..3cacca0 100644
--- a/ecp/src/ecp/timer.c
+++ b/ecp/src/ecp/timer.c
@@ -24,7 +24,7 @@ void ecp_timer_destroy(ECPTimer *timer) {
#endif
}
-void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout) {
+void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout, void *param) {
ti->conn = conn;
ti->mtype = mtype;
ti->frag_cnt = 0;
@@ -32,6 +32,11 @@ void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mt
ti->cnt = cnt;
ti->timeout = timeout;
ti->abstime = 0;
+ ti->param = param;
+}
+
+void *ecp_timer_get_param(ECPTimerItem *ti) {
+ return ti->param;
}
int ecp_timer_push(ECPTimerItem *ti) {
@@ -81,7 +86,7 @@ timer_push_fin:
return rv;
}
-void ecp_timer_pop(ECPConnection *conn, unsigned char mtype, unsigned char frag_tot) {
+void ecp_timer_pop(ECPConnection *conn, unsigned char mtype, ecp_seq_t seq, unsigned char frag_cnt, unsigned char frag_tot) {
ECPTimer *timer = &conn->sock->timer;
int i;
@@ -94,6 +99,30 @@ void ecp_timer_pop(ECPConnection *conn, unsigned char mtype, unsigned char frag_
if ((conn == curr_conn) && (mtype == timer->item[i].mtype)) {
if (mtype & ECP_MTYPE_FLAG_FRAG) {
+ int rv;
+
+ rv = ECP_OK;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+
+ if (conn->frag_cnt) {
+ ecp_seq_t seq_frag = conn->frag_seq + frag_cnt;
+
+ if (ECP_SEQ_LT(seq_frag, seq)) {
+ timer->item[i].frag_cnt = 0;
+ } else if (seq_frag != seq) {
+ rv = ECP_ERR_FRAG;
+ }
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
+ if (rv) break;
+
if (timer->item[i].frag_cnt) {
timer->item[i].frag_cnt--;
} else if (frag_tot) {
@@ -220,12 +249,12 @@ ecp_sts_t ecp_timer_exe(ECPSocket *sock) {
return ret;
}
-ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout) {
+ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout, void *param) {
ECPTimerItem ti;
if (cnt == 0) return ECP_ERR;
- ecp_timer_item_init(&ti, conn, mtype, send_f, cnt-1, timeout);
+ ecp_timer_item_init(&ti, conn, mtype, send_f, cnt-1, timeout, param);
return send_f(conn, &ti);
}
diff --git a/ecp/src/ecp/timer.h b/ecp/src/ecp/timer.h
index 2ac9173..2b44060 100644
--- a/ecp/src/ecp/timer.h
+++ b/ecp/src/ecp/timer.h
@@ -14,6 +14,7 @@ typedef struct ECPTimerItem {
ecp_sts_t abstime;
ecp_sts_t timeout;
ecp_timer_retry_t retry;
+ void *param;
} ECPTimerItem;
typedef struct ECPTimer {
@@ -26,9 +27,10 @@ typedef struct ECPTimer {
int ecp_timer_create(ECPTimer *timer);
void ecp_timer_destroy(ECPTimer *timer);
-void ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout);
+void ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout, void *param);
+void *ecp_timer_get_param(ECPTimerItem *ti);
int ecp_timer_push(ECPTimerItem *ti);
-void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char frag_tot);
+void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype, ecp_seq_t seq, unsigned char frag_cnt, unsigned char frag_tot);
void ecp_timer_remove(struct ECPConnection *conn);
ecp_sts_t ecp_timer_exe(struct ECPSocket *sock);
-ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout);
+ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout, void *param);
diff --git a/ecp/src/platform/posix/features_tmpl.mk b/ecp/src/platform/posix/features_tmpl.mk
index 4efdc90..a840328 100644
--- a/ecp/src/platform/posix/features_tmpl.mk
+++ b/ecp/src/platform/posix/features_tmpl.mk
@@ -3,6 +3,5 @@ with_htable = yes
with_vconn = yes
with_dir = yes
with_debug = yes
-#with_frag = yes
#with_rbuf = yes
#with_msgq = yes