From 702f3716075528d6ad390615b5df9b79264b40bb Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Sat, 19 Oct 2019 20:01:25 +0200 Subject: fixed transport buf free/flag set interface --- code/ecp/core.c | 2 +- code/ecp/fe310/transport.c | 16 ++++++++-------- code/ecp/posix/transport.c | 6 +++--- code/ecp/rbuf_recv.c | 48 +++++++++++++++++++++++----------------------- code/ecp/rbuf_send.c | 46 +++++++++++++++++++++++--------------------- code/ecp/tr.h | 6 +++--- 6 files changed, 63 insertions(+), 61 deletions(-) diff --git a/code/ecp/core.c b/code/ecp/core.c index b9efec4..77b7a96 100644 --- a/code/ecp/core.c +++ b/code/ecp/core.c @@ -1215,7 +1215,7 @@ ssize_t ecp_unpack(ECPSocket *sock, ECPNetAddr *addr, ECPConnection *parent, ECP (payload[2] << 8) | \ (payload[3]); - if ((payload[ECP_SIZE_PLD_HDR] & ECP_MTYPE_MASK) < ECP_MAX_MTYPE_SYS) ecp_tr_buf_free(bufs, ECP_SEND_FLAG_MORE); + if ((payload[ECP_SIZE_PLD_HDR] & ECP_MTYPE_MASK) < ECP_MAX_MTYPE_SYS) ecp_tr_release(bufs->packet, 1); if (conn == NULL) { if (payload[ECP_SIZE_PLD_HDR] == ECP_MTYPE_OPEN_REQ) { is_new = 1; diff --git a/code/ecp/fe310/transport.c b/code/ecp/fe310/transport.c index 3493966..a0b4a05 100644 --- a/code/ecp/fe310/transport.c +++ b/code/ecp/fe310/transport.c @@ -94,19 +94,19 @@ ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int ti return ECP_ERR; } -void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags) { - if (b && b->packet && b->packet->buffer) { - eos_net_free(b->packet->buffer-EOS_SOCK_SIZE_UDP_HDR, flags & ECP_SEND_FLAG_MORE); - b->packet->buffer = NULL; +void ecp_tr_release(ECPBuffer *packet, unsigned char more) { + if (packet && packet->buffer) { + eos_net_free(packet->buffer-EOS_SOCK_SIZE_UDP_HDR, more); + packet->buffer = NULL; + } else if (!more) { + eos_net_release(); } } -void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags) { +void ecp_tr_flag_set(unsigned char flags) { _flags |= flags; - if (flags & ECP_SEND_FLAG_MORE) ecp_tr_buf_free(b, flags); } -void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags) { +void ecp_tr_flag_clear(unsigned char flags) { _flags &= ~flags; - if (flags & ECP_SEND_FLAG_MORE) eos_net_release(); } diff --git a/code/ecp/posix/transport.c b/code/ecp/posix/transport.c index 3c9b85b..2032ce2 100644 --- a/code/ecp/posix/transport.c +++ b/code/ecp/posix/transport.c @@ -107,7 +107,7 @@ ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int ti return ECP_ERR_TIMEOUT; } -void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags) {} -void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags) {} -void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags) {} +void ecp_tr_release(ECPBuffer *packet, unsigned char more) {} +void ecp_tr_flag_set(unsigned char flags) {} +void ecp_tr_flag_clear(unsigned char flags) {} diff --git a/code/ecp/rbuf_recv.c b/code/ecp/rbuf_recv.c index 4bd76cc..b9b16a2 100644 --- a/code/ecp/rbuf_recv.c +++ b/code/ecp/rbuf_recv.c @@ -9,9 +9,9 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, ECPRBRecv *buf = conn->rbuf.recv; unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; unsigned char mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; - + if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - + #ifdef ECP_WITH_MSGQ if (buf->flags & ECP_RBUF_FLAG_MSGQ) { int rv = ECP_OK; @@ -20,7 +20,7 @@ static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, ecp_seq_t seq_offset = seq - buf->msgq.seq_start; if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL; pthread_mutex_unlock(&buf->msgq.mutex); - + if (rv) return rv; } #endif @@ -42,15 +42,15 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { #endif ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; - ecp_seq_t seq_next = buf->rbuf.seq_start; + ecp_seq_t seq_next = buf->rbuf.seq_start; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; - + if (buf->timer_pts) { ecp_timer_pop(conn, ECP_MTYPE_RBFLUSH_PTS); buf->timer_pts = 0; } - + for (i=0; irbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_SYS) { @@ -65,7 +65,7 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { rv = ecp_msg_get_frag(buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size, &frag_cnt, &frag_tot); if ((rv == ECP_OK) && (frag_cnt != 0) && (seq != seq_next)) { ecp_seq_t seq_fend = seq + (ecp_seq_t)(frag_tot - frag_cnt - 1); - + if (ECP_SEQ_LT(buf->rbuf.seq_max, seq_fend) || (buf->hole_max && ((ecp_seq_t)(buf->rbuf.seq_max - seq_fend) <= buf->hole_max))) { ecp_seq_t seq_fbeg = seq - frag_cnt; ecp_seq_t seq_offset = ECP_SEQ_LT(seq_next, seq_fbeg) ? seq - seq_fbeg : seq - seq_next; @@ -91,8 +91,8 @@ static void msg_flush(ECPConnection *conn, ECP2Buffer *b) { idx = ECP_RBUF_IDX_MASK(idx - seq_offset, buf->rbuf.msg_size); break; } - } - + } + seq_next = seq + 1; if (buf->flags & ECP_RBUF_FLAG_MSGQ) { #ifdef ECP_WITH_MSGQ @@ -132,7 +132,7 @@ static int ack_send(ECPConnection *conn) { unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn)]; unsigned char *buf_ = ecp_pld_get_buf(pld_buf, ECP_MTYPE_RBACK); ssize_t rv; - + packet.buffer = pkt_buf; packet.size = ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, conn); payload.buffer = pld_buf; @@ -161,7 +161,7 @@ static int ack_shift(ECPRBRecv *buf) { int in_rbuf = 0; int idx; int i; - + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; while (ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_max)) { @@ -169,7 +169,7 @@ static int ack_shift(ECPRBRecv *buf) { in_rbuf = ECP_SEQ_LT(buf->seq_ack, buf->rbuf.seq_start) ? 1 : buf->rbuf.msg[ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + buf->seq_ack - buf->rbuf.seq_start, buf->rbuf.msg_size)].flags & ECP_RBUF_FLAG_IN_RBUF; if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; - + buf->ack_map = buf->ack_map << 1; if (in_rbuf) { buf->ack_map |= 1; @@ -179,7 +179,7 @@ static int ack_shift(ECPRBRecv *buf) { if ((buf->ack_map & ACK_MASK_FIRST) == 0) break; } - + if (!do_ack && (buf->seq_ack == buf->rbuf.seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) { ecp_ack_t hole_mask = buf->ack_map; @@ -191,17 +191,17 @@ static int ack_shift(ECPRBRecv *buf) { } } } - + return do_ack; } ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { if (size < 0) return size; - + ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; - ecp_tr_buf_free(b, ECP_SEND_FLAG_MORE); + ecp_tr_release(b->packet, 1); ack_send(conn); return 0; } @@ -209,7 +209,7 @@ ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char ssize_t ecp_rbuf_handle_flush_pts(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; - + buf->timer_pts = 0; msg_flush(conn, b); return 0; @@ -221,7 +221,7 @@ int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, memset(buf, 0, sizeof(ECPRBRecv)); rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); if (rv) return rv; - + buf->ack_map = ECP_ACK_FULL; buf->ack_rate = ACK_RATE; @@ -236,7 +236,7 @@ int ecp_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, void ecp_rbuf_recv_destroy(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; - + if (buf == NULL) return; #ifdef ECP_WITH_MSGQ ecp_conn_msgq_destroy(&buf->msgq); @@ -250,7 +250,7 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; - + seq--; buf->seq_ack = seq; rv = ecp_rbuf_start(&buf->rbuf, seq); @@ -262,7 +262,7 @@ int ecp_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { if (rv) return rv; } #endif - + return ECP_OK; } @@ -272,7 +272,7 @@ int ecp_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { buf->hole_max = hole_max; buf->hole_mask_full = ~(~((ecp_ack_t)1) << (hole_max * 2)); buf->hole_mask_empty = ~(~((ecp_ack_t)1) << (hole_max + 1)); - + return ECP_OK; } @@ -280,7 +280,7 @@ int ecp_rbuf_recv_set_delay(ECPConnection *conn, ecp_pts_t delay) { ECPRBRecv *buf = conn->rbuf.recv; buf->deliver_delay = delay; - + return ECP_OK; } @@ -289,7 +289,7 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m ecp_seq_t ack_pkt = 0; ssize_t rv; unsigned char mtype; - + mtype = ecp_msg_get_type(msg) & ECP_MTYPE_MASK; if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size, b); diff --git a/code/ecp/rbuf_send.c b/code/ecp/rbuf_send.c index 82c4272..39fed02 100644 --- a/code/ecp/rbuf_send.c +++ b/code/ecp/rbuf_send.c @@ -8,7 +8,7 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { ECPBuffer payload; unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, conn)]; unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_RBFLUSH, conn)]; - + packet.buffer = pkt_buf; packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, conn); payload.buffer = pld_buf; @@ -37,7 +37,7 @@ static void cc_flush(ECPConnection *conn) { ECPTimerItem ti[ECP_MAX_TIMER]; unsigned short max_t = 0; - + if (pkt_to_send) { unsigned int idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc); unsigned int _idx = idx; @@ -78,7 +78,7 @@ static void cc_flush(ECPConnection *conn) { #endif buf->in_transit += (ecp_win_t)pkt_to_send; - buf->cnt_cc -= (ecp_win_t)pkt_to_send; + buf->cnt_cc -= (ecp_win_t)pkt_to_send; buf->seq_cc += (ecp_seq_t)pkt_to_send; } } @@ -91,7 +91,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt int i; int do_flush = 0; int rv = ECP_OK; - + buf = conn->rbuf.send; if (buf == NULL) return size; if (size < 0) return size; @@ -108,7 +108,8 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt (msg[6] << 8) | \ (msg[7]); - ecp_tr_buf_flag_set(b, ECP_SEND_FLAG_MORE); + eos_tr_release(b->packet, 1); + ecp_tr_flag_set(ECP_SEND_FLAG_MORE); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); @@ -117,7 +118,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt ECPRBuffer *rbuf = &buf->rbuf; int idx = ecp_rbuf_msg_idx(rbuf, seq_ack); if (idx < 0) rv = idx; - + if (!rv) { seq_ack++; if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1; @@ -127,13 +128,13 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt unsigned int msg_start; ecp_seq_t seq_start; ecp_win_t nack_cnt = 0; - + seq_ack -= ECP_SIZE_ACKB; #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); #endif - + for (i=0; ipacket, 0); if (rv) return rv; return rsize; @@ -209,7 +211,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; - + memset(buf, 0, sizeof(ECPRBRecv)); rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); if (rv) return rv; @@ -225,13 +227,13 @@ int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, void ecp_rbuf_send_destroy(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; - + if (buf == NULL) return; - + #ifdef ECP_WITH_PTHREAD pthread_mutex_destroy(&buf->mutex); #endif - + conn->rbuf.send = NULL; } @@ -247,11 +249,11 @@ int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return ECP_ERR; - + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif - + buf->win_size = size; if (buf->cnt_cc) cc_flush(conn); @@ -309,19 +311,19 @@ int ecp_rbuf_pkt_prep(ECPRBSend *buf, ECPSeqItem *si, unsigned char mtype) { #endif if (idx < 0) return idx; - + si->rb_mtype = mtype; si->rb_idx = idx; buf->rbuf.msg[idx].size = 0; buf->rbuf.msg[idx].flags = 0; - return ECP_OK; + return ECP_OK; } ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPBuffer *packet, size_t pkt_size, unsigned char flags, ECPTimerItem *ti, ECPSeqItem *si) { int do_send = 1; ssize_t rv = 0; - + if (!si->rb_pass) { unsigned char flags = 0; ecp_seq_t seq = si->seq; @@ -350,7 +352,7 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP if (ti) { ECPRBTimer *timer = &buf->timer; ECPRBTimerItem *item = &timer->item[timer->idx_w]; - + if (!item->occupied) { item->occupied = 1; item->item = *ti; @@ -365,11 +367,11 @@ ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECP } else { buf->in_transit++; } - + #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); #endif - + if (_rv) return _rv; } } diff --git a/code/ecp/tr.h b/code/ecp/tr.h index e39dec0..116590b 100644 --- a/code/ecp/tr.h +++ b/code/ecp/tr.h @@ -5,6 +5,6 @@ int ecp_tr_open(ECPSocket *sock, void *addr_s); void ecp_tr_close(ECPSocket *sock); ssize_t ecp_tr_send(ECPSocket *sock, ECPBuffer *packet, size_t msg_size, ECPNetAddr *addr, unsigned char flags); ssize_t ecp_tr_recv(ECPSocket *sock, ECPBuffer *packet, ECPNetAddr *addr, int timeout); -void ecp_tr_buf_free(ECP2Buffer *b, unsigned char flags); -void ecp_tr_buf_flag_set(ECP2Buffer *b, unsigned char flags); -void ecp_tr_buf_flag_clear(ECP2Buffer *b, unsigned char flags); +void ecp_tr_release(ECPBuffer *packet, unsigned char more); +void ecp_tr_flag_set(unsigned char flags); +void ecp_tr_flag_clear(unsigned char flags); -- cgit v1.2.3