diff options
| author | Uros Majstorovic <majstor@majstor.org> | 2017-08-16 21:21:33 +0200 | 
|---|---|---|
| committer | Uros Majstorovic <majstor@majstor.org> | 2017-08-16 21:21:33 +0200 | 
| commit | 8f44e2151cb3f91b220c4a3393a06068d0ee7302 (patch) | |
| tree | 8320c304887ac9b0d922bc27b51330c084f34ee0 /code/core/rbuf_recv.c | |
| parent | 38e2385f5846860916f8880d818b3b024b8c7dd9 (diff) | |
fixed rbuf; fixed error code for pthread_mitex_init
Diffstat (limited to 'code/core/rbuf_recv.c')
| -rw-r--r-- | code/core/rbuf_recv.c | 155 | 
1 files changed, 84 insertions, 71 deletions
| diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index ad9c53c..e00b63f 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -5,78 +5,76 @@  #define ACK_RATE            8  #define ACK_MASK_FIRST      ((ecp_ack_t)1 << (ECP_RBUF_ACK_SIZE - 1)) -static ssize_t handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { -    if (size < 0) return size; -     -    ECPRBRecv *buf = conn->rbuf.recv; -    unsigned char payload[ECP_SIZE_PLD(0)]; -     -    if (buf == NULL) return ECP_ERR; - -    buf->flush = 1; -    return 0; -} -  static ssize_t msg_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {      ECPRBRecv *buf = conn->rbuf.recv; -    ssize_t rv = 0; -    unsigned char flags; +    int rv = ECP_OK; +    ssize_t _rv = 0; +    unsigned char flags = ECP_RBUF_FLAG_IN_RBUF;      unsigned char mtype = msg[0] & ECP_MTYPE_MASK;      if (mtype < ECP_MAX_MTYPE_SYS) { -        flags = ECP_RBUF_FLAG_RECEIVED | ECP_RBUF_FLAG_DELIVERED; -    } else { -        flags = ECP_RBUF_FLAG_RECEIVED; +        flags |= ECP_RBUF_FLAG_DELIVERED; +        ecp_msg_handle(conn, seq, msg, msg_size);      } -    rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_RECEIVED, flags); -    if (rv < 0) return ECP_ERR_RBUF_DUP; -     -    if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - -    if (flags & ECP_RBUF_FLAG_DELIVERED) {  #ifdef ECP_WITH_MSGQ -        if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +        pthread_mutex_lock(&buf->msgq.mutex); +        ecp_seq_t seq_offset = seq - buf->msgq.seq_start; +        if (seq_offset >= buf->rbuf.msg_size) rv = ECP_ERR_RBUF_FULL; +    }  #endif -        ecp_msg_handle(conn, seq, msg, msg_size); +    if (!rv) _rv = ecp_rbuf_msg_store(&buf->rbuf, seq, -1, msg, msg_size, ECP_RBUF_FLAG_IN_RBUF | ECP_RBUF_FLAG_IN_MSGQ, flags);  #ifdef ECP_WITH_MSGQ -        if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex);  #endif -    } -    return rv; +    if (rv) return rv; +    return _rv;  }  static void msg_flush(ECPConnection *conn) {      ECPRBRecv *buf = conn->rbuf.recv; + +#ifdef ECP_WITH_MSGQ +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); +#endif +      ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1;      ecp_seq_t i = 0;      unsigned int idx = buf->rbuf.msg_start;      for (i=0; i<msg_cnt; i++) { -        if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED)) break; +        if ((buf->flags & ECP_RBUF_FLAG_RELIABLE) && !(buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF)) break;          if (buf->deliver_delay && (msg_cnt - i < buf->deliver_delay)) break; -        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { -            ssize_t rv = 0; +        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) { +            int rv = 0; +            ecp_seq_t seq = buf->rbuf.seq_start + i;              if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_DELIVERED) {                  buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_DELIVERED; +            } else if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +#ifdef ECP_WITH_MSGQ +                unsigned char mtype = buf->rbuf.msg[idx].msg[0] & ECP_MTYPE_MASK; +                int rv = ecp_conn_msgq_push(conn, seq, mtype); +                if (rv) break; +                buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_MSGQ; +#endif              } else { -                rv = ecp_msg_handle(conn, buf->rbuf.seq_start + i, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size); -            } -            if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { -                buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; -            } else if (rv < 0) { -                break; +                ecp_msg_handle(conn, seq, buf->rbuf.msg[idx].msg, buf->rbuf.msg[idx].size);              } +            buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_RBUF;          }          idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);      } -    if (!(buf->flags & ECP_RBUF_FLAG_MSGQ)) { -        buf->rbuf.seq_start += i; -        buf->rbuf.msg_start = idx; -    } +    buf->rbuf.seq_start += i; +    buf->rbuf.msg_start = idx; + +#ifdef ECP_WITH_MSGQ +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); +#endif +  }  static int ack_send(ECPConnection *conn) { @@ -116,10 +114,10 @@ static int ack_shift(ECPRBRecv *buf) {          idx = ECP_RBUF_IDX_MASK(idx + 1, buf->rbuf.msg_size);          buf->seq_ack++; -        if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue; +        if ((buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) && (buf->ack_map == ECP_RBUF_ACK_FULL)) continue;          buf->ack_map = buf->ack_map << 1; -        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_RECEIVED) { +        if (buf->rbuf.msg[idx].flags & ECP_RBUF_FLAG_IN_RBUF) {              buf->ack_map |= 1;          } else if (!do_ack && ECP_RBUF_SEQ_LTE(buf->seq_ack, buf->rbuf.seq_max - 2 * buf->hole_max)) {              do_ack = 1; @@ -143,6 +141,16 @@ static int ack_shift(ECPRBRecv *buf) {      return do_ack;  } +ssize_t ecp_rbuf_handle_flush(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size) { +    if (size < 0) return size; +     +    ECPRBRecv *buf = conn->rbuf.recv; +    if (buf == NULL) return ECP_ERR; + +    buf->flush = 1; +    return 0; +} +  int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage *msg, unsigned int msg_size) {      int rv; @@ -152,13 +160,20 @@ int ecp_conn_rbuf_recv_create(ECPConnection *conn, ECPRBRecv *buf, ECPRBMessage      buf->ack_map = ECP_RBUF_ACK_FULL;      buf->ack_rate = ACK_RATE; + +#ifdef ECP_WITH_MSGQ +    rv = ecp_conn_msgq_create(conn); +    if (rv) return rv; +#endif +      conn->rbuf.recv = buf; -          return ECP_OK;  }  void ecp_conn_rbuf_recv_destroy(ECPConnection *conn) { -     +#ifdef ECP_WITH_MSGQ +    ecp_conn_msgq_destroy(conn); +#endif      }  int ecp_conn_rbuf_recv_set_hole(ECPConnection *conn, unsigned short hole_max) { @@ -183,50 +198,55 @@ int ecp_conn_rbuf_recv_set_delay(ECPConnection *conn, unsigned short delay) {  }  int ecp_conn_rbuf_recv_start(ECPConnection *conn, ecp_seq_t seq) { +    int rv;      ECPRBRecv *buf = conn->rbuf.recv;      if (buf == NULL) return ECP_ERR;      buf->seq_ack = seq; -    return ecp_rbuf_start(&buf->rbuf, seq); +    rv = ecp_rbuf_start(&buf->rbuf, seq); +    if (rv) return rv; + +#ifdef ECP_WITH_MSGQ +    if (buf->flags & ECP_RBUF_FLAG_MSGQ) { +        rv = ecp_conn_msgq_start(conn, seq); +        if (rv) return rv; +    } +#endif +     +    return ECP_OK;  }  ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *msg, size_t msg_size) {      ECPRBRecv *buf = conn->rbuf.recv;      ecp_seq_t ack_pkt = 0;      ssize_t rv; -    int _rv = ECP_OK;      int do_ack = 0;       if (buf == NULL) return ECP_ERR;      if (msg_size < 1) return ECP_ERR_MIN_MSG; -#ifdef ECP_WITH_MSGQ -    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_lock(&buf->msgq.mutex); -#endif -      if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;      if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {          ecp_seq_t seq_offset = buf->seq_ack - seq;          if (seq_offset < ECP_RBUF_ACK_SIZE) {              ecp_ack_t ack_mask = ((ecp_ack_t)1 << seq_offset); -            if (ack_mask & buf->ack_map) _rv = ECP_ERR_RBUF_DUP; -            if (!_rv) { -                buf->ack_map |= ack_mask; -                do_ack = ack_shift(buf); +            if (ack_mask & buf->ack_map) return ECP_ERR_RBUF_DUP; -                rv = msg_store(conn, seq, msg, msg_size); -                if (rv < 0) _rv = rv; -            } +            buf->ack_map |= ack_mask; +            do_ack = ack_shift(buf); + +            rv = msg_store(conn, seq, msg, msg_size); +            if (rv < 0) return rv;          } else { -            _rv = ECP_ERR_RBUF_IDX; +            return ECP_ERR_RBUF_DUP;          }      } else {          if ((buf->ack_map == ECP_RBUF_ACK_FULL) && (seq == (ecp_seq_t)(buf->seq_ack + 1))) {              if ((buf->flags & ECP_RBUF_FLAG_MSGQ) || buf->deliver_delay) {                  rv = msg_store(conn, seq, msg, msg_size); -                if (rv < 0) _rv = rv; +                if (rv < 0) return rv;              } else {                  ecp_msg_handle(conn, seq, msg, msg_size);                  rv = msg_size; @@ -234,22 +254,15 @@ ssize_t ecp_conn_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned ch                  buf->rbuf.seq_start++;                  buf->rbuf.msg_start = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + 1, buf->rbuf.msg_size);              } -            if (!_rv) buf->seq_ack++; +            buf->seq_ack++;          } else {              rv = msg_store(conn, seq, msg, msg_size); -            if (rv < 0) _rv = rv; +            if (rv < 0) return rv; -            if (!_rv) do_ack = ack_shift(buf); +            do_ack = ack_shift(buf);          }      } -    if (!_rv) msg_flush(conn); - -#ifdef ECP_WITH_MSGQ -    if (buf->flags & ECP_RBUF_FLAG_MSGQ) pthread_mutex_unlock(&buf->msgq.mutex); -#endif -     -    if (_rv) return _rv; -     +    msg_flush(conn);      if (buf->flush) {          buf->flush = 0;          do_ack = 1; | 
