summaryrefslogtreecommitdiff
path: root/code/core/rbuf_send.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2017-08-26 21:59:08 +0200
committerUros Majstorovic <majstor@majstor.org>2017-08-26 21:59:08 +0200
commitb83e58e21ea7dda57ddfda47bd1539d15abe687f (patch)
tree801339717059b1be494e872d90836b4e4564d462 /code/core/rbuf_send.c
parente800e8df9fbe633d09a534ae07eb361833572f1a (diff)
fragments and packet timestamp implemented
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r--code/core/rbuf_send.c87
1 files changed, 47 insertions, 40 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c
index e0c7f29..d582e67 100644
--- a/code/core/rbuf_send.c
+++ b/code/core/rbuf_send.c
@@ -5,7 +5,7 @@
#define NACK_RATE_UNIT 10000
static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH);
if (ti == NULL) {
@@ -127,7 +127,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {
unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size);
if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) {
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_pld_set_type(payload, ECP_MTYPE_NOP);
ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i);
} else {
@@ -235,7 +235,7 @@ int ecp_rbuf_send_start(ECPConnection *conn) {
int ecp_rbuf_flush(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
- unsigned char payload[ECP_SIZE_PLD(0)];
+ unsigned char payload[ECP_SIZE_PLD(0, 0)];
ecp_seq_t seq;
if (buf == NULL) return ECP_ERR;
@@ -269,7 +269,9 @@ int ecp_rbuf_flush(ECPConnection *conn) {
return ECP_OK;
}
-int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
+int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char *payload) {
+ if (si->rb_pass) return ECP_OK;
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
@@ -280,6 +282,7 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
if (idx < 0) return idx;
+ si->rb_mtype = ecp_pld_get_type(payload);
si->rb_idx = idx;
buf->rbuf.msg[idx].size = 0;
buf->rbuf.msg[idx].flags = 0;
@@ -288,54 +291,58 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si) {
}
ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, ECPSeqItem *si, unsigned char *packet, size_t pkt_size) {
- unsigned char flags = 0;
int do_send = 1;
ssize_t rv = 0;
- ecp_seq_t seq = si->seq;
- unsigned int idx = si->rb_idx;
- unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
-
- if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
-
- rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
- if (rv < 0) return rv;
+
+ if (!si->rb_pass) {
+ unsigned char flags = 0;
+ ecp_seq_t seq = si->seq;
+ unsigned int idx = si->rb_idx;
+ unsigned char mtype = si->rb_mtype & ECP_MTYPE_MASK;
- if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&buf->mutex);
-#endif
+ if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
- if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+ rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags);
+ if (rv < 0) return rv;
- if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
- if (!buf->cnt_cc) buf->seq_cc = seq;
- buf->cnt_cc++;
- buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
- do_send = 0;
- if (ti) {
- ECPRBTimer *timer = &buf->timer;
- ECPRBTimerItem *item = &timer->item[timer->idx_w];
+ if (buf->flags & ECP_RBUF_FLAG_CCONTROL) {
+ int _rv = ECP_OK;
+ #ifdef ECP_WITH_PTHREAD
+ pthread_mutex_lock(&buf->mutex);
+ #endif
+
+ if (ECP_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq;
+
+ if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) {
+ if (!buf->cnt_cc) buf->seq_cc = seq;
+ buf->cnt_cc++;
+ buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL;
+ do_send = 0;
+ if (ti) {
+ ECPRBTimer *timer = &buf->timer;
+ ECPRBTimerItem *item = &timer->item[timer->idx_w];
- if (!item->occupied) {
- item->occupied = 1;
- item->item = *ti;
- buf->rbuf.msg[idx].idx_t = timer->idx_w;
- timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ if (!item->occupied) {
+ item->occupied = 1;
+ item->item = *ti;
+ buf->rbuf.msg[idx].idx_t = timer->idx_w;
+ timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER;
+ } else {
+ _rv = ECP_ERR_MAX_TIMER;
+ }
} else {
- rv = ECP_ERR_MAX_TIMER;
+ buf->rbuf.msg[idx].idx_t = -1;
}
} else {
- buf->rbuf.msg[idx].idx_t = -1;
+ buf->in_transit++;
}
- } else {
- buf->in_transit++;
- }
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&buf->mutex);
-#endif
+ #ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&buf->mutex);
+ #endif
- if (rv) return rv;
+ if (_rv) return _rv;
+ }
}
if (do_send) {