summaryrefslogtreecommitdiff
path: root/code/core/rbuf_send.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r--code/core/rbuf_send.c63
1 files changed, 44 insertions, 19 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 06bd2e2..e7020cb 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -4,11 +4,20 @@
#define NACK_RATE_UNIT 10000
-static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
+static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
unsigned char payload[ECP_SIZE_PLD(0)];
ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH);
- return ecp_pld_send(conn, payload, sizeof(payload));
+ if (ti == NULL) {
+ ECPTimerItem _ti;
+ int rv = ecp_timer_item_init(&_ti, conn, ECP_MTYPE_RBACK, 3, 500);
+ if (rv) return rv;
+
+ _ti.retry = flush_send;
+ rv = ecp_timer_push(&_ti);
+ if (rv) return rv;
+ }
+ return ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0);
}
static void cc_flush(ECPConnection *conn) {
@@ -24,8 +33,8 @@ static void cc_flush(ECPConnection *conn) {
unsigned int _idx = idx;
for (i=0; i<pkt_to_send; i++) {
- if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break;
- rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT;
+ if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
+ rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
pkt_to_send = i;
@@ -45,7 +54,7 @@ static void cc_flush(ECPConnection *conn) {
#endif
buf->in_transit += (ecp_win_t)pkt_to_send;
- buf->cc_wait -= (ecp_win_t)pkt_to_send;
+ buf->cnt_cc -= (ecp_win_t)pkt_to_send;
buf->seq_cc += (ecp_seq_t)pkt_to_send;
}
}
@@ -85,7 +94,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (!rv) {
seq_ack++;
- buf->in_transit -= seq_ack - rbuf->seq_start;
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1;
if (ack_map != ECP_RBUF_ACK_FULL) {
int nack_first = 0;
@@ -104,7 +113,11 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
nack_cnt++;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
+
+ } else {
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ }
if (!nack_first) {
nack_first = 1;
seq_start = seq_ack + i;
@@ -118,7 +131,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
pthread_mutex_lock(&buf->mutex);
#endif
- buf->in_transit += nack_cnt;
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt;
buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
rbuf->seq_start = seq_start;
@@ -138,7 +151,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
do_flush = 1;
}
}
- if (buf->cc_wait) cc_flush(conn);
+ if (buf->cnt_cc) cc_flush(conn);
}
#ifdef ECP_WITH_PTHREAD
@@ -147,11 +160,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (rv) return rv;
- if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
+ if (do_flush) {
+ ssize_t _rv = flush_send(conn, NULL);
+ if (_rv < 0) return _rv;
+ }
return rsize;
}
-int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
+int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
memset(buf, 0, sizeof(ECPRBRecv));
@@ -167,7 +183,7 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage
return ECP_OK;
}
-void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
+void ecp_rbuf_send_destroy(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return;
@@ -177,7 +193,7 @@ void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
#endif
}
-int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
+int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
@@ -187,7 +203,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
#endif
buf->win_size = size;
- if (buf->cc_wait) cc_flush(conn);
+ if (buf->cnt_cc) cc_flush(conn);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
@@ -196,7 +212,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
return ECP_OK;
}
-int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+int ecp_rbuf_send_start(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
@@ -204,29 +220,38 @@ int ecp_conn_rbuf_send_start(ECPConnection *conn) {
return ecp_rbuf_start(&buf->rbuf, conn->seq_out);
}
-int ecp_conn_rbuf_flush(ECPConnection *conn) {
+int ecp_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
unsigned char payload[ECP_SIZE_PLD(0)];
ecp_seq_t seq;
if (buf == NULL) return ECP_ERR;
- // XXX flush seq
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&conn->mutex);
+#endif
+ seq = conn->seq_out;
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
+
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq;
} else {
buf->flush = 1;
buf->seq_flush = seq;
}
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
#endif
- ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500);
- if (_rv < 0) return _rv;
+ ssize_t rv = flush_send(conn, 0);
+ if (rv < 0) return rv;
return ECP_OK;
}