summaryrefslogtreecommitdiff
path: root/code/core/rbuf_send.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
commit8a59506a23eb3133fa510c19993865f661aec0f4 (patch)
tree9e4bef53e7694f8e6c6177cac2859f84db486cb0 /code/core/rbuf_send.c
parent37fa8fc50753900f536e7ce857a63ac557e572ba (diff)
rbuf congestion controll implemented
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r--code/core/rbuf_send.c76
1 files changed, 64 insertions, 12 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 877b086..4176726 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -11,11 +11,44 @@ static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
return ecp_pld_send(conn, payload, sizeof(payload));
}
+static void cc_flush(ECPConnection *conn) {
+ ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ ecp_seq_t pkt_buf_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+ ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;
+ int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;
+ int i;
+
+ if (pkt_to_send) {
+ unsigned int _idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ for (i=0; i<pkt_to_send; i++) {
+ if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break;
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT;
+ _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ buf->in_transit += (ecp_win_t)i;
+ buf->cc_wait -= (ecp_win_t)i;
+ buf->seq_cc += (ecp_seq_t)i;
+ }
+}
+
static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
ECPRBSend *buf;
ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
ecp_seq_t seq_ack = 0;
ecp_ack_t ack_map = 0;
+ int i;
int do_flush = 0;
int rv = ECP_OK;
@@ -43,14 +76,11 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
int idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
if (idx < 0) rv = idx;
- int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE;
-
if (!rv) {
seq_ack++;
buf->in_transit -= seq_ack - rbuf->seq_start;
if (ack_map != ECP_RBUF_ACK_FULL) {
- int i;
int nack_first = 0;
unsigned int msg_start;
ecp_seq_t seq_start;
@@ -66,12 +96,12 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
nack_cnt++;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size);
+ unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
if (!nack_first) {
nack_first = 1;
seq_start = seq_ack + i;
- msg_start = idx;
+ msg_start = _idx;
}
}
}
@@ -88,13 +118,12 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
rbuf->msg_start = msg_start;
} else {
rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
}
} else {
- rbuf->seq_start = seq_ack;
buf->nack_rate = (buf->nack_rate * 7) / 8;
- if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
- }
+ rbuf->seq_start = seq_ack;
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
}
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
@@ -102,6 +131,7 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
do_flush = 1;
}
}
+ if (buf->cc_wait) cc_flush(conn);
}
#ifdef ECP_WITH_PTHREAD
@@ -126,15 +156,37 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage
return ECP_OK;
}
-int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
+
+}
+
+int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
- buf->rbuf.seq_start = conn->seq_out;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ buf->win_size = size;
+ if (buf->cc_wait) cc_flush(conn);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
return ECP_OK;
}
+int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+ ECPRBSend *buf = conn->rbuf.send;
+
+ if (buf == NULL) return ECP_ERR;
+
+ return ecp_rbuf_start(&buf->rbuf, conn->seq_out);
+}
+
int ecp_conn_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
unsigned char payload[ECP_SIZE_PLD(0)];