diff options
author | Uros Majstorovic <majstor@majstor.org> | 2017-08-18 20:35:21 +0200 |
---|---|---|
committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-18 20:35:21 +0200 |
commit | 5d20e9bafc3571f37eb0d9b74699d023d2d3d13a (patch) | |
tree | bb26005ff3ed2564212ac84cfa336ee6e97ffb93 /code/core/rbuf_send.c | |
parent | 8f44e2151cb3f91b220c4a3393a06068d0ee7302 (diff) |
timer fixed; rbuf almost implemented
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r-- | code/core/rbuf_send.c | 63 |
1 files changed, 44 insertions, 19 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 06bd2e2..e7020cb 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -4,11 +4,20 @@ #define NACK_RATE_UNIT 10000 -static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) { +static ssize_t flush_send(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH); - return ecp_pld_send(conn, payload, sizeof(payload)); + if (ti == NULL) { + ECPTimerItem _ti; + int rv = ecp_timer_item_init(&_ti, conn, ECP_MTYPE_RBACK, 3, 500); + if (rv) return rv; + + _ti.retry = flush_send; + rv = ecp_timer_push(&_ti); + if (rv) return rv; + } + return ecp_rbuf_pld_send(conn, payload, sizeof(payload), 0); } static void cc_flush(ECPConnection *conn) { @@ -24,8 +33,8 @@ static void cc_flush(ECPConnection *conn) { unsigned int _idx = idx; for (i=0; i<pkt_to_send; i++) { - if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_CCWAIT)) break; - rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_CCWAIT; + if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break; + rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL; _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size); } pkt_to_send = i; @@ -45,7 +54,7 @@ static void cc_flush(ECPConnection *conn) { #endif buf->in_transit += (ecp_win_t)pkt_to_send; - buf->cc_wait -= (ecp_win_t)pkt_to_send; + buf->cnt_cc -= (ecp_win_t)pkt_to_send; buf->seq_cc += (ecp_seq_t)pkt_to_send; } } @@ -85,7 +94,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (!rv) { seq_ack++; - buf->in_transit -= seq_ack - rbuf->seq_start; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit = buf->cnt_cc ? buf->seq_cc - seq_ack : rbuf->seq_max - seq_ack + 1; if (ack_map != ECP_RBUF_ACK_FULL) { int nack_first = 0; @@ -104,7 +113,11 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt nack_cnt++; if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size); - ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { + + } else { + ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size); + } if (!nack_first) { nack_first = 1; seq_start = seq_ack + i; @@ -118,7 +131,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt pthread_mutex_lock(&buf->mutex); #endif - buf->in_transit += nack_cnt; + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) buf->in_transit += nack_cnt; buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { rbuf->seq_start = seq_start; @@ -138,7 +151,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt do_flush = 1; } } - if (buf->cc_wait) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn); } #ifdef ECP_WITH_PTHREAD @@ -147,11 +160,14 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt if (rv) return rv; - if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + if (do_flush) { + ssize_t _rv = flush_send(conn, NULL); + if (_rv < 0) return _rv; + } return rsize; } -int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; memset(buf, 0, sizeof(ECPRBRecv)); @@ -167,7 +183,7 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage return ECP_OK; } -void ecp_conn_rbuf_send_destroy(ECPConnection *conn) { +void ecp_rbuf_send_destroy(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return; @@ -177,7 +193,7 @@ void ecp_conn_rbuf_send_destroy(ECPConnection *conn) { #endif } -int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { +int ecp_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return ECP_ERR; @@ -187,7 +203,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { #endif buf->win_size = size; - if (buf->cc_wait) cc_flush(conn); + if (buf->cnt_cc) cc_flush(conn); #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); @@ -196,7 +212,7 @@ int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { return ECP_OK; } -int ecp_conn_rbuf_send_start(ECPConnection *conn) { +int ecp_rbuf_send_start(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; if (buf == NULL) return ECP_ERR; @@ -204,29 +220,38 @@ int ecp_conn_rbuf_send_start(ECPConnection *conn) { return ecp_rbuf_start(&buf->rbuf, conn->seq_out); } -int ecp_conn_rbuf_flush(ECPConnection *conn) { +int ecp_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; unsigned char payload[ECP_SIZE_PLD(0)]; ecp_seq_t seq; if (buf == NULL) return ECP_ERR; - // XXX flush seq +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&conn->mutex); +#endif + seq = conn->seq_out; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&conn->mutex); +#endif + #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&buf->mutex); #endif + if (buf->flush) { if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; } else { buf->flush = 1; buf->seq_flush = seq; } + #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); #endif - ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); - if (_rv < 0) return _rv; + ssize_t rv = flush_send(conn, 0); + if (rv < 0) return rv; return ECP_OK; } |