summaryrefslogtreecommitdiff
path: root/code
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-19 19:16:18 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-19 19:16:18 +0200
commit050bf97c2dc6c22e7b3134e281ee892e5351439f (patch)
treedc44faa97f20c03bc0d3dd6d0a1e4d121c9562dd /code
parent5d20e9bafc3571f37eb0d9b74699d023d2d3d13a (diff)
timer improved; rbuf finally implemented
Diffstat (limited to 'code')
-rw-r--r--code/core/TODO4
-rw-r--r--code/core/htable/htable.c2
-rw-r--r--code/core/rbuf.c79
-rw-r--r--code/core/rbuf.h19
-rw-r--r--code/core/rbuf_recv.c5
-rw-r--r--code/core/rbuf_send.c105
-rw-r--r--code/core/timer.c14
7 files changed, 138 insertions, 90 deletions
diff --git a/code/core/TODO b/code/core/TODO
index b1e7d12..bd4cc70 100644
--- a/code/core/TODO
+++ b/code/core/TODO
@@ -6,8 +6,6 @@
rbuf:
- implement _wait variants of open / send
+- msgq should subtract ECP_MTYPE_SYS from mtype
- consider adding one buffer for all msgs (frag. issue)
-should implement:
-- send flush (seq)
-- sending ack with congestion control
diff --git a/code/core/htable/htable.c b/code/core/htable/htable.c
index 82dbcbc..6330ef8 100644
--- a/code/core/htable/htable.c
+++ b/code/core/htable/htable.c
@@ -26,6 +26,7 @@ static ECPConnection *h_search(void *h, unsigned char *k) {
return hashtable_search(h, k);
}
+#ifdef ECP_WITH_HTABLE
int ecp_htable_init(ECPHTableIface *h) {
h->init = 1;
h->create = h_create;
@@ -35,3 +36,4 @@ int ecp_htable_init(ECPHTableIface *h) {
h->search = h_search;
return ECP_OK;
}
+#endif \ No newline at end of file
diff --git a/code/core/rbuf.c b/code/core/rbuf.c
index ee9a7fc..f4ec3a4 100644
--- a/code/core/rbuf.c
+++ b/code/core/rbuf.c
@@ -44,6 +44,12 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch
return msg_size;
}
+int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) {
+ memset(rbuf_info, 0, sizeof(ECPRBInfo));
+
+ return ECP_OK;
+}
+
ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq) {
unsigned char packet[ECP_MAX_PKT];
ECPSocket *sock = conn->sock;
@@ -67,79 +73,6 @@ ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t pa
return rv;
}
-int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) {
- memset(rbuf_info, 0, sizeof(ECPRBInfo));
-
- return ECP_OK;
-}
-
-int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) {
- if (buf == NULL) return ECP_ERR;
-
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
-#endif
- int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
-#endif
-
- if (idx < 0) return idx;
-
- rbuf_info->seq = seq;
- rbuf_info->idx = idx;
- rbuf_info->mtype = mtype;
- buf->rbuf.msg[idx].size = 0;
- buf->rbuf.msg[idx].flags = 0;
-
- return ECP_OK;
-}
-
-ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) {
- unsigned char flags = 0;
- int do_send = 1;
- ssize_t rv = 0;
- ecp_seq_t seq = rbuf_info->seq;
- unsigned int idx = rbuf_info->idx;
- unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK;
-
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
-
- rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
- if (rv < 0) return rv;
-
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
-#endif
-
- if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
-
- if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
- if (!buf->cnt_cc) buf->seq_cc = seq;
- buf->cnt_cc++;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
- do_send = 0;
- } else {
- buf->in_transit++;
- }
-
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
-#endif
- }
-
- if (do_send) {
- int _rv;
- if (ti) {
- _rv = ecp_timer_push(ti);
- if (_rv) return _rv;
- }
- rv = ecp_pkt_send(sock, addr, packet, pkt_size);
- }
- return rv;
-}
-
int ecp_rbuf_conn_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {
int rv;
diff --git a/code/core/rbuf.h b/code/core/rbuf.h
index 1332dee..77663ad 100644
--- a/code/core/rbuf.h
+++ b/code/core/rbuf.h
@@ -35,6 +35,7 @@ typedef struct ECPRBMessage {
unsigned char msg[ECP_MAX_PKT];
ssize_t size;
unsigned char flags;
+ short idx_t;
} ECPRBMessage;
typedef struct ECPRBuffer {
@@ -45,6 +46,17 @@ typedef struct ECPRBuffer {
ECPRBMessage *msg;
} ECPRBuffer;
+
+typedef struct ECPRBTimerItem {
+ unsigned char occupied;
+ ECPTimerItem item;
+} ECPRBTimerItem;
+
+typedef struct ECPRBTimer {
+ ECPRBTimerItem item[ECP_MAX_TIMER];
+ unsigned short idx_w;
+} ECPRBTimer;
+
typedef struct ECPRBRecv {
unsigned char flags;
unsigned char flush;
@@ -72,6 +84,7 @@ typedef struct ECPRBSend {
ecp_seq_t seq_flush;
unsigned int nack_rate;
ECPRBuffer rbuf;
+ ECPRBTimer timer;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_t mutex;
#endif
@@ -93,11 +106,9 @@ int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);
int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);
int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);
ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags);
-ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);
int ecp_rbuf_info_init(ECPRBInfo *rbuf_info);
-int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info);
-ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info);
+ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);
int ecp_rbuf_conn_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);
void ecp_rbuf_conn_destroy(struct ECPConnection *conn);
@@ -113,6 +124,8 @@ ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned
int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);
void ecp_rbuf_send_destroy(struct ECPConnection *conn);
int ecp_rbuf_send_start(struct ECPConnection *conn);
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info);
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info);
ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c
index d78ffce..e73c602 100644
--- a/code/core/rbuf_recv.c
+++ b/code/core/rbuf_recv.c
@@ -221,11 +221,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
ecp_seq_t ack_pkt = 0;
ssize_t rv;
int do_ack = 0;
+ unsigned char mtype;
if (buf == NULL) return ECP_ERR;
if (msg_size < 1) return ECP_ERR_MIN_MSG;
- // XXX mtype == RBACK | RBFLUSH handle imediately
+ mtype = msg[0] & ECP_MTYPE_MASK;
+ if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size);
+
if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;
if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {
ecp_seq_t seq_offset = buf->seq_ack - seq;
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index e7020cb..b6cdbac 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -27,6 +27,9 @@ static void cc_flush(ECPConnection *conn) {
ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;
int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;
int i;
+
+ ECPTimerItem ti[ECP_MAX_TIMER];
+ unsigned short max_t = 0;
if (pkt_to_send) {
unsigned int idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
@@ -35,6 +38,15 @@ static void cc_flush(ECPConnection *conn) {
for (i=0; i<pkt_to_send; i++) {
if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
+ if (rbuf->msg[_idx].idx_t != -1) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t];
+
+ item->occupied = 0;
+ ti[max_t] = item->item;
+ rbuf->msg[_idx].idx_t = max_t;
+ max_t++;
+ }
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
pkt_to_send = i;
@@ -45,6 +57,7 @@ static void cc_flush(ECPConnection *conn) {
#endif
for (i=0; i<pkt_to_send; i++) {
+ if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]);
ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
@@ -114,7 +127,9 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
-
+ unsigned char payload[ECP_SIZE_PLD(0)];
+ ecp_pld_set_type(payload, ECP_MTYPE_NOP);
+ ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i);
} else {
ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
}
@@ -147,9 +162,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
}
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
- if (buf->flush) {
- do_flush = 1;
- }
+ if (buf->flush) do_flush = 1;
}
if (buf->cnt_cc) cc_flush(conn);
}
@@ -250,9 +263,91 @@ int ecp_rbuf_flush(ECPConnection *conn) {
pthread_mutex_unlock(&buf->mutex);
#endif
- ssize_t rv = flush_send(conn, 0);
+ ssize_t rv = flush_send(conn, NULL);
if (rv < 0) return rv;
return ECP_OK;
}
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) {
+ if (buf == NULL) return ECP_ERR;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (idx < 0) return idx;
+
+ rbuf_info->seq = seq;
+ rbuf_info->idx = idx;
+ rbuf_info->mtype = mtype;
+ buf->rbuf.msg[idx].size = 0;
+ buf->rbuf.msg[idx].flags = 0;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) {
+ unsigned char flags = 0;
+ int do_send = 1;
+ ssize_t rv = 0;
+ ecp_seq_t seq = rbuf_info->seq;
+ unsigned int idx = rbuf_info->idx;
+ unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK;
+
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ if (rv < 0) return rv;
+
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cnt_cc) buf->seq_cc = seq;
+ buf->cnt_cc++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
+ do_send = 0;
+ if (ti) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[timer->idx_w];
+
+ if (!item->occupied) {
+ item->occupied = 1;
+ item->item = *ti;
+ buf->rbuf.msg[idx].idx_t = timer->idx_w;
+ timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ } else {
+ rv = ECP_ERR_MAX_TIMER;
+ }
+ } else {
+ buf->rbuf.msg[idx].idx_t = -1;
+ }
+ } else {
+ buf->in_transit++;
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (rv) return rv;
+ }
+
+ if (do_send) {
+ if (ti) {
+ int _rv = ecp_timer_push(ti);
+ if (_rv) return _rv;
+ }
+ rv = ecp_pkt_send(sock, addr, packet, pkt_size);
+ }
+ return rv;
+}
diff --git a/code/core/timer.c b/code/core/timer.c
index a2d3fa5..ea516df 100644
--- a/code/core/timer.c
+++ b/code/core/timer.c
@@ -21,15 +21,16 @@ void ecp_timer_destroy(ECPTimer *timer) {
}
int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout) {
- unsigned int abstime = conn->sock->ctx->tm.abstime_ms(timeout);
-
if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE;
+ if (ti == NULL) return ECP_ERR;
+ if (conn == NULL) return ECP_ERR;
+
ti->conn = conn;
ti->mtype = mtype;
ti->cnt = cnt-1;
ti->timeout = timeout;
- ti->abstime = abstime;
+ ti->abstime = 0;
ti->retry = NULL;
ti->pld = NULL;
ti->pld_size = 0;
@@ -40,7 +41,10 @@ int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mty
int ecp_timer_push(ECPTimerItem *ti) {
int i, is_reg, rv = ECP_OK;
ECPConnection *conn = ti->conn;
+ ECPContext *ctx = conn->sock->ctx;
ECPTimer *timer = &conn->sock->timer;
+
+ ti->abstime = ctx->tm.abstime_ms(ti->timeout);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
@@ -181,7 +185,6 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
for (i=to_exec_size-1; i>=0; i--) {
int rv = ECP_OK;
ECPConnection *conn = to_exec[i].conn;
- ECPSocket *sock = conn->sock;
unsigned char mtype = to_exec[i].mtype;
unsigned char *pld = to_exec[i].pld;
unsigned char pld_size = to_exec[i].pld_size;
@@ -191,7 +194,6 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
if (to_exec[i].cnt) {
ssize_t _rv = 0;
to_exec[i].cnt--;
- to_exec[i].abstime = now + to_exec[i].timeout;
if (retry) {
_rv = retry(conn, to_exec+i);
if (_rv < 0) rv = _rv;
@@ -203,6 +205,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
} else if (handler) {
handler(conn, 0, mtype, NULL, ECP_ERR_TIMEOUT);
}
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
@@ -210,6 +213,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&conn->mutex);
#endif
+
}
return ret;