summaryrefslogtreecommitdiff
path: root/code/core
diff options
context:
space:
mode:
Diffstat (limited to 'code/core')
-rw-r--r--code/core/core.c78
-rw-r--r--code/core/core.h10
-rw-r--r--code/core/msgq.c2
-rw-r--r--code/core/rbuf.c114
-rw-r--r--code/core/rbuf.h56
-rw-r--r--code/core/rbuf_recv.c48
-rw-r--r--code/core/rbuf_send.c63
-rw-r--r--code/core/timer.c8
8 files changed, 247 insertions, 132 deletions
diff --git a/code/core/core.c b/code/core/core.c
index d64aa04..b319afb 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -517,7 +517,7 @@ static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) {
unsigned char payload[ECP_SIZE_PLD(0)];
ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ);
- return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
+ return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload));
}
int ecp_conn_init(ECPConnection *conn, ECPNode *node) {
@@ -616,7 +616,7 @@ static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) {
ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ);
buf[0] = conn->type;
- return ecp_pld_send(conn, payload, sizeof(payload));
+ return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload));
}
ssize_t ecp_conn_send_open(ECPConnection *conn) {
@@ -706,7 +706,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m
#ifdef ECP_WITH_RBUF
if (!is_open && conn->rbuf.recv) {
- rv = ecp_conn_rbuf_recv_start(conn, seq);
+ rv = ecp_rbuf_recv_start(conn, seq);
if (rv) return rv;
}
#endif
@@ -815,7 +815,7 @@ static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) {
rv = ecp_conn_dhkey_get_curr(conn, buf, buf+1);
if (rv) return rv;
- return ecp_pld_send(conn, payload, sizeof(payload));
+ return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload));
}
int ecp_conn_dhkey_new(ECPConnection *conn) {
@@ -920,7 +920,7 @@ ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *pack
return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size);
}
-ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) {
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info) {
ecp_aead_key_t shsec;
ecp_dh_public_t public;
ecp_seq_t _seq;
@@ -957,26 +957,28 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
}
}
if (!rv) {
- _seq = conn->seq_out + 1;
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.send && rbuf_idx) {
- ECPRBSend *buf = conn->rbuf.send;
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
-#endif
-
- *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq);
- if (*rbuf_idx < 0) rv = *rbuf_idx;
+ ECPRBInfo *rbuf_info = _rbuf_info;
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
-#endif
+ if (rbuf_info) {
+ if (rbuf_info->seq_force) {
+ _seq = rbuf_info->seq;
+ } else {
+ unsigned char mtype = ecp_pld_get_type(payload);
+ _seq = conn->seq_out + 1;
+
+ rv = ecp_rbuf_pkt_prep(conn->rbuf.send, _seq, mtype, rbuf_info);
+ if (!rv) conn->seq_out = _seq;
+ }
+ } else {
+ _seq = conn->seq_out + 1;
+ conn->seq_out = _seq;
}
-#endif
- }
- if (!rv) {
+#else
+ _seq = conn->seq_out + 1;
conn->seq_out = _seq;
- if (addr) *addr = conn->node.addr;
+#endif
+ if (!rv && addr) *addr = conn->node.addr;
}
#ifdef ECP_WITH_PTHREAD
@@ -996,7 +998,6 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
pthread_mutex_unlock(&conn->mutex);
#endif
- if (seq) *seq = _seq;
return _rv;
}
@@ -1150,7 +1151,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
#endif
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
+ if (conn->rbuf.recv) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
#endif
if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size);
@@ -1247,25 +1248,42 @@ unsigned char ecp_pld_get_type(unsigned char *payload) {
}
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) {
- return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+ return ecp_pld_send_ll(conn, NULL, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
}
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
+ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size) {
+ return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size);
+}
+
+ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) {
unsigned char packet[ECP_MAX_PKT];
ECPSocket *sock = conn->sock;
ECPContext *ctx = sock->ctx;
ECPNetAddr addr;
- ecp_seq_t seq;
- ssize_t rv;
- int rbuf_idx = -1;
+ int _rv = ECP_OK;
+ void *_rbuf_info = NULL;
- rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx);
+#ifdef ECP_WITH_RBUF
+ ECPRBInfo rbuf_info;
+
+ if (conn->rbuf.send) {
+ _rv = ecp_rbuf_info_init(&rbuf_info);
+ if (_rv) return _rv;
+ _rbuf_info = &rbuf_info;
+ }
+#endif
+
+ ssize_t rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, _rbuf_info);
if (rv < 0) return rv;
#ifdef ECP_WITH_RBUF
- if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx);
+ if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, packet, rv, _rbuf_info);
#endif
+ if (ti) {
+ _rv = ecp_timer_push(ti);
+ if (_rv) return _rv;
+ }
return ecp_pkt_send(sock, &addr, packet, rv);
}
diff --git a/code/core/core.h b/code/core/core.h
index 9738fb4..04fb5ee 100644
--- a/code/core/core.h
+++ b/code/core/core.h
@@ -36,7 +36,7 @@
#define ECP_MAX_NODE_KEY 2
#define ECP_MAX_CTYPE 8
#define ECP_MAX_MTYPE 16
-#define ECP_MAX_MTYPE_SYS 8
+#define ECP_MAX_MTYPE_SYS 4
#define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE)
#define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ)
@@ -59,6 +59,7 @@
#define ECP_MTYPE_OPEN 0x00
#define ECP_MTYPE_KGET 0x01
#define ECP_MTYPE_KPUT 0x02
+#define ECP_MTYPE_NOP 0x03
#define ECP_MTYPE_OPEN_REQ (ECP_MTYPE_OPEN)
#define ECP_MTYPE_OPEN_REP (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP)
@@ -221,7 +222,7 @@ typedef struct ECPContext {
#ifdef ECP_WITH_HTABLE
ECPHTableIface ht;
#endif
- ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+ ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info);
ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
ECPConnHandler *handler[ECP_MAX_CTYPE];
} ECPContext;
@@ -315,7 +316,7 @@ int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned ch
ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr);
-ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx);
+ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info);
ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, unsigned char *packet, size_t pkt_size);
ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size);
@@ -326,7 +327,8 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload);
void ecp_pld_set_type(unsigned char *payload, unsigned char mtype);
unsigned char ecp_pld_get_type(unsigned char *payload);
ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
-ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size);
+ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size);
ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size);
ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size);
diff --git a/code/core/msgq.c b/code/core/msgq.c
index 15629c8..4c1252b 100644
--- a/code/core/msgq.c
+++ b/code/core/msgq.c
@@ -79,7 +79,9 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)
ECPRBRecv *buf = conn->rbuf.recv;
ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;
+ mtype &= ECP_MTYPE_MASK;
if (msgq == NULL) return ECP_ERR;
+ if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG;
if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]);
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index b384774..ee9a7fc 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -37,33 +37,88 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch
if (rbuf->msg == NULL) return 0;
if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP;
- if (ECP_RBUF_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq;
-
- if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) {
- memcpy(rbuf->msg[idx].msg, msg, msg_size);
- rbuf->msg[idx].size = msg_size;
- }
+ if (msg_size) memcpy(rbuf->msg[idx].msg, msg, msg_size);
+ rbuf->msg[idx].size = msg_size;
rbuf->msg[idx].flags = set_flags;
return msg_size;
}
-ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
- int do_send = 1;
- ECPRBSend *buf = conn->rbuf.send;
+ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq) {
+ unsigned char packet[ECP_MAX_PKT];
+ ECPSocket *sock = conn->sock;
+ ECPContext *ctx = sock->ctx;
+ ECPNetAddr addr;
+ ECPRBInfo rbuf_info;
+ ssize_t rv;
- ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
+ int _rv = ecp_rbuf_info_init(&rbuf_info);
+ if (_rv) return _rv;
+
+ rbuf_info.seq = seq;
+ rbuf_info.seq_force = 1;
+
+ rv = ctx->pack(conn, packet, ECP_MAX_PKT, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size, &addr, &rbuf_info);
if (rv < 0) return rv;
- if (buf->win_size) {
+ rv = ecp_pkt_send(sock, &addr, packet, rv);
+ if (rv < 0) return rv;
+
+ return rv;
+}
+
+int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) {
+ memset(rbuf_info, 0, sizeof(ECPRBInfo));
+
+ return ECP_OK;
+}
+
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) {
+ if (buf == NULL) return ECP_ERR;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (idx < 0) return idx;
+
+ rbuf_info->seq = seq;
+ rbuf_info->idx = idx;
+ rbuf_info->mtype = mtype;
+ buf->rbuf.msg[idx].size = 0;
+ buf->rbuf.msg[idx].flags = 0;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) {
+ unsigned char flags = 0;
+ int do_send = 1;
+ ssize_t rv = 0;
+ ecp_seq_t seq = rbuf_info->seq;
+ unsigned int idx = rbuf_info->idx;
+ unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK;
+
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ if (rv < 0) return rv;
+
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
- if (buf->cc_wait || (buf->in_transit >= buf->win_size)) {
- if (!buf->cc_wait) buf->seq_cc = seq;
- buf->cc_wait++;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_CCWAIT;
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cnt_cc) buf->seq_cc = seq;
+ buf->cnt_cc++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
do_send = 0;
} else {
buf->in_transit++;
@@ -74,36 +129,43 @@ ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned c
#endif
}
- if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
+ if (do_send) {
+ int _rv;
+ if (ti) {
+ _rv = ecp_timer_push(ti);
+ if (_rv) return _rv;
+ }
+ rv = ecp_pkt_send(sock, addr, packet, pkt_size);
+ }
return rv;
}
-int ecp_conn_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
+int ecp_rbuf_conn_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
int rv;
- rv = ecp_conn_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
+ rv = ecp_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
if (rv) return rv;
- rv = ecp_conn_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
+ rv = ecp_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
if (rv) {
- ecp_conn_rbuf_send_destroy(conn);
+ ecp_rbuf_send_destroy(conn);
return rv;
}
return ECP_OK;
}
-void ecp_conn_rbuf_destroy(ECPConnection *conn) {
- ecp_conn_rbuf_send_destroy(conn);
- ecp_conn_rbuf_recv_destroy(conn);
+void ecp_rbuf_conn_destroy(ECPConnection *conn) {
+ ecp_rbuf_send_destroy(conn);
+ ecp_rbuf_recv_destroy(conn);
}
-int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
- int rv = ecp_conn_rbuf_send_start(conn);
+int ecp_rbuf_conn_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv = ecp_rbuf_send_start(conn);
if (rv) return rv;
if (!conn->out) {
- rv = ecp_conn_rbuf_recv_start(conn, seq);
+ rv = ecp_rbuf_recv_start(conn, seq);
if (rv) return rv;
}
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index b09b19c..1332dee 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,10 +1,11 @@
#define ECP_RBUF_FLAG_IN_RBUF 0x01
#define ECP_RBUF_FLAG_IN_MSGQ 0x02
-#define ECP_RBUF_FLAG_DELIVERED 0x04
-#define ECP_RBUF_FLAG_CCWAIT 0x08
+#define ECP_RBUF_FLAG_IN_CCONTROL 0x04
+#define ECP_RBUF_FLAG_SYS 0x80
-#define ECP_RBUF_FLAG_RELIABLE 0x01
-#define ECP_RBUF_FLAG_MSGQ 0x02
+#define ECP_RBUF_FLAG_CCONTROL 0x01
+#define ECP_RBUF_FLAG_RELIABLE 0x02
+#define ECP_RBUF_FLAG_MSGQ 0x04
#define ECP_MTYPE_RBACK 0x04
#define ECP_MTYPE_RBFLUSH 0x05
@@ -66,9 +67,9 @@ typedef struct ECPRBSend {
unsigned char flush;
ecp_win_t win_size;
ecp_win_t in_transit;
- ecp_win_t cc_wait;
- ecp_seq_t seq_flush;
+ ecp_win_t cnt_cc;
ecp_seq_t seq_cc;
+ ecp_seq_t seq_flush;
unsigned int nack_rate;
ECPRBuffer rbuf;
#ifdef ECP_WITH_PTHREAD
@@ -81,28 +82,37 @@ typedef struct ECPConnRBuffer {
ECPRBSend *send;
} ECPConnRBuffer;
+typedef struct ECPRBInfo {
+ ecp_seq_t seq;
+ unsigned int idx;
+ unsigned char mtype;
+ unsigned char seq_force;
+} ECPRBInfo;
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx);
-
-int ecp_conn_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
-void ecp_conn_rbuf_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
-
-int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
-void ecp_conn_rbuf_recv_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
-int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
-int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
-ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
-
-int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
-void ecp_conn_rbuf_send_destroy(struct ECPConnection *conn);
-int ecp_conn_rbuf_send_start(struct ECPConnection *conn);
-ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size);
+ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);
+
+int ecp_rbuf_info_init(ECPRBInfo *rbuf_info);
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info);
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info);
+
+int ecp_rbuf_conn_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
+void ecp_rbuf_conn_destroy(struct ECPConnection *conn);
+int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq);
+
+int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_rbuf_recv_destroy(struct ECPConnection *conn);
+int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
+int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
+int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
+ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
+
+int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_rbuf_send_destroy(struct ECPConnection *conn);
+int ecp_rbuf_send_start(struct ECPConnection *conn);
ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index e00b63f..d78ffce 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -7,32 +7,31 @@
static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
- int rv = ECP_OK;
- ssize_t _rv = 0;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
unsigned char mtype = msg[0] & ECP_MTYPE_MASK;
- if (mtype < ECP_MAX_MTYPE_SYS) {
- flags |= ECP_RBUF_FLAG_DELIVERED;
- ecp_msg_handle(conn, seq, msg, msg_size);
- }
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
+ int rv = ECP_OK;
+
pthread_mutex_lock(&buf->msgq.mutex);
ecp_seq_t seq_offset = seq - buf->msgq.seq_start;
if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL;
+ pthread_mutex_unlock(&buf->msgq.mutex);
+
+ if (rv) return rv;
}
#endif
- if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+ ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);
+ if (rv < 0) return rv;
-#ifdef ECP_WITH_MSGQ
- if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);
-#endif
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+ if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size);
- if (rv) return rv;
- return _rv;
+ return rv;
}
static void msg_flush(ECPConnection *conn) {
@@ -52,11 +51,11 @@ static void msg_flush(ECPConnection *conn) {
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
int rv = 0;
ecp_seq_t seq = buf->rbuf.seq_start + i;
- if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {
- buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED;
+ if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
+ buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS;
} else if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
- unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK;
+ unsigned char mtype = buf->rbuf.msg[idx].msg[0];
int rv = ecp_conn_msgq_push(conn, seq, mtype);
if (rv) break;
buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ;
@@ -92,8 +91,8 @@ static int ack_send(ECPConnection *conn) {
buf_[5] = (buf->ack_map & 0x00FF0000) >> 16;
buf_[6] = (buf->ack_map & 0x0000FF00) >> 8;
buf_[7] = (buf->ack_map & 0x000000FF);
-
- rv = ecp_pld_send(conn, payload, sizeof(payload));
+
+ rv = ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
if (rv < 0) return rv;
buf->ack_pkt = 0;
@@ -151,7 +150,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
return 0;
}
-int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
memset(buf, 0, sizeof(ECPRBRecv));
@@ -170,13 +169,13 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage
return ECP_OK;
}
-void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) {
+void ecp_rbuf_recv_destroy(ECPConnection *conn) {
#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(conn);
#endif
}
-int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
+int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->hole_max = hole_max;
@@ -186,18 +185,18 @@ int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
return ECP_OK;
}
-int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
+int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
if (buf->hole_max < delay - 1) {
- ecp_conn_rbuf_recv_set_hole(conn, delay - 1);
+ ecp_rbuf_recv_set_hole(conn, delay - 1);
}
return ECP_OK;
}
-int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
+int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
int rv;
ECPRBRecv *buf = conn->rbuf.recv;
@@ -217,7 +216,7 @@ int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
return ECP_OK;
}
-ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
+ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
ECPRBRecv *buf = conn->rbuf.recv;
ecp_seq_t ack_pkt = 0;
ssize_t rv;
@@ -226,6 +225,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
+ // XXX mtype == RBACK | RBFLUSH handle imediately
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 06bd2e2..e7020cb 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -4,11 +4,20 @@
#define NACK_RATE_UNIT 10000
-static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
+static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
unsigned char payload[ECP_SIZE_PLD(0)];
ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH);
- return ecp_pld_send(conn, payload, sizeof(payload));
+ if (ti == NULL) {
+ ECPTimerItem _ti;
+ int rv = ecp_timer_item_init(&_ti, conn, ECP_MTYPE_RBACK, 3, 500);
+ if (rv) return rv;
+
+ _ti.retry = flush_send;
+ rv = ecp_timer_push(&_ti);
+ if (rv) return rv;
+ }
+ return ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
}
static void cc_flush(ECPConnection *conn) {
@@ -24,8 +33,8 @@ static void cc_flush(ECPConnection *conn) {
unsigned int _idx = idx;
for (i=0; i<pkt_to_send; i++) {
- if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break;
- rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT;
+ if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
+ rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
pkt_to_send = i;
@@ -45,7 +54,7 @@ static void cc_flush(ECPConnection *conn) {
#endif
buf->in_transit += (ecp_win_t)pkt_to_send;
- buf->cc_wait -= (ecp_win_t)pkt_to_send;
+ buf->cnt_cc -= (ecp_win_t)pkt_to_send;
buf->seq_cc += (ecp_seq_t)pkt_to_send;
}
}
@@ -85,7 +94,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (!rv) {
seq_ack++;
- buf->in_transit -= seq_ack - rbuf->seq_start;
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1;
if (ack_map != ECP_RBUF_ACK_FULL) {
int nack_first = 0;
@@ -104,7 +113,11 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
nack_cnt++;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
+
+ } else {
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ }
if (!nack_first) {
nack_first = 1;
seq_start = seq_ack + i;
@@ -118,7 +131,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
pthread_mutex_lock(&buf->mutex);
#endif
- buf->in_transit += nack_cnt;
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt;
buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
rbuf->seq_start = seq_start;
@@ -138,7 +151,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
do_flush = 1;
}
}
- if (buf->cc_wait) cc_flush(conn);
+ if (buf->cnt_cc) cc_flush(conn);
}
#ifdef ECP_WITH_PTHREAD
@@ -147,11 +160,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (rv) return rv;
- if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+ if (do_flush) {
+ ssize_t _rv = flush_send(conn, NULL);
+ if (_rv < 0) return _rv;
+ }
return rsize;
}
-int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
memset(buf, 0, sizeof(ECPRBRecv));
@@ -167,7 +183,7 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage
return ECP_OK;
}
-void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
+void ecp_rbuf_send_destroy(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return;
@@ -177,7 +193,7 @@ void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
#endif
}
-int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
+int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
@@ -187,7 +203,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
#endif
buf->win_size = size;
- if (buf->cc_wait) cc_flush(conn);
+ if (buf->cnt_cc) cc_flush(conn);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
@@ -196,7 +212,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
return ECP_OK;
}
-int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+int ecp_rbuf_send_start(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
@@ -204,29 +220,38 @@ int ecp_conn_rbuf_send_start(ECPConnection *conn) {
return ecp_rbuf_start(&buf->rbuf, conn->seq_out);
}
-int ecp_conn_rbuf_flush(ECPConnection *conn) {
+int ecp_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
unsigned char payload[ECP_SIZE_PLD(0)];
ecp_seq_t seq;
if (buf == NULL) return ECP_ERR;
- // XXX flush seq
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+ seq = conn->seq_out;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
+
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq;
} else {
buf->flush = 1;
buf->seq_flush = seq;
}
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
#endif
- ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
- if (_rv < 0) return _rv;
+ ssize_t rv = flush_send(conn, 0);
+ if (rv < 0) return rv;
return ECP_OK;
}
diff --git a/code/core/timer.c b/code/core/timer.c
index 8368742..a2d3fa5 100644
--- a/code/core/timer.c
+++ b/code/core/timer.c
@@ -196,10 +196,9 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
_rv = retry(conn, to_exec+i);
if (_rv < 0) rv = _rv;
} else {
- _rv = ecp_pld_send(conn, pld, pld_size);
+ _rv = ecp_pld_send_wtimer(conn, to_exec+i, pld, pld_size);
if (_rv < 0) rv = _rv;
}
- if (!rv) rv = ecp_timer_push(to_exec+i);
if (rv && (rv != ECP_ERR_CLOSED) && handler) handler(conn, 0, mtype, NULL, rv);
} else if (handler) {
handler(conn, 0, mtype, NULL, ECP_ERR_TIMEOUT);
@@ -224,8 +223,5 @@ ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned
if (rv) return rv;
ti.retry = send_f;
- rv = ecp_timer_push(&ti);
- if (rv) return rv;
-
- return send_f(conn, NULL);
+ return send_f(conn, &ti);
}