diff options
Diffstat (limited to 'code/ecp/rbuf_send.c')
-rw-r--r-- | code/ecp/rbuf_send.c | 34 |
1 files changed, 23 insertions, 11 deletions
diff --git a/code/ecp/rbuf_send.c b/code/ecp/rbuf_send.c index f913e7e..61b29af 100644 --- a/code/ecp/rbuf_send.c +++ b/code/ecp/rbuf_send.c @@ -25,7 +25,7 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { rv = ecp_timer_push(&_ti); if (rv) return rv; } - return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0); + return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, 0); } static void cc_flush(ECPConnection *conn) { @@ -65,8 +65,12 @@ static void cc_flush(ECPConnection *conn) { #endif for (i=0; i<pkt_to_send; i++) { + ECPBuffer packet; + if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]); - ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + packet.buffer = rbuf->msg[_idx].msg; + packet.size = rbuf->msg[_idx].size; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); } @@ -82,6 +86,7 @@ static void cc_flush(ECPConnection *conn) { ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBSend *buf; + ECPContext *ctx = conn->sock->ctx; ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; ecp_ack_t ack_map = 0; @@ -105,6 +110,8 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt (msg[6] << 8) | \ (msg[7]); + if (ctx->tr.buf_flag_set) ctx->tr.buf_flag_set(b, ECP_SEND_FLAG_MORE); + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif @@ -146,9 +153,12 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn); ecp_pld_set_type(pld_buf, ECP_MTYPE_NOP); - ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), seq_ack + i); + ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), 0, seq_ack + i); } else { - ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + ECPBuffer packet; + packet.buffer = rbuf->msg[_idx].msg; + packet.size = rbuf->msg[_idx].size; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); } if (!nack_first) { nack_first = 1; @@ -188,12 +198,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt pthread_mutex_unlock(&buf->mutex); #endif - if (rv) return rv; - - if (do_flush) { + if (!rv && do_flush) { ssize_t _rv = flush_send(conn, NULL); - if (_rv < 0) return _rv; + if (_rv < 0) rv = _rv; } + + if (ctx->tr.buf_flag_clear) ctx->tr.buf_flag_clear(b, ECP_SEND_FLAG_MORE); + + if (rv) return rv; return rsize; } @@ -308,7 +320,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char mtype) { return ECP_OK; } -ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ECPTimerItem *ti, ECPSeqItem *si) { +ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) { int do_send = 1; ssize_t rv = 0; @@ -320,7 +332,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, uns if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); + rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet->buffer, pkt_size, 0, flags); if (rv < 0) return rv; if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { @@ -369,7 +381,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, uns int _rv = ecp_timer_push(ti); if (_rv) return _rv; } - rv = ecp_pkt_send(sock, addr, packet, pkt_size); + rv = ecp_pkt_send(sock, addr, packet, pkt_size, flags); } return rv; } |