diff options
Diffstat (limited to 'code/core/rbuf_send.c')
-rw-r--r-- | code/core/rbuf_send.c | 182 |
1 files changed, 135 insertions, 47 deletions
diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 5f3bc87..877b086 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -2,75 +2,163 @@ #include <string.h> -#define ACK_RATE_UNIT 10000 +#define NACK_RATE_UNIT 10000 + +static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) { + unsigned char payload[ECP_SIZE_PLD(0)]; + + ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH_REQ); + return ecp_pld_send(conn, payload, sizeof(payload)); +} static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { - int idx = 0; + ECPRBSend *buf; + ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; ecp_ack_t ack_map = 0; + int do_flush = 0; + int rv = ECP_OK; - ECPRBSend *buf = conn->rbuf.send; + buf = conn->rbuf.send; + if (buf == NULL) return ECP_ERR; + if (size < 0) return size; + if (size < rsize) return ECP_ERR; + + seq_ack = \ + (msg[0] << 24) | \ + (msg[1] << 16) | \ + (msg[2] << 8) | \ + (msg[3]); + ack_map = \ + (msg[4] << 24) | \ + (msg[5] << 16) | \ + (msg[6] << 8) | \ + (msg[7]); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + ECPRBuffer *rbuf = &buf->rbuf; + int idx = ecp_rbuf_msg_idx(rbuf, seq_ack); + if (idx < 0) rv = idx; - idx = ecp_rbuf_msg_idx(rbuf, seq_ack); - if (idx < 0) return idx; - - seq_ack++; - buf->in_transit -= seq_ack - rbuf->seq_start; - - if (ack_map != ECP_RBUF_ACK_FULL) { - int i; - int nack = 0; - ecp_win_t nack_cnt = 0; - - seq_ack -= ECP_RBUF_ACK_SIZE; - for (i=0; i<ECP_RBUF_ACK_SIZE; i++) { - if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) { - nack_cnt++; - if (buf->reliable) { - idx = ecp_rbuf_msg_idx(rbuf, seq_ack + i); - // resend packet - // ecp_pkt_send(conn->sock, &conn->node.addr, packet, rv); - if (!nack) { - nack = 1; - - rbuf->seq_start = seq_ack + i; - rbuf->msg_start = idx; + int is_reliable = buf->flags & ECP_RBUF_FLAG_RELIABLE; + + if (!rv) { + seq_ack++; + buf->in_transit -= seq_ack - rbuf->seq_start; + + if (ack_map != ECP_RBUF_ACK_FULL) { + int i; + int nack_first = 0; + unsigned int msg_start; + ecp_seq_t seq_start; + ecp_win_t nack_cnt = 0; + + seq_ack -= ECP_RBUF_ACK_SIZE; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + for (i=0; i<ECP_RBUF_ACK_SIZE; i++) { + if ((ack_map & ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - i - 1))) == 0) { + nack_cnt++; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size); + ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[idx].msg, rbuf->msg[idx].size); + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack + i; + msg_start = idx; + } } } } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->in_transit += nack_cnt; + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * NACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rbuf->seq_start = seq_start; + rbuf->msg_start = msg_start; + } else { + rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE; + } + } else { + rbuf->seq_start = seq_ack; + buf->nack_rate = (buf->nack_rate * 7) / 8; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); + } } - buf->in_transit += nack_cnt; - buf->nack_rate = (buf->nack_rate * 7 + ((ECP_RBUF_ACK_SIZE - nack_cnt) * ACK_RATE_UNIT) / ECP_RBUF_ACK_SIZE) / 8; - if (!buf->reliable) { - rbuf->seq_start = seq_ack + ECP_RBUF_ACK_SIZE; - } - } else { - rbuf->seq_start = seq_ack; - buf->nack_rate = (buf->nack_rate * 7) / 8; - if (buf->reliable) { - rbuf->msg_start = ECP_RBUF_IDX_MASK(idx + 1, rbuf->msg_size); + if (buf->flush) { + if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) { + do_flush = 1; + } } } - return size; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (rv) return rv; + + if (do_flush) ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + return rsize; } -int ecp_rbuf_send_create(ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { +int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size) { + int rv; + memset(buf, 0, sizeof(ECPRBRecv)); - ecp_rbuf_init(&buf->rbuf, msg, msg_size); + rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); + if (rv) return rv; + + conn->rbuf.send = buf; return ECP_OK; } -int ecp_rbuf_send_start(ECPRBSend *buf, ecp_seq_t seq) { - buf->rbuf.seq_start = seq + 1; +int ecp_conn_rbuf_send_start(ECPConnection *conn) { + ECPRBSend *buf = conn->rbuf.send; + + if (buf == NULL) return ECP_ERR; + buf->rbuf.seq_start = conn->seq_out; return ECP_OK; } -ssize_t ecp_rbuf_send_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { +int ecp_conn_rbuf_flush(ECPConnection *conn) { ECPRBSend *buf = conn->rbuf.send; - - buf->in_transit++; - return ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, 0, 0); -}
\ No newline at end of file + unsigned char payload[ECP_SIZE_PLD(0)]; + ecp_seq_t seq; + + if (buf == NULL) return ECP_ERR; + + // XXX flush seq +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + if (buf->flush) { + if (ECP_RBUF_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; + } else { + buf->flush = 1; + buf->seq_flush = seq; + } +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + ssize_t _rv = ecp_timer_send(conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); + if (_rv < 0) return _rv; + + return ECP_OK; +} + |