diff options
Diffstat (limited to 'code/core/rbuf_recv.c')
-rw-r--r-- | code/core/rbuf_recv.c | 187 |
1 files changed, 95 insertions, 92 deletions
diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index 281bec8..a6db276 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -7,92 +7,65 @@ #define ACK_SIZE (sizeof(ecp_ack_t)*8) #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ACK_SIZE - 1)) -#define SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > SEQ_HALF) -#define SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < SEQ_HALF) - -#define IDX_MASK(a) (a & (ECP_MAX_RBUF_MSGR-1)) -/* If ECP_MAX_RBUF_MSGR not pow of 2: -#define IDX_MASK(a) (a % ECP_MAX_RBUF_MSGR) -*/ - -static int msg_idx(ECPRBRecvBuffer *rbuf, ecp_seq_t seq) { - ecp_seq_t seq_offset = seq - rbuf->seq_start; - - // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR - if (seq_offset < ECP_MAX_RBUF_MSGR) return IDX_MASK(rbuf->msg_start + seq_offset); - return ECP_ERR_RBUF_IDX; -} - -ssize_t msg_store(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { - int idx; - ecp_seq_t seq_offset = seq - rbuf->seq_start; - - // This also checks for seq_start <= seq if seq type range >> ECP_MAX_RBUF_MSGR - if (seq_offset >= ECP_MAX_RBUF_MSGR) return ECP_ERR_RBUF_IDX; +static ssize_t msg_store(ECPRBRecv *buf, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { + ssize_t rv = ecp_rbuf_msg_store(&buf->rbuf, seq, msg, msg_size, ECP_RBUF_FLAG_PRESENT, ECP_RBUF_FLAG_PRESENT); + if (rv < 0) return ECP_ERR_RBUF_DUP; - if (SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; - idx = msg_idx(rbuf, seq); - if (idx < 0) return idx; - if (rbuf->msg[idx].present) return ECP_ERR_RBUF_DUP; - - rbuf->msg[idx].present = 1; - memcpy(rbuf->msg[idx].msg, msg, msg_size); - rbuf->msg[idx].size = msg_size; - - return msg_size; + if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) buf->seq_max = seq; + return rv; } -static int msg_flush(ECPConnection *conn, ECPRBRecvBuffer *rbuf) { - int idx = rbuf->msg_start; - ecp_seq_t msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; +static void msg_flush(ECPConnection *conn) { + ECPRBRecv *buf = conn->rbuf.recv; + ecp_seq_t msg_cnt = buf->seq_max - buf->rbuf.seq_start + 1; ecp_seq_t i = 0; + unsigned int idx = buf->rbuf.msg_start; for (i=0; i<msg_cnt; i++) { - if (rbuf->reliable && !rbuf->msg[idx].present) break; - if (rbuf->deliver_delay && msg_cnt - i < rbuf->deliver_delay) break; - if (rbuf->msg[idx].present) { - rbuf->msg[idx].present = 0; - // deliver idx + if (buf->reliable && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT)) break; + if (buf->deliver_delay && msg_cnt - i < buf->deliver_delay) break; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_PRESENT; + ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); } - idx = IDX_MASK(idx + 1); + idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - rbuf->msg_start = idx; - rbuf->seq_start += i; - - return ECP_OK; + buf->rbuf.msg_start = idx; + buf->rbuf.seq_start += i; } -static int ack_shift(ECPRBRecvBuffer *rbuf) { +static int ack_shift(ECPRBRecv *buf) { int do_ack = 0; int idx; int i; - if (rbuf->reliable && ((rbuf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + if (buf->reliable && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; - idx = msg_idx(rbuf, rbuf->seq_ack); + idx = ecp_rbuf_msg_idx(&buf->rbuf, buf->seq_ack); if (idx < 0) return idx; - while (SEQ_LT(rbuf->seq_ack, rbuf->seq_max)) { - idx = IDX_MASK(idx + 1); - rbuf->seq_ack++; + while (ECP_RBUF_SEQ_LT(buf->seq_ack, buf->seq_max)) { + idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); + buf->seq_ack++; - if (rbuf->msg[idx].present && (rbuf->ack_map == ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) && (buf->ack_map == ACK_FULL)) continue; - rbuf->ack_map = rbuf->ack_map << 1; - rbuf->ack_map |= rbuf->msg[idx].present; - if (!do_ack && !rbuf->msg[idx].present && SEQ_LTE(rbuf->seq_ack, rbuf->seq_max - 2 * rbuf->hole_max)) { + buf->ack_map = buf->ack_map << 1; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_PRESENT) { + buf->ack_map |= 1; + } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->seq_max - 2 * buf->hole_max)) { do_ack = 1; } - if ((rbuf->ack_map & ACK_MASK_FIRST) == 0) break; + if ((buf->ack_map & ACK_MASK_FIRST) == 0) break; } - if (!do_ack && (rbuf->seq_ack == rbuf->seq_max) && ((rbuf->ack_map & rbuf->hole_mask_full) != rbuf->hole_mask_full)) { - ecp_ack_t hole_mask = rbuf->ack_map; + if (!do_ack && (buf->seq_ack == buf->seq_max) && ((buf->ack_map & buf->hole_mask_full) != buf->hole_mask_full)) { + ecp_ack_t hole_mask = buf->ack_map; - for (i=0; i<rbuf->hole_max-1; i++) { + for (i=0; i<buf->hole_max-1; i++) { hole_mask = hole_mask >> 1; - if ((hole_mask & rbuf->hole_mask_empty) == 0) { + if ((hole_mask & buf->hole_mask_empty) == 0) { do_ack = 1; break; } @@ -102,62 +75,92 @@ static int ack_shift(ECPRBRecvBuffer *rbuf) { return do_ack; } -int ecp_rbuf_recv_init(ECPRBRecvBuffer *rbuf, ecp_seq_t seq, unsigned char hole_max) { - memset(rbuf, 0, sizeof(ECPRBRecvBuffer)); - rbuf->hole_max = hole_max; - rbuf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); - rbuf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); - rbuf->seq_ack = seq; - rbuf->seq_max = seq; - rbuf->seq_start = seq + 1; - rbuf->ack_map = ACK_FULL; +int ecp_rbuf_recv_create(ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { + memset(buf, 0, sizeof(ECPRBRecv)); + memset(msg, 0, sizeof(ECPRBMessage) * msg_size); + ecp_rbuf_init(&buf->rbuf, msg, msg_size); + buf->ack_map = ACK_FULL; + + return ECP_OK; +} + +int ecp_rbuf_recv_start(ECPRBRecv *buf, ecp_seq_t seq) { + buf->seq_ack = seq; + buf->seq_max = seq; + buf->rbuf.seq_start = seq + 1; + + return ECP_OK; +} + +int ecp_rbuf_recv_set_hole(ECPRBRecv *buf, unsigned short hole_max) { + buf->hole_max = hole_max; + buf->hole_mask_full = ~(~((ecp_ack_t)0) << (hole_max * 2)); + buf->hole_mask_empty = ~(~((ecp_ack_t)0) << (hole_max + 1)); + + return ECP_OK; +} + +int ecp_rbuf_recv_set_delay(ECPRBRecv *buf, unsigned short delay) { + buf->deliver_delay = delay; + if (buf->hole_max < delay - 1) { + ecp_rbuf_recv_set_hole(buf, delay - 1); + } return ECP_OK; } ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ssize_t rv; - int do_ack = 0; - ECPRBRecvBuffer *rbuf = conn->rbuf.recv; + ecp_seq_t ack_pkt = 0; + int do_ack = 0; + ECPRBRecv *buf = conn->rbuf.recv; - if (rbuf == NULL) return ECP_ERR; + if (buf == NULL) return ECP_ERR; - if (SEQ_LTE(seq, rbuf->seq_ack)) { - ecp_seq_t seq_offset = rbuf->seq_ack - seq; + if (ECP_RBUF_SEQ_LT(buf->seq_max, seq)) ack_pkt = seq - buf->seq_max; + if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { + ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & rbuf->ack_map) return ECP_ERR_RBUF_DUP; + if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(rbuf, seq, msg, msg_size); + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; - rbuf->ack_map |= ack_mask; - do_ack = ack_shift(rbuf); + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); } else { return ECP_ERR_RBUF_IDX; } } else { - if ((rbuf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(rbuf->seq_ack + 1))) { - if (rbuf->deliver_delay) { - rv = msg_store(rbuf, seq, msg, msg_size); + if ((buf->ack_map == ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + if (buf->deliver_delay) { + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; } else { - // rv = deliver - rbuf->seq_max++; - rbuf->seq_start++; + rv = ecp_msg_handle(conn, seq, msg, msg_size); + buf->seq_max++; + buf->rbuf.seq_start++; } - rbuf->seq_ack++; + buf->seq_ack++; } else { - rv = msg_store(rbuf, seq, msg, msg_size); + rv = msg_store(buf, seq, msg, msg_size); if (rv < 0) return rv; - do_ack = ack_shift(rbuf); + do_ack = ack_shift(buf); } } - // XXX - // update do_ack for pps - // should send acks more aggresively when ack_map is not full (freq > RTT) - // now send acks as per do_ack + if (ack_pkt && !do_ack) { + buf->ack_pkt += ack_pkt; + // should send acks more aggresively when reliable and ack_map is not full (rate ~ PPS * RTT) + if (buf->ack_pkt > buf->ack_rate) do_ack = 1; + } + if (do_ack) { + buf->ack_pkt = 0; + // send ack (with seq = 0) + } + // XXX should handle close + msg_flush(conn); return rv; -}
\ No newline at end of file +} |