summaryrefslogtreecommitdiff
path: root/code/core/rbuf.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-13 20:17:30 +0200
commit8a59506a23eb3133fa510c19993865f661aec0f4 (patch)
tree9e4bef53e7694f8e6c6177cac2859f84db486cb0 /code/core/rbuf.c
parent37fa8fc50753900f536e7ce857a63ac557e572ba (diff)
rbuf congestion controll implemented
Diffstat (limited to 'code/core/rbuf.c')
-rw-r--r--code/core/rbuf.c73
1 files changed, 54 insertions, 19 deletions
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index d92c3f1..f13e06e 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -15,6 +15,13 @@ int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size) {
return ECP_OK;
}
+int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) {
+ rbuf->seq_max = seq;
+ rbuf->seq_start = seq + 1;
+
+ return ECP_OK;
+}
+
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) {
ecp_seq_t seq_offset = seq - rbuf->seq_start;
@@ -39,37 +46,65 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch
return msg_size;
}
-int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
- int rv = ecp_conn_rbuf_send_start(conn);
- if (rv) return rv;
-
- if (!conn->out) {
- rv = ecp_conn_rbuf_recv_start(conn, seq);
- if (rv) return rv;
- }
-
- return ECP_OK;
-}
-
ssize_t ecp_conn_rbuf_pkt_send(ECPConnection *conn, ECPNetAddr *addr, unsigned char *packet, size_t pkt_size, ecp_seq_t seq, int idx) {
- int do_send;
+ int do_send = 1;
ECPRBSend *buf = conn->rbuf.send;
ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, 0);
if (rv < 0) return rv;
+ if (buf->win_size) {
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
+ pthread_mutex_lock(&buf->mutex);
#endif
- if (buf->in_transit < buf->win_size) {
- buf->in_transit++;
- do_send = 1;
- }
+
+ if (buf->cc_wait || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cc_wait) buf->seq_cc = seq;
+ buf->cc_wait++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_CCWAIT;
+ do_send = 0;
+ } else {
+ buf->in_transit++;
+ }
+
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
+ pthread_mutex_unlock(&buf->mutex);
#endif
+ }
if (do_send) rv = ecp_pkt_send(conn->sock, addr, packet, pkt_size);
return rv;
}
+int ecp_conn_rbuf_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
+ int rv;
+
+ rv = ecp_conn_rbuf_send_create(conn, buf_s, msg_s, msg_s_size);
+ if (rv) return rv;
+
+ rv = ecp_conn_rbuf_recv_create(conn, buf_r, msg_r, msg_r_size);
+ if (rv) {
+ ecp_conn_rbuf_send_destroy(conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}
+
+void ecp_conn_rbuf_destroy(ECPConnection *conn) {
+ ecp_conn_rbuf_send_destroy(conn);
+ ecp_conn_rbuf_recv_destroy(conn);
+}
+
+int ecp_conn_rbuf_start(ECPConnection *conn, ecp_seq_t seq) {
+ int rv = ecp_conn_rbuf_send_start(conn);
+ if (rv) return rv;
+
+ if (!conn->out) {
+ rv = ecp_conn_rbuf_recv_start(conn, seq);
+ if (rv) return rv;
+ }
+
+ return ECP_OK;
+}
+