summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-09-06 18:19:00 +0200
committerUros Majstorovic <majstor@majstor.org>2017-09-06 18:19:00 +0200
commitc129b10bf7c851d94002767aa09e06c526cacb7d (patch)
treea0ca2c8d603a233c43c888dddb4ee541d4ba34e5
parentb83e58e21ea7dda57ddfda47bd1539d15abe687f (diff)
frad/defrag implemented
-rw-r--r--code/core/core.c249
-rw-r--r--code/core/core.h39
-rw-r--r--code/core/rbuf.h1
-rw-r--r--code/core/rbuf_recv.c4
-rw-r--r--code/core/rbuf_send.c9
-rw-r--r--code/test/basic.c4
-rw-r--r--code/test/client.c2
-rw-r--r--code/test/echo.c2
-rw-r--r--code/test/server.c2
-rw-r--r--code/test/stress.c6
-rw-r--r--code/test/vc_client.c2
-rw-r--r--code/test/vc_server.c2
-rw-r--r--code/test/voip.c2
-rw-r--r--code/vconn/vconn.c5
14 files changed, 237 insertions, 92 deletions
diff --git a/code/core/core.c b/code/core/core.c
index 8e21e82..ce607ef 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -1184,7 +1184,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
}
} else {
ecp_seq_t seq_offset = seq_p - seq_c;
- if (seq_offset < ECP_MAX_SEQ_FORWARD) {
+ if (seq_offset < ECP_MAX_SEQ_FWD) {
if (seq_offset < ECP_SIZE_ACKB) {
seq_map = seq_map << seq_offset;
} else {
@@ -1280,6 +1280,106 @@ ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, s
return rv;
}
+int ecp_seq_item_init(ECPSeqItem *seq_item) {
+ memset(seq_item, 0, sizeof(ECPSeqItem));
+
+ return ECP_OK;
+}
+
+int ecp_frag_iter_init(ECPFragIter *iter, unsigned char *buffer, size_t buf_size) {
+ memset(iter, 0, sizeof(ECPFragIter));
+ iter->buffer = buffer;
+ iter->buf_size = buf_size;
+
+ return ECP_OK;
+}
+
+unsigned char ecp_msg_get_type(unsigned char *msg) {
+ return msg[0];
+}
+
+unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) {
+ size_t offset = 1 + ECP_SIZE_MT_FLAG(msg[0]);
+
+ if (msg_size < offset) return NULL;
+ return msg + offset;
+}
+
+int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) {
+ if (!(msg[0] & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
+ if (msg_size < 3) return ECP_ERR;
+
+ *frag_cnt = msg[1];
+ *frag_tot = msg[2];
+
+ return ECP_OK;
+}
+
+int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) {
+ unsigned char mtype = msg[0];
+ size_t offset = 1 + ECP_SIZE_MT_FRAG(mtype);
+
+ if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR;
+ if (msg_size < offset + sizeof(ecp_pts_t)) return ECP_ERR;
+
+ *pts = \
+ (msg[offset] << 24) | \
+ (msg[offset + 1] << 16) | \
+ (msg[offset + 2] << 8) | \
+ (msg[offset + 3]);
+
+ return ECP_OK;
+}
+
+int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char *msg_in, size_t msg_in_size, unsigned char **msg_out, size_t *msg_out_size) {
+ unsigned char frag_cnt, frag_tot;
+ int rv = ecp_msg_get_frag(msg_in, msg_in_size, &frag_cnt, &frag_tot);
+ if (rv == ECP_OK) {
+ unsigned char mtype = ecp_msg_get_type(msg_in) & (~ECP_MTYPE_FLAG_FRAG);
+ unsigned char *content = NULL;
+ size_t content_size = 0;
+ size_t buf_offset = 0;
+
+ content = ecp_msg_get_content(msg_in, msg_in_size);
+ if (content == NULL) return ECP_ERR_MIN_MSG;
+
+ content_size = msg_in_size - (content - msg_in);
+ if (frag_cnt == 0) {
+ iter->seq = seq;
+ iter->frag_cnt = 0;
+ iter->frag_size = content_size;
+ iter->buffer[0] = mtype;
+ if (ECP_SIZE_MT_FLAG(mtype)) {
+ memcpy(iter->buffer + 1, msg_in, ECP_SIZE_MT_FLAG(mtype));
+ }
+ } else {
+ if (iter->seq + frag_cnt != seq) {
+ iter->seq = seq - frag_cnt;
+ return ECP_ERR_ITER;
+ }
+ if (iter->frag_cnt != frag_cnt) return ECP_ERR_ITER;
+ }
+ if ((1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_tot) > iter->buf_size) return ECP_ERR_SIZE;
+
+ buf_offset = 1 + ECP_SIZE_MT_FLAG(mtype) + iter->frag_size * frag_cnt;
+ memcpy(iter->buffer + buf_offset, content, content_size);
+ iter->frag_cnt++;
+
+ if (iter->frag_cnt == frag_tot) {
+ *msg_out = iter->buffer;
+ *msg_out_size = buf_offset + content_size;
+ return ECP_OK;
+ } else {
+ return ECP_ITER_NEXT;
+ }
+ } else {
+ iter->seq = seq;
+ *msg_out = msg_in;
+ *msg_out_size = msg_in_size;
+ return ECP_OK;
+ }
+}
+
ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ecp_conn_handler_msg_t *handler = NULL;
ssize_t rv = 0;
@@ -1287,18 +1387,24 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s
unsigned char *content = NULL;
size_t rem_size = msg_size;
- if (msg_size < 1) return ECP_ERR_MIN_MSG;
-
while (rem_size) {
mtype = ecp_msg_get_type(msg);
- content = ecp_msg_get_content(msg, rem_size);
- if (content == NULL) return ECP_ERR_MIN_MSG;
-
- rem_size -= content - msg;
-
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
ecp_timer_pop(conn, mtype);
+
+#ifdef ECP_WITH_RBUF
+ if (conn->rbuf.recv && conn->rbuf.recv->frag_iter) {
+ int _rv = ecp_msg_defrag(conn->rbuf.recv->frag_iter, seq, msg, msg_size, &msg, &rem_size);
+ if (_rv < 0) return _rv;
+ if (_rv == ECP_ITER_NEXT) return msg_size;
+ }
+#endif
+
+ content = ecp_msg_get_content(msg, rem_size);
+ if (content == NULL) return ECP_ERR_MIN_MSG;
+ rem_size -= content - msg;
+
handler = conn->sock->ctx->handler[conn->type] ? conn->sock->ctx->handler[conn->type]->msg[mtype & ECP_MTYPE_MASK] : NULL;
if (handler) {
rv = handler(conn, seq, mtype, content, rem_size);
@@ -1308,80 +1414,47 @@ ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, s
rem_size -= rv;
msg = content + rv;
} else {
- return msg_size-rem_size-1;
+ return msg_size - rem_size - 1;
}
}
return msg_size;
}
-int ecp_seq_item_init(ECPSeqItem *seq_item) {
- memset(seq_item, 0, sizeof(ECPSeqItem));
-
- return ECP_OK;
-}
-
void ecp_pld_set_type(unsigned char *payload, unsigned char mtype) {
payload[ECP_SIZE_PLD_HDR] = mtype;
}
-unsigned char ecp_pld_get_type(unsigned char *payload) {
- return payload[ECP_SIZE_PLD_HDR];
-}
-
-unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) {
- size_t offset = 0;
- if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
- if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t);
-
- return payload + ECP_SIZE_PLD_HDR + 1 + offset;
-}
-
-unsigned char ecp_msg_get_type(unsigned char *msg) {
- return msg[0];
-}
-
-unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size) {
- size_t offset = 0;
- unsigned char mtype = msg[0];
-
- if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
- if (mtype & ECP_MTYPE_FLAG_PTS) offset += sizeof(ecp_pts_t);
-
- if (msg_size < 1 + offset) return NULL;
- return msg + 1 + offset;
-}
-
-int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot) {
- unsigned char mtype = msg[0];
+int ecp_pld_set_frag(unsigned char *payload, unsigned char mtype, unsigned char frag_cnt, unsigned char frag_tot) {
+ size_t offset = ECP_SIZE_PLD_HDR + 1;
if (!(mtype & ECP_MTYPE_FLAG_FRAG)) return ECP_ERR;
- if (msg_size < 3) return ECP_ERR;
-
- *frag_cnt = msg[1];
- *frag_tot = msg[2];
-
+ payload[offset] = frag_cnt;
+ payload[offset + 1] = frag_tot;
return ECP_OK;
}
-int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts) {
- size_t offset = 0;
- unsigned char mtype = msg[0];
+int ecp_pld_set_pts(unsigned char *payload, unsigned char mtype, ecp_pts_t pts) {
+ size_t offset = ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FRAG(mtype);
if (!(mtype & ECP_MTYPE_FLAG_PTS)) return ECP_ERR;
- if (mtype & ECP_MTYPE_FLAG_FRAG) offset += 2;
- if (msg_size < 1 + offset + sizeof(ecp_pts_t)) return ECP_ERR;
-
- *pts = \
- (msg[1 + offset] << 24) | \
- (msg[2 + offset] << 16) | \
- (msg[3 + offset] << 8) | \
- (msg[4 + offset]);
+ payload[offset] = (pts & 0xFF000000) >> 24;
+ payload[offset + 1] = (pts & 0x00FF0000) >> 16;
+ payload[offset + 2] = (pts & 0x0000FF00) >> 8;
+ payload[offset + 3] = (pts & 0x000000FF);
return ECP_OK;
}
+unsigned char ecp_pld_get_type(unsigned char *payload) {
+ return payload[ECP_SIZE_PLD_HDR];
+}
+
+unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype) {
+ return payload + ECP_SIZE_PLD_HDR + 1 + ECP_SIZE_MT_FLAG(mtype);
+}
+
static ssize_t pld_send(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPSeqItem *si) {
unsigned char packet[ECP_MAX_PKT];
ECPSocket *sock = conn->sock;
@@ -1438,8 +1511,60 @@ ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *add
return ecp_pkt_send(sock, parent ? &_addr : addr, packet, rv);
}
-ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) {
- return ecp_pld_send(conn, payload, payload_size);
+ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *content, size_t content_size) {
+ ssize_t rv = 0;
+ unsigned char payload[ECP_MAX_PLD];
+ int pkt_cnt = 0;
+ int vc_cnt = conn->pcount + 1;
+ size_t size_max = ECP_MAX_PKT - (ECP_SIZE_PKT_HDR + ECP_AEAD_SIZE_TAG + ECP_SIZE_SEQ + 1) * vc_cnt;
+
+ if (content_size + ECP_SIZE_MT_FLAG(mtype) > size_max) {
+ int i;
+ int _rv = ECP_OK;
+ size_t frag_size, frag_size_final;
+ ecp_seq_t seq_start = 0;
+ ECPSeqItem seq_item;
+
+ _rv = ecp_seq_item_init(&seq_item);
+ if (_rv) return _rv;
+
+ mtype |= ECP_MTYPE_FLAG_FRAG;
+ frag_size = size_max - ECP_SIZE_MT_FLAG(mtype);
+ pkt_cnt = content_size / frag_size;
+ frag_size_final = content_size - frag_size * pkt_cnt;
+ if (frag_size_final) pkt_cnt++;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+
+ seq_start = conn->seq_out + 1;
+ conn->seq_out += pkt_cnt;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
+ seq_item.seq_w = 1;
+ for (i=0; i<pkt_cnt; i++) {
+ if ((i == pkt_cnt - 1) && frag_size_final) frag_size = frag_size_final;
+ ecp_pld_set_type(payload, mtype);
+ ecp_pld_set_frag(payload, mtype, i, pkt_cnt);
+ memcpy(ecp_pld_get_buf(payload, mtype), content, frag_size);
+ content += frag_size;
+
+ seq_item.seq = seq_start + i;
+ ssize_t _rv = pld_send(conn, NULL, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, ECP_SIZE_PLD(frag_size, mtype), &seq_item);
+ if (_rv < 0) return _rv;
+
+ rv += _rv;
+ }
+ } else {
+ ecp_pld_set_type(payload, mtype);
+ memcpy(ecp_pld_get_buf(payload, mtype), content, content_size);
+ rv = ecp_pld_send(conn, payload, ECP_SIZE_PLD(content_size, mtype));
+ }
+ return rv;
}
ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {
diff --git a/code/core/core.h b/code/core/core.h
index cdb4b62..aa883ce 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -1,7 +1,11 @@
#define ECP_OK 0
+#define ECP_ITER_NEXT 1
+
#define ECP_ERR -1
#define ECP_ERR_TIMEOUT -2
#define ECP_ERR_ALLOC -3
+#define ECP_ERR_SIZE -4
+#define ECP_ERR_ITER -5
#define ECP_ERR_MAX_SOCK_CONN -10
#define ECP_ERR_MAX_CTYPE -11
@@ -39,6 +43,7 @@
#define ECP_MAX_CTYPE 8
#define ECP_MAX_MTYPE 16
#define ECP_MAX_MTYPE_SYS 4
+#define ECP_MAX_SEQ_FWD 1024
#define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
#define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ)
@@ -73,8 +78,6 @@
#define ECP_CONN_FLAG_REG 0x01
#define ECP_CONN_FLAG_OPEN 0x02
-#define ECP_MAX_SEQ_FORWARD 1024
-
#define ecp_conn_is_reg(conn) ((conn->flags) & ECP_CONN_FLAG_REG)
#define ecp_conn_is_open(conn) ((conn->flags) & ECP_CONN_FLAG_OPEN)
@@ -86,6 +89,7 @@
struct ECPConnection;
struct ECPSocket;
struct ECPSeqItem;
+struct ECPFragIter;
typedef long ssize_t;
@@ -109,9 +113,10 @@ typedef uint32_t ecp_seq_t;
#define ECP_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_SEQ_HALF)
-#define ECP_SIZE_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0)
-#define ECP_SIZE_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0)
-#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_FRAG(F)+ECP_SIZE_PTS(F))
+#define ECP_SIZE_MT_FRAG(F) ((F) & ECP_MTYPE_FLAG_FRAG ? 2 : 0)
+#define ECP_SIZE_MT_PTS(F) ((F) & ECP_MTYPE_FLAG_PTS ? sizeof(ecp_pts_t) : 0)
+#define ECP_SIZE_MT_FLAG(F) (ECP_SIZE_MT_FRAG(F)+ECP_SIZE_MT_PTS(F))
+#define ECP_SIZE_PLD(X,F) ((X) + ECP_SIZE_PLD_HDR+1+ECP_SIZE_MT_FLAG(F))
#define ECP_SIZE_PKT(X,F) (ECP_SIZE_PKT_HDR+ECP_SIZE_PLD(X,F)+ECP_AEAD_SIZE_TAG)
#ifdef ECP_WITH_PTHREAD
@@ -232,6 +237,13 @@ typedef struct ECPSeqItem {
#endif
} ECPSeqItem;
+typedef struct ECPFragIter {
+ ecp_seq_t seq;
+ size_t frag_size;
+ unsigned char frag_cnt;
+ unsigned char *buffer;
+ size_t buf_size;
+} ECPFragIter;
typedef struct ECPConnHandler {
ecp_conn_handler_msg_t *msg[ECP_MAX_MTYPE];
@@ -309,6 +321,7 @@ typedef struct ECPConnection {
pthread_mutex_t mutex;
#endif
struct ECPConnection *parent;
+ unsigned short pcount;
void *conn_data;
} ECPConnection;
@@ -362,22 +375,28 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
ssize_t ecp_pkt_recv(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
-ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
int ecp_seq_item_init(ECPSeqItem *seq_item);
-void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
-unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype);
-unsigned char ecp_pld_get_type(unsigned char *payload);
+int ecp_frag_iter_init(ECPFragIter *iter, unsigned char *buffer, size_t buf_size);
+
unsigned char ecp_msg_get_type(unsigned char *msg);
unsigned char *ecp_msg_get_content(unsigned char *msg, size_t msg_size);
int ecp_msg_get_frag(unsigned char *msg, size_t msg_size, unsigned char *frag_cnt, unsigned char *frag_tot);
int ecp_msg_get_pts(unsigned char *msg, size_t msg_size, ecp_pts_t *pts);
+int ecp_msg_defrag(ECPFragIter *iter, ecp_seq_t seq, unsigned char *msg_in, size_t msg_in_size, unsigned char **msg_out, size_t *msg_out_size);
+ssize_t ecp_msg_handle(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
+int ecp_pld_set_frag(unsigned char *payload, unsigned char mtype, unsigned char frag_cnt, unsigned char frag_tot);
+int ecp_pld_set_pts(unsigned char *payload, unsigned char mtype, ecp_pts_t pts);
+unsigned char *ecp_pld_get_buf(unsigned char *payload, unsigned char mtype);
+unsigned char ecp_pld_get_type(unsigned char *payload);
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
-ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
+ssize_t ecp_send(ECPConnection *conn, unsigned char mtype, unsigned char *content, size_t content_size);
ssize_t ecp_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout);
int ecp_receiver(ECPSocket *sock);
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index c3e6ff9..3644f9c 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -65,6 +65,7 @@ typedef struct ECPRBRecv {
#ifdef ECP_WITH_MSGQ
ECPConnMsgQ msgq;
#endif
+ struct ECPFragIter *frag_iter;
} ECPRBRecv;
typedef struct ECPRBSend {
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index 0970f6a..7b90b89 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -59,7 +59,6 @@ static void msg_flush(ECPConnection *conn) {
int rv = ECP_OK;
ecp_pts_t msg_pts;
ecp_seq_t seq = buf->rbuf.seq_start + i;
- unsigned char mtype = ecp_msg_get_type(buf->rbuf.msg[idx].msg);
unsigned char frag_tot;
unsigned char frag_cnt;
@@ -279,9 +278,6 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
int do_ack = 0;
unsigned char mtype;
- if (buf == NULL) return ECP_ERR;
- if (msg_size < 1) return ECP_ERR_MIN_MSG;
-
mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size);
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index d582e67..685280d 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -307,9 +307,10 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP
if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
int _rv = ECP_OK;
- #ifdef ECP_WITH_PTHREAD
+
+#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
- #endif
+#endif
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
@@ -337,9 +338,9 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP
buf->in_transit++;
}
- #ifdef ECP_WITH_PTHREAD
+#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
- #endif
+#endif
if (_rv) return _rv;
}
diff --git a/code/test/basic.c b/code/test/basic.c
index dae2819..5627fd3 100644
--- a/code/test/basic.c
+++ b/code/test/basic.c
@@ -35,7 +35,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
@@ -53,7 +53,7 @@ ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/client.c b/code/test/client.c
index 992479b..edde70b 100644
--- a/code/test/client.c
+++ b/code/test/client.c
@@ -31,7 +31,7 @@ ssize_t handle_open_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsign
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/echo.c b/code/test/echo.c
index 593b904..aa27529 100644
--- a/code/test/echo.c
+++ b/code/test/echo.c
@@ -15,7 +15,7 @@ ECPConnHandler handler_s;
#define MTYPE_MSG 8
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
- ssize_t rv = ecp_send(conn, p-ECP_SIZE_PLD_HDR-1, ECP_SIZE_PLD_HDR+1+s);
+ ssize_t rv = ecp_pld_send(conn, p-ECP_SIZE_PLD_HDR-1, ECP_SIZE_PLD_HDR+1+s);
return s;
}
diff --git a/code/test/server.c b/code/test/server.c
index 471164e..1b2e2b1 100644
--- a/code/test/server.c
+++ b/code/test/server.c
@@ -23,7 +23,7 @@ ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/stress.c b/code/test/stress.c
index d1ead1d..0e46f84 100644
--- a/code/test/stress.c
+++ b/code/test/stress.c
@@ -86,7 +86,7 @@ void *sender(ECPConnection *c) {
usleep(rnd % (2000000/msg_rate));
ecp_pld_set_type(payload, MTYPE_MSG);
- ssize_t _rv = ecp_send(c, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(c, payload, sizeof(payload));
if (c_start && (_rv > 0)) {
pthread_mutex_lock(&t_mtx[idx]);
t_sent[idx]++;
@@ -121,14 +121,14 @@ ssize_t handle_msg_c(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigne
}
// ecp_pld_set_type(payload, MTYPE_MSG);
- // ssize_t _rv = ecp_send(c, payload, sizeof(payload));
+ // ssize_t _rv = ecp_pld_send(c, payload, sizeof(payload));
return s;
}
ssize_t handle_msg_s(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned char *p, ssize_t s) {
unsigned char payload[ECP_SIZE_PLD(1000, 0)];
ecp_pld_set_type(payload, MTYPE_MSG);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/vc_client.c b/code/test/vc_client.c
index d5cb8f2..eb23d05 100644
--- a/code/test/vc_client.c
+++ b/code/test/vc_client.c
@@ -37,7 +37,7 @@ ssize_t handle_open(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/vc_server.c b/code/test/vc_server.c
index 47c6267..455f1a0 100644
--- a/code/test/vc_server.c
+++ b/code/test/vc_server.c
@@ -32,7 +32,7 @@ ssize_t handle_msg(ECPConnection *conn, ecp_seq_t sq, unsigned char t, unsigned
ecp_pld_set_type(payload, MTYPE_MSG);
strcpy((char *)buf, msg);
- ssize_t _rv = ecp_send(conn, payload, sizeof(payload));
+ ssize_t _rv = ecp_pld_send(conn, payload, sizeof(payload));
return s;
}
diff --git a/code/test/voip.c b/code/test/voip.c
index db1ea9f..4c3d30b 100644
--- a/code/test/voip.c
+++ b/code/test/voip.c
@@ -191,6 +191,6 @@ int main(int argc, char *argv[]) {
opus_buf = ecp_pld_get_buf(payload, 0);
opus_int32 len = a_read(handle_cpt, alsa_in_buf, alsa_frames, opus_enc, opus_buf, ECP_MAX_MSG);
if (len < 0) continue;
- ssize_t _rv = ecp_send(&conn, payload, len);
+ ssize_t _rv = ecp_pld_send(&conn, payload, len);
}
} \ No newline at end of file
diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c
index 4eebb62..740f400 100644
--- a/code/vconn/vconn.c
+++ b/code/vconn/vconn.c
@@ -483,6 +483,7 @@ int ecp_vconn_init(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn
if (rv) return rv;
conn->parent = (ECPConnection *)&vconn[size-1];
+ conn->pcount = size;
for (i=0; i<size; i++) {
rv = ecp_conn_create((ECPConnection *)&vconn[i], sock, ECP_CTYPE_VCONN);
if (rv) return rv;
@@ -495,7 +496,9 @@ int ecp_vconn_init(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn
} else {
vconn[i].b.parent = (ECPConnection *)&vconn[i-1];
}
- if (i == size-1) {
+ vconn[i].b.pcount = i;
+
+ if (i == size - 1) {
vconn[i].next = conn;
} else {
vconn[i].next = (ECPConnection *)&vconn[i+1];