summaryrefslogtreecommitdiff
path: root/ecp/src/ecp/ext
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/src/ecp/ext')
-rw-r--r--ecp/src/ecp/ext/Makefile14
-rw-r--r--ecp/src/ecp/ext/frag.c126
-rw-r--r--ecp/src/ecp/ext/frag.h7
-rw-r--r--ecp/src/ecp/ext/msgq.c19
-rw-r--r--ecp/src/ecp/ext/msgq.h3
-rw-r--r--ecp/src/ecp/ext/rbuf.c52
-rw-r--r--ecp/src/ecp/ext/rbuf.h16
-rw-r--r--ecp/src/ecp/ext/rbuf_ext.c (renamed from ecp/src/ecp/ext/ext.c)46
-rw-r--r--ecp/src/ecp/ext/rbuf_recv.c33
-rw-r--r--ecp/src/ecp/ext/rbuf_send.c17
10 files changed, 192 insertions, 141 deletions
diff --git a/ecp/src/ecp/ext/Makefile b/ecp/src/ecp/ext/Makefile
index 0e61e83..b20258f 100644
--- a/ecp/src/ecp/ext/Makefile
+++ b/ecp/src/ecp/ext/Makefile
@@ -1,6 +1,18 @@
include ../common.mk
-obj = ext.o frag.o rbuf.o rbuf_send.o rbuf_recv.o msgq.o
+obj =
+
+ifeq ($(with_frag),yes)
+obj += frag.o
+endif
+
+ifeq ($(with_rbuf),yes)
+obj += rbuf.o rbuf_send.o rbuf_recv.o rbuf_ext.o
+endif
+
+ifeq ($(with_msgq),yes)
+obj += msgq.o
+endif
%.o: %.c
diff --git a/ecp/src/ecp/ext/frag.c b/ecp/src/ecp/ext/frag.c
index 58941ba..8795470 100644
--- a/ecp/src/ecp/ext/frag.c
+++ b/ecp/src/ecp/ext/frag.c
@@ -1,12 +1,11 @@
#include <stdlib.h>
#include <string.h>
-#include <core.h>
+#include <ecp/core.h>
-#include "rbuf.h"
#include "frag.h"
-int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size) {
+int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size) {
memset(iter, 0, sizeof(ECPFragIter));
iter->buffer = buffer;
iter->buf_size = buf_size;
@@ -15,17 +14,64 @@ int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer
return ECP_OK;
}
-ECPFragIter *ecp_frag_iter_get(ECPRBConn *conn) {
- return conn->iter;
-}
-
void ecp_frag_iter_reset(ECPFragIter *iter) {
iter->seq = 0;
iter->frag_cnt = 0;
iter->pld_size = 0;
}
-ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
+ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size) {
+ ECPFragIter *iter = conn->iter;
+ unsigned char *msg;
+ unsigned char frag_cnt, frag_tot;
+ uint16_t frag_size;
+ size_t hdr_size, msg_size;
+ size_t buf_offset;
+ int rv;
+
+ rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, &frag_size);
+ if (rv) return ECP_ERR;
+
+ msg = ecp_pld_get_msg(payload, pld_size);
+ if (msg == NULL) return ECP_ERR;
+ hdr_size = msg - payload;
+
+ msg_size = pld_size - hdr_size;
+ if (msg_size == 0) return ECP_ERR;
+
+ if (iter->pld_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter);
+
+ if (iter->pld_size == 0) {
+ iter->seq = seq - frag_cnt;
+ iter->frag_cnt = 0;
+ }
+
+ mtype &= ~ECP_MTYPE_FLAG_FRAG;
+ buf_offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) + frag_size * frag_cnt;
+ if (buf_offset + msg_size > iter->buf_size) return ECP_ERR_SIZE;
+ memcpy(iter->buffer + buf_offset, msg, msg_size);
+
+ if (frag_cnt == 0) {
+ if (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) > iter->buf_size) return ECP_ERR_SIZE;
+
+ iter->buffer[0] = mtype;
+ if (ECP_SIZE_MT_FLAG(mtype)) {
+ memcpy(iter->buffer + ECP_SIZE_MTYPE, payload + ECP_SIZE_MTYPE, ECP_SIZE_MT_FLAG(mtype));
+ }
+ msg_size += ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype);
+ }
+
+ iter->frag_cnt++;
+ iter->pld_size += msg_size;
+ if (iter->frag_cnt == frag_tot) {
+ ecp_pld_handle_one(conn, iter->seq, iter->buffer, iter->pld_size, NULL);
+ ecp_frag_iter_reset(iter);
+ }
+
+ return pld_size;
+}
+
+ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
unsigned char *msg_buf;
unsigned char *pld_buf;
size_t pld_size;
@@ -72,58 +118,26 @@ ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *ms
return msg_size;
}
-ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size) {
- ECPRBConn *_conn = NULL;
- ECPFragIter *iter = NULL;
- unsigned char *msg;
- unsigned char frag_cnt, frag_tot;
- uint16_t frag_size;
- size_t hdr_size, msg_size;
- size_t buf_offset;
- int rv;
-
- _conn = ecp_rbuf_get_rbconn(conn);
- if (_conn) iter = ecp_frag_iter_get(_conn);
- if (iter == NULL) ECP_ERR;
-
- rv = ecp_pld_get_frag(payload, pld_size, &frag_cnt, &frag_tot, &frag_size);
- if (rv) return ECP_ERR;
-
- msg = ecp_pld_get_msg(payload, pld_size);
- if (msg == NULL) return ECP_ERR;
- hdr_size = msg - payload;
-
- msg_size = pld_size - hdr_size;
- if (msg_size == 0) return ECP_ERR;
-
- if (iter->pld_size && (iter->seq + frag_cnt != seq)) ecp_frag_iter_reset(iter);
-
- if (iter->pld_size == 0) {
- iter->seq = seq - frag_cnt;
- iter->frag_cnt = 0;
- }
-
- mtype &= ~ECP_MTYPE_FLAG_FRAG;
- buf_offset = ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) + frag_size * frag_cnt;
- if (buf_offset + msg_size > iter->buf_size) return ECP_ERR_SIZE;
- memcpy(iter->buffer + buf_offset, msg, msg_size);
+ssize_t ecp_ext_pld_handle_one(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
+ if (conn->iter) {
+ unsigned char mtype;
+ int rv;
- if (frag_cnt == 0) {
- if (ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype) > iter->buf_size) return ECP_ERR_SIZE;
+ rv = ecp_pld_get_type(payload, pld_size, &mtype);
+ if (rv) return rv;
- iter->buffer[0] = mtype;
- if (ECP_SIZE_MT_FLAG(mtype)) {
- memcpy(iter->buffer + ECP_SIZE_MTYPE, payload + ECP_SIZE_MTYPE, ECP_SIZE_MT_FLAG(mtype));
+ if (mtype & ECP_MTYPE_FLAG_FRAG) {
+ return ecp_pld_defrag(conn, seq, mtype, payload, pld_size);
}
- msg_size += ECP_SIZE_MTYPE + ECP_SIZE_MT_FLAG(mtype);
}
- iter->frag_cnt++;
- iter->pld_size += msg_size;
- if (iter->frag_cnt == frag_tot) {
- ecp_pld_handle_one(conn, iter->seq, iter->buffer, iter->pld_size, NULL);
- ecp_frag_iter_reset(iter);
- }
+ return 0;
+}
- return pld_size;
+ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
+ if (ECP_SIZE_PKT_BUF(msg_size, mtype, conn) > ECP_MAX_PKT) {
+ return ecp_msg_send_wfrag(conn, mtype, msg, msg_size, packet, payload);
+ } else {
+ return 0;
+ }
}
diff --git a/ecp/src/ecp/ext/frag.h b/ecp/src/ecp/ext/frag.h
index 1400c1d..2bf35d1 100644
--- a/ecp/src/ecp/ext/frag.h
+++ b/ecp/src/ecp/ext/frag.h
@@ -6,8 +6,7 @@ typedef struct ECPFragIter {
size_t pld_size;
} ECPFragIter;
-int ecp_frag_iter_init(ECPRBConn *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size);
-ECPFragIter *ecp_frag_iter_get(ECPRBConn *conn);
+int ecp_frag_iter_init(ECPConnection *conn, ECPFragIter *iter, unsigned char *buffer, size_t buf_size);
void ecp_frag_iter_reset(ECPFragIter *iter);
-ssize_t ecp_msg_frag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload);
-ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size); \ No newline at end of file
+ssize_t ecp_pld_defrag(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *payload, size_t pld_size);
+ssize_t ecp_msg_send_wfrag(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload);
diff --git a/ecp/src/ecp/ext/msgq.c b/ecp/src/ecp/ext/msgq.c
index e383b03..7172e51 100644
--- a/ecp/src/ecp/ext/msgq.c
+++ b/ecp/src/ecp/ext/msgq.c
@@ -2,7 +2,7 @@
#include <stdlib.h>
#include <string.h>
-#include <core.h>
+#include <ecp/core.h>
#include "rbuf.h"
#include "msgq.h"
@@ -21,13 +21,15 @@ static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) {
return ts;
}
-int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) {
- int i;
- int rv;
+void ecp_msgq_init(ECPRBConn *conn, ECPMsgQ *msgq) {
+ memset(msgq, 0, sizeof(ECPMsgQ));
- if (conn->recv == NULL) return ECP_ERR;
+ conn->recv->msgq = msgq;
+}
- memset(msgq, 0, sizeof(ECPMsgQ));
+int ecp_msgq_create(ECPRBConn *conn) {
+ ECPMsgQ *msgq = conn->recv->msgq;
+ int i, rv;
for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
rv = pthread_cond_init(&msgq->cond[i], NULL);
@@ -40,7 +42,6 @@ int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) {
return ECP_ERR;
}
}
- conn->recv->msgq = msgq;
return ECP_OK;
}
@@ -52,8 +53,6 @@ void ecp_msgq_destroy(ECPRBConn *conn) {
for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) {
pthread_cond_destroy(&msgq->cond[i]);
}
-
- conn->recv->msgq = NULL;
}
void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) {
@@ -117,6 +116,8 @@ ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, s
msg_size = pld_size - hdr_size;
rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ;
+ // if (rbuf->arr.pld[idx].flags == 0);
+
msgq->idx_r[mtype]++;
if (msgq->seq_start == seq) {
int i;
diff --git a/ecp/src/ecp/ext/msgq.h b/ecp/src/ecp/ext/msgq.h
index dddb6e7..669a5af 100644
--- a/ecp/src/ecp/ext/msgq.h
+++ b/ecp/src/ecp/ext/msgq.h
@@ -11,7 +11,8 @@ typedef struct ECPMsgQ {
pthread_cond_t cond[ECP_MSGQ_MAX_MTYPE];
} ECPMsgQ;
-int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq);
+void ecp_msgq_init(ECPRBConn *conn, ECPMsgQ *msgq);
+int ecp_msgq_create(ECPRBConn *conn);
void ecp_msgq_destroy(ECPRBConn *conn);
void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq);
int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype);
diff --git a/ecp/src/ecp/ext/rbuf.c b/ecp/src/ecp/ext/rbuf.c
index 70ee0d2..e1fab7b 100644
--- a/ecp/src/ecp/ext/rbuf.c
+++ b/ecp/src/ecp/ext/rbuf.c
@@ -1,6 +1,6 @@
#include <stdlib.h>
-#include <core.h>
+#include <ecp/core.h>
#include "rbuf.h"
@@ -28,41 +28,35 @@ int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) {
return ECP_OK;
}
-void ecp_rbuf_conn_init(ECPRBConn *conn) {
+void ecp_rbuf_init(ECPRBConn *conn) {
ECPConnection *_conn = ecp_rbuf_get_conn(conn);
ecp_conn_set_flags(_conn, ECP_CONN_FLAG_RBUF);
conn->send = NULL;
conn->recv = NULL;
- conn->iter = NULL;
}
-int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type) {
- ECPConnection *_conn = ecp_rbuf_get_conn(conn);
+int ecp_rbuf_create(ECPRBConn *conn) {
int rv;
- rv = ecp_conn_create(_conn, sock, type);
- if (rv) return rv;
-
- ecp_rbuf_conn_init(conn);
- return ECP_OK;
-}
-
-int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type) {
- ECPConnection *_conn = ecp_rbuf_get_conn(conn);
- int rv;
-
- rv = ecp_conn_create_inb(_conn, sock, type);
- if (rv) return rv;
+ if (conn->send) {
+ rv = ecp_rbsend_create(conn);
+ if (rv) return rv;
+ }
+ if (conn->recv) {
+ rv = ecp_rbrecv_create(conn);
+ if (rv) {
+ if (conn->send) ecp_rbsend_destroy(conn);
+ return rv;
+ }
+ }
- ecp_rbuf_conn_init(conn);
return ECP_OK;
}
void ecp_rbuf_destroy(ECPRBConn *conn) {
if (conn->send) ecp_rbsend_destroy(conn);
if (conn->recv) ecp_rbrecv_destroy(conn);
- conn->iter = NULL;
}
void ecp_rbuf_start(ECPRBConn *conn) {
@@ -71,14 +65,32 @@ void ecp_rbuf_start(ECPRBConn *conn) {
if (conn->send) {
ecp_seq_t seq_out;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&_conn->mutex);
+#endif
+
seq_out = (ecp_seq_t)(_conn->nonce_out);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&_conn->mutex);
+#endif
+
ecp_rbsend_start(conn, seq_out);
}
if (conn->recv) {
ecp_seq_t seq_in;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&_conn->mutex);
+#endif
+
seq_in = (ecp_seq_t)(_conn->nonce_in);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&_conn->mutex);
+#endif
+
ecp_rbrecv_start(conn, seq_in);
}
}
diff --git a/ecp/src/ecp/ext/rbuf.h b/ecp/src/ecp/ext/rbuf.h
index 36ff963..bf6e6df 100644
--- a/ecp/src/ecp/ext/rbuf.h
+++ b/ecp/src/ecp/ext/rbuf.h
@@ -87,16 +87,14 @@ typedef struct ECPRBConn {
ECPConnection b;
ECPRBRecv *recv;
ECPRBSend *send;
- struct ECPFragIter *iter;
} ECPRBConn;
ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn);
ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn);
void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx);
-void ecp_rbuf_conn_init(ECPRBConn *conn);
-int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type);
-int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type);
+void ecp_rbuf_init(ECPRBConn *conn);
+int ecp_rbuf_create(ECPRBConn *conn);
void ecp_rbuf_destroy(ECPRBConn *conn);
void ecp_rbuf_start(ECPRBConn *conn);
ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs);
@@ -105,19 +103,23 @@ int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err);
/* send */
ssize_t ecp_rbuf_send_flush(ECPRBConn *conn);
ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size);
-int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size);
+void ecp_rbsend_init(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size);
+int ecp_rbsend_create(ECPRBConn *conn);
void ecp_rbsend_destroy(ECPRBConn *conn);
void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq);
int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size);
int ecp_rbuf_flush(ECPRBConn *conn);
ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti);
+/* recv */
ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size);
ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn);
-void ecp_rbuf_handle_timer(ECPRBConn *conn) ;
-int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size);
+void ecp_rbuf_handle_timer(ECPRBConn *conn);
+void ecp_rbrecv_init(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size);
+int ecp_rbrecv_create(ECPRBConn *conn);
void ecp_rbrecv_destroy(ECPRBConn *conn);
void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq);
+
int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max);
int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay);
ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size);
diff --git a/ecp/src/ecp/ext/ext.c b/ecp/src/ecp/ext/rbuf_ext.c
index d407e80..67b6a8d 100644
--- a/ecp/src/ecp/ext/ext.c
+++ b/ecp/src/ecp/ext/rbuf_ext.c
@@ -1,10 +1,9 @@
#include <stdlib.h>
-#include <core.h>
-#include <ext.h>
+#include <ecp/core.h>
+#include <ecp/ext.h>
#include "rbuf.h"
-#include "frag.h"
int ecp_ext_err_handle(ECPConnection *conn, unsigned char mtype, int err) {
ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
@@ -13,44 +12,41 @@ int ecp_ext_err_handle(ECPConnection *conn, unsigned char mtype, int err) {
return ECP_PASS;
}
-int ecp_ext_conn_open(ECPConnection *conn) {
+int ecp_ext_conn_create(ECPConnection *conn) {
ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
+ int rv = ECP_OK;
- if (_conn) ecp_rbuf_start(_conn);
- return ECP_OK;
+ if (_conn) rv = ecp_rbuf_create(_conn);
+ return rv;
}
void ecp_ext_conn_destroy(ECPConnection *conn) {
ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
+
if (_conn) ecp_rbuf_destroy(_conn);
}
-ssize_t ecp_ext_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) {
+int ecp_ext_conn_open(ECPConnection *conn) {
ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
- if (_conn) return ecp_rbuf_msg_handle(_conn, seq, mtype, msg, msg_size, bufs);
- return 0;
+ if (_conn) ecp_rbuf_start(_conn);
+ return ECP_OK;
}
-ssize_t ecp_ext_pld_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
+void ecp_ext_conn_close(ECPConnection *conn) {}
+
+ssize_t ecp_ext_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) {
ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
- if (_conn && _conn->recv) return ecp_rbuf_store(_conn, seq, payload, pld_size);
+ if (_conn) return ecp_rbuf_msg_handle(_conn, seq, mtype, msg, msg_size, bufs);
return 0;
}
ssize_t ecp_ext_pld_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *payload, size_t pld_size, ECP2Buffer *bufs) {
- unsigned char mtype;
- int rv;
-
- rv = ecp_pld_get_type(payload, pld_size, &mtype);
- if (rv) return rv;
+ ECPRBConn *_conn = ecp_rbuf_get_rbconn(conn);
- if (mtype & ECP_MTYPE_FLAG_FRAG) {
- return ecp_pld_defrag(conn, seq, mtype, payload, pld_size);
- } else {
- return 0;
- }
+ if (_conn && _conn->recv) return ecp_rbuf_store(_conn, seq, payload, pld_size);
+ return 0;
}
ssize_t ecp_ext_pld_send(ECPConnection *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ecp_tr_addr_t *addr) {
@@ -59,11 +55,3 @@ ssize_t ecp_ext_pld_send(ECPConnection *conn, ECPBuffer *payload, size_t pld_siz
if (_conn && _conn->send) return ecp_rbuf_pld_send(_conn, payload, pld_size, packet, pkt_size, ti);
return 0;
}
-
-ssize_t ecp_ext_msg_send(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ECPBuffer *packet, ECPBuffer *payload) {
- if (ECP_SIZE_PKT_BUF(msg_size, mtype, conn) > ECP_MAX_PKT) {
- return ecp_msg_frag(conn, mtype, msg, msg_size, packet, payload);
- } else {
- return 0;
- }
-}
diff --git a/ecp/src/ecp/ext/rbuf_recv.c b/ecp/src/ecp/ext/rbuf_recv.c
index 81327d5..b981e3f 100644
--- a/ecp/src/ecp/ext/rbuf_recv.c
+++ b/ecp/src/ecp/ext/rbuf_recv.c
@@ -1,8 +1,8 @@
#include <stdlib.h>
#include <string.h>
-#include <core.h>
-#include <tm.h>
+#include <ecp/core.h>
+#include <ecp/tm.h>
#include "rbuf.h"
@@ -89,7 +89,7 @@ static void msg_flush(ECPRBConn *conn) {
rv = ecp_pld_get_pts(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &msg_pts);
if (!rv && buf->deliver_delay) {
- ecp_sts_t now = ecp_tm_abstime_ms(0);
+ ecp_sts_t now = ecp_tm_get_ms();
msg_pts += buf->deliver_delay;
if (ECP_PTS_LT(now, msg_pts)) {
@@ -122,6 +122,7 @@ static void msg_flush(ECPRBConn *conn) {
rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_SKIP;
}
rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;
+ // if (rbuf->arr.pld[idx].flags == 0);
} else {
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break;
if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break;
@@ -274,9 +275,8 @@ void ecp_rbuf_handle_timer(ECPRBConn *conn) {
#endif
}
-int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) {
+void ecp_rbrecv_init(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) {
ECPRBuffer *rbuf = &buf->rbuf;
- int rv;
memset(buf, 0, sizeof(ECPRBRecv));
memset(pld, 0, sizeof(ECPRBPayload) * pld_size);
@@ -286,12 +286,31 @@ int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsign
rbuf->arr.pld = pld;
rbuf->arr_size = pld_size;
+ conn->recv = buf;
+}
+
+int ecp_rbrecv_create(ECPRBConn *conn) {
+ ECPRBRecv *buf = conn->recv;
+ int rv;
+
#ifdef ECP_WITH_PTHREAD
rv = pthread_mutex_init(&buf->mutex, NULL);
if (rv) return ECP_ERR;
#endif
- conn->recv = buf;
+#ifdef ECP_WITH_MSGQ
+ if (buf->msgq) {
+ rv = ecp_msgq_create(conn);
+ if (rv) {
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_destroy(&buf->mutex);
+#endif
+
+ return ECP_ERR;
+ }
+ }
+#endif
+
return ECP_OK;
}
@@ -305,8 +324,6 @@ void ecp_rbrecv_destroy(ECPRBConn *conn) {
#ifdef ECP_WITH_MSGQ
if (buf->msgq) ecp_msgq_destroy(conn);
#endif
-
- conn->recv = NULL;
}
void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq) {
diff --git a/ecp/src/ecp/ext/rbuf_send.c b/ecp/src/ecp/ext/rbuf_send.c
index d3d2f04..40d526c 100644
--- a/ecp/src/ecp/ext/rbuf_send.c
+++ b/ecp/src/ecp/ext/rbuf_send.c
@@ -1,7 +1,7 @@
#include <stdlib.h>
#include <string.h>
-#include <core.h>
+#include <ecp/core.h>
#include "rbuf.h"
@@ -43,6 +43,7 @@ static void cc_flush(ECPRBConn *conn, unsigned char flags) {
}
if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) {
rbuf->arr.pkt[idx].flags = 0;
+ // if (rbuf->arr.pkt[idx].flags == 0);
}
buf->seq_cc++;
idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
@@ -209,6 +210,7 @@ ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size
idx = rbuf->idx_start;
for (i=0; i<msg_cnt; i++) {
rbuf->arr.pkt[idx].flags = 0;
+ // if (rbuf->arr.pkt[idx].flags == 0);
idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size);
}
rbuf->seq_start = seq_start;
@@ -237,9 +239,8 @@ handle_ack_fin:
return rsize;
}
-int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) {
+void ecp_rbsend_init(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) {
ECPRBuffer *rbuf = &buf->rbuf;
- int rv;
memset(buf, 0, sizeof(ECPRBRecv));
memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size);
@@ -247,12 +248,18 @@ int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigne
rbuf->arr.pkt = pkt;
rbuf->arr_size = pkt_size;
+ conn->send = buf;
+}
+
+int ecp_rbsend_create(ECPRBConn *conn) {
+ ECPRBSend *buf = conn->send;
+ int rv;
+
#ifdef ECP_WITH_PTHREAD
rv = pthread_mutex_init(&buf->mutex, NULL);
if (rv) return ECP_ERR;
#endif
- conn->send = buf;
return ECP_OK;
}
@@ -262,8 +269,6 @@ void ecp_rbsend_destroy(ECPRBConn *conn) {
#ifdef ECP_WITH_PTHREAD
pthread_mutex_destroy(&buf->mutex);
#endif
-
- conn->send = NULL;
}
void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq) {