summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
commit8a59506a23eb3133fa510c19993865f661aec0f4 (patch)
tree9e4bef53e7694f8e6c6177cac2859f84db486cb0
parent37fa8fc50753900f536e7ce857a63ac557e572ba (diff)
rbuf congestion controll implemented
-rw-r--r--code/core/core.c22
-rw-r--r--code/core/rbuf.c73
-rw-r--r--code/core/rbuf.h17
-rw-r--r--code/core/rbuf_recv.c23
-rw-r--r--code/core/rbuf_send.c76
5 files changed, 145 insertions, 66 deletions
diff --git a/code/core/core.c b/code/core/core.c
index 7c3bc5a..25680f3 100644
--- a/code/core/core.c
+++ b/code/core/core.c
@@ -466,21 +466,10 @@ int ecp_conn_create(ECPConnection *conn, ECPSocket *sock, unsigned char ctype) {
if (rv) return ECP_ERR;
#endif
-#ifdef ECP_WITH_MSGQ
- rv = ecp_conn_msgq_create(conn);
- if (rv) {
- pthread_mutex_destroy(&conn->mutex);
- return ECP_ERR;
- }
-#endif
-
return ECP_OK;
}
void ecp_conn_destroy(ECPConnection *conn) {
-#ifdef ECP_WITH_MSGQ
- ecp_conn_msgq_destroy(conn);
-#endif
#ifdef ECP_WITH_PTHREAD
pthread_mutex_destroy(&conn->mutex);
#endif
@@ -959,7 +948,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
}
}
if (!rv) {
- _seq = conn->seq_out;
+ _seq = conn->seq_out + 1;
#ifdef ECP_WITH_RBUF
if (conn->rbuf.send && rbuf_idx) {
ECPRBSend *buf = conn->rbuf.send;
@@ -977,7 +966,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz
#endif
}
if (!rv) {
- conn->seq_out = _seq + 1;
+ conn->seq_out = _seq;
if (addr) *addr = conn->node.addr;
}
@@ -1162,13 +1151,6 @@ ssize_t ecp_pkt_handle(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent,
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
-#ifdef ECP_WITH_MSGQ
- if (!rv && (cnt_size > 0)) {
- proc_size = ecp_conn_msgq_push(conn, payload+pld_size-cnt_size, cnt_size);
- if (proc_size < 0) rv = ECP_ERR_HANDLE;
- if (!rv) cnt_size -= proc_size;
- }
-#endif
if (!rv) {
conn->seq_in = n_seq;
conn->seq_in_bitmap = seq_bitmap;
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index d92c3f1..f13e06e 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -15,6 +15,13 @@ int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
return ECP_OK;
}
+int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) {
+ rbuf->seq_max = seq;
+ rbuf->seq_start = seq + 1;
+
+ return ECP_OK;
+}
+
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
ecp_seq_t seq_offset = seq - rbuf->seq_start;
@@ -39,37 +46,65 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch
return msg_size;
}
-int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
- int rv = ecp_conn_rbuf_send_start(conn);
- if (rv) return rv;
-
- if (!conn->out) {
- rv = ecp_conn_rbuf_recv_start(conn, seq);
- if (rv) return rv;
- }
-
- return ECP_OK;
-}
-
ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
- int do_send;
+ int do_send = 1;
ECPRBSend *buf = conn->rbuf.send;
ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
if (rv < 0) return rv;
+ if (buf->win_size) {
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
+ pthread_mutex_lock(&buf->mutex);
#endif
- if (buf->in_transit < buf->win_size) {
- buf->in_transit++;
- do_send = 1;
- }
+
+ if (buf->cc_wait || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cc_wait) buf->seq_cc = seq;
+ buf->cc_wait++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_CCWAIT;
+ do_send = 0;
+ } else {
+ buf->in_transit++;
+ }
+
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
+ pthread_mutex_unlock(&buf->mutex);
#endif
+ }
if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
return rv;
}
+int ecp_conn_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
+ int rv;
+
+ rv = ecp_conn_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
+ if (rv) return rv;
+
+ rv = ecp_conn_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
+ if (rv) {
+ ecp_conn_rbuf_send_destroy(conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}
+
+void ecp_conn_rbuf_destroy(ECPConnection *conn) {
+ ecp_conn_rbuf_send_destroy(conn);
+ ecp_conn_rbuf_recv_destroy(conn);
+}
+
+int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv = ecp_conn_rbuf_send_start(conn);
+ if (rv) return rv;
+
+ if (!conn->out) {
+ rv = ecp_conn_rbuf_recv_start(conn, seq);
+ if (rv) return rv;
+ }
+
+ return ECP_OK;
+}
+
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 9e9e53d..9bb9701 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -1,5 +1,6 @@
#define ECP_RBUF_FLAG_RECEIVED 0x01
#define ECP_RBUF_FLAG_DELIVERED 0x02
+#define ECP_RBUF_FLAG_CCWAIT 0x04
#define ECP_RBUF_FLAG_RELIABLE 0x01
#define ECP_RBUF_FLAG_MSGQ 0x02
@@ -46,6 +47,7 @@ typedef struct ECPRBMessage {
typedef struct ECPRBuffer {
ecp_seq_t seq_start;
+ ecp_seq_t seq_max;
unsigned int msg_size;
unsigned int msg_start;
ECPRBMessage *msg;
@@ -58,7 +60,6 @@ typedef struct ECPRBRecv {
unsigned short hole_max;
unsigned short ack_rate;
ecp_seq_t seq_ack;
- ecp_seq_t seq_max;
ecp_seq_t ack_pkt;
ecp_ack_t ack_map;
ecp_ack_t hole_mask_full;
@@ -71,7 +72,9 @@ typedef struct ECPRBSend {
unsigned char flush;
ecp_win_t win_size;
ecp_win_t in_transit;
+ ecp_win_t cc_wait;
ecp_seq_t seq_flush;
+ ecp_seq_t seq_cc;
unsigned int nack_rate;
ECPRBuffer rbuf;
#ifdef ECP_WITH_PTHREAD
@@ -86,18 +89,24 @@ typedef struct ECPConnRBuffer {
int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
+int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
ssize_t ecp_conn_rbuf_pkt_send(struct ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx);
+int ecp_conn_rbuf_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
+void ecp_conn_rbuf_destroy(struct ECPConnection *conn);
+int ecp_conn_rbuf_start(struct ECPConnection *conn, ecp_seq_t seq);
+
int ecp_conn_rbuf_recv_create(struct ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_conn_rbuf_recv_destroy(struct ECPConnection *conn);
+int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
+int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
int ecp_conn_rbuf_recv_start(struct ECPConnection *conn, ecp_seq_t seq);
ssize_t ecp_conn_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size);
int ecp_conn_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
+void ecp_conn_rbuf_send_destroy(struct ECPConnection *conn);
int ecp_conn_rbuf_send_start(struct ECPConnection *conn);
-int ecp_conn_rbuf_recv_set_hole(struct ECPConnection *conn, unsigned short hole_max);
-int ecp_conn_rbuf_recv_set_delay(struct ECPConnection *conn, unsigned short delay);
ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index b2a5ecf..f86cd13 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -34,13 +34,13 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
if (flags & ECP_RBUF_FLAG_DELIVERED) ecp_msg_handle(conn, seq, msg, msg_size);
- if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq;
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
return rv;
}
static void msg_flush(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
- ecp_seq_t msg_cnt = buf->seq_max - buf->rbuf.seq_start + 1;
+ ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
ecp_seq_t i = 0;
unsigned int idx = buf->rbuf.msg_start;
@@ -94,7 +94,7 @@ static int ack_shift(ECPRBRecv *buf) {
idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack);
if (idx < 0) return idx;
- while (ECP_RBUF_SEQ_LT(buf->seq_ack, buf->seq_max)) {
+ while (ECP_RBUF_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) {
idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);
buf->seq_ack++;
@@ -103,14 +103,14 @@ static int ack_shift(ECPRBRecv *buf) {
buf->ack_map = buf->ack_map << 1;
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) {
buf->ack_map |= 1;
- } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) {
+ } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {
do_ack = 1;
}
if ((buf->ack_map & ACK_MASK_FIRST) == 0) break;
}
- if (!do_ack && (buf->seq_ack == buf->seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
+ if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
ecp_ack_t hole_mask = buf->ack_map;
for (i=0; i<buf->hole_max-1; i++) {
@@ -139,6 +139,10 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage
return ECP_OK;
}
+void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) {
+
+}
+
int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
ECPRBRecv *buf = conn->rbuf.recv;
@@ -166,10 +170,7 @@ int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
if (buf == NULL) return ECP_ERR;
buf->seq_ack = seq;
- buf->seq_max = seq;
- buf->rbuf.seq_start = seq + 1;
-
- return ECP_OK;
+ return ecp_rbuf_start(&buf->rbuf, seq);
}
ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {
@@ -181,7 +182,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
- if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max;
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
if (seq_offset < ECP_RBUF_ACK_SIZE) {
@@ -204,7 +205,7 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch
} else {
ecp_msg_handle(conn, seq, msg, msg_size);
rv = msg_size;
- buf->seq_max++;
+ buf->rbuf.seq_max++;
buf->rbuf.seq_start++;
buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);
}
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index 877b086..4176726 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -11,11 +11,44 @@ static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) {
return ecp_pld_send(conn, payload, sizeof(payload));
}
+static void cc_flush(ECPConnection *conn) {
+ ECPRBSend *buf = conn->rbuf.send;
+ ECPRBuffer *rbuf = &buf->rbuf;
+ ecp_seq_t pkt_buf_cnt = rbuf->seq_max - rbuf->seq_start + 1;
+ ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;
+ int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;
+ int i;
+
+ if (pkt_to_send) {
+ unsigned int _idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ for (i=0; i<pkt_to_send; i++) {
+ if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break;
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
+ rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT;
+ _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ buf->in_transit += (ecp_win_t)i;
+ buf->cc_wait -= (ecp_win_t)i;
+ buf->seq_cc += (ecp_seq_t)i;
+ }
+}
+
static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) {
ECPRBSend *buf;
ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t);
ecp_seq_t seq_ack = 0;
ecp_ack_t ack_map = 0;
+ int i;
int do_flush = 0;
int rv = ECP_OK;
@@ -43,14 +76,11 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
int idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
if (idx < 0) rv = idx;
- int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE;
-
if (!rv) {
seq_ack++;
buf->in_transit -= seq_ack - rbuf->seq_start;
if (ack_map != ECP_RBUF_ACK_FULL) {
- int i;
int nack_first = 0;
unsigned int msg_start;
ecp_seq_t seq_start;
@@ -66,12 +96,12 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) {
nack_cnt++;
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
- ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size);
+ unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
+ ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
if (!nack_first) {
nack_first = 1;
seq_start = seq_ack + i;
- msg_start = idx;
+ msg_start = _idx;
}
}
}
@@ -88,13 +118,12 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
rbuf->msg_start = msg_start;
} else {
rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE;
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
}
} else {
- rbuf->seq_start = seq_ack;
buf->nack_rate = (buf->nack_rate * 7) / 8;
- if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
- rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
- }
+ rbuf->seq_start = seq_ack;
+ rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size);
}
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
@@ -102,6 +131,7 @@ static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtyp
do_flush = 1;
}
}
+ if (buf->cc_wait) cc_flush(conn);
}
#ifdef ECP_WITH_PTHREAD
@@ -126,15 +156,37 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage
return ECP_OK;
}
-int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+void ecp_conn_rbuf_send_destroy(ECPConnection *conn) {
+
+}
+
+int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
- buf->rbuf.seq_start = conn->seq_out;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ buf->win_size = size;
+ if (buf->cc_wait) cc_flush(conn);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
return ECP_OK;
}
+int ecp_conn_rbuf_send_start(ECPConnection *conn) {
+ ECPRBSend *buf = conn->rbuf.send;
+
+ if (buf == NULL) return ECP_ERR;
+
+ return ecp_rbuf_start(&buf->rbuf, conn->seq_out);
+}
+
int ecp_conn_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
unsigned char payload[ECP_SIZE_PLD(0)];