From c129b10bf7c851d94002767aa09e06c526cacb7d Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Wed, 6 Sep 2017 18:19:00 +0200 Subject: frad/defrag implemented --- code/core/core.c | 249 +++++++++++++++++++++++++++++++++++++------------- code/core/core.h | 39 ++++++-- code/core/rbuf.h | 1 + code/core/rbuf_recv.c | 4 - code/core/rbuf_send.c | 9 +- 5 files changed, 222 insertions(+), 80 deletions(-) (limited to 'code/core') diff --git a/code/core/core.c b/code/core/core.c index 8e21e82..ce607ef 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -1184,7 +1184,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, } } else { ecp_seq_t seq_offset = seq_p - seq_c; - if (seq_offset < ECP_MAX_SEQ_FORWARD) { + if (seq_offset < ECP_MAX_SEQ_FWD) { if (seq_offset < ECP_SIZE_ACKB) { seq_map = seq_map << seq_offset; } else { @@ -1280,6 +1280,106 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s return rv; } +int ecp_seq_item_init(ECPSeqItem *seq_item) { + memset(seq_item, 0, sizeof(ECPSeqItem)); + + return ECP_OK; +} + +int ecp_frag_iter_init(ECPFragIter *iter, unsigned char *buffer, size_t buf_size) { + memset(iter, 0, sizeof(ECPFragIter)); + iter->buffer = buffer; + iter->buf_size = buf_size; + + return ECP_OK; +} + +unsigned char ecp_msg_get_type(unsigned char *msg) { + return msg[0]; +} + +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { + size_t offset = 1 + ECP_SIZE_MT_FLAG(msg[0]); + + if (msg_size < offset) return NULL; + return msg + offset; +} + +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { + if (!(msg[0] & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; + if (msg_size < 3) return ECP_ERR; + + *frag_cnt = msg[1]; + *frag_tot = msg[2]; + + return ECP_OK; +} + +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { + unsigned char mtype = msg[0]; + size_t offset = 1 + ECP_SIZE_MT_FRAG(mtype); + + if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; + if (msg_size < offset + sizeof(ecp_pts_t)) return ECP_ERR; + + *pts = \ + (msg[offset] << 24) | \ + (msg[offset + 1] << 16) | \ + (msg[offset + 2] << 8) | \ + (msg[offset + 3]); + + return ECP_OK; +} + +int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char *msg_in, size_t msg_in_size, unsigned char **msg_out, size_t *msg_out_size) { + unsigned char frag_cnt, frag_tot; + int rv = ecp_msg_get_frag(msg_in, msg_in_size, &frag_cnt, &frag_tot); + if (rv == ECP_OK) { + unsigned char mtype = ecp_msg_get_type(msg_in) & (~ECP_MTYPE_FLAG_FRAG); + unsigned char *content = NULL; + size_t content_size = 0; + size_t buf_offset = 0; + + content = ecp_msg_get_content(msg_in, msg_in_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + + content_size = msg_in_size - (content - msg_in); + if (frag_cnt == 0) { + iter->seq = seq; + iter->frag_cnt = 0; + iter->frag_size = content_size; + iter->buffer[0] = mtype; + if (ECP_SIZE_MT_FLAG(mtype)) { + memcpy(iter->buffer + 1, msg_in, ECP_SIZE_MT_FLAG(mtype)); + } + } else { + if (iter->seq + frag_cnt != seq) { + iter->seq = seq - frag_cnt; + return ECP_ERR_ITER; + } + if (iter->frag_cnt != frag_cnt) return ECP_ERR_ITER; + } + if ((1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_tot) > iter->buf_size) return ECP_ERR_SIZE; + + buf_offset = 1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_cnt; + memcpy(iter->buffer + buf_offset, content, content_size); + iter->frag_cnt++; + + if (iter->frag_cnt == frag_tot) { + *msg_out = iter->buffer; + *msg_out_size = buf_offset + content_size; + return ECP_OK; + } else { + return ECP_ITER_NEXT; + } + } else { + iter->seq = seq; + *msg_out = msg_in; + *msg_out_size = msg_in_size; + return ECP_OK; + } +} + ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ecp_conn_handler_msg_t *handler = NULL; ssize_t rv = 0; @@ -1287,18 +1387,24 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s unsigned char *content = NULL; size_t rem_size = msg_size; - if (msg_size < 1) return ECP_ERR_MIN_MSG; - while (rem_size) { mtype = ecp_msg_get_type(msg); - content = ecp_msg_get_content(msg, rem_size); - if (content == NULL) return ECP_ERR_MIN_MSG; - - rem_size -= content - msg; - if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; ecp_timer_pop(conn, mtype); + +#ifdef ECP_WITH_RBUF + if (conn->rbuf.recv && conn->rbuf.recv->frag_iter) { + int _rv = ecp_msg_defrag(conn->rbuf.recv->frag_iter, seq, msg, msg_size, &msg, &rem_size); + if (_rv < 0) return _rv; + if (_rv == ECP_ITER_NEXT) return msg_size; + } +#endif + + content = ecp_msg_get_content(msg, rem_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + rem_size -= content - msg; + handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; if (handler) { rv = handler(conn, seq, mtype, content, rem_size); @@ -1308,80 +1414,47 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s rem_size -= rv; msg = content + rv; } else { - return msg_size-rem_size-1; + return msg_size - rem_size - 1; } } return msg_size; } -int ecp_seq_item_init(ECPSeqItem *seq_item) { - memset(seq_item, 0, sizeof(ECPSeqItem)); - - return ECP_OK; -} - void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) { payload[ECP_SIZE_PLD_HDR] = mtype; } -unsigned char ecp_pld_get_type(unsigned char *payload) { - return payload[ECP_SIZE_PLD_HDR]; -} - -unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { - size_t offset = 0; - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); - - return payload + ECP_SIZE_PLD_HDR + 1 + offset; -} - -unsigned char ecp_msg_get_type(unsigned char *msg) { - return msg[0]; -} - -unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { - size_t offset = 0; - unsigned char mtype = msg[0]; - - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); - - if (msg_size < 1 + offset) return NULL; - return msg + 1 + offset; -} - -int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { - unsigned char mtype = msg[0]; +int ecp_pld_set_frag(unsigned char *payload, unsigned char mtype, unsigned char frag_cnt, unsigned char frag_tot) { + size_t offset = ECP_SIZE_PLD_HDR + 1; if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; - if (msg_size < 3) return ECP_ERR; - - *frag_cnt = msg[1]; - *frag_tot = msg[2]; - + payload[offset] = frag_cnt; + payload[offset + 1] = frag_tot; return ECP_OK; } -int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { - size_t offset = 0; - unsigned char mtype = msg[0]; +int ecp_pld_set_pts(unsigned char *payload, unsigned char mtype, ecp_pts_t pts) { + size_t offset = ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FRAG(mtype); if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR; - - *pts = \ - (msg[1 + offset] << 24) | \ - (msg[2 + offset] << 16) | \ - (msg[3 + offset] << 8) | \ - (msg[4 + offset]); + payload[offset] = (pts & 0xFF000000) >> 24; + payload[offset + 1] = (pts & 0x00FF0000) >> 16; + payload[offset + 2] = (pts & 0x0000FF00) >> 8; + payload[offset + 3] = (pts & 0x000000FF); return ECP_OK; } +unsigned char ecp_pld_get_type(unsigned char *payload) { + return payload[ECP_SIZE_PLD_HDR]; +} + +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { + return payload + ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FLAG(mtype); +} + static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; @@ -1438,8 +1511,60 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *add return ecp_pkt_send(sock, parent ? &_addr : addr, packet, rv); } -ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) { - return ecp_pld_send(conn, payload, payload_size); +ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *content, size_t content_size) { + ssize_t rv = 0; + unsigned char payload[ECP_MAX_PLD]; + int pkt_cnt = 0; + int vc_cnt = conn->pcount + 1; + size_t size_max = ECP_MAX_PKT - (ECP_SIZE_PKT_HDR + ECP_AEAD_SIZE_TAG + ECP_SIZE_SEQ + 1) * vc_cnt; + + if (content_size + ECP_SIZE_MT_FLAG(mtype) > size_max) { + int i; + int _rv = ECP_OK; + size_t frag_size, frag_size_final; + ecp_seq_t seq_start = 0; + ECPSeqItem seq_item; + + _rv = ecp_seq_item_init(&seq_item); + if (_rv) return _rv; + + mtype |= ECP_MTYPE_FLAG_FRAG; + frag_size = size_max - ECP_SIZE_MT_FLAG(mtype); + pkt_cnt = content_size / frag_size; + frag_size_final = content_size - frag_size * pkt_cnt; + if (frag_size_final) pkt_cnt++; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&conn->mutex); +#endif + + seq_start = conn->seq_out + 1; + conn->seq_out += pkt_cnt; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&conn->mutex); +#endif + + seq_item.seq_w = 1; + for (i=0; iflags) & ECP_CONN_FLAG_REG) #define ecp_conn_is_open(conn) ((conn->flags) & ECP_CONN_FLAG_OPEN) @@ -86,6 +89,7 @@ struct ECPConnection; struct ECPSocket; struct ECPSeqItem; +struct ECPFragIter; typedef long ssize_t; @@ -109,9 +113,10 @@ typedef uint32_t ecp_seq_t; #define ECP_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_SEQ_HALF) -#define ECP_SIZE_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0) -#define ECP_SIZE_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0) -#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_FRAG(F)+ECP_SIZE_PTS(F)) +#define ECP_SIZE_MT_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0) +#define ECP_SIZE_MT_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0) +#define ECP_SIZE_MT_FLAG(F) (ECP_SIZE_MT_FRAG(F)+ECP_SIZE_MT_PTS(F)) +#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_MT_FLAG(F)) #define ECP_SIZE_PKT(X,F) (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD(X,F)+ECP_AEAD_SIZE_TAG) #ifdef ECP_WITH_PTHREAD @@ -232,6 +237,13 @@ typedef struct ECPSeqItem { #endif } ECPSeqItem; +typedef struct ECPFragIter { + ecp_seq_t seq; + size_t frag_size; + unsigned char frag_cnt; + unsigned char *buffer; + size_t buf_size; +} ECPFragIter; typedef struct ECPConnHandler { ecp_conn_handler_msg_t *msg[ECP_MAX_MTYPE]; @@ -309,6 +321,7 @@ typedef struct ECPConnection { pthread_mutex_t mutex; #endif struct ECPConnection *parent; + unsigned short pcount; void *conn_data; } ECPConnection; @@ -362,22 +375,28 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size); ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size); -ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); int ecp_seq_item_init(ECPSeqItem *seq_item); -void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); -unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype); -unsigned char ecp_pld_get_type(unsigned char *payload); +int ecp_frag_iter_init(ECPFragIter *iter, unsigned char *buffer, size_t buf_size); + unsigned char ecp_msg_get_type(unsigned char *msg); unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size); int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot); int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts); +int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char *msg_in, size_t msg_in_size, unsigned char **msg_out, size_t *msg_out_size); +ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); + +void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); +int ecp_pld_set_frag(unsigned char *payload, unsigned char mtype, unsigned char frag_cnt, unsigned char frag_tot); +int ecp_pld_set_pts(unsigned char *payload, unsigned char mtype, ecp_pts_t pts); +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype); +unsigned char ecp_pld_get_type(unsigned char *payload); ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); -ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); +ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *content, size_t content_size); ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout); int ecp_receiver(ECPSocket *sock); diff --git a/code/core/rbuf.h b/code/core/rbuf.h index c3e6ff9..3644f9c 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -65,6 +65,7 @@ typedef struct ECPRBRecv { #ifdef ECP_WITH_MSGQ ECPConnMsgQ msgq; #endif + struct ECPFragIter *frag_iter; } ECPRBRecv; typedef struct ECPRBSend { diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index 0970f6a..7b90b89 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -59,7 +59,6 @@ static void msg_flush(ECPConnection *conn) { int rv = ECP_OK; ecp_pts_t msg_pts; ecp_seq_t seq = buf->rbuf.seq_start + i; - unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg); unsigned char frag_tot; unsigned char frag_cnt; @@ -279,9 +278,6 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m int do_ack = 0; unsigned char mtype; - if (buf == NULL) return ECP_ERR; - if (msg_size < 1) return ECP_ERR_MIN_MSG; - mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size); diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index d582e67..685280d 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -307,9 +307,10 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { int _rv = ECP_OK; - #ifdef ECP_WITH_PTHREAD + +#ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); - #endif +#endif if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; @@ -337,9 +338,9 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP buf->in_transit++; } - #ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); - #endif +#endif if (_rv) return _rv; } -- cgit v1.2.3