summaryrefslogtreecommitdiff
path: root/ecp/src
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-05-10 19:45:55 +0200
committerUros Majstorovic <majstor@majstor.org>2024-05-10 19:45:55 +0200
commit54f3d8e3accc3b28b076f3ed09385b852c815824 (patch)
tree9470d41ea4750149ebd6617952067205e4676e64 /ecp/src
parent9089e3b41cefeb20cf504fec51bbb5177aa00304 (diff)
timer support for frag messages
Diffstat (limited to 'ecp/src')
-rw-r--r--ecp/src/ecp/core.c73
-rw-r--r--ecp/src/ecp/core.h4
-rw-r--r--ecp/src/ecp/timer.c24
-rw-r--r--ecp/src/ecp/timer.h3
4 files changed, 65 insertions, 39 deletions
diff --git a/ecp/src/ecp/core.c b/ecp/src/ecp/core.c
index 2741df2..33debdb 100644
--- a/ecp/src/ecp/core.c
+++ b/ecp/src/ecp/core.c
@@ -2245,6 +2245,7 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype,
ssize_t ecp_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
unsigned char mtype;
unsigned char *msg;
+ unsigned char frag_tot;
size_t hdr_size, msg_size;
ssize_t rv;
int _rv;
@@ -2252,13 +2253,20 @@ ssize_t ecp_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *pa
_rv = ecp_pld_get_type(payload, pld_size, &mtype);
if (_rv) return _rv;
+ frag_tot = 0;
+ if (mtype & ECP_MTYPE_FLAG_FRAG) {
+ _rv = ecp_pld_get_frag(payload, pld_size, NULL, &frag_tot, NULL);
+ if (_rv) return _rv;
+ }
+
+ ecp_timer_pop(conn, mtype, frag_tot);
+
msg = ecp_pld_get_msg(payload, pld_size);
if (msg == NULL) return ECP_ERR;
hdr_size = msg - payload;
msg_size = pld_size - hdr_size;
rv = ecp_msg_handle(conn, seq, mtype, msg, msg_size, bufs);
- ecp_timer_pop(conn, mtype);
if (rv < 0) {
ecp_err_handle(conn, mtype, rv);
return rv;
@@ -2417,35 +2425,40 @@ ssize_t ecp_unpack(ECPSocket *sock, ECPConnection *parent, ecp_tr_addr_t *addr,
ecp_buf2nonce(&nonce_pkt, nonce_buf);
if (is_open) {
+ ecp_nonce_t nonce_offset;
+
if (ECP_NONCE_LTE(nonce_pkt, nonce_conn)) {
- ecp_nonce_t nonce_offset = nonce_conn - nonce_pkt;
+ ecp_ack_t _nonce_map;
- if (nonce_offset < ECP_SIZE_ACKB) {
- ecp_ack_t nonce_mask = ((ecp_ack_t)1 << nonce_offset);
+ nonce_offset = nonce_conn - nonce_pkt;
+ if (nonce_offset >= ECP_SIZE_ACKB) {
+ rv = ECP_ERR_SEQ;
+ goto unpack_fin;
+ }
- if (nonce_mask & nonce_map) _rv = ECP_ERR_SEQ;
- if (!_rv) nonce_in = nonce_conn;
- } else {
- _rv = ECP_ERR_SEQ;
+ _nonce_map = ((ecp_ack_t)1 << nonce_offset);
+ if (nonce_map & _nonce_map) {
+ rv = ECP_ERR_SEQ;
+ goto unpack_fin;
}
+
+ nonce_map |= _nonce_map;
+ nonce_in = nonce_conn;
} else {
- ecp_nonce_t nonce_offset = nonce_pkt - nonce_conn;
+ nonce_offset = nonce_pkt - nonce_conn;
- if (nonce_offset < ECP_MAX_SEQ_FWD) {
- if (nonce_offset < ECP_SIZE_ACKB) {
- nonce_map = nonce_map << nonce_offset;
- } else {
- nonce_map = 0;
- }
- nonce_map |= 1;
- nonce_in = nonce_pkt;
+ if (nonce_offset >= ECP_MAX_SEQ_FWD) {
+ rv = ECP_ERR_SEQ;
+ goto unpack_fin;
+ }
+
+ if (nonce_offset < ECP_SIZE_ACKB) {
+ nonce_map = nonce_map << nonce_offset;
} else {
- _rv = ECP_ERR_SEQ;
+ nonce_map = 0;
}
- }
- if (_rv) {
- rv = _rv;
- goto unpack_fin;
+ nonce_map |= 1;
+ nonce_in = nonce_pkt;
}
}
@@ -2876,23 +2889,23 @@ int ecp_pld_set_type(unsigned char *pld, size_t pld_size, unsigned char mtype) {
return ECP_OK;
}
-int ecp_pld_get_frag(unsigned char *pld, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size) {
+int ecp_pld_get_frag(unsigned char *pld, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_sz) {
if (pld_size < ECP_SIZE_MTYPE) return ECP_ERR_SIZE;
if (!(pld[0] & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
if (pld_size < (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(pld[0]))) return ECP_ERR_SIZE;
if (pld[2] == 0) return ECP_ERR;
- *frag_cnt = pld[1];
- *frag_tot = pld[2];
- *frag_size = \
+ if (frag_cnt) *frag_cnt = pld[1];
+ if (frag_tot) *frag_tot = pld[2];
+ if (frag_sz) *frag_sz = \
((uint16_t)pld[3] << 8) | \
((uint16_t)pld[4]);
return ECP_OK;
}
-int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_size) {
+int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_sz) {
if (pld_size < ECP_SIZE_MTYPE) return ECP_ERR_SIZE;
if (!(pld[0] & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
@@ -2900,8 +2913,8 @@ int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt
pld[1] = frag_cnt;
pld[2] = frag_tot;
- pld[3] = frag_size >> 8;
- pld[4] = frag_size;
+ pld[3] = frag_sz >> 8;
+ pld[4] = frag_sz;
return ECP_OK;
}
@@ -2915,7 +2928,7 @@ int ecp_pld_get_pts(unsigned char *pld, size_t pld_size, ecp_pts_t *pts) {
if (pld_size < (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(pld[0]))) return ECP_ERR_SIZE;
offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FRAG(pld[0]);
- *pts = \
+ if (pts) *pts = \
((ecp_pts_t)pld[offset] << 24) | \
((ecp_pts_t)pld[offset + 1] << 16) | \
((ecp_pts_t)pld[offset + 2] << 8) | \
diff --git a/ecp/src/ecp/core.h b/ecp/src/ecp/core.h
index 3ae635a..9e6a775 100644
--- a/ecp/src/ecp/core.h
+++ b/ecp/src/ecp/core.h
@@ -493,8 +493,8 @@ ssize_t ecp_pack_conn(ECPConnection *conn, ECPBuffer *packet, unsigned char s_id
int ecp_pld_get_type(unsigned char *pld, size_t pld_size, unsigned char *mtype);
int ecp_pld_set_type(unsigned char *pld, size_t pld_size, unsigned char mtype);
-int ecp_pld_get_frag(unsigned char *pld, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_size);
-int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_size);
+int ecp_pld_get_frag(unsigned char *pld, size_t pld_size, unsigned char *frag_cnt, unsigned char *frag_tot, uint16_t *frag_sz);
+int ecp_pld_set_frag(unsigned char *pld, size_t pld_size, unsigned char frag_cnt, unsigned char frag_tot, uint16_t frag_sz);
int ecp_pld_get_pts(unsigned char *pld, size_t pld_size, ecp_pts_t *pts);
int ecp_pld_set_pts(unsigned char *pld, size_t pld_size, ecp_pts_t pts);
unsigned char *ecp_pld_get_msg(unsigned char *pld, size_t pld_size);
diff --git a/ecp/src/ecp/timer.c b/ecp/src/ecp/timer.c
index 00478ed..7e54344 100644
--- a/ecp/src/ecp/timer.c
+++ b/ecp/src/ecp/timer.c
@@ -27,6 +27,7 @@ void ecp_timer_destroy(ECPTimer *timer) {
void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout) {
ti->conn = conn;
ti->mtype = mtype;
+ ti->frag_cnt = 0;
ti->retry = retry_f;
ti->cnt = cnt;
ti->timeout = timeout;
@@ -34,9 +35,9 @@ void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mt
}
int ecp_timer_push(ECPTimerItem *ti) {
- int i, rv = ECP_OK;
ECPConnection *conn = ti->conn;
ECPTimer *timer = &conn->sock->timer;
+ int i, rv = ECP_OK;
ti->abstime = ecp_tm_get_ms() + ti->timeout;
@@ -80,9 +81,9 @@ timer_push_fin:
return rv;
}
-void ecp_timer_pop(ECPConnection *conn, unsigned char mtype) {
- int i;
+void ecp_timer_pop(ECPConnection *conn, unsigned char mtype, unsigned char frag_tot) {
ECPTimer *timer = &conn->sock->timer;
+ int i;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
@@ -90,7 +91,16 @@ void ecp_timer_pop(ECPConnection *conn, unsigned char mtype) {
for (i=timer->head; i>=0; i--) {
ECPConnection *curr_conn = timer->item[i].conn;
+
if ((conn == curr_conn) && (mtype == timer->item[i].mtype)) {
+ if (mtype & ECP_MTYPE_FLAG_FRAG) {
+ if (timer->item[i].frag_cnt) {
+ timer->item[i].frag_cnt--;
+ } else if (frag_tot) {
+ timer->item[i].frag_cnt = frag_tot - 1;
+ }
+ if (timer->item[i].frag_cnt != 0) break;
+ }
if (i != timer->head) {
memmove(timer->item+i, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i));
memset(timer->item+timer->head, 0, sizeof(ECPTimerItem));
@@ -109,8 +119,8 @@ void ecp_timer_pop(ECPConnection *conn, unsigned char mtype) {
}
void ecp_timer_remove(ECPConnection *conn) {
- int i;
ECPTimer *timer = &conn->sock->timer;
+ int i;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
@@ -118,6 +128,7 @@ void ecp_timer_remove(ECPConnection *conn) {
for (i=timer->head; i>=0; i--) {
ECPConnection *curr_conn = timer->item[i].conn;
+
if (conn == curr_conn) {
if (i != timer->head) {
memmove(timer->item+i, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i));
@@ -136,12 +147,12 @@ void ecp_timer_remove(ECPConnection *conn) {
}
ecp_sts_t ecp_timer_exe(ECPSocket *sock) {
- int i;
ecp_sts_t ret = 0;
ECPTimer *timer = &sock->timer;
ECPTimerItem to_exec[ECP_MAX_TIMER];
- int to_exec_size = 0;
ecp_sts_t now = ecp_tm_get_ms();
+ int to_exec_size = 0;
+ int i;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
@@ -175,6 +186,7 @@ ecp_sts_t ecp_timer_exe(ECPSocket *sock) {
rv = ECP_OK;
if (to_exec[i].cnt > 0) {
to_exec[i].cnt--;
+ to_exec[i].frag_cnt = 0;
if (retry) {
ssize_t _rv;
diff --git a/ecp/src/ecp/timer.h b/ecp/src/ecp/timer.h
index 9079a5e..2ac9173 100644
--- a/ecp/src/ecp/timer.h
+++ b/ecp/src/ecp/timer.h
@@ -9,6 +9,7 @@ typedef ssize_t (*ecp_timer_retry_t) (struct ECPConnection *, struct ECPTimerIte
typedef struct ECPTimerItem {
struct ECPConnection *conn;
unsigned char mtype;
+ unsigned char frag_cnt;
unsigned short cnt;
ecp_sts_t abstime;
ecp_sts_t timeout;
@@ -27,7 +28,7 @@ int ecp_timer_create(ECPTimer *timer);
void ecp_timer_destroy(ECPTimer *timer);
void ecp_timer_item_init(ECPTimerItem *ti, struct ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout);
int ecp_timer_push(ECPTimerItem *ti);
-void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype);
+void ecp_timer_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char frag_tot);
void ecp_timer_remove(struct ECPConnection *conn);
ecp_sts_t ecp_timer_exe(struct ECPSocket *sock);
ssize_t ecp_timer_send(struct ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout);