summaryrefslogtreecommitdiff
path: root/code/ecp/rbuf_send.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2018-01-16 23:43:07 +0100
committerUros Majstorovic <majstor@majstor.org>2018-01-16 23:43:07 +0100
commit01c3e3af2394f863323b846fa304ff7e0a30e9df (patch)
tree84b499e6ece88e637ed86bbdd87333613e2433c5 /code/ecp/rbuf_send.c
parent0d0e9facfcea3cf96da3b63285865182fdd5477e (diff)
eos support
Diffstat (limited to 'code/ecp/rbuf_send.c')
-rw-r--r--code/ecp/rbuf_send.c34
1 files changed, 23 insertions, 11 deletions
diff --git a/code/ecp/rbuf_send.c b/code/ecp/rbuf_send.c
index f913e7e..61b29af 100644
--- a/code/ecp/rbuf_send.c
+++ b/code/ecp/rbuf_send.c
@@ -25,7 +25,7 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
rv = ecp_timer_push(&_ti);
if (rv) return rv;
}
- return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0);
+ return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, 0);
}
static void cc_flush(ECPConnection *conn) {
@@ -65,8 +65,12 @@ static void cc_flush(ECPConnection *conn) {
#endif
for (i=0; i<pkt_to_send; i++) {
+ ECPBuffer packet;
+
if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]);
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ packet.buffer = rbuf->msg[_idx].msg;
+ packet.size = rbuf->msg[_idx].size;
+ ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0);
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
@@ -82,6 +86,7 @@ static void cc_flush(ECPConnection *conn) {
ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
ECPRBSend *buf;
+ ECPContext *ctx = conn->sock->ctx;
ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
ecp_seq_t seq_ack = 0;
ecp_ack_t ack_map = 0;
@@ -105,6 +110,8 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
(msg[6] << 8) | \
(msg[7]);
+ if (ctx->tr.buf_flag_set) ctx->tr.buf_flag_set(b, ECP_SEND_FLAG_MORE);
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
@@ -146,9 +153,12 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn);
ecp_pld_set_type(pld_buf, ECP_MTYPE_NOP);
- ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), seq_ack + i);
+ ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), 0, seq_ack + i);
} else {
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ ECPBuffer packet;
+ packet.buffer = rbuf->msg[_idx].msg;
+ packet.size = rbuf->msg[_idx].size;
+ ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0);
}
if (!nack_first) {
nack_first = 1;
@@ -188,12 +198,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
pthread_mutex_unlock(&buf->mutex);
#endif
- if (rv) return rv;
-
- if (do_flush) {
+ if (!rv && do_flush) {
ssize_t _rv = flush_send(conn, NULL);
- if (_rv < 0) return _rv;
+ if (_rv < 0) rv = _rv;
}
+
+ if (ctx->tr.buf_flag_clear) ctx->tr.buf_flag_clear(b, ECP_SEND_FLAG_MORE);
+
+ if (rv) return rv;
return rsize;
}
@@ -308,7 +320,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char mtype) {
return ECP_OK;
}
-ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ECPTimerItem *ti, ECPSeqItem *si) {
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) {
int do_send = 1;
ssize_t rv = 0;
@@ -320,7 +332,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, uns
if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
- rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet->buffer, pkt_size, 0, flags);
if (rv < 0) return rv;
if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
@@ -369,7 +381,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, uns
int _rv = ecp_timer_push(ti);
if (_rv) return _rv;
}
- rv = ecp_pkt_send(sock, addr, packet, pkt_size);
+ rv = ecp_pkt_send(sock, addr, packet, pkt_size, flags);
}
return rv;
}