diff options
Diffstat (limited to 'ecp/src/ecp/ext')
-rw-r--r-- | ecp/src/ecp/ext/Makefile | 14 | ||||
-rw-r--r-- | ecp/src/ecp/ext/frag.c | 126 | ||||
-rw-r--r-- | ecp/src/ecp/ext/frag.h | 7 | ||||
-rw-r--r-- | ecp/src/ecp/ext/msgq.c | 19 | ||||
-rw-r--r-- | ecp/src/ecp/ext/msgq.h | 3 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf.c | 52 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf.h | 16 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf_ext.c (renamed from ecp/src/ecp/ext/ext.c) | 46 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf_recv.c | 33 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf_send.c | 17 |
10 files changed, 192 insertions, 141 deletions
diff --git a/ecp/src/ecp/ext/Makefile b/ecp/src/ecp/ext/Makefile index 0e61e83..b20258f 100644 --- a/ecp/src/ecp/ext/Makefile +++ b/ecp/src/ecp/ext/Makefile @@ -1,6 +1,18 @@ include ../common.mk -obj = ext.o frag.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o +obj = + +ifeq ($(with_frag),yes) +obj += frag.o +endif + +ifeq ($(with_rbuf),yes) +obj += rbuf.o rbuf_send.o rbuf_recv.o rbuf_ext.o +endif + +ifeq ($(with_msgq),yes) +obj += msgq.o +endif %.o: %.c diff --git a/ecp/src/ecp/ext/frag.c b/ecp/src/ecp/ext/frag.c index 58941ba..8795470 100644 --- a/ecp/src/ecp/ext/frag.c +++ b/ecp/src/ecp/ext/frag.c @@ -1,12 +1,11 @@ #include <stdlib.h> #include <string.h> -#include <core.h> +#include <ecp/core.h> -#include "rbuf.h" #include "frag.h" -int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size) { +int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size) { memset(iter, 0, sizeof(ECPFragIter)); iter->buffer = buffer; iter->buf_size = buf_size; @@ -15,17 +14,64 @@ int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer return ECP_OK; } -ECPFragIter *ecp_frag_iter_get(ECPRBConn *conn) { - return conn->iter; -} - void ecp_frag_iter_reset(ECPFragIter *iter) { iter->seq = 0; iter->frag_cnt = 0; iter->pld_size = 0; } -ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) { +ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size) { + ECPFragIter *iter = conn->iter; + unsigned char *msg; + unsigned char frag_cnt, frag_tot; + uint16_t frag_size; + size_t hdr_size, msg_size; + size_t buf_offset; + int rv; + + rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, &frag_size); + if (rv) return ECP_ERR; + + msg = ecp_pld_get_msg(payload, pld_size); + if (msg == NULL) return ECP_ERR; + hdr_size = msg - payload; + + msg_size = pld_size - hdr_size; + if (msg_size == 0) return ECP_ERR; + + if (iter->pld_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter); + + if (iter->pld_size == 0) { + iter->seq = seq - frag_cnt; + iter->frag_cnt = 0; + } + + mtype &= ~ECP_MTYPE_FLAG_FRAG; + buf_offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) + frag_size * frag_cnt; + if (buf_offset + msg_size > iter->buf_size) return ECP_ERR_SIZE; + memcpy(iter->buffer + buf_offset, msg, msg_size); + + if (frag_cnt == 0) { + if (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) > iter->buf_size) return ECP_ERR_SIZE; + + iter->buffer[0] = mtype; + if (ECP_SIZE_MT_FLAG(mtype)) { + memcpy(iter->buffer + ECP_SIZE_MTYPE, payload + ECP_SIZE_MTYPE, ECP_SIZE_MT_FLAG(mtype)); + } + msg_size += ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype); + } + + iter->frag_cnt++; + iter->pld_size += msg_size; + if (iter->frag_cnt == frag_tot) { + ecp_pld_handle_one(conn, iter->seq, iter->buffer, iter->pld_size, NULL); + ecp_frag_iter_reset(iter); + } + + return pld_size; +} + +ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) { unsigned char *msg_buf; unsigned char *pld_buf; size_t pld_size; @@ -72,58 +118,26 @@ ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *ms return msg_size; } -ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size) { - ECPRBConn *_conn = NULL; - ECPFragIter *iter = NULL; - unsigned char *msg; - unsigned char frag_cnt, frag_tot; - uint16_t frag_size; - size_t hdr_size, msg_size; - size_t buf_offset; - int rv; - - _conn = ecp_rbuf_get_rbconn(conn); - if (_conn) iter = ecp_frag_iter_get(_conn); - if (iter == NULL) ECP_ERR; - - rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, &frag_size); - if (rv) return ECP_ERR; - - msg = ecp_pld_get_msg(payload, pld_size); - if (msg == NULL) return ECP_ERR; - hdr_size = msg - payload; - - msg_size = pld_size - hdr_size; - if (msg_size == 0) return ECP_ERR; - - if (iter->pld_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter); - - if (iter->pld_size == 0) { - iter->seq = seq - frag_cnt; - iter->frag_cnt = 0; - } - - mtype &= ~ECP_MTYPE_FLAG_FRAG; - buf_offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) + frag_size * frag_cnt; - if (buf_offset + msg_size > iter->buf_size) return ECP_ERR_SIZE; - memcpy(iter->buffer + buf_offset, msg, msg_size); +ssize_t ecp_ext_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) { + if (conn->iter) { + unsigned char mtype; + int rv; - if (frag_cnt == 0) { - if (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) > iter->buf_size) return ECP_ERR_SIZE; + rv = ecp_pld_get_type(payload, pld_size, &mtype); + if (rv) return rv; - iter->buffer[0] = mtype; - if (ECP_SIZE_MT_FLAG(mtype)) { - memcpy(iter->buffer + ECP_SIZE_MTYPE, payload + ECP_SIZE_MTYPE, ECP_SIZE_MT_FLAG(mtype)); + if (mtype & ECP_MTYPE_FLAG_FRAG) { + return ecp_pld_defrag(conn, seq, mtype, payload, pld_size); } - msg_size += ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype); } - iter->frag_cnt++; - iter->pld_size += msg_size; - if (iter->frag_cnt == frag_tot) { - ecp_pld_handle_one(conn, iter->seq, iter->buffer, iter->pld_size, NULL); - ecp_frag_iter_reset(iter); - } + return 0; +} - return pld_size; +ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) { + if (ECP_SIZE_PKT_BUF(msg_size, mtype, conn) > ECP_MAX_PKT) { + return ecp_msg_send_wfrag(conn, mtype, msg, msg_size, packet, payload); + } else { + return 0; + } } diff --git a/ecp/src/ecp/ext/frag.h b/ecp/src/ecp/ext/frag.h index 1400c1d..2bf35d1 100644 --- a/ecp/src/ecp/ext/frag.h +++ b/ecp/src/ecp/ext/frag.h @@ -6,8 +6,7 @@ typedef struct ECPFragIter { size_t pld_size; } ECPFragIter; -int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size); -ECPFragIter *ecp_frag_iter_get(ECPRBConn *conn); +int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size); void ecp_frag_iter_reset(ECPFragIter *iter); -ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload); -ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size);
\ No newline at end of file +ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size); +ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload); diff --git a/ecp/src/ecp/ext/msgq.c b/ecp/src/ecp/ext/msgq.c index e383b03..7172e51 100644 --- a/ecp/src/ecp/ext/msgq.c +++ b/ecp/src/ecp/ext/msgq.c @@ -2,7 +2,7 @@ #include <stdlib.h> #include <string.h> -#include <core.h> +#include <ecp/core.h> #include "rbuf.h" #include "msgq.h" @@ -21,13 +21,15 @@ static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) { return ts; } -int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { - int i; - int rv; +void ecp_msgq_init(ECPRBConn *conn, ECPMsgQ *msgq) { + memset(msgq, 0, sizeof(ECPMsgQ)); - if (conn->recv == NULL) return ECP_ERR; + conn->recv->msgq = msgq; +} - memset(msgq, 0, sizeof(ECPMsgQ)); +int ecp_msgq_create(ECPRBConn *conn) { + ECPMsgQ *msgq = conn->recv->msgq; + int i, rv; for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { rv = pthread_cond_init(&msgq->cond[i], NULL); @@ -40,7 +42,6 @@ int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { return ECP_ERR; } } - conn->recv->msgq = msgq; return ECP_OK; } @@ -52,8 +53,6 @@ void ecp_msgq_destroy(ECPRBConn *conn) { for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { pthread_cond_destroy(&msgq->cond[i]); } - - conn->recv->msgq = NULL; } void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) { @@ -117,6 +116,8 @@ ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, s msg_size = pld_size - hdr_size; rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.pld[idx].flags == 0); + msgq->idx_r[mtype]++; if (msgq->seq_start == seq) { int i; diff --git a/ecp/src/ecp/ext/msgq.h b/ecp/src/ecp/ext/msgq.h index dddb6e7..669a5af 100644 --- a/ecp/src/ecp/ext/msgq.h +++ b/ecp/src/ecp/ext/msgq.h @@ -11,7 +11,8 @@ typedef struct ECPMsgQ { pthread_cond_t cond[ECP_MSGQ_MAX_MTYPE]; } ECPMsgQ; -int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq); +void ecp_msgq_init(ECPRBConn *conn, ECPMsgQ *msgq); +int ecp_msgq_create(ECPRBConn *conn); void ecp_msgq_destroy(ECPRBConn *conn); void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq); int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype); diff --git a/ecp/src/ecp/ext/rbuf.c b/ecp/src/ecp/ext/rbuf.c index 70ee0d2..e1fab7b 100644 --- a/ecp/src/ecp/ext/rbuf.c +++ b/ecp/src/ecp/ext/rbuf.c @@ -1,6 +1,6 @@ #include <stdlib.h> -#include <core.h> +#include <ecp/core.h> #include "rbuf.h" @@ -28,41 +28,35 @@ int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) { return ECP_OK; } -void ecp_rbuf_conn_init(ECPRBConn *conn) { +void ecp_rbuf_init(ECPRBConn *conn) { ECPConnection *_conn = ecp_rbuf_get_conn(conn); ecp_conn_set_flags(_conn, ECP_CONN_FLAG_RBUF); conn->send = NULL; conn->recv = NULL; - conn->iter = NULL; } -int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); +int ecp_rbuf_create(ECPRBConn *conn) { int rv; - rv = ecp_conn_create(_conn, sock, type); - if (rv) return rv; - - ecp_rbuf_conn_init(conn); - return ECP_OK; -} - -int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { - ECPConnection *_conn = ecp_rbuf_get_conn(conn); - int rv; - - rv = ecp_conn_create_inb(_conn, sock, type); - if (rv) return rv; + if (conn->send) { + rv = ecp_rbsend_create(conn); + if (rv) return rv; + } + if (conn->recv) { + rv = ecp_rbrecv_create(conn); + if (rv) { + if (conn->send) ecp_rbsend_destroy(conn); + return rv; + } + } - ecp_rbuf_conn_init(conn); return ECP_OK; } void ecp_rbuf_destroy(ECPRBConn *conn) { if (conn->send) ecp_rbsend_destroy(conn); if (conn->recv) ecp_rbrecv_destroy(conn); - conn->iter = NULL; } void ecp_rbuf_start(ECPRBConn *conn) { @@ -71,14 +65,32 @@ void ecp_rbuf_start(ECPRBConn *conn) { if (conn->send) { ecp_seq_t seq_out; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&_conn->mutex); +#endif + seq_out = (ecp_seq_t)(_conn->nonce_out); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&_conn->mutex); +#endif + ecp_rbsend_start(conn, seq_out); } if (conn->recv) { ecp_seq_t seq_in; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&_conn->mutex); +#endif + seq_in = (ecp_seq_t)(_conn->nonce_in); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&_conn->mutex); +#endif + ecp_rbrecv_start(conn, seq_in); } } diff --git a/ecp/src/ecp/ext/rbuf.h b/ecp/src/ecp/ext/rbuf.h index 36ff963..bf6e6df 100644 --- a/ecp/src/ecp/ext/rbuf.h +++ b/ecp/src/ecp/ext/rbuf.h @@ -87,16 +87,14 @@ typedef struct ECPRBConn { ECPConnection b; ECPRBRecv *recv; ECPRBSend *send; - struct ECPFragIter *iter; } ECPRBConn; ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn); ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn); void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx); -void ecp_rbuf_conn_init(ECPRBConn *conn); -int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type); -int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type); +void ecp_rbuf_init(ECPRBConn *conn); +int ecp_rbuf_create(ECPRBConn *conn); void ecp_rbuf_destroy(ECPRBConn *conn); void ecp_rbuf_start(ECPRBConn *conn); ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs); @@ -105,19 +103,23 @@ int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err); /* send */ ssize_t ecp_rbuf_send_flush(ECPRBConn *conn); ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size); -int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size); +void ecp_rbsend_init(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size); +int ecp_rbsend_create(ECPRBConn *conn); void ecp_rbsend_destroy(ECPRBConn *conn); void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq); int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size); int ecp_rbuf_flush(ECPRBConn *conn); ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti); +/* recv */ ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size); ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn); -void ecp_rbuf_handle_timer(ECPRBConn *conn) ; -int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size); +void ecp_rbuf_handle_timer(ECPRBConn *conn); +void ecp_rbrecv_init(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size); +int ecp_rbrecv_create(ECPRBConn *conn); void ecp_rbrecv_destroy(ECPRBConn *conn); void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq); + int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max); int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay); ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size); diff --git a/ecp/src/ecp/ext/ext.c b/ecp/src/ecp/ext/rbuf_ext.c index d407e80..67b6a8d 100644 --- a/ecp/src/ecp/ext/ext.c +++ b/ecp/src/ecp/ext/rbuf_ext.c @@ -1,10 +1,9 @@ #include <stdlib.h> -#include <core.h> -#include <ext.h> +#include <ecp/core.h> +#include <ecp/ext.h> #include "rbuf.h" -#include "frag.h" int ecp_ext_err_handle(ECPConnection *conn, unsigned char mtype, int err) { ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); @@ -13,44 +12,41 @@ int ecp_ext_err_handle(ECPConnection *conn, unsigned char mtype, int err) { return ECP_PASS; } -int ecp_ext_conn_open(ECPConnection *conn) { +int ecp_ext_conn_create(ECPConnection *conn) { ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); + int rv = ECP_OK; - if (_conn) ecp_rbuf_start(_conn); - return ECP_OK; + if (_conn) rv = ecp_rbuf_create(_conn); + return rv; } void ecp_ext_conn_destroy(ECPConnection *conn) { ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); + if (_conn) ecp_rbuf_destroy(_conn); } -ssize_t ecp_ext_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) { +int ecp_ext_conn_open(ECPConnection *conn) { ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); - if (_conn) return ecp_rbuf_msg_handle(_conn, seq, mtype, msg, msg_size, bufs); - return 0; + if (_conn) ecp_rbuf_start(_conn); + return ECP_OK; } -ssize_t ecp_ext_pld_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) { +void ecp_ext_conn_close(ECPConnection *conn) {} + +ssize_t ecp_ext_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) { ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); - if (_conn && _conn->recv) return ecp_rbuf_store(_conn, seq, payload, pld_size); + if (_conn) return ecp_rbuf_msg_handle(_conn, seq, mtype, msg, msg_size, bufs); return 0; } ssize_t ecp_ext_pld_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) { - unsigned char mtype; - int rv; - - rv = ecp_pld_get_type(payload, pld_size, &mtype); - if (rv) return rv; + ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn); - if (mtype & ECP_MTYPE_FLAG_FRAG) { - return ecp_pld_defrag(conn, seq, mtype, payload, pld_size); - } else { - return 0; - } + if (_conn && _conn->recv) return ecp_rbuf_store(_conn, seq, payload, pld_size); + return 0; } ssize_t ecp_ext_pld_send(ECPConnection *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ecp_tr_addr_t *addr) { @@ -59,11 +55,3 @@ ssize_t ecp_ext_pld_send(ECPConnection *conn, ECPBuffer *payload, size_t pld_siz if (_conn && _conn->send) return ecp_rbuf_pld_send(_conn, payload, pld_size, packet, pkt_size, ti); return 0; } - -ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) { - if (ECP_SIZE_PKT_BUF(msg_size, mtype, conn) > ECP_MAX_PKT) { - return ecp_msg_frag(conn, mtype, msg, msg_size, packet, payload); - } else { - return 0; - } -} diff --git a/ecp/src/ecp/ext/rbuf_recv.c b/ecp/src/ecp/ext/rbuf_recv.c index 81327d5..b981e3f 100644 --- a/ecp/src/ecp/ext/rbuf_recv.c +++ b/ecp/src/ecp/ext/rbuf_recv.c @@ -1,8 +1,8 @@ #include <stdlib.h> #include <string.h> -#include <core.h> -#include <tm.h> +#include <ecp/core.h> +#include <ecp/tm.h> #include "rbuf.h" @@ -89,7 +89,7 @@ static void msg_flush(ECPRBConn *conn) { rv = ecp_pld_get_pts(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &msg_pts); if (!rv && buf->deliver_delay) { - ecp_sts_t now = ecp_tm_abstime_ms(0); + ecp_sts_t now = ecp_tm_get_ms(); msg_pts += buf->deliver_delay; if (ECP_PTS_LT(now, msg_pts)) { @@ -122,6 +122,7 @@ static void msg_flush(ECPRBConn *conn) { rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_SKIP; } rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + // if (rbuf->arr.pld[idx].flags == 0); } else { if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; @@ -274,9 +275,8 @@ void ecp_rbuf_handle_timer(ECPRBConn *conn) { #endif } -int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) { +void ecp_rbrecv_init(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) { ECPRBuffer *rbuf = &buf->rbuf; - int rv; memset(buf, 0, sizeof(ECPRBRecv)); memset(pld, 0, sizeof(ECPRBPayload) * pld_size); @@ -286,12 +286,31 @@ int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsign rbuf->arr.pld = pld; rbuf->arr_size = pld_size; + conn->recv = buf; +} + +int ecp_rbrecv_create(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + int rv; + #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&buf->mutex, NULL); if (rv) return ECP_ERR; #endif - conn->recv = buf; +#ifdef ECP_WITH_MSGQ + if (buf->msgq) { + rv = ecp_msgq_create(conn); + if (rv) { +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif + + return ECP_ERR; + } + } +#endif + return ECP_OK; } @@ -305,8 +324,6 @@ void ecp_rbrecv_destroy(ECPRBConn *conn) { #ifdef ECP_WITH_MSGQ if (buf->msgq) ecp_msgq_destroy(conn); #endif - - conn->recv = NULL; } void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq) { diff --git a/ecp/src/ecp/ext/rbuf_send.c b/ecp/src/ecp/ext/rbuf_send.c index d3d2f04..40d526c 100644 --- a/ecp/src/ecp/ext/rbuf_send.c +++ b/ecp/src/ecp/ext/rbuf_send.c @@ -1,7 +1,7 @@ #include <stdlib.h> #include <string.h> -#include <core.h> +#include <ecp/core.h> #include "rbuf.h" @@ -43,6 +43,7 @@ static void cc_flush(ECPRBConn *conn, unsigned char flags) { } if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); } buf->seq_cc++; idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); @@ -209,6 +210,7 @@ ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size idx = rbuf->idx_start; for (i=0; i<msg_cnt; i++) { rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } rbuf->seq_start = seq_start; @@ -237,9 +239,8 @@ handle_ack_fin: return rsize; } -int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { +void ecp_rbsend_init(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { ECPRBuffer *rbuf = &buf->rbuf; - int rv; memset(buf, 0, sizeof(ECPRBRecv)); memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); @@ -247,12 +248,18 @@ int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigne rbuf->arr.pkt = pkt; rbuf->arr_size = pkt_size; + conn->send = buf; +} + +int ecp_rbsend_create(ECPRBConn *conn) { + ECPRBSend *buf = conn->send; + int rv; + #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&buf->mutex, NULL); if (rv) return ECP_ERR; #endif - conn->send = buf; return ECP_OK; } @@ -262,8 +269,6 @@ void ecp_rbsend_destroy(ECPRBConn *conn) { #ifdef ECP_WITH_PTHREAD pthread_mutex_destroy(&buf->mutex); #endif - - conn->send = NULL; } void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq) { |