summaryrefslogtreecommitdiff
path: root/code/ecp
diff options
context:
space:
mode:
Diffstat (limited to 'code/ecp')
-rw-r--r--code/ecp/core.c2
-rw-r--r--code/ecp/fe310/transport.c16
-rw-r--r--code/ecp/posix/transport.c6
-rw-r--r--code/ecp/rbuf_recv.c48
-rw-r--r--code/ecp/rbuf_send.c46
-rw-r--r--code/ecp/tr.h6
6 files changed, 63 insertions, 61 deletions
diff --git a/code/ecp/core.c b/code/ecp/core.c
index b9efec4..77b7a96 100644
--- a/code/ecp/core.c
+++ b/code/ecp/core.c
@@ -1215,7 +1215,7 @@ ssize_t ecp_unpack(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP
(payload[2] << 8) | \
(payload[3]);
- if ((payload[ECP_SIZE_PLD_HDR] & ECP_MTYPE_MASK) < ECP_MAX_MTYPE_SYS) ecp_tr_buf_free(bufs, ECP_SEND_FLAG_MORE);
+ if ((payload[ECP_SIZE_PLD_HDR] & ECP_MTYPE_MASK) < ECP_MAX_MTYPE_SYS) ecp_tr_release(bufs->packet, 1);
if (conn == NULL) {
if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_OPEN_REQ) {
is_new = 1;
diff --git a/code/ecp/fe310/transport.c b/code/ecp/fe310/transport.c
index 3493966..a0b4a05 100644
--- a/code/ecp/fe310/transport.c
+++ b/code/ecp/fe310/transport.c
@@ -94,19 +94,19 @@ ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int ti
return ECP_ERR;
}
-void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags) {
- if (b && b->packet && b->packet->buffer) {
- eos_net_free(b->packet->buffer-EOS_SOCK_SIZE_UDP_HDR, flags & ECP_SEND_FLAG_MORE);
- b->packet->buffer = NULL;
+void ecp_tr_release(ECPBuffer *packet, unsigned char more) {
+ if (packet && packet->buffer) {
+ eos_net_free(packet->buffer-EOS_SOCK_SIZE_UDP_HDR, more);
+ packet->buffer = NULL;
+ } else if (!more) {
+ eos_net_release();
}
}
-void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags) {
+void ecp_tr_flag_set(unsigned char flags) {
_flags |= flags;
- if (flags & ECP_SEND_FLAG_MORE) ecp_tr_buf_free(b, flags);
}
-void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags) {
+void ecp_tr_flag_clear(unsigned char flags) {
_flags &= ~flags;
- if (flags & ECP_SEND_FLAG_MORE) eos_net_release();
}
diff --git a/code/ecp/posix/transport.c b/code/ecp/posix/transport.c
index 3c9b85b..2032ce2 100644
--- a/code/ecp/posix/transport.c
+++ b/code/ecp/posix/transport.c
@@ -107,7 +107,7 @@ ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int ti
return ECP_ERR_TIMEOUT;
}
-void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags) {}
-void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags) {}
-void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags) {}
+void ecp_tr_release(ECPBuffer *packet, unsigned char more) {}
+void ecp_tr_flag_set(unsigned char flags) {}
+void ecp_tr_flag_clear(unsigned char flags) {}
diff --git a/code/ecp/rbuf_recv.c b/code/ecp/rbuf_recv.c
index 4bd76cc..b9b16a2 100644
--- a/code/ecp/rbuf_recv.c
+++ b/code/ecp/rbuf_recv.c
@@ -9,9 +9,9 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
ECPRBRecv *buf = conn->rbuf.recv;
unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;
unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
-
+
if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS;
-
+
#ifdef ECP_WITH_MSGQ
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
int rv = ECP_OK;
@@ -20,7 +20,7 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg,
ecp_seq_t seq_offset = seq - buf->msgq.seq_start;
if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL;
pthread_mutex_unlock(&buf->msgq.mutex);
-
+
if (rv) return rv;
}
#endif
@@ -42,15 +42,15 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
#endif
ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;
- ecp_seq_t seq_next = buf->rbuf.seq_start;
+ ecp_seq_t seq_next = buf->rbuf.seq_start;
ecp_seq_t i = 0;
unsigned int idx = buf->rbuf.msg_start;
-
+
if (buf->timer_pts) {
ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS);
buf->timer_pts = 0;
}
-
+
for (i=0; i<msg_cnt; i++) {
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {
if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) {
@@ -65,7 +65,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot);
if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) {
ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1);
-
+
if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) {
ecp_seq_t seq_fbeg = seq - frag_cnt;
ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next;
@@ -91,8 +91,8 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) {
idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size);
break;
}
- }
-
+ }
+
seq_next = seq + 1;
if (buf->flags & ECP_RBUF_FLAG_MSGQ) {
#ifdef ECP_WITH_MSGQ
@@ -132,7 +132,7 @@ static int ack_send(ECPConnection *conn) {
unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)];
unsigned char *buf_ = ecp_pld_get_buf(pld_buf, ECP_MTYPE_RBACK);
ssize_t rv;
-
+
packet.buffer = pkt_buf;
packet.size = ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn);
payload.buffer = pld_buf;
@@ -161,7 +161,7 @@ static int ack_shift(ECPRBRecv *buf) {
int in_rbuf = 0;
int idx;
int i;
-
+
if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0;
while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) {
@@ -169,7 +169,7 @@ static int ack_shift(ECPRBRecv *buf) {
in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF;
if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue;
-
+
buf->ack_map = buf->ack_map << 1;
if (in_rbuf) {
buf->ack_map |= 1;
@@ -179,7 +179,7 @@ static int ack_shift(ECPRBRecv *buf) {
if ((buf->ack_map & ACK_MASK_FIRST) == 0) break;
}
-
+
if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) {
ecp_ack_t hole_mask = buf->ack_map;
@@ -191,17 +191,17 @@ static int ack_shift(ECPRBRecv *buf) {
}
}
}
-
+
return do_ack;
}
ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
if (size < 0) return size;
-
+
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
- ecp_tr_buf_free(b, ECP_SEND_FLAG_MORE);
+ ecp_tr_release(b->packet, 1);
ack_send(conn);
return 0;
}
@@ -209,7 +209,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char
ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) {
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
-
+
buf->timer_pts = 0;
msg_flush(conn, b);
return 0;
@@ -221,7 +221,7 @@ int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg,
memset(buf, 0, sizeof(ECPRBRecv));
rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
if (rv) return rv;
-
+
buf->ack_map = ECP_ACK_FULL;
buf->ack_rate = ACK_RATE;
@@ -236,7 +236,7 @@ int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg,
void ecp_rbuf_recv_destroy(ECPConnection *conn) {
ECPRBRecv *buf = conn->rbuf.recv;
-
+
if (buf == NULL) return;
#ifdef ECP_WITH_MSGQ
ecp_conn_msgq_destroy(&buf->msgq);
@@ -250,7 +250,7 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
ECPRBRecv *buf = conn->rbuf.recv;
if (buf == NULL) return ECP_ERR;
-
+
seq--;
buf->seq_ack = seq;
rv = ecp_rbuf_start(&buf->rbuf, seq);
@@ -262,7 +262,7 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) {
if (rv) return rv;
}
#endif
-
+
return ECP_OK;
}
@@ -272,7 +272,7 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) {
buf->hole_max = hole_max;
buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2));
buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1));
-
+
return ECP_OK;
}
@@ -280,7 +280,7 @@ int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) {
ECPRBRecv *buf = conn->rbuf.recv;
buf->deliver_delay = delay;
-
+
return ECP_OK;
}
@@ -289,7 +289,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m
ecp_seq_t ack_pkt = 0;
ssize_t rv;
unsigned char mtype;
-
+
mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK;
if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b);
diff --git a/code/ecp/rbuf_send.c b/code/ecp/rbuf_send.c
index 82c4272..39fed02 100644
--- a/code/ecp/rbuf_send.c
+++ b/code/ecp/rbuf_send.c
@@ -8,7 +8,7 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) {
ECPBuffer payload;
unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, conn)];
unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_RBFLUSH, conn)];
-
+
packet.buffer = pkt_buf;
packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, conn);
payload.buffer = pld_buf;
@@ -37,7 +37,7 @@ static void cc_flush(ECPConnection *conn) {
ECPTimerItem ti[ECP_MAX_TIMER];
unsigned short max_t = 0;
-
+
if (pkt_to_send) {
unsigned int idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc);
unsigned int _idx = idx;
@@ -78,7 +78,7 @@ static void cc_flush(ECPConnection *conn) {
#endif
buf->in_transit += (ecp_win_t)pkt_to_send;
- buf->cnt_cc -= (ecp_win_t)pkt_to_send;
+ buf->cnt_cc -= (ecp_win_t)pkt_to_send;
buf->seq_cc += (ecp_seq_t)pkt_to_send;
}
}
@@ -91,7 +91,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
int i;
int do_flush = 0;
int rv = ECP_OK;
-
+
buf = conn->rbuf.send;
if (buf == NULL) return size;
if (size < 0) return size;
@@ -108,7 +108,8 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
(msg[6] << 8) | \
(msg[7]);
- ecp_tr_buf_flag_set(b, ECP_SEND_FLAG_MORE);
+ eos_tr_release(b->packet, 1);
+ ecp_tr_flag_set(ECP_SEND_FLAG_MORE);
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
@@ -117,7 +118,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
ECPRBuffer *rbuf = &buf->rbuf;
int idx = ecp_rbuf_msg_idx(rbuf, seq_ack);
if (idx < 0) rv = idx;
-
+
if (!rv) {
seq_ack++;
if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1;
@@ -127,13 +128,13 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
unsigned int msg_start;
ecp_seq_t seq_start;
ecp_win_t nack_cnt = 0;
-
+
seq_ack -= ECP_SIZE_ACKB;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
#endif
-
+
for (i=0; i<ECP_SIZE_ACKB; i++) {
if ((ack_map & ((ecp_ack_t)1 << (ECP_SIZE_ACKB - i - 1))) == 0) {
nack_cnt++;
@@ -144,7 +145,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
ECPBuffer payload;
unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)];
unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)];
-
+
packet.buffer = pkt_buf;
packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn);
payload.buffer = pld_buf;
@@ -201,7 +202,8 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
if (_rv < 0) rv = _rv;
}
- ecp_tr_buf_flag_clear(b, ECP_SEND_FLAG_MORE);
+ ecp_tr_flag_clear(ECP_SEND_FLAG_MORE);
+ eos_tr_release(b->packet, 0);
if (rv) return rv;
return rsize;
@@ -209,7 +211,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt
int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) {
int rv;
-
+
memset(buf, 0, sizeof(ECPRBRecv));
rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size);
if (rv) return rv;
@@ -225,13 +227,13 @@ int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg,
void ecp_rbuf_send_destroy(ECPConnection *conn) {
ECPRBSend *buf = conn->rbuf.send;
-
+
if (buf == NULL) return;
-
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_destroy(&buf->mutex);
#endif
-
+
conn->rbuf.send = NULL;
}
@@ -247,11 +249,11 @@ int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) {
ECPRBSend *buf = conn->rbuf.send;
if (buf == NULL) return ECP_ERR;
-
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&buf->mutex);
#endif
-
+
buf->win_size = size;
if (buf->cnt_cc) cc_flush(conn);
@@ -309,19 +311,19 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char mtype) {
#endif
if (idx < 0) return idx;
-
+
si->rb_mtype = mtype;
si->rb_idx = idx;
buf->rbuf.msg[idx].size = 0;
buf->rbuf.msg[idx].flags = 0;
- return ECP_OK;
+ return ECP_OK;
}
ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) {
int do_send = 1;
ssize_t rv = 0;
-
+
if (!si->rb_pass) {
unsigned char flags = 0;
ecp_seq_t seq = si->seq;
@@ -350,7 +352,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP
if (ti) {
ECPRBTimer *timer = &buf->timer;
ECPRBTimerItem *item = &timer->item[timer->idx_w];
-
+
if (!item->occupied) {
item->occupied = 1;
item->item = *ti;
@@ -365,11 +367,11 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP
} else {
buf->in_transit++;
}
-
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&buf->mutex);
#endif
-
+
if (_rv) return _rv;
}
}
diff --git a/code/ecp/tr.h b/code/ecp/tr.h
index e39dec0..116590b 100644
--- a/code/ecp/tr.h
+++ b/code/ecp/tr.h
@@ -5,6 +5,6 @@ int ecp_tr_open(ECPSocket *sock, void *addr_s);
void ecp_tr_close(ECPSocket *sock);
ssize_t ecp_tr_send(ECPSocket *sock, ECPBuffer *packet, size_t msg_size, ECPNetAddr *addr, unsigned char flags);
ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int timeout);
-void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags);
-void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags);
-void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags);
+void ecp_tr_release(ECPBuffer *packet, unsigned char more);
+void ecp_tr_flag_set(unsigned char flags);
+void ecp_tr_flag_clear(unsigned char flags);