diff options
Diffstat (limited to 'ecp/src/ecp/ext')
-rw-r--r-- | ecp/src/ecp/ext/msgq.c | 151 | ||||
-rw-r--r-- | ecp/src/ecp/ext/msgq.h | 19 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf.c | 113 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf.h | 123 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf_recv.c | 473 | ||||
-rw-r--r-- | ecp/src/ecp/ext/rbuf_send.c | 419 |
6 files changed, 1298 insertions, 0 deletions
diff --git a/ecp/src/ecp/ext/msgq.c b/ecp/src/ecp/ext/msgq.c new file mode 100644 index 0000000..63f5aa6 --- /dev/null +++ b/ecp/src/ecp/ext/msgq.c @@ -0,0 +1,151 @@ +#include <sys/time.h> +#include <stdlib.h> +#include <string.h> + +#include <core.h> + +#include "rbuf.h" +#include "msgq.h" + +#define MSGQ_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) + +static struct timespec *abstime_ts(struct timespec *ts, ecp_sts_t msec) { + struct timeval tv; + uint64_t us_start; + + gettimeofday(&tv, NULL); + us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec; + us_start += msec * 1000; + ts->tv_sec = us_start / 1000000; + ts->tv_nsec = (us_start % 1000000) * 1000; + return ts; +} + +int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq) { + int i; + int rv; + + if (conn->recv == NULL) return ECP_ERR; + + memset(msgq, 0, sizeof(ECPMsgQ)); + + for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { + rv = pthread_cond_init(&msgq->cond[i], NULL); + if (rv) { + int j; + + for (j=0; j<i; j++) { + pthread_cond_destroy(&msgq->cond[j]); + } + return ECP_ERR; + } + } + conn->recv->msgq = msgq; + + return ECP_OK; +} + +void ecp_msgq_destroy(ECPRBConn *conn) { + ECPMsgQ *msgq = conn->recv->msgq; + int i; + + for (i=0; i<ECP_MSGQ_MAX_MTYPE; i++) { + pthread_cond_destroy(&msgq->cond[i]); + } + + conn->recv->msgq = NULL; +} + +void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPMsgQ *msgq = conn->recv->msgq; + + msgq->seq_max = seq; + msgq->seq_start = seq + 1; +} + +int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; + + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; + + if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_ERR_FULL; + if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); + + msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_w[mtype])] = seq; + msgq->idx_w[mtype]++; + + if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; + + return ECP_OK; +} + +ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout) { + ECPRBRecv *buf = conn->recv; + ECPMsgQ *msgq = buf->msgq; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char *pld_buf; + unsigned char *msg_buf; + size_t pld_size, hdr_size, msg_size; + ecp_seq_t seq; + unsigned short idx; + int rv; + + if (mtype >= ECP_MSGQ_MAX_MTYPE) return ECP_ERR_MTYPE; + + if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) { + if (timeout == -1) { + pthread_cond_wait(&msgq->cond[mtype], &buf->mutex); + } else { + struct timespec ts; + + rv = pthread_cond_timedwait(&msgq->cond[mtype], &buf->mutex, abstime_ts(&ts, timeout)); + if (rv) return ECP_ERR_TIMEOUT; + } + } + seq = msgq->seq_msg[mtype][MSGQ_IDX_MASK(msgq->idx_r[mtype])]; + + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return ECP_ERR; + + pld_buf = rbuf->arr.pld[idx].buf; + pld_size = rbuf->arr.pld[idx].size; + + msg_buf = ecp_pld_get_msg(pld_buf, pld_size); + if (msg_buf == NULL) return ECP_ERR; + hdr_size = msg_buf - pld_buf; + msg_size = pld_size - hdr_size; + + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + // if (rbuf->arr.pld[idx].flags == 0); + + msgq->idx_r[mtype]++; + if (msgq->seq_start == seq) { + int i; + unsigned short msg_cnt = msgq->seq_max - msgq->seq_start + 1; + + for (i=0; i<msg_cnt; i++) { + if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + msgq->seq_start += i; + } + + if (_msg_size < msg_size) return ECP_ERR_FULL; + if (msg_size) memcpy(msg, msg_buf, msg_size); + return msg_size; +} + +ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout) { + ECPRBConn *_conn; + ssize_t rv; + + _conn = ecp_rbuf_get_rbconn(conn); + if (_conn == NULL) return ECP_ERR; + + pthread_mutex_lock(&_conn->recv->mutex); + rv = ecp_msgq_pop(_conn, mtype, msg, msg_size, timeout); + pthread_mutex_unlock(&_conn->recv->mutex); + + return rv; +} diff --git a/ecp/src/ecp/ext/msgq.h b/ecp/src/ecp/ext/msgq.h new file mode 100644 index 0000000..dddb6e7 --- /dev/null +++ b/ecp/src/ecp/ext/msgq.h @@ -0,0 +1,19 @@ +#define ECP_MSGQ_MAX_MSG 32 + +#define ECP_MSGQ_MAX_MTYPE (ECP_MAX_MTYPE) + +typedef struct ECPMsgQ { + unsigned short idx_w[ECP_MSGQ_MAX_MTYPE]; + unsigned short idx_r[ECP_MSGQ_MAX_MTYPE]; + ecp_seq_t seq_start; + ecp_seq_t seq_max; + ecp_seq_t seq_msg[ECP_MSGQ_MAX_MTYPE][ECP_MSGQ_MAX_MSG]; + pthread_cond_t cond[ECP_MSGQ_MAX_MTYPE]; +} ECPMsgQ; + +int ecp_msgq_create(ECPRBConn *conn, ECPMsgQ *msgq); +void ecp_msgq_destroy(ECPRBConn *conn); +void ecp_msgq_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_msgq_push(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype); +ssize_t ecp_msgq_pop(ECPRBConn *conn, unsigned char mtype, unsigned char *msg, size_t _msg_size, ecp_sts_t timeout); +ssize_t ecp_msg_receive(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_sts_t timeout);
\ No newline at end of file diff --git a/ecp/src/ecp/ext/rbuf.c b/ecp/src/ecp/ext/rbuf.c new file mode 100644 index 0000000..70ee0d2 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf.c @@ -0,0 +1,113 @@ +#include <stdlib.h> + +#include <core.h> + +#include "rbuf.h" + +ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn) { + if (ecp_conn_has_rbuf(conn)) return (ECPRBConn *)conn; + return NULL; +} + +ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn) { + return &conn->b; +} + +void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq) { + rbuf->seq_max = seq; + rbuf->seq_start = seq + 1; +} + +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx) { + ecp_seq_t seq_offset = seq - rbuf->seq_start; + + /* This also checks for seq_start <= seq if seq type range >> rbuf->arr_size */ + if (seq_offset >= rbuf->arr_size) return ECP_ERR_FULL; + + if (idx) *idx = ECP_RBUF_IDX_MASK(rbuf->idx_start + seq_offset, rbuf->arr_size); + return ECP_OK; +} + +void ecp_rbuf_conn_init(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + ecp_conn_set_flags(_conn, ECP_CONN_FLAG_RBUF); + conn->send = NULL; + conn->recv = NULL; + conn->iter = NULL; +} + +int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + int rv; + + rv = ecp_conn_create(_conn, sock, type); + if (rv) return rv; + + ecp_rbuf_conn_init(conn); + return ECP_OK; +} + +int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + int rv; + + rv = ecp_conn_create_inb(_conn, sock, type); + if (rv) return rv; + + ecp_rbuf_conn_init(conn); + return ECP_OK; +} + +void ecp_rbuf_destroy(ECPRBConn *conn) { + if (conn->send) ecp_rbsend_destroy(conn); + if (conn->recv) ecp_rbrecv_destroy(conn); + conn->iter = NULL; +} + +void ecp_rbuf_start(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + if (conn->send) { + ecp_seq_t seq_out; + + seq_out = (ecp_seq_t)(_conn->nonce_out); + ecp_rbsend_start(conn, seq_out); + } + + if (conn->recv) { + ecp_seq_t seq_in; + + seq_in = (ecp_seq_t)(_conn->nonce_in); + ecp_rbrecv_start(conn, seq_in); + } +} + +ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs) { + switch (mtype) { + case ECP_MTYPE_RBACK: + if (conn->send) return ecp_rbuf_handle_ack(conn, msg, msg_size); + break; + + case ECP_MTYPE_RBNOP: + if (conn->recv) return ecp_rbuf_handle_nop(conn, msg, msg_size); + break; + + case ECP_MTYPE_RBFLUSH: + if (conn->recv) return ecp_rbuf_handle_flush(conn); + break; + + default: + break; + } + + return ECP_ERR_MTYPE; +} + +int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err) { + if (conn->recv && (mtype == ECP_MTYPE_RBTIMER)) { + ecp_rbuf_handle_timer(conn); + return ECP_OK; + } + return ECP_PASS; +} diff --git a/ecp/src/ecp/ext/rbuf.h b/ecp/src/ecp/ext/rbuf.h new file mode 100644 index 0000000..36ff963 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf.h @@ -0,0 +1,123 @@ +#define ECP_RBUF_FLAG_IN_RBUF 0x01 +#define ECP_RBUF_FLAG_IN_MSGQ 0x02 +#define ECP_RBUF_FLAG_IN_TIMER 0x04 +#define ECP_RBUF_FLAG_SKIP 0x08 + +#define ECP_RBUF_FLAG_CCONTROL 0x01 +#define ECP_RBUF_FLAG_RELIABLE 0x02 + + +#define ECP_MTYPE_RBNOP (0x08 | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBACK (0x09 | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBFLUSH (0x0a | ECP_MTYPE_FLAG_SYS) +#define ECP_MTYPE_RBTIMER (0x0b | ECP_MTYPE_FLAG_SYS) + +#define ECP_ERR_RBUF_DUP -100 +#define ECP_ERR_RBUF_TIMER -101 + +#define ecp_rbuf_skip(mtype) (mtype & ECP_MTYPE_FLAG_SYS) + +/* size must be power of 2 */ +#define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) + +typedef uint32_t ecp_win_t; + +struct ECPMsgQ; +struct ECPFragIter; + +typedef struct ECPRBPayload { + unsigned char buf[ECP_MAX_PLD]; + size_t size; + unsigned char flags; +} ECPRBPayload; + +typedef struct ECPRBPacket { + unsigned char buf[ECP_MAX_PKT]; + size_t size; + unsigned char flags; +} ECPRBPacket; + +typedef struct ECPRBuffer { + ecp_seq_t seq_start; + ecp_seq_t seq_max; + unsigned short arr_size; + unsigned short idx_start; + union { + ECPRBPayload *pld; + ECPRBPacket *pkt; + } arr; +} ECPRBuffer; + +typedef struct ECPRBRecv { + unsigned char start; + unsigned char flags; + ecp_sts_t deliver_delay; + unsigned short hole_max; + unsigned short ack_rate; + unsigned short ack_pkt; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + ECPRBuffer rbuf; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif +#ifdef ECP_WITH_MSGQ + struct ECPMsgQ *msgq; +#endif +} ECPRBRecv; + +typedef struct ECPRBSend { + unsigned char start; + unsigned char flags; + ecp_win_t win_size; + ecp_win_t in_transit; + ecp_win_t cnt_cc; + ecp_seq_t seq_cc; + ecp_seq_t seq_flush; + ecp_seq_t seq_nack; + unsigned char flush; + unsigned int nack_rate; + ECPRBuffer rbuf; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_t mutex; +#endif +} ECPRBSend; + +typedef struct ECPRBConn { + ECPConnection b; + ECPRBRecv *recv; + ECPRBSend *send; + struct ECPFragIter *iter; +} ECPRBConn; + +ECPRBConn *ecp_rbuf_get_rbconn(ECPConnection *conn); +ECPConnection *ecp_rbuf_get_conn(ECPRBConn *conn); +void _ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq); +int _ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq, unsigned short *idx); +void ecp_rbuf_conn_init(ECPRBConn *conn); +int ecp_rbuf_conn_create(ECPRBConn *conn, ECPSocket *sock, unsigned char type); +int ecp_rbuf_conn_create_inb(ECPRBConn *conn, ECPSocket *sock, unsigned char type); +void ecp_rbuf_destroy(ECPRBConn *conn); +void ecp_rbuf_start(ECPRBConn *conn); +ssize_t ecp_rbuf_msg_handle(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, ECP2Buffer *bufs); +int ecp_rbuf_err_handle(ECPRBConn *conn, unsigned char mtype, int err); + +/* send */ +ssize_t ecp_rbuf_send_flush(ECPRBConn *conn); +ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size); +int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size); +void ecp_rbsend_destroy(ECPRBConn *conn); +void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size); +int ecp_rbuf_flush(ECPRBConn *conn); +ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti); + +ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size); +ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn); +void ecp_rbuf_handle_timer(ECPRBConn *conn) ; +int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size); +void ecp_rbrecv_destroy(ECPRBConn *conn); +void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq); +int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max); +int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay); +ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size); diff --git a/ecp/src/ecp/ext/rbuf_recv.c b/ecp/src/ecp/ext/rbuf_recv.c new file mode 100644 index 0000000..4c95be6 --- /dev/null +++ b/ecp/src/ecp/ext/rbuf_recv.c @@ -0,0 +1,473 @@ +#include <stdlib.h> +#include <string.h> + +#include <core.h> +#include <tm.h> + +#include "rbuf.h" + +#ifdef ECP_WITH_MSGQ +#include "msgq.h" +#endif + +#define ACK_RATE 8 +#define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_SIZE_ACKB - 1)) + +static ssize_t msg_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *pld, size_t pld_size) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + unsigned char flags; + int skip; + int rv; + + rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (rv) return rv; + + if (rbuf->arr.pld[idx].flags) return ECP_ERR_RBUF_DUP; + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) { + ecp_seq_t seq_offset; + +#ifndef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + seq_offset = seq - buf->msgq->seq_start; + if (seq_offset >= rbuf->arr_size) rv = ECP_ERR_FULL; + +#ifndef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (rv) return rv; + } +#endif + + skip = ecp_rbuf_skip(mtype); + flags = ECP_RBUF_FLAG_IN_RBUF; + if (skip) flags |= ECP_RBUF_FLAG_SKIP; + rbuf->arr.pld[idx].flags = flags; + + if ((mtype == ECP_MTYPE_RBNOP) && pld) { + return ecp_pld_handle_one(_conn, seq, pld, pld_size, NULL); + } else if (skip) { + return 0; + } + + if (pld && pld_size) memcpy(rbuf->arr.pld[idx].buf, pld, pld_size); + rbuf->arr.pld[idx].size = pld_size; + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + return pld_size; +} + +static void msg_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq; + unsigned short idx; + int i; + +#ifdef ECP_WITH_MSGQ +#ifndef ECP_WITH_RTHD + if (buf->msgq) pthread_mutex_lock(&buf->mutex); +#endif +#endif + + seq = rbuf->seq_start; + idx = rbuf->idx_start; + + while (ECP_SEQ_LTE(seq, rbuf->seq_max)) { + if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ecp_pts_t msg_pts; + int rv; + + rv = ecp_pld_get_pts(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &msg_pts); + if (!rv && buf->deliver_delay) { + ecp_sts_t now = ecp_tm_abstime_ms(0); + + msg_pts += buf->deliver_delay; + if (ECP_PTS_LT(now, msg_pts)) { + if (!(rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER)) { + ECPTimerItem ti; + + ecp_timer_item_init(&ti, _conn, ECP_MTYPE_RBTIMER, NULL, 0, msg_pts - now); + rv = ecp_timer_push(&ti); + if (!rv) rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_TIMER; + } + break; + } else if (rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_TIMER) { + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_TIMER; + } + } + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) { + unsigned char mtype; + + rv = ecp_pld_get_type(rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, &mtype); + if (!rv) rv = ecp_msgq_push(conn, seq, mtype & ECP_MTYPE_MASK); + if (rv) break; + + rbuf->arr.pld[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; + } else +#endif + ecp_pld_handle(_conn, seq, rbuf->arr.pld[idx].buf, rbuf->arr.pld[idx].size, NULL); + } else { + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_SKIP; + } + rbuf->arr.pld[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; + // if (rbuf->arr.pld[idx].flags == 0); + } else { + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) break; + if (!ECP_SEQ_LT(seq, rbuf->seq_max - buf->hole_max)) break; + } + seq++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + rbuf->seq_start = seq; + rbuf->idx_start = idx; + +#ifdef ECP_WITH_MSGQ +#ifndef ECP_WITH_RTHD + if (buf->msgq) pthread_mutex_unlock(&buf->mutex); +#endif +#endif +} + +static int ack_shift(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + int do_ack = 0; + int in_rbuf = 0; + int rv; + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) return 0; + + /* walks through messages that are not delivered yet, so no need for msgq mutex lock */ + while (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { + buf->seq_ack++; + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_ack, &idx); + if (!rv) { + in_rbuf = rbuf->arr.pld[idx].flags & ECP_RBUF_FLAG_IN_RBUF; + } else { + in_rbuf = 1; + } + if (in_rbuf && (buf->ack_map == ECP_ACK_FULL)) continue; + + buf->ack_map = buf->ack_map << 1; + if (in_rbuf) { + buf->ack_map |= 1; + } else if (!do_ack && ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max - buf->hole_max)) { + do_ack = 1; + } + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ((buf->ack_map & ACK_MASK_FIRST) == 0)) { + do_ack = 1; + break; + } + } + + return do_ack; +} + +static int ack_send(ECPRBConn *conn, ecp_seq_t seq_ack, ecp_seq_t ack_map) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBRecv *buf = conn->recv; + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK, _conn)]; + unsigned char *_buf; + ssize_t rv; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBACK); + _buf = ecp_pld_get_msg(payload.buffer, payload.size); + _buf[0] = seq_ack >> 24; + _buf[1] = seq_ack >> 16; + _buf[2] = seq_ack >> 8; + _buf[3] = seq_ack; + _buf[4] = ack_map >> 24; + _buf[5] = ack_map >> 16; + _buf[6] = ack_map >> 8; + _buf[7] = ack_map; + + rv = ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBACK), 0); + if (rv < 0) return rv; + + buf->ack_pkt = 0; + + return ECP_OK; +} + +ssize_t ecp_rbuf_handle_nop(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + size_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); + int i; + + if (msg_size < rsize) return ECP_ERR_SIZE; + + seq_ack = \ + ((ecp_seq_t)msg[0] << 24) | \ + ((ecp_seq_t)msg[1] << 16) | \ + ((ecp_seq_t)msg[2] << 8) | \ + ((ecp_seq_t)msg[3]); + ack_map = \ + ((ecp_ack_t)msg[4] << 24) | \ + ((ecp_ack_t)msg[5] << 16) | \ + ((ecp_ack_t)msg[6] << 8) | \ + ((ecp_ack_t)msg[7]); + + seq_ack -= (ECP_SIZE_ACKB - 1); + for (i=0; i<ECP_SIZE_ACKB; i++) { + if (ack_map & ack_mask) { + msg_store(conn, seq_ack, ECP_MTYPE_RBNOP, NULL, 0); + } + seq_ack++; + ack_mask = ack_mask >> 1; + } + + return rsize; +} + +ssize_t ecp_rbuf_handle_flush(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + ack_send(conn, buf->seq_ack, buf->ack_map); + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return 0; +} + +void ecp_rbuf_handle_timer(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + msg_flush(conn); + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbrecv_create(ECPRBConn *conn, ECPRBRecv *buf, ECPRBPayload *pld, unsigned short pld_size) { + ECPRBuffer *rbuf = &buf->rbuf; + int rv; + + memset(buf, 0, sizeof(ECPRBRecv)); + memset(pld, 0, sizeof(ECPRBPayload) * pld_size); + + buf->ack_map = ECP_ACK_FULL; + buf->ack_rate = ACK_RATE; + rbuf->arr.pld = pld; + rbuf->arr_size = pld_size; + +#ifdef ECP_WITH_PTHREAD + rv = pthread_mutex_init(&buf->mutex, NULL); + if (rv) return ECP_ERR; +#endif + + conn->recv = buf; + return ECP_OK; +} + +void ecp_rbrecv_destroy(ECPRBConn *conn) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) ecp_msgq_destroy(conn); +#endif + + conn->recv = NULL; +} + +void ecp_rbrecv_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->start = 1; + buf->seq_ack = seq; + _ecp_rbuf_start(rbuf, seq); + +#ifdef ECP_WITH_MSGQ + if (buf->msgq) ecp_msgq_start(conn, seq); +#endif + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbuf_set_hole(ECPRBConn *conn, unsigned short hole_max) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->hole_max = hole_max; + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +int ecp_rbuf_set_delay(ECPRBConn *conn, ecp_sts_t delay) { + ECPRBRecv *buf = conn->recv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->deliver_delay = delay; + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +ssize_t ecp_rbuf_store(ECPRBConn *conn, ecp_seq_t seq, unsigned char *pld, size_t pld_size) { + ECPRBRecv *buf = conn->recv; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char mtype; + unsigned short ack_pkt = 0; + int do_ack = 0; + int _rv; + ssize_t rv; + + _rv = ecp_pld_get_type(pld, pld_size, &mtype); + if (_rv) return _rv; + +#ifdef ECP_WITH_RTHD + pthread_mutex_lock(&buf->mutex); +#endif + + if (!buf->start) { + rv = 0; + goto rbuf_store_fin; + } + + if (ECP_SEQ_LT(rbuf->seq_max, seq)) { + ack_pkt = seq - rbuf->seq_max; + } + if (ECP_SEQ_LTE(seq, buf->seq_ack)) { + ecp_seq_t seq_offset = buf->seq_ack - seq; + if (seq_offset < ECP_SIZE_ACKB) { + ecp_ack_t ack_bit = ((ecp_ack_t)1 << seq_offset); + + if (ack_bit & buf->ack_map) { + rv = ECP_ERR_RBUF_DUP; + goto rbuf_store_fin; + } + + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + + buf->ack_map |= ack_bit; + /* reliable transport can prevent seq_ack from reaching seq_max */ + if (ECP_SEQ_LT(buf->seq_ack, rbuf->seq_max)) { + do_ack = ack_shift(conn); + } + } else { + rv = ECP_ERR_RBUF_DUP; + goto rbuf_store_fin; + } + } else { + unsigned short msg_cnt = rbuf->seq_max - rbuf->seq_start + 1; + + if ((msg_cnt == 0) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { + int deliver = 1; +#ifdef ECP_WITH_MSGQ + if (buf->msgq) deliver = 0; +#endif + if (buf->deliver_delay) deliver = 0; + if (deliver) { + /* receive buffer is empty, so no need for msgq mutex lock */ + rv = 0; + rbuf->seq_max++; + rbuf->seq_start++; + rbuf->idx_start = ECP_RBUF_IDX_MASK(rbuf->idx_start + 1, rbuf->arr_size); + } else { + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + } + buf->seq_ack++; + } else { + rv = msg_store(conn, seq, mtype, pld, pld_size); + if (rv < 0) goto rbuf_store_fin; + + do_ack = ack_shift(conn); + } + } + msg_flush(conn); + if (ack_pkt) { + buf->ack_pkt += ack_pkt; + if (!do_ack && (buf->ack_pkt > buf->ack_rate)) do_ack = 1; + } + if (do_ack) { + ecp_seq_t seq_ack = buf->seq_ack; + ecp_seq_t ack_map = buf->ack_map; + + /* account for missing mackets within hole_max range */ + if (buf->hole_max && (buf->seq_ack == rbuf->seq_max)) { + unsigned short h_bits = buf->hole_max + 1; + ecp_seq_t h_mask = ~(~((ecp_seq_t)0) << h_bits); + + if ((ack_map & h_mask) != h_mask) { + h_mask = ~(~((ecp_seq_t)0) >> h_bits); + seq_ack -= h_bits; + ack_map = (ack_map >> h_bits) | h_mask; + } + } + _rv = ack_send(conn, seq_ack, ack_map); + if (_rv) { + rv = _rv; + goto rbuf_store_fin; + } + } + +rbuf_store_fin: + +#ifdef ECP_WITH_RTHD + pthread_mutex_unlock(&buf->mutex); +#endif + + return rv; +} diff --git a/ecp/src/ecp/ext/rbuf_send.c b/ecp/src/ecp/ext/rbuf_send.c new file mode 100644 index 0000000..3dc777c --- /dev/null +++ b/ecp/src/ecp/ext/rbuf_send.c @@ -0,0 +1,419 @@ +#include <stdlib.h> +#include <string.h> + +#include <core.h> + +#include "rbuf.h" + +#define NACK_RATE_UNIT 10000 + +#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) + +static void cc_flush(ECPRBConn *conn, unsigned char flags) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned short idx; + int rv; + + rv = _ecp_rbuf_msg_idx(rbuf, buf->seq_cc, &idx); + if (rv) return; + + while (ECP_SEQ_LTE(buf->seq_cc, rbuf->seq_max)) { + ECPBuffer packet; + + if ((buf->cnt_cc == 0) || (buf->in_transit >= buf->win_size)) break; + + if ((rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && !(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, flags, NULL, &_conn->remote.addr); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->cnt_cc--; + buf->in_transit++; + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + } + buf->seq_cc++; + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + if (!(buf->flags & ECP_RBUF_FLAG_RELIABLE)) { + rbuf->seq_start = buf->seq_cc; + rbuf->idx_start = idx; + } +} + +static ssize_t _rbuf_send_flush(ECPConnection *_conn, ECPTimerItem *ti) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(0, ECP_MTYPE_RBFLUSH, _conn)]; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBFLUSH); + + return ecp_pld_send_wtimer(_conn, &packet, &payload, ECP_SIZE_PLD(0, ECP_MTYPE_RBFLUSH), 0, ti); +} + +ssize_t ecp_rbuf_send_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + + return ecp_timer_send(_conn, _rbuf_send_flush, ECP_MTYPE_RBACK, 3, 500); +} + +ssize_t ecp_rbuf_handle_ack(ECPRBConn *conn, unsigned char *msg, size_t msg_size) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + ssize_t rsize = sizeof(ecp_seq_t) + sizeof(ecp_ack_t); + ecp_seq_t seq_start; + ecp_seq_t seq_max; + ecp_seq_t seq_ack; + ecp_ack_t ack_map; + unsigned short idx; + unsigned short msg_cnt; + int do_flush = 0; + int i; + int rv; + + if (msg_size < rsize) return ECP_ERR_SIZE; + + seq_ack = \ + ((ecp_seq_t)msg[0] << 24) | \ + ((ecp_seq_t)msg[1] << 16) | \ + ((ecp_seq_t)msg[2] << 8) | \ + ((ecp_seq_t)msg[3]); + ack_map = \ + ((ecp_ack_t)msg[4] << 24) | \ + ((ecp_ack_t)msg[5] << 16) | \ + ((ecp_ack_t)msg[6] << 8) | \ + ((ecp_ack_t)msg[7]); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + seq_max = (buf->cnt_cc ? buf->seq_cc - 1 : rbuf->seq_max); + if (ECP_SEQ_LT(seq_max, seq_ack)) return ECP_ERR; + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, NULL); + if (rv) goto handle_ack_fin; + } + + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit = seq_max - seq_ack; + } + + if (ack_map != ECP_ACK_FULL) { + ecp_ack_t ack_mask = (ecp_ack_t)1 << (ECP_SIZE_ACKB - 1); + ecp_ack_t ack_map_nop = 0; + unsigned short nack_cnt = 0; + int nack_first = 0; + + seq_ack -= (ECP_SIZE_ACKB - 1); + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + rv = _ecp_rbuf_msg_idx(rbuf, seq_ack, &idx); + if (rv) goto handle_ack_fin; + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + for (i=0; i<ECP_SIZE_ACKB; i++) { + if ((ack_map & ack_mask) == 0) { + if (ECP_SEQ_LT(buf->seq_nack, seq_ack)) { + nack_cnt++; + buf->seq_nack = seq_ack; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + if (!(rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_IN_RBUF) || (rbuf->arr.pkt[idx].flags & ECP_RBUF_FLAG_SKIP)) { + ack_map_nop |= ack_mask; + } else { + ECPBuffer packet; + + packet.buffer = rbuf->arr.pkt[idx].buf; + packet.size = ECP_MAX_PKT; + ecp_pkt_send(_conn->sock, &packet, rbuf->arr.pkt[idx].size, ECP_SEND_FLAG_MORE, NULL, &_conn->remote.addr); + if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { + buf->in_transit++; + } + } + if (!nack_first) { + nack_first = 1; + seq_start = seq_ack; + } + } + } + seq_ack++; + ack_mask = ack_mask >> 1; + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && ack_map_nop) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP, _conn)]; + unsigned char *_buf; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_RBNOP); + _buf = ecp_pld_get_msg(payload.buffer, payload.size); + + seq_ack--; + _buf[0] = seq_ack >> 24; + _buf[1] = seq_ack >> 16; + _buf[2] = seq_ack >> 8; + _buf[3] = seq_ack; + _buf[4] = ack_map_nop >> 24; + _buf[5] = ack_map_nop >> 16; + _buf[6] = ack_map_nop >> 8; + _buf[7] = ack_map_nop; + + ecp_pld_send(_conn, &packet, &payload, ECP_SIZE_PLD(sizeof(ecp_seq_t) + sizeof(ecp_ack_t), ECP_MTYPE_RBNOP), ECP_SEND_FLAG_MORE); + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->nack_rate = (buf->nack_rate * 7 + ((ECP_SIZE_ACKB - nack_cnt) * NACK_RATE_UNIT) / ECP_SIZE_ACKB) / 8; + } else { + buf->nack_rate = (buf->nack_rate * 7) / 8; + seq_start = seq_ack + 1; + } + + if (buf->flags & ECP_RBUF_FLAG_RELIABLE) { + msg_cnt = seq_start - rbuf->seq_start; + idx = rbuf->idx_start; + for (i=0; i<msg_cnt; i++) { + rbuf->arr.pkt[idx].flags = 0; + // if (rbuf->arr.pkt[idx].flags == 0); + idx = ECP_RBUF_IDX_MASK(idx + 1, rbuf->arr_size); + } + rbuf->seq_start = seq_start; + rbuf->idx_start = idx; + } + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; + if (buf->flush) do_flush = 1; + } + if (buf->cnt_cc) cc_flush(conn, ECP_SEND_FLAG_MORE); + +handle_ack_fin: + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + if (!rv && do_flush) { + ssize_t _rv; + + _rv = ecp_rbuf_send_flush(conn); + if (_rv < 0) rv = _rv; + } + + if (rv) return rv; + return rsize; +} + +int ecp_rbsend_create(ECPRBConn *conn, ECPRBSend *buf, ECPRBPacket *pkt, unsigned short pkt_size) { + ECPRBuffer *rbuf = &buf->rbuf; + int rv; + + memset(buf, 0, sizeof(ECPRBRecv)); + memset(pkt, 0, sizeof(ECPRBPacket) * pkt_size); + + rbuf->arr.pkt = pkt; + rbuf->arr_size = pkt_size; + +#ifdef ECP_WITH_PTHREAD + rv = pthread_mutex_init(&buf->mutex, NULL); + if (rv) return ECP_ERR; +#endif + + conn->send = buf; + return ECP_OK; +} + +void ecp_rbsend_destroy(ECPRBConn *conn) { + ECPRBSend *buf = conn->send; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif + + conn->send = NULL; +} + +void ecp_rbsend_start(ECPRBConn *conn, ecp_seq_t seq) { + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->start = 1; + buf->seq_nack = seq; + _ecp_rbuf_start(rbuf, seq); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif +} + +int ecp_rbuf_set_wsize(ECPRBConn *conn, ecp_win_t size) { + ECPRBSend *buf = conn->send; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + buf->win_size = size; + if (buf->cnt_cc) cc_flush(conn, 0); + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + return ECP_OK; +} + +int ecp_rbuf_flush(ECPRBConn *conn) { + ECPConnection *_conn = ecp_rbuf_get_conn(conn); + ECPRBSend *buf = conn->send; + ecp_seq_t seq; + ssize_t rv; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&_conn->mutex); +#endif + seq = (ecp_seq_t)(_conn->nonce_out) - 1; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&_conn->mutex); +#endif + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + if (buf->flush) { + if (ECP_SEQ_LT(buf->seq_flush, seq)) buf->seq_flush = seq; + } else { + buf->flush = 1; + buf->seq_flush = seq; + } + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + rv = ecp_rbuf_send_flush(conn); + if (rv < 0) return rv; + + return ECP_OK; +} + +ssize_t ecp_rbuf_pld_send(ECPRBConn *conn, ECPBuffer *payload, size_t pld_size, ECPBuffer *packet, size_t pkt_size, ECPTimerItem *ti) { + ECPRBSend *buf = conn->send; + ECPRBuffer *rbuf = &buf->rbuf; + unsigned char mtype; + int rb_rel; + int rb_cc; + int do_send; + int do_skip; + int _rv = ECP_OK; + ssize_t rv = 0; + + _rv = ecp_pld_get_type(payload->buffer, pld_size, &mtype); + if (_rv) return _rv; + + do_send = 1; + do_skip = ecp_rbuf_skip(mtype); + if (ti && !do_skip) return ECP_ERR_RBUF_TIMER; + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_lock(&buf->mutex); +#endif + + if (!buf->start) { + rv = 0; + goto pld_send_fin; + } + + rb_rel = (buf->flags & ECP_RBUF_FLAG_RELIABLE); + rb_cc = ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && (buf->cnt_cc || (buf->in_transit >= buf->win_size))); + + if (rb_rel || rb_cc) { + ecp_seq_t seq; + unsigned short idx; + unsigned char _flags; + + _rv = ecp_pkt_get_seq(packet->buffer, pkt_size, &seq); + if (_rv) { + rv = _rv; + goto pld_send_fin; + } + + _rv = _ecp_rbuf_msg_idx(rbuf, seq, &idx); + if (_rv) rv = ECP_ERR_RBUF_DUP; + if (!rv && rbuf->arr.pkt[idx].flags) rv = ECP_ERR_RBUF_DUP; + if (rv) goto pld_send_fin; + + _flags = ECP_RBUF_FLAG_IN_RBUF; + if (do_skip) { + _flags |= ECP_RBUF_FLAG_SKIP; + } else { + do_send = 0; + } + + rbuf->arr.pkt[idx].flags = _flags; + if (!do_send) { + memcpy(rbuf->arr.pkt[idx].buf, packet->buffer, pkt_size); + rbuf->arr.pkt[idx].size = pkt_size; + rv = pld_size; + } + + if (ECP_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + + if (rb_cc && !do_send) { + if (buf->cnt_cc == 0) buf->seq_cc = seq; + buf->cnt_cc++; + } + } + + if ((buf->flags & ECP_RBUF_FLAG_CCONTROL) && !do_skip && do_send) { + buf->in_transit++; + } + +pld_send_fin: + +#ifdef ECP_WITH_PTHREAD + pthread_mutex_unlock(&buf->mutex); +#endif + + return rv; +} |