From 8f44e2151cb3f91b220c4a3393a06068d0ee7302 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Wed, 16 Aug 2017 21:21:33 +0200 Subject: fixed rbuf; fixed error code for pthread_mitex_init --- code/core/TODO | 6 +- code/core/core.c | 23 +++++--- code/core/core.h | 3 - code/core/msgq.c | 48 ++++++++++------ code/core/msgq.h | 13 +++-- code/core/rbuf.c | 6 +- code/core/rbuf.h | 36 +++++------- code/core/rbuf_recv.c | 155 +++++++++++++++++++++++++++----------------------- code/core/rbuf_send.c | 16 +++++- code/vconn/vconn.c | 6 +- 10 files changed, 179 insertions(+), 133 deletions(-) diff --git a/code/core/TODO b/code/core/TODO index 0ee7499..b1e7d12 100644 --- a/code/core/TODO +++ b/code/core/TODO @@ -6,4 +6,8 @@ rbuf: - implement _wait variants of open / send -- consider adding one buffer for all msgs (frag. issue) \ No newline at end of file +- consider adding one buffer for all msgs (frag. issue) + +should implement: +- send flush (seq) +- sending ack with congestion control diff --git a/code/core/core.c b/code/core/core.c index d890291..d64aa04 100644 --- a/code/core/core.c +++ b/code/core/core.c @@ -58,20 +58,25 @@ static int ctable_create(ECPSockCTable *conn, ECPContext *ctx) { int rv = ECP_OK; memset(conn, 0, sizeof(ECPSockCTable)); + #ifdef ECP_WITH_HTABLE if (ctx->ht.init) { conn->htable = ctx->ht.create(ctx); if (conn->htable == NULL) return ECP_ERR_ALLOC; } #endif + #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&conn->mutex, NULL); -#endif + if (rv) { #ifdef ECP_WITH_HTABLE - if (rv && ctx->ht.init) ctx->ht.destroy(conn->htable); + if (ctx->ht.init) ctx->ht.destroy(conn->htable); +#endif + return ECP_ERR; + } #endif - return rv; + return ECP_OK; } static void ctable_destroy(ECPSockCTable *conn, ECPContext *ctx) { @@ -217,15 +222,15 @@ int ecp_sock_create(ECPSocket *sock, ECPContext *ctx, ECPDHKey *key) { } #ifdef ECP_WITH_PTHREAD - if (!rv) rv = pthread_mutex_init(&sock->mutex, NULL); + rv = pthread_mutex_init(&sock->mutex, NULL); if (rv) { ecp_timer_destroy(&sock->timer); ctable_destroy(&sock->conn, sock->ctx); - return rv; + return ECP_ERR; } #endif - return rv; + return ECP_OK; } void ecp_sock_destroy(ECPSocket *sock) { @@ -596,6 +601,10 @@ int ecp_conn_handler_init(ECPConnHandler *handler) { handler->msg[ECP_MTYPE_OPEN] = ecp_conn_handle_open; handler->msg[ECP_MTYPE_KGET] = ecp_conn_handle_kget; handler->msg[ECP_MTYPE_KPUT] = ecp_conn_handle_kput; +#ifdef ECP_WITH_RBUF + handler->msg[ECP_MTYPE_RBACK] = ecp_rbuf_handle_ack; + handler->msg[ECP_MTYPE_RBFLUSH] = ecp_rbuf_handle_flush; +#endif handler->conn_open = ecp_conn_send_open; return ECP_OK; } @@ -957,7 +966,7 @@ ssize_t ecp_conn_pack(ECPConnection *conn, unsigned char *packet, size_t pkt_siz #endif *rbuf_idx = ecp_rbuf_msg_idx(&buf->rbuf, _seq); - if (*rbuf_idx < 0) rv = ECP_ERR_RBUF_FULL; + if (*rbuf_idx < 0) rv = *rbuf_idx; #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&buf->mutex); diff --git a/code/core/core.h b/code/core/core.h index cb08bc9..9738fb4 100644 --- a/code/core/core.h +++ b/code/core/core.h @@ -93,9 +93,6 @@ typedef uint32_t ecp_seq_t; #include "timer.h" #ifdef ECP_WITH_RBUF -#ifdef ECP_WITH_MSGQ -#include "msgq.h" -#endif #include "rbuf.h" #endif diff --git a/code/core/msgq.c b/code/core/msgq.c index c441acd..15629c8 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -1,13 +1,13 @@ #include "core.h" -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ #include #define MIN(a,b) (((a)<(b))?(a):(b)) #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MAX_CONN_MSG) - 1)) +#define MSG_IDX_MASK(idx) ((idx) & ((ECP_MSGQ_MAX_MSG) - 1)) static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) { struct timeval tv; @@ -63,17 +63,31 @@ void ecp_conn_msgq_destroy(ECPConnection *conn) { pthread_mutex_destroy(&msgq->mutex); } +int ecp_conn_msgq_start(ECPConnection *conn, ecp_seq_t seq) { + ECPRBRecv *buf = conn->rbuf.recv; + ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; + + if (msgq == NULL) return ECP_ERR; + + msgq->seq_max = seq; + msgq->seq_start = seq + 1; + + return ECP_OK; +} + int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; if (msgq == NULL) return ECP_ERR; - if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; + if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG; if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; msgq->idx_w[mtype]++; + + if (ECP_RBUF_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; return ECP_OK; } @@ -82,7 +96,6 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha ECPRBRecv *buf = conn->rbuf.recv; ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; ssize_t rv = ECP_OK; - ecp_seq_t seq; int idx; if (msgq == NULL) return ECP_ERR; @@ -98,32 +111,31 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha } } if (!rv) { - seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + ecp_seq_t seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; + ecp_seq_t seq_offset = seq - buf->rbuf.seq_start; + unsigned int idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); + msgq->idx_r[mtype]++; - idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); - if (idx < 0) rv = idx; - } - if (!rv) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; - if (buf->rbuf.seq_start == seq) { + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; + + if (msgq->seq_start == seq) { int i, _idx = idx; - ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; for (i=0; irbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; + if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); } - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = _idx; + msgq->seq_start += i; } - rv = buf->rbuf.msg[idx].size; + rv = buf->rbuf.msg[idx].size - 1; if (rv >= 0) { rv = MIN(msg_size, rv); - memcpy(msg, buf->rbuf.msg[idx].msg, rv); + memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); } } return rv; } -#endif /* ECP_WITH_PTHREAD */ \ No newline at end of file +#endif /* ECP_WITH_MSGQ */ \ No newline at end of file diff --git a/code/core/msgq.h b/code/core/msgq.h index 54319e0..75c795e 100644 --- a/code/core/msgq.h +++ b/code/core/msgq.h @@ -1,23 +1,26 @@ -#ifdef ECP_WITH_PTHREAD +#ifdef ECP_WITH_MSGQ #include #include -#define ECP_MAX_CONN_MSG 32 -#define ECP_ERR_MAX_CONN_MSG -100 +#define ECP_MSGQ_MAX_MSG 32 +#define ECP_MSGQ_ERR_MAX_MSG -110 typedef struct ECPConnMsgQ { unsigned short idx_w[ECP_MAX_MTYPE]; unsigned short idx_r[ECP_MAX_MTYPE]; - ecp_seq_t seq_msg[ECP_MAX_MTYPE][ECP_MAX_CONN_MSG]; + ecp_seq_t seq_start; + ecp_seq_t seq_max; + ecp_seq_t seq_msg[ECP_MAX_MTYPE][ECP_MSGQ_MAX_MSG]; pthread_cond_t cond[ECP_MAX_MTYPE]; pthread_mutex_t mutex; } ECPConnMsgQ; int ecp_conn_msgq_create(struct ECPConnection *conn); void ecp_conn_msgq_destroy(struct ECPConnection *conn); +int ecp_conn_msgq_start(struct ECPConnection *conn, ecp_seq_t seq); int ecp_conn_msgq_push(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype); ssize_t ecp_conn_msgq_pop(struct ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout); -#endif /* ECP_WITH_PTHREAD */ \ No newline at end of file +#endif /* ECP_WITH_MSGQ */ \ No newline at end of file diff --git a/code/core/rbuf.c b/code/core/rbuf.c index f13e06e..b384774 100644 --- a/code/core/rbuf.c +++ b/code/core/rbuf.c @@ -27,7 +27,7 @@ int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq) { // This also checks for seq_start <= seq if seq type range >> rbuf->msg_size if (seq_offset < rbuf->msg_size) return ECP_RBUF_IDX_MASK(rbuf->msg_start + seq_offset, rbuf->msg_size); - return ECP_ERR_RBUF_IDX; + return ECP_ERR_RBUF_FULL; } ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags) { @@ -35,8 +35,10 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch if (idx < 0) return idx; if (rbuf->msg == NULL) return 0; - if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_FLAG; + if (test_flags && (test_flags & rbuf->msg[idx].flags)) return ECP_ERR_RBUF_DUP; + if (ECP_RBUF_SEQ_LT(rbuf->seq_max, seq)) rbuf->seq_max = seq; + if (!(set_flags & ECP_RBUF_FLAG_DELIVERED)) { memcpy(rbuf->msg[idx].msg, msg, msg_size); rbuf->msg[idx].size = msg_size; diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 67ac8c9..b09b19c 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -1,26 +1,16 @@ -#define ECP_RBUF_FLAG_RECEIVED 0x01 -#define ECP_RBUF_FLAG_DELIVERED 0x02 -#define ECP_RBUF_FLAG_CCWAIT 0x04 +#define ECP_RBUF_FLAG_IN_RBUF 0x01 +#define ECP_RBUF_FLAG_IN_MSGQ 0x02 +#define ECP_RBUF_FLAG_DELIVERED 0x04 +#define ECP_RBUF_FLAG_CCWAIT 0x08 #define ECP_RBUF_FLAG_RELIABLE 0x01 #define ECP_RBUF_FLAG_MSGQ 0x02 -#define ECP_MTYPE_RBOPEN 0x04 -#define ECP_MTYPE_RBCLOSE 0x05 -#define ECP_MTYPE_RBFLUSH 0x06 -#define ECP_MTYPE_RBACK 0x07 +#define ECP_MTYPE_RBACK 0x04 +#define ECP_MTYPE_RBFLUSH 0x05 -#define ECP_MTYPE_RBOPEN_REQ (ECP_MTYPE_RBOPEN) -#define ECP_MTYPE_RBOPEN_REP (ECP_MTYPE_RBOPEN | ECP_MTYPE_FLAG_REP) -#define ECP_MTYPE_RBCLOSE_REQ (ECP_MTYPE_RBCLOSE) -#define ECP_MTYPE_RBCLOSE_REP (ECP_MTYPE_RBCLOSE | ECP_MTYPE_FLAG_REP) -#define ECP_MTYPE_RBFLUSH_REQ (ECP_MTYPE_RBFLUSH) -#define ECP_MTYPE_RBFLUSH_REP (ECP_MTYPE_RBFLUSH | ECP_MTYPE_FLAG_REP) - -#define ECP_ERR_RBUF_FLAG -100 -#define ECP_ERR_RBUF_IDX -101 -#define ECP_ERR_RBUF_DUP -102 -#define ECP_ERR_RBUF_FULL -103 +#define ECP_ERR_RBUF_DUP -100 +#define ECP_ERR_RBUF_FULL -101 typedef uint32_t ecp_ack_t; typedef uint32_t ecp_win_t; @@ -33,10 +23,12 @@ typedef uint32_t ecp_win_t; #define ECP_RBUF_SEQ_LT(a,b) ((ecp_seq_t)((ecp_seq_t)(a) - (ecp_seq_t)(b)) > ECP_RBUF_SEQ_HALF) #define ECP_RBUF_SEQ_LTE(a,b) ((ecp_seq_t)((ecp_seq_t)(b) - (ecp_seq_t)(a)) < ECP_RBUF_SEQ_HALF) +/* size must be power of 2 */ #define ECP_RBUF_IDX_MASK(idx, size) ((idx) & ((size) - 1)) -/* If size not 2^x: -#define ECP_RBUF_IDX_MASK(idx, size) ((idx) % (size)) -*/ + +#ifdef ECP_WITH_MSGQ +#include "msgq.h" +#endif typedef struct ECPRBMessage { unsigned char msg[ECP_MAX_PKT]; @@ -112,3 +104,5 @@ void ecp_conn_rbuf_send_destroy(struct ECPConnection *conn); int ecp_conn_rbuf_send_start(struct ECPConnection *conn); ssize_t ecp_conn_rbuf_send_store(struct ECPConnection *conn, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size); +ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); +ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index ad9c53c..e00b63f 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -5,78 +5,76 @@ #define ACK_RATE 8 #define ACK_MASK_FIRST ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1)) -static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { - if (size < 0) return size; - - ECPRBRecv *buf = conn->rbuf.recv; - unsigned char payload[ECP_SIZE_PLD(0)]; - - if (buf == NULL) return ECP_ERR; - - buf->flush = 1; - return 0; -} - static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; - ssize_t rv = 0; - unsigned char flags; + int rv = ECP_OK; + ssize_t _rv = 0; + unsigned char flags = ECP_RBUF_FLAG_IN_RBUF; unsigned char mtype = msg[0] & ECP_MTYPE_MASK; if (mtype < ECP_MAX_MTYPE_SYS) { - flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED; - } else { - flags = ECP_RBUF_FLAG_RECEIVED; + flags |= ECP_RBUF_FLAG_DELIVERED; + ecp_msg_handle(conn, seq, msg, msg_size); } - rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); - if (rv < 0) return ECP_ERR_RBUF_DUP; - - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - - if (flags & ECP_RBUF_FLAG_DELIVERED) { #ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + pthread_mutex_lock(&buf->msgq.mutex); + ecp_seq_t seq_offset = seq - buf->msgq.seq_start; + if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL; + } #endif - ecp_msg_handle(conn, seq, msg, msg_size); + if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags); #ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); #endif - } - return rv; + if (rv) return rv; + return _rv; } static void msg_flush(ECPConnection *conn) { ECPRBRecv *buf = conn->rbuf.recv; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif + ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; ecp_seq_t i = 0; unsigned int idx = buf->rbuf.msg_start; for (i=0; iflags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; + if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break; if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { - ssize_t rv = 0; + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { + int rv = 0; + ecp_seq_t seq = buf->rbuf.seq_start + i; if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) { buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; + } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +#ifdef ECP_WITH_MSGQ + unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK; + int rv = ecp_conn_msgq_push(conn, seq, mtype); + if (rv) break; + buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; +#endif } else { - rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); - } - if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { - buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; - } else if (rv < 0) { - break; + ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); } + buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF; } idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); } - if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { - buf->rbuf.seq_start += i; - buf->rbuf.msg_start = idx; - } + buf->rbuf.seq_start += i; + buf->rbuf.msg_start = idx; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif + } static int ack_send(ECPConnection *conn) { @@ -116,10 +114,10 @@ static int ack_shift(ECPRBRecv *buf) { idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size); buf->seq_ack++; - if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; + if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; buf->ack_map = buf->ack_map << 1; - if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { + if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { buf->ack_map |= 1; } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) { do_ack = 1; @@ -143,6 +141,16 @@ static int ack_shift(ECPRBRecv *buf) { return do_ack; } +ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { + if (size < 0) return size; + + ECPRBRecv *buf = conn->rbuf.recv; + if (buf == NULL) return ECP_ERR; + + buf->flush = 1; + return 0; +} + int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) { int rv; @@ -152,13 +160,20 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage buf->ack_map = ECP_RBUF_ACK_FULL; buf->ack_rate = ACK_RATE; + +#ifdef ECP_WITH_MSGQ + rv = ecp_conn_msgq_create(conn); + if (rv) return rv; +#endif + conn->rbuf.recv = buf; - return ECP_OK; } void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) { - +#ifdef ECP_WITH_MSGQ + ecp_conn_msgq_destroy(conn); +#endif } int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { @@ -183,50 +198,55 @@ int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) { } int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { + int rv; ECPRBRecv *buf = conn->rbuf.recv; if (buf == NULL) return ECP_ERR; buf->seq_ack = seq; - return ecp_rbuf_start(&buf->rbuf, seq); + rv = ecp_rbuf_start(&buf->rbuf, seq); + if (rv) return rv; + +#ifdef ECP_WITH_MSGQ + if (buf->flags & ECP_RBUF_FLAG_MSGQ) { + rv = ecp_conn_msgq_start(conn, seq); + if (rv) return rv; + } +#endif + + return ECP_OK; } ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) { ECPRBRecv *buf = conn->rbuf.recv; ecp_seq_t ack_pkt = 0; ssize_t rv; - int _rv = ECP_OK; int do_ack = 0; if (buf == NULL) return ECP_ERR; if (msg_size < 1) return ECP_ERR_MIN_MSG; -#ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); -#endif - if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max; if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) { ecp_seq_t seq_offset = buf->seq_ack - seq; if (seq_offset < ECP_RBUF_ACK_SIZE) { ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); - if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; - if (!_rv) { - buf->ack_map |= ack_mask; - do_ack = ack_shift(buf); + if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; - rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; - } + buf->ack_map |= ack_mask; + do_ack = ack_shift(buf); + + rv = msg_store(conn, seq, msg, msg_size); + if (rv < 0) return rv; } else { - _rv = ECP_ERR_RBUF_IDX; + return ECP_ERR_RBUF_DUP; } } else { if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) { if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; + if (rv < 0) return rv; } else { ecp_msg_handle(conn, seq, msg, msg_size); rv = msg_size; @@ -234,22 +254,15 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch buf->rbuf.seq_start++; buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size); } - if (!_rv) buf->seq_ack++; + buf->seq_ack++; } else { rv = msg_store(conn, seq, msg, msg_size); - if (rv < 0) _rv = rv; + if (rv < 0) return rv; - if (!_rv) do_ack = ack_shift(buf); + do_ack = ack_shift(buf); } } - if (!_rv) msg_flush(conn); - -#ifdef ECP_WITH_MSGQ - if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); -#endif - - if (_rv) return _rv; - + msg_flush(conn); if (buf->flush) { buf->flush = 0; do_ack = 1; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index 78c60c3..06bd2e2 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -7,7 +7,7 @@ static ssize_t _rbuf_send_flush(ECPConnection *conn, ECPTimerItem *ti) { unsigned char payload[ECP_SIZE_PLD(0)]; - ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH_REQ); + ecp_pld_set_type(payload, ECP_MTYPE_RBFLUSH); return ecp_pld_send(conn, payload, sizeof(payload)); } @@ -50,7 +50,7 @@ static void cc_flush(ECPConnection *conn) { } } -static ssize_t handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { +ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { ECPRBSend *buf; ssize_t rsize = sizeof(ecp_seq_t)+sizeof(ecp_ack_t); ecp_seq_t seq_ack = 0; @@ -158,13 +158,23 @@ int ecp_conn_rbuf_send_create(ECPConnection *conn, ECPRBSend *buf, ECPRBMessage rv = ecp_rbuf_init(&buf->rbuf, msg, msg_size); if (rv) return rv; - conn->rbuf.send = buf; +#ifdef ECP_WITH_PTHREAD + rv = pthread_mutex_init(&buf->mutex, NULL); + if (rv) return ECP_ERR; +#endif + conn->rbuf.send = buf; return ECP_OK; } void ecp_conn_rbuf_send_destroy(ECPConnection *conn) { + ECPRBSend *buf = conn->rbuf.send; + + if (buf == NULL) return; +#ifdef ECP_WITH_PTHREAD + pthread_mutex_destroy(&buf->mutex); +#endif } int ecp_conn_rbuf_send_set_wsize(ECPConnection *conn, ecp_win_t size) { diff --git a/code/vconn/vconn.c b/code/vconn/vconn.c index 3a69092..33fa80b 100644 --- a/code/vconn/vconn.c +++ b/code/vconn/vconn.c @@ -471,14 +471,16 @@ int ecp_ctx_vconn_init(ECPContext *ctx) { ctx->pack_raw = vconn_pack_raw; #ifdef ECP_WITH_PTHREAD - pthread_mutex_init(&key_perma_mutex, NULL); - pthread_mutex_init(&key_next_mutex, NULL); + rv = pthread_mutex_init(&key_perma_mutex, NULL); + if (!rv) pthread_mutex_init(&key_next_mutex, NULL); + if (rv) return ECP_ERR; #endif #ifdef ECP_WITH_HTABLE if (ctx->ht.init) { key_perma_table = ctx->ht.create(ctx); key_next_table = ctx->ht.create(ctx); + if ((key_perma_table == NULL) || (key_next_table == NULL)) return ECP_ERR; } #endif -- cgit v1.2.3