From 5d20e9bafc3571f37eb0d9b74699d023d2d3d13a Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Fri, 18 Aug 2017 20:35:21 +0200 Subject: timer fixed; rbuf almost implemented --- code/core/core.c | 78 +++++++++++++++++++++------------- code/core/core.h | 10 +++-- code/core/msgq.c | 2 + code/core/rbuf.c | 114 ++++++++++++++++++++++++++++++++++++++------------ code/core/rbuf.h | 56 +++++++++++++++---------- code/core/rbuf_recv.c | 48 ++++++++++----------- code/core/rbuf_send.c | 63 +++++++++++++++++++--------- code/core/timer.c | 8 +--- code/vconn/vconn.c | 38 ++++++----------- 9 files changed, 260 insertions(+), 157 deletions(-) (limited to 'code') diff --git a/code/core/core.c b/code/core/core.c index d64aa04..b319afb 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -517,7 +517,7 @@ static ssize_t _conn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); - return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); + return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); } int ecp_conn_init(ECPConnection *conn, ECPNode *node) { @@ -616,7 +616,7 @@ static ssize_t _conn_send_open(ECPConnection *conn, ECPTimerItem *ti) { ecp_pld_set_type(payload, ECP_MTYPE_OPEN_REQ); buf[0] = conn->type; - return ecp_pld_send(conn, payload, sizeof(payload)); + return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload)); } ssize_t ecp_conn_send_open(ECPConnection *conn) { @@ -706,7 +706,7 @@ ssize_t ecp_conn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char m #ifdef ECP_WITH_RBUF if (!is_open && conn->rbuf.recv) { - rv = ecp_conn_rbuf_recv_start(conn, seq); + rv = ecp_rbuf_recv_start(conn, seq); if (rv) return rv; } #endif @@ -815,7 +815,7 @@ static ssize_t _conn_send_kput(ECPConnection *conn, ECPTimerItem *ti) { rv = ecp_conn_dhkey_get_curr(conn, buf, buf+1); if (rv) return rv; - return ecp_pld_send(conn, payload, sizeof(payload)); + return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload)); } int ecp_conn_dhkey_new(ECPConnection *conn) { @@ -920,7 +920,7 @@ ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *pack return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size); } -ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) { +ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info) { ecp_aead_key_t shsec; ecp_dh_public_t public; ecp_seq_t _seq; @@ -957,26 +957,28 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz } } if (!rv) { - _seq = conn->seq_out + 1; #ifdef ECP_WITH_RBUF - if (conn->rbuf.send && rbuf_idx) { - ECPRBSend *buf = conn->rbuf.send; -#ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); -#endif - - *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq); - if (*rbuf_idx < 0) rv = *rbuf_idx; + ECPRBInfo *rbuf_info = _rbuf_info; -#ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); -#endif + if (rbuf_info) { + if (rbuf_info->seq_force) { + _seq = rbuf_info->seq; + } else { + unsigned char mtype = ecp_pld_get_type(payload); + _seq = conn->seq_out + 1; + + rv = ecp_rbuf_pkt_prep(conn->rbuf.send, _seq, mtype, rbuf_info); + if (!rv) conn->seq_out = _seq; + } + } else { + _seq = conn->seq_out + 1; + conn->seq_out = _seq; } -#endif - } - if (!rv) { +#else + _seq = conn->seq_out + 1; conn->seq_out = _seq; - if (addr) *addr = conn->node.addr; +#endif + if (!rv && addr) *addr = conn->node.addr; } #ifdef ECP_WITH_PTHREAD @@ -996,7 +998,6 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz pthread_mutex_unlock(&conn->mutex); #endif - if (seq) *seq = _seq; return _rv; } @@ -1150,7 +1151,7 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, #endif #ifdef ECP_WITH_RBUF - if (conn->rbuf.recv) proc_size = ecp_conn_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); + if (conn->rbuf.recv) proc_size = ecp_rbuf_recv_store(conn, p_seq, payload+pld_size-cnt_size, cnt_size); #endif if (proc_size == 0) proc_size = ecp_msg_handle(conn, p_seq, payload+pld_size-cnt_size, cnt_size); @@ -1247,25 +1248,42 @@ unsigned char ecp_pld_get_type(unsigned char *payload) { } ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size) { - return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size); + return ecp_pld_send_ll(conn, NULL, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size); } -ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) { +ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size) { + return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size); +} + +ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size) { unsigned char packet[ECP_MAX_PKT]; ECPSocket *sock = conn->sock; ECPContext *ctx = sock->ctx; ECPNetAddr addr; - ecp_seq_t seq; - ssize_t rv; - int rbuf_idx = -1; + int _rv = ECP_OK; + void *_rbuf_info = NULL; - rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, &seq, &rbuf_idx); +#ifdef ECP_WITH_RBUF + ECPRBInfo rbuf_info; + + if (conn->rbuf.send) { + _rv = ecp_rbuf_info_init(&rbuf_info); + if (_rv) return _rv; + _rbuf_info = &rbuf_info; + } +#endif + + ssize_t rv = ctx->pack(conn, packet, ECP_MAX_PKT, s_idx, c_idx, payload, payload_size, &addr, _rbuf_info); if (rv < 0) return rv; #ifdef ECP_WITH_RBUF - if (conn->rbuf.send) return ecp_conn_rbuf_pkt_send(conn, &addr, packet, rv, seq, rbuf_idx); + if (conn->rbuf.send) return ecp_rbuf_pkt_send(conn->rbuf.send, conn->sock, &addr, ti, packet, rv, _rbuf_info); #endif + if (ti) { + _rv = ecp_timer_push(ti); + if (_rv) return _rv; + } return ecp_pkt_send(sock, &addr, packet, rv); } diff --git a/code/core/core.h b/code/core/core.h index 9738fb4..04fb5ee 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -36,7 +36,7 @@ #define ECP_MAX_NODE_KEY 2 #define ECP_MAX_CTYPE 8 #define ECP_MAX_MTYPE 16 -#define ECP_MAX_MTYPE_SYS 8 +#define ECP_MAX_MTYPE_SYS 4 #define ECP_SIZE_PKT_HDR (ECP_SIZE_PROTO+1+ECP_ECDH_SIZE_KEY+ECP_AEAD_SIZE_NONCE) #define ECP_SIZE_PLD_HDR (ECP_SIZE_SEQ) @@ -59,6 +59,7 @@ #define ECP_MTYPE_OPEN 0x00 #define ECP_MTYPE_KGET 0x01 #define ECP_MTYPE_KPUT 0x02 +#define ECP_MTYPE_NOP 0x03 #define ECP_MTYPE_OPEN_REQ (ECP_MTYPE_OPEN) #define ECP_MTYPE_OPEN_REP (ECP_MTYPE_OPEN | ECP_MTYPE_FLAG_REP) @@ -221,7 +222,7 @@ typedef struct ECPContext { #ifdef ECP_WITH_HTABLE ECPHTableIface ht; #endif - ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx); + ssize_t (*pack) (struct ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info); ssize_t (*pack_raw) (struct ECPSocket *sock, struct ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr); ECPConnHandler *handler[ECP_MAX_CTYPE]; } ECPContext; @@ -315,7 +316,7 @@ int ecp_conn_dhkey_get_curr(ECPConnection *conn, unsigned char *idx, unsigned ch ssize_t ecp_pack(ECPContext *ctx, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); ssize_t ecp_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size, ECPNetAddr *addr); -ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx); +ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info); ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, unsigned char *packet, size_t pkt_size); ssize_t ecp_pkt_send(ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size); @@ -326,7 +327,8 @@ unsigned char *ecp_pld_get_buf(unsigned char *payload); void ecp_pld_set_type(unsigned char *payload, unsigned char mtype); unsigned char ecp_pld_get_type(unsigned char *payload); ssize_t ecp_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); -ssize_t ecp_pld_send_wkey(ECPConnection *conn, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); +ssize_t ecp_pld_send_wtimer(ECPConnection *conn, ECPTimerItem *ti, unsigned char *payload, size_t payload_size); +ssize_t ecp_pld_send_ll(ECPConnection *conn, ECPTimerItem *ti, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size); ssize_t ecp_pld_send_raw(ECPSocket *sock, ECPConnection *parent, ECPNetAddr *addr, unsigned char s_idx, unsigned char c_idx, ecp_dh_public_t *public, ecp_aead_key_t *shsec, unsigned char *nonce, ecp_seq_t seq, unsigned char *payload, size_t payload_size); ssize_t ecp_send(ECPConnection *conn, unsigned char *payload, size_t payload_size); diff --git a/code/core/msgq.c b/code/core/msgq.c index 15629c8..4c1252b 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -79,7 +79,9 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + mtype &= ECP_MTYPE_MASK; if (msgq == NULL) return ECP_ERR; + if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); diff --git a/code/core/rbuf.c b/code/core/rbuf.c index b384774..ee9a7fc 100644 --- a/code/core/rbuf.c +++ b/code/core/rbuf.c @@ -37,33 +37,88 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch if (rbuf->msg == NULL) return 0; if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP; - if (ECP_RBUF_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - - if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) { - memcpy(rbuf->msg[idx].msg, msg, msg_size); - rbuf->msg[idx].size = msg_size; - } + if (msg_size) memcpy(rbuf->msg[idx].msg, msg, msg_size); + rbuf->msg[idx].size = msg_size; rbuf->msg[idx].flags = set_flags; return msg_size; } -ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) { - int do_send = 1; - ECPRBSend *buf = conn->rbuf.send; +ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq) { + unsigned char packet[ECP_MAX_PKT]; + ECPSocket *sock = conn->sock; + ECPContext *ctx = sock->ctx; + ECPNetAddr addr; + ECPRBInfo rbuf_info; + ssize_t rv; - ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0); + int _rv = ecp_rbuf_info_init(&rbuf_info); + if (_rv) return _rv; + + rbuf_info.seq = seq; + rbuf_info.seq_force = 1; + + rv = ctx->pack(conn, packet, ECP_MAX_PKT, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload, payload_size, &addr, &rbuf_info); if (rv < 0) return rv; - if (buf->win_size) { + rv = ecp_pkt_send(sock, &addr, packet, rv); + if (rv < 0) return rv; + + return rv; +} + +int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) { + memset(rbuf_info, 0, sizeof(ECPRBInfo)); + + return ECP_OK; +} + +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) { + if (buf == NULL) return ECP_ERR; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (idx < 0) return idx; + + rbuf_info->seq = seq; + rbuf_info->idx = idx; + rbuf_info->mtype = mtype; + buf->rbuf.msg[idx].size = 0; + buf->rbuf.msg[idx].flags = 0; + + return ECP_OK; +} + +ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) { + unsigned char flags = 0; + int do_send = 1; + ssize_t rv = 0; + ecp_seq_t seq = rbuf_info->seq; + unsigned int idx = rbuf_info->idx; + unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK; + + if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; + + rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); + if (rv < 0) return rv; + + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif - if (buf->cc_wait || (buf->in_transit >= buf->win_size)) { - if (!buf->cc_wait) buf->seq_cc = seq; - buf->cc_wait++; - buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_CCWAIT; + if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + + if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { + if (!buf->cnt_cc) buf->seq_cc = seq; + buf->cnt_cc++; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; do_send = 0; } else { buf->in_transit++; @@ -74,36 +129,43 @@ ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned c #endif } - if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size); + if (do_send) { + int _rv; + if (ti) { + _rv = ecp_timer_push(ti); + if (_rv) return _rv; + } + rv = ecp_pkt_send(sock, addr, packet, pkt_size); + } return rv; } -int ecp_conn_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) { +int ecp_rbuf_conn_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) { int rv; - rv = ecp_conn_rbuf_send_create(conn, buf_s, msg_s, msg_s_size); + rv = ecp_rbuf_send_create(conn, buf_s, msg_s, msg_s_size); if (rv) return rv; - rv = ecp_conn_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size); + rv = ecp_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size); if (rv) { - ecp_conn_rbuf_send_destroy(conn); + ecp_rbuf_send_destroy(conn); return rv; } return ECP_OK; } -void ecp_conn_rbuf_destroy(ECPConnection *conn) { - ecp_conn_rbuf_send_destroy(conn); - ecp_conn_rbuf_recv_destroy(conn); +void ecp_rbuf_conn_destroy(ECPConnection *conn) { + ecp_rbuf_send_destroy(conn); + ecp_rbuf_recv_destroy(conn); } -int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) { - int rv = ecp_conn_rbuf_send_start(conn); +int ecp_rbuf_conn_start(ECPConnection *conn, ecp_seq_t seq) { + int rv = ecp_rbuf_send_start(conn); if (rv) return rv; if (!conn->out) { - rv = ecp_conn_rbuf_recv_start(conn, seq); + rv = ecp_rbuf_recv_start(conn, seq); if (rv) return rv; } diff --git a/code/core/rbuf.h b/code/core/rbuf.h index b09b19c..1332dee 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,10 +1,11 @@ #define ECP_RBUF_FLAG_IN_RBUF 0x01 #define ECP_RBUF_FLAG_IN_MSGQ 0x02 -#define ECP_RBUF_FLAG_DELIVERED 0x04 -#define ECP_RBUF_FLAG_CCWAIT 0x08 +#define ECP_RBUF_FLAG_IN_CCONTROL 0x04 +#define ECP_RBUF_FLAG_SYS 0x80 -#define ECP_RBUF_FLAG_RELIABLE 0x01 -#define ECP_RBUF_FLAG_MSGQ 0x02 +#define ECP_RBUF_FLAG_CCONTROL 0x01 +#define ECP_RBUF_FLAG_RELIABLE 0x02 +#define ECP_RBUF_FLAG_MSGQ 0x04 #define ECP_MTYPE_RBACK 0x04 #define ECP_MTYPE_RBFLUSH 0x05 @@ -66,9 +67,9 @@ typedef struct ECPRBSend { unsigned char flush; ecp_win_t win_size; ecp_win_t in_transit; - ecp_win_t cc_wait; - ecp_seq_t seq_flush; + ecp_win_t cnt_cc; ecp_seq_t seq_cc; + ecp_seq_t seq_flush; unsigned int nack_rate; ECPRBuffer rbuf; #ifdef ECP_WITH_PTHREAD @@ -81,28 +82,37 @@ typedef struct ECPConnRBuffer { ECPRBSend *send; } ECPConnRBuffer; +typedef struct ECPRBInfo { + ecp_seq_t seq; + unsigned int idx; + unsigned char mtype; + unsigned char seq_force; +} ECPRBInfo; int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size); int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq); ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); -ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx); - -int ecp_conn_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size); -void ecp_conn_rbuf_destroy(struct ECPConnection *conn); -int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq); - -int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); -void ecp_conn_rbuf_recv_destroy(struct ECPConnection *conn); -int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max); -int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay); -int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); -ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); - -int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); -void ecp_conn_rbuf_send_destroy(struct ECPConnection *conn); -int ecp_conn_rbuf_send_start(struct ECPConnection *conn); -ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size); +ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq); + +int ecp_rbuf_info_init(ECPRBInfo *rbuf_info); +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info); +ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info); + +int ecp_rbuf_conn_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size); +void ecp_rbuf_conn_destroy(struct ECPConnection *conn); +int ecp_rbuf_conn_start(struct ECPConnection *conn, ecp_seq_t seq); + +int ecp_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size); +void ecp_rbuf_recv_destroy(struct ECPConnection *conn); +int ecp_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max); +int ecp_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay); +int ecp_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq); +ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size); + +int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size); +void ecp_rbuf_send_destroy(struct ECPConnection *conn); +int ecp_rbuf_send_start(struct ECPConnection *conn); ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index e00b63f..d78ffce 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -7,32 +7,31 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; - int rv = ECP_OK; - ssize_t _rv = 0; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; unsigned char mtype = msg[0] & ECP_MTYPE_MASK; - if (mtype < ECP_MAX_MTYPE_SYS) { - flags |= ECP_RBUF_FLAG_DELIVERED; - ecp_msg_handle(conn, seq, msg, msg_size); - } + if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + int rv = ECP_OK; + pthread_mutex_lock(&buf->msgq.mutex); ecp_seq_t seq_offset = seq - buf->msgq.seq_start; if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL; + pthread_mutex_unlock(&buf->msgq.mutex); + + if (rv) return rv; } #endif - if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); + ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); + if (rv < 0) return rv; -#ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); -#endif + if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + if (flags & ECP_RBUF_FLAG_SYS) ecp_msg_handle(conn, seq, msg, msg_size); - if (rv) return rv; - return _rv; + return rv; } static void msg_flush(ECPConnection *conn) { @@ -52,11 +51,11 @@ static void msg_flush(ECPConnection *conn) { if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { int rv = 0; ecp_seq_t seq = buf->rbuf.seq_start + i; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_SYS; } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ - unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK; + unsigned char mtype = buf->rbuf.msg[idx].msg[0]; int rv = ecp_conn_msgq_push(conn, seq, mtype); if (rv) break; buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; @@ -92,8 +91,8 @@ static int ack_send(ECPConnection *conn) { buf_[5] = (buf->ack_map & 0x00FF0000) >> 16; buf_[6] = (buf->ack_map & 0x0000FF00) >> 8; buf_[7] = (buf->ack_map & 0x000000FF); - - rv = ecp_pld_send(conn, payload, sizeof(payload)); + + rv = ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0); if (rv < 0) return rv; buf->ack_pkt = 0; @@ -151,7 +150,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char return 0; } -int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; memset(buf, 0, sizeof(ECPRBRecv)); @@ -170,13 +169,13 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage return ECP_OK; } -void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) { +void ecp_rbuf_recv_destroy(ECPConnection *conn) { #ifdef ECP_WITH_MSGQ ecp_conn_msgq_destroy(conn); #endif } -int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { +int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { ECPRBRecv *buf = conn->rbuf.recv; buf->hole_max = hole_max; @@ -186,18 +185,18 @@ int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { return ECP_OK; } -int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { +int ecp_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; if (buf->hole_max < delay - 1) { - ecp_conn_rbuf_recv_set_hole(conn, delay - 1); + ecp_rbuf_recv_set_hole(conn, delay - 1); } return ECP_OK; } -int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { +int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { int rv; ECPRBRecv *buf = conn->rbuf.recv; @@ -217,7 +216,7 @@ int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { return ECP_OK; } -ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; @@ -226,6 +225,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; + // XXX mtype == RBACK | RBFLUSH handle imediately if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 06bd2e2..e7020cb 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -4,11 +4,20 @@ #define NACK_RATE_UNIT 10000 -static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) { +static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH); - return ecp_pld_send(conn, payload, sizeof(payload)); + if (ti == NULL) { + ECPTimerItem _ti; + int rv = ecp_timer_item_init(&_ti, conn, ECP_MTYPE_RBACK, 3, 500); + if (rv) return rv; + + _ti.retry = flush_send; + rv = ecp_timer_push(&_ti); + if (rv) return rv; + } + return ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0); } static void cc_flush(ECPConnection *conn) { @@ -24,8 +33,8 @@ static void cc_flush(ECPConnection *conn) { unsigned int _idx = idx; for (i=0; imsg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break; - rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT; + if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break; + rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL; _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); } pkt_to_send = i; @@ -45,7 +54,7 @@ static void cc_flush(ECPConnection *conn) { #endif buf->in_transit += (ecp_win_t)pkt_to_send; - buf->cc_wait -= (ecp_win_t)pkt_to_send; + buf->cnt_cc -= (ecp_win_t)pkt_to_send; buf->seq_cc += (ecp_seq_t)pkt_to_send; } } @@ -85,7 +94,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (!rv) { seq_ack++; - buf->in_transit -= seq_ack - rbuf->seq_start; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1; if (ack_map != ECP_RBUF_ACK_FULL) { int nack_first = 0; @@ -104,7 +113,11 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt nack_cnt++; if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size); - ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { + + } else { + ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + } if (!nack_first) { nack_first = 1; seq_start = seq_ack + i; @@ -118,7 +131,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt pthread_mutex_lock(&buf->mutex); #endif - buf->in_transit += nack_cnt; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt; buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { rbuf->seq_start = seq_start; @@ -138,7 +151,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt do_flush = 1; } } - if (buf->cc_wait) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn); } #ifdef ECP_WITH_PTHREAD @@ -147,11 +160,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (rv) return rv; - if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + if (do_flush) { + ssize_t _rv = flush_send(conn, NULL); + if (_rv < 0) return _rv; + } return rsize; } -int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; memset(buf, 0, sizeof(ECPRBRecv)); @@ -167,7 +183,7 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage return ECP_OK; } -void ecp_conn_rbuf_send_destroy(ECPConnection *conn) { +void ecp_rbuf_send_destroy(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return; @@ -177,7 +193,7 @@ void ecp_conn_rbuf_send_destroy(ECPConnection *conn) { #endif } -int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { +int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return ECP_ERR; @@ -187,7 +203,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { #endif buf->win_size = size; - if (buf->cc_wait) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); @@ -196,7 +212,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { return ECP_OK; } -int ecp_conn_rbuf_send_start(ECPConnection *conn) { +int ecp_rbuf_send_start(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return ECP_ERR; @@ -204,29 +220,38 @@ int ecp_conn_rbuf_send_start(ECPConnection *conn) { return ecp_rbuf_start(&buf->rbuf, conn->seq_out); } -int ecp_conn_rbuf_flush(ECPConnection *conn) { +int ecp_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; unsigned char payload[ECP_SIZE_PLD(0)]; ecp_seq_t seq; if (buf == NULL) return ECP_ERR; - // XXX flush seq +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&conn->mutex); +#endif + seq = conn->seq_out; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&conn->mutex); +#endif + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif + if (buf->flush) { if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; } else { buf->flush = 1; buf->seq_flush = seq; } + #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); #endif - ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); - if (_rv < 0) return _rv; + ssize_t rv = flush_send(conn, 0); + if (rv < 0) return rv; return ECP_OK; } diff --git a/code/core/timer.c b/code/core/timer.c index 8368742..a2d3fa5 100644 --- a/code/core/timer.c +++ b/code/core/timer.c @@ -196,10 +196,9 @@ unsigned int ecp_timer_exe(ECPSocket *sock) { _rv = retry(conn, to_exec+i); if (_rv < 0) rv = _rv; } else { - _rv = ecp_pld_send(conn, pld, pld_size); + _rv = ecp_pld_send_wtimer(conn, to_exec+i, pld, pld_size); if (_rv < 0) rv = _rv; } - if (!rv) rv = ecp_timer_push(to_exec+i); if (rv && (rv != ECP_ERR_CLOSED) && handler) handler(conn, 0, mtype, NULL, rv); } else if (handler) { handler(conn, 0, mtype, NULL, ECP_ERR_TIMEOUT); @@ -224,8 +223,5 @@ ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t *send_f, unsigned if (rv) return rv; ti.retry = send_f; - rv = ecp_timer_push(&ti); - if (rv) return rv; - - return send_f(conn, NULL); + return send_f(conn, &ti); } diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c index 33fa80b..25b6583 100644 --- a/code/vconn/vconn.c +++ b/code/vconn/vconn.c @@ -69,21 +69,11 @@ static void vconn_destroy(ECPConnection *conn) { #endif } -static ssize_t _vconn_send_open(ECPConnection *conn) { - ECPVConnection *conn_v = (ECPVConnection *)conn; - ECPConnection *conn_next = conn_v->next; +static ssize_t _vconn_send_open(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; - if (conn_next == NULL) return ECP_ERR; - ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); - return ecp_pld_send_wkey(conn_next, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); -} - -static ssize_t _vconn_retry_kget(ECPConnection *conn, ECPTimerItem *ti) { - if (conn->parent == NULL) return ECP_ERR; - - return _vconn_send_open(conn->parent); + return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); } static ssize_t vconn_open(ECPConnection *conn) { @@ -92,14 +82,12 @@ static ssize_t vconn_open(ECPConnection *conn) { ECPVConnection *conn_v = (ECPVConnection *)conn; ECPConnection *conn_next = conn_v->next; - rv = ecp_timer_item_init(&ti, conn_next, ECP_MTYPE_KGET_REP, 3, 3000); - if (rv) return rv; + if (conn_next == NULL) return ECP_ERR; - ti.retry = _vconn_retry_kget; - rv = ecp_timer_push(&ti); - if (rv) return rv; + ssize_t _rv = ecp_timer_send(conn_next, _vconn_send_open, ECP_MTYPE_KGET_REP, 3, 3000); + if (_rv < 0) return _rv; - return _vconn_send_open(conn); + return _rv; } static ssize_t vconn_handle_open(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { @@ -278,7 +266,7 @@ static ssize_t _vlink_send_open(ECPConnection *conn, ECPTimerItem *ti) { buf[0] = conn->type; memcpy(buf+1, ctx->cr.dh_pub_get_buf(&sock->key_perma.public), ECP_ECDH_SIZE_KEY); - return ecp_pld_send(conn, payload, sizeof(payload)); + return ecp_pld_send_wtimer(conn, ti, payload, sizeof(payload)); } static ssize_t vlink_open(ECPConnection *conn) { @@ -407,7 +395,7 @@ static ssize_t vconn_set_msg(ECPConnection *conn, unsigned char *pld_out, size_t } -static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, ecp_seq_t *seq, int *rbuf_idx) { +static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_size, unsigned char s_idx, unsigned char c_idx, unsigned char *payload, size_t payload_size, ECPNetAddr *addr, void *_rbuf_info) { ECPContext *ctx = conn->sock->ctx; if (conn->parent) { @@ -415,12 +403,12 @@ static ssize_t vconn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt ssize_t rv, hdr_size = vconn_set_msg(conn->parent, payload_, sizeof(payload_), payload, payload_size); if (hdr_size < 0) return hdr_size; - rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, seq, rbuf_idx); + rv = ecp_conn_pack(conn, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, payload, payload_size, NULL, _rbuf_info); if (rv < 0) return rv; - return vconn_pack(conn->parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL); + return vconn_pack(conn->parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL); } else { - return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, seq, rbuf_idx); + return ecp_conn_pack(conn, packet, pkt_size, s_idx, c_idx, payload, payload_size, addr, _rbuf_info); } } @@ -435,7 +423,7 @@ static ssize_t vconn_pack_raw(ECPSocket *sock, ECPConnection *parent, unsigned c rv = ecp_pack(ctx, payload_+hdr_size, ECP_MAX_PLD-hdr_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size); if (rv < 0) return rv; - return vconn_pack(parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL, NULL); + return vconn_pack(parent, packet, pkt_size, ECP_ECDH_IDX_INV, ECP_ECDH_IDX_INV, payload_, rv+hdr_size, addr, NULL); } else { return ecp_pack(ctx, packet, pkt_size, s_idx, c_idx, public, shsec, nonce, seq, payload, payload_size); } @@ -521,7 +509,7 @@ static ssize_t _vconn_send_kget(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; ecp_pld_set_type(payload, ECP_MTYPE_KGET_REQ); - return ecp_pld_send_wkey(conn, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); + return ecp_pld_send_ll(conn, ti, ECP_ECDH_IDX_PERMA, ECP_ECDH_IDX_INV, payload, sizeof(payload)); } int ecp_vconn_open(ECPConnection *conn, ECPNode *conn_node, ECPVConnection vconn[], ECPNode vconn_node[], int size) { -- cgit v1.2.3