From 483b954dff83b86877184718acdd66533cb694ac Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Sun, 30 Jan 2022 02:04:24 +0100 Subject: code cleanup and fixes mainly rbuf --- ecp/src/rbuf_send.c | 255 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 146 insertions(+), 109 deletions(-) (limited to 'ecp/src/rbuf_send.c') diff --git a/ecp/src/rbuf_send.c b/ecp/src/rbuf_send.c index faf2a7d..9f29010 100644 --- a/ecp/src/rbuf_send.c +++ b/ecp/src/rbuf_send.c @@ -1,7 +1,10 @@ #include "core.h" #include "tr.h" -#define NACK_RATE_UNIT 10000 +#define NACK_RATE_UNIT 10000 + +#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { ECPBuffer packet; @@ -26,76 +29,83 @@ static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { rv = ecp_timer_push(&_ti); if (rv) return rv; } - return ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, 0); + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0); } -static void cc_flush(ECPConnection *conn) { +static void cc_flush(ECPConnection *conn, unsigned char flags) { ECPRBSend *buf = conn->rbuf.send; ECPRBuffer *rbuf = &buf->rbuf; - ecp_seq_t pkt_buf_cnt = rbuf->seq_max - rbuf->seq_start + 1; - ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0; - int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt; - int i; + unsigned short idx; + int rv; - ECPTimerItem ti[ECP_MAX_TIMER]; - unsigned short max_t = 0; + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx); + if (rv) return; - if (pkt_to_send) { - unsigned int idx = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc); - unsigned int _idx = idx; + while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) { + ECPRBTimerItem *ti; + ECPBuffer packet; - for (i=0; imsg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break; - rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL; - if (rbuf->msg[_idx].idx_t != -1) { - ECPRBTimer *timer = &buf->timer; - ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t]; - - item->occupied = 0; - ti[max_t] = item->item; - rbuf->msg[_idx].idx_t = max_t; - max_t++; - } - _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); - } - pkt_to_send = i; - _idx = idx; + if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break; + if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { #ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); + pthread_mutex_unlock(&buf->mutex); #endif - for (i=0; iarr.pkt[idx].timer; + if (ti) ecp_timer_push(&ti->item); + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, flags); - if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]); - packet.buffer = rbuf->msg[_idx].msg; - packet.size = rbuf->msg[_idx].size; - ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); - _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); - } +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + if (ti) { #ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); + pthread_mutex_lock(&buf->timer->mutex); #endif + ti->empty = 1; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->timer->mutex); +#endif + } - buf->in_transit += (ecp_win_t)pkt_to_send; - buf->cnt_cc -= (ecp_win_t)pkt_to_send; - buf->seq_cc += (ecp_seq_t)pkt_to_send; + buf->cnt_cc--; + buf->in_transit++; + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + } + buf->seq_cc++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->seq_start = buf->seq_cc; + rbuf->idx_start = idx; } } ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size, ECP2Buffer *b) { ECPRBSend *buf; + ECPRBuffer *rbuf; ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; ecp_ack_t ack_map = 0; - int i; + ecp_seq_t seq_start; + ecp_seq_t seq_max; + unsigned short idx; + unsigned short msg_cnt; int do_flush = 0; - int rv = ECP_OK; + int i; + int rv; buf = conn->rbuf.send; if (buf == NULL) return size; + + rbuf = &buf->rbuf; if (size < 0) return size; if (size < rsize) return ECP_ERR; @@ -111,89 +121,108 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt (msg[7]); ecp_tr_release(b->packet, 1); - ecp_tr_flag_set(ECP_SEND_FLAG_MORE); #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif - ECPRBuffer *rbuf = &buf->rbuf; - int idx = _ecp_rbuf_msg_idx(rbuf, seq_ack); - if (idx < 0) rv = idx; + seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max); + if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR; - if (!rv) { - seq_ack++; - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } - if (ack_map != ECP_ACK_FULL) { - int nack_first = 0; - unsigned int msg_start; - ecp_seq_t seq_start; - ecp_win_t nack_cnt = 0; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit = seq_max - seq_ack; + } - seq_ack -= ECP_SIZE_ACKB; + if (ack_map != ECP_ACK_FULL) { + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + unsigned short nack_cnt = 0; + int nack_first = 0; + + seq_ack -= (ECP_SIZE_ACKB - 1); + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } #ifdef ECP_WITH_PTHREAD - pthread_mutex_unlock(&buf->mutex); + pthread_mutex_unlock(&buf->mutex); #endif - for (i=0; iseq_nack, seq_ack)) { nack_cnt++; - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_SIZE_ACKB + i, rbuf->msg_size); - if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { - ECPBuffer packet; - ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)]; - - packet.buffer = pkt_buf; - packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn); - payload.buffer = pld_buf; - payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn); - - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP); - ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), 0, seq_ack + i); - } else { - ECPBuffer packet; - packet.buffer = rbuf->msg[_idx].msg; - packet.size = rbuf->msg[_idx].size; - ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->msg[_idx].size, 0); - } - if (!nack_first) { - nack_first = 1; - seq_start = seq_ack + i; - msg_start = _idx; + buf->seq_nack = seq_ack; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn)]; + + packet.buffer = pkt_buf; + packet.size = ECP_SIZE_PKT_BUF(0, ECP_MTYPE_NOP, conn); + payload.buffer = pld_buf; + payload.size = ECP_SIZE_PLD_BUF(0, ECP_MTYPE_NOP, conn); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_NOP); + ecp_rbuf_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_NOP), ECP_SEND_FLAG_MORE, seq_ack); + } else { + ECPBuffer packet; + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(conn->sock, &conn->node.addr, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE); + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit++; } } + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack; + } } } + seq_ack++; + ack_mask = ack_mask >> 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } #ifdef ECP_WITH_PTHREAD - pthread_mutex_lock(&buf->mutex); + pthread_mutex_lock(&buf->mutex); #endif - if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt; - buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; - if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { - rbuf->seq_start = seq_start; - rbuf->msg_start = msg_start; - } else { - rbuf->seq_start = seq_ack + ECP_SIZE_ACKB; - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); - } - } else { - buf->nack_rate = (buf->nack_rate * 7) / 8; - rbuf->seq_start = seq_ack; - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); - } - if (buf->flush) { - if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; - if (buf->flush) do_flush = 1; + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; + } else { + buf->nack_rate = (buf->nack_rate * 7) / 8; + seq_start = seq_ack + 1; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + msg_cnt = seq_start - rbuf->seq_start; + idx = rbuf->idx_start; + for (i=0; iarr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); } - if (buf->cnt_cc) cc_flush(conn); + rbuf->seq_start = seq_start; + rbuf->idx_start = idx; } + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) do_flush = 1; + } + if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE); + +handle_ack_fin: #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); @@ -204,21 +233,27 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt _rv = flush_send(conn, NULL); if (_rv < 0) rv = _rv; + } else { + // ecp_tr_nomore(); } - ecp_tr_flag_clear(ECP_SEND_FLAG_MORE); ecp_tr_release(b->packet, 0); if (rv) return rv; return rsize; } -int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { + ECPRBuffer *rbuf = &buf->rbuf; int rv; + if (pkt == NULL) return ECP_ERR; + memset(buf, 0, sizeof(ECPRBRecv)); - rv = _ecp_rbuf_init(&buf->rbuf, msg, msg_size); - if (rv) return rv; + memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); + + rbuf->arr.pkt = pkt; + rbuf->arr_size = pkt_size; #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&buf->mutex, NULL); @@ -243,10 +278,12 @@ void ecp_rbuf_send_destroy(ECPConnection *conn) { int ecp_rbuf_send_start(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; + ECPRBuffer *rbuf = &buf->rbuf; if (buf == NULL) return ECP_ERR; - return _ecp_rbuf_start(&buf->rbuf, conn->seq_out); + buf->seq_nack = conn->seq_out; + return _ecp_rbuf_start(rbuf, conn->seq_out); } int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) { @@ -259,7 +296,7 @@ int ecp_rbuf_set_wsize(ECPConnection *conn, ecp_win_t size) { #endif buf->win_size = size; - if (buf->cnt_cc) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn, 0); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); -- cgit v1.2.3