From c129b10bf7c851d94002767aa09e06c526cacb7d Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Wed, 6 Sep 2017 18:19:00 +0200 Subject: frad/defrag implemented --- code/core/core.c | 249 +++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 187 insertions(+), 62 deletions(-) (limited to 'code/core/core.c') diff --git a/code/core/core.c b/code/core/core.c index 8e21e82..ce607ef 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -1184,7 +1184,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, } } else { ecp_seq_t seq_offset = seq_p - seq_c; - if (seq_offset < ECP_MAX_SEQ_FORWARD) { + if (seq_offset < ECP_MAX_SEQ_FWD) { if (seq_offset < ECP_SIZE_ACKB) { seq_map = seq_map << seq_offset; } else { @@ -1280,6 +1280,106 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s return rv; } +int ecp_seq_item_init(ECPSeqItem *seq_item) { + memset(seq_item, 0, sizeof(ECPSeqItem)); + + return ECP_OK; +} + +int ecp_frag_iter_init(ECPFragIter *iter, unsigned char *buffer, size_t buf_size) { + memset(iter, 0, sizeof(ECPFragIter)); + iter->buffer = buffer; + iter->buf_size = buf_size; + + return ECP_OK; +} + +unsigned char ecp_msg_get_type(unsigned char *msg) { + return msg[0]; +} + +unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { + size_t offset = 1 + ECP_SIZE_MT_FLAG(msg[0]); + + if (msg_size < offset) return NULL; + return msg + offset; +} + +int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { + if (!(msg[0] & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; + if (msg_size < 3) return ECP_ERR; + + *frag_cnt = msg[1]; + *frag_tot = msg[2]; + + return ECP_OK; +} + +int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { + unsigned char mtype = msg[0]; + size_t offset = 1 + ECP_SIZE_MT_FRAG(mtype); + + if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; + if (msg_size < offset + sizeof(ecp_pts_t)) return ECP_ERR; + + *pts = \ + (msg[offset] << 24) | \ + (msg[offset + 1] << 16) | \ + (msg[offset + 2] << 8) | \ + (msg[offset + 3]); + + return ECP_OK; +} + +int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char *msg_in, size_t msg_in_size, unsigned char **msg_out, size_t *msg_out_size) { + unsigned char frag_cnt, frag_tot; + int rv = ecp_msg_get_frag(msg_in, msg_in_size, &frag_cnt, &frag_tot); + if (rv == ECP_OK) { + unsigned char mtype = ecp_msg_get_type(msg_in) & (~ECP_MTYPE_FLAG_FRAG); + unsigned char *content = NULL; + size_t content_size = 0; + size_t buf_offset = 0; + + content = ecp_msg_get_content(msg_in, msg_in_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + + content_size = msg_in_size - (content - msg_in); + if (frag_cnt == 0) { + iter->seq = seq; + iter->frag_cnt = 0; + iter->frag_size = content_size; + iter->buffer[0] = mtype; + if (ECP_SIZE_MT_FLAG(mtype)) { + memcpy(iter->buffer + 1, msg_in, ECP_SIZE_MT_FLAG(mtype)); + } + } else { + if (iter->seq + frag_cnt != seq) { + iter->seq = seq - frag_cnt; + return ECP_ERR_ITER; + } + if (iter->frag_cnt != frag_cnt) return ECP_ERR_ITER; + } + if ((1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_tot) > iter->buf_size) return ECP_ERR_SIZE; + + buf_offset = 1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_cnt; + memcpy(iter->buffer + buf_offset, content, content_size); + iter->frag_cnt++; + + if (iter->frag_cnt == frag_tot) { + *msg_out = iter->buffer; + *msg_out_size = buf_offset + content_size; + return ECP_OK; + } else { + return ECP_ITER_NEXT; + } + } else { + iter->seq = seq; + *msg_out = msg_in; + *msg_out_size = msg_in_size; + return ECP_OK; + } +} + ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ecp_conn_handler_msg_t *handler = NULL; ssize_t rv = 0; @@ -1287,18 +1387,24 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s unsigned char *content = NULL; size_t rem_size = msg_size; - if (msg_size < 1) return ECP_ERR_MIN_MSG; - while (rem_size) { mtype = ecp_msg_get_type(msg); - content = ecp_msg_get_content(msg, rem_size); - if (content == NULL) return ECP_ERR_MIN_MSG; - - rem_size -= content - msg; - if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; ecp_timer_pop(conn, mtype); + +#ifdef ECP_WITH_RBUF + if (conn->rbuf.recv && conn->rbuf.recv->frag_iter) { + int _rv = ecp_msg_defrag(conn->rbuf.recv->frag_iter, seq, msg, msg_size, &msg, &rem_size); + if (_rv < 0) return _rv; + if (_rv == ECP_ITER_NEXT) return msg_size; + } +#endif + + content = ecp_msg_get_content(msg, rem_size); + if (content == NULL) return ECP_ERR_MIN_MSG; + rem_size -= content - msg; + handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL; if (handler) { rv = handler(conn, seq, mtype, content, rem_size); @@ -1308,80 +1414,47 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s rem_size -= rv; msg = content + rv; } else { - return msg_size-rem_size-1; + return msg_size - rem_size - 1; } } return msg_size; } -int ecp_seq_item_init(ECPSeqItem *seq_item) { - memset(seq_item, 0, sizeof(ECPSeqItem)); - - return ECP_OK; -} - void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) { payload[ECP_SIZE_PLD_HDR] = mtype; } -unsigned char ecp_pld_get_type(unsigned char *payload) { - return payload[ECP_SIZE_PLD_HDR]; -} - -unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { - size_t offset = 0; - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); - - return payload + ECP_SIZE_PLD_HDR + 1 + offset; -} - -unsigned char ecp_msg_get_type(unsigned char *msg) { - return msg[0]; -} - -unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) { - size_t offset = 0; - unsigned char mtype = msg[0]; - - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t); - - if (msg_size < 1 + offset) return NULL; - return msg + 1 + offset; -} - -int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) { - unsigned char mtype = msg[0]; +int ecp_pld_set_frag(unsigned char *payload, unsigned char mtype, unsigned char frag_cnt, unsigned char frag_tot) { + size_t offset = ECP_SIZE_PLD_HDR + 1; if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR; - if (msg_size < 3) return ECP_ERR; - - *frag_cnt = msg[1]; - *frag_tot = msg[2]; - + payload[offset] = frag_cnt; + payload[offset + 1] = frag_tot; return ECP_OK; } -int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) { - size_t offset = 0; - unsigned char mtype = msg[0]; +int ecp_pld_set_pts(unsigned char *payload, unsigned char mtype, ecp_pts_t pts) { + size_t offset = ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FRAG(mtype); if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR; - if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2; - if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR; - - *pts = \ - (msg[1 + offset] << 24) | \ - (msg[2 + offset] << 16) | \ - (msg[3 + offset] << 8) | \ - (msg[4 + offset]); + payload[offset] = (pts & 0xFF000000) >> 24; + payload[offset + 1] = (pts & 0x00FF0000) >> 16; + payload[offset + 2] = (pts & 0x0000FF00) >> 8; + payload[offset + 3] = (pts & 0x000000FF); return ECP_OK; } +unsigned char ecp_pld_get_type(unsigned char *payload) { + return payload[ECP_SIZE_PLD_HDR]; +} + +unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) { + return payload + ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FLAG(mtype); +} + static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; @@ -1438,8 +1511,60 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *add return ecp_pkt_send(sock, parent ? &_addr : addr, packet, rv); } -ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) { - return ecp_pld_send(conn, payload, payload_size); +ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *content, size_t content_size) { + ssize_t rv = 0; + unsigned char payload[ECP_MAX_PLD]; + int pkt_cnt = 0; + int vc_cnt = conn->pcount + 1; + size_t size_max = ECP_MAX_PKT - (ECP_SIZE_PKT_HDR + ECP_AEAD_SIZE_TAG + ECP_SIZE_SEQ + 1) * vc_cnt; + + if (content_size + ECP_SIZE_MT_FLAG(mtype) > size_max) { + int i; + int _rv = ECP_OK; + size_t frag_size, frag_size_final; + ecp_seq_t seq_start = 0; + ECPSeqItem seq_item; + + _rv = ecp_seq_item_init(&seq_item); + if (_rv) return _rv; + + mtype |= ECP_MTYPE_FLAG_FRAG; + frag_size = size_max - ECP_SIZE_MT_FLAG(mtype); + pkt_cnt = content_size / frag_size; + frag_size_final = content_size - frag_size * pkt_cnt; + if (frag_size_final) pkt_cnt++; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&conn->mutex); +#endif + + seq_start = conn->seq_out + 1; + conn->seq_out += pkt_cnt; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&conn->mutex); +#endif + + seq_item.seq_w = 1; + for (i=0; i