summaryrefslogtreecommitdiff
path: root/code/core/rbuf_send.c
diff options
context:
space:
mode:
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r--code/core/rbuf_send.c105
1 files changed, 100 insertions, 5 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index e7020cb..b6cdbac 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -27,6 +27,9 @@ static void cc_flush(ECPConnection *conn) {
ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;
int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;
int i;
+
+ ECPTimerItem ti[ECP_MAX_TIMER];
+ unsigned short max_t = 0;
if (pkt_to_send) {
unsigned int idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
@@ -35,6 +38,15 @@ static void cc_flush(ECPConnection *conn) {
for (i=0; i<pkt_to_send; i++) {
if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;
rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL;
+ if (rbuf->msg[_idx].idx_t != -1) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t];
+
+ item->occupied = 0;
+ ti[max_t] = item->item;
+ rbuf->msg[_idx].idx_t = max_t;
+ max_t++;
+ }
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
pkt_to_send = i;
@@ -45,6 +57,7 @@ static void cc_flush(ECPConnection *conn) {
#endif
for (i=0; i<pkt_to_send; i++) {
+ if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]);
ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
_idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);
}
@@ -114,7 +127,9 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);
if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
-
+ unsigned char payload[ECP_SIZE_PLD(0)];
+ ecp_pld_set_type(payload, ECP_MTYPE_NOP);
+ ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i);
} else {
ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);
}
@@ -147,9 +162,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
}
if (buf->flush) {
if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0;
- if (buf->flush) {
- do_flush = 1;
- }
+ if (buf->flush) do_flush = 1;
}
if (buf->cnt_cc) cc_flush(conn);
}
@@ -250,9 +263,91 @@ int ecp_rbuf_flush(ECPConnection *conn) {
pthread_mutex_unlock(&buf->mutex);
#endif
- ssize_t rv = flush_send(conn, 0);
+ ssize_t rv = flush_send(conn, NULL);
if (rv < 0) return rv;
return ECP_OK;
}
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) {
+ if (buf == NULL) return ECP_ERR;
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+ int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq);
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (idx < 0) return idx;
+
+ rbuf_info->seq = seq;
+ rbuf_info->idx = idx;
+ rbuf_info->mtype = mtype;
+ buf->rbuf.msg[idx].size = 0;
+ buf->rbuf.msg[idx].flags = 0;
+
+ return ECP_OK;
+}
+
+ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) {
+ unsigned char flags = 0;
+ int do_send = 1;
+ ssize_t rv = 0;
+ ecp_seq_t seq = rbuf_info->seq;
+ unsigned int idx = rbuf_info->idx;
+ unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK;
+
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
+
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ if (rv < 0) return rv;
+
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+#endif
+
+ if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cnt_cc) buf->seq_cc = seq;
+ buf->cnt_cc++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
+ do_send = 0;
+ if (ti) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[timer->idx_w];
+
+ if (!item->occupied) {
+ item->occupied = 1;
+ item->item = *ti;
+ buf->rbuf.msg[idx].idx_t = timer->idx_w;
+ timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ } else {
+ rv = ECP_ERR_MAX_TIMER;
+ }
+ } else {
+ buf->rbuf.msg[idx].idx_t = -1;
+ }
+ } else {
+ buf->in_transit++;
+ }
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+#endif
+
+ if (rv) return rv;
+ }
+
+ if (do_send) {
+ if (ti) {
+ int _rv = ecp_timer_push(ti);
+ if (_rv) return _rv;
+ }
+ rv = ecp_pkt_send(sock, addr, packet, pkt_size);
+ }
+ return rv;
+}