diff options
| author | Uros Majstorovic <majstor@majstor.org> | 2021-08-29 16:25:40 +0200 | 
|---|---|---|
| committer | Uros Majstorovic <majstor@majstor.org> | 2021-08-29 16:25:40 +0200 | 
| commit | ff7ffade2f5686ae977af578cb87040cc4654994 (patch) | |
| tree | 9b44f006a0d70535095928dcff06d7f26abe59e2 /ecp/src/msgq.c | |
| parent | cdaa2560c0271585317450b57047a7f812d8366e (diff) | |
ecp code cleanup
Diffstat (limited to 'ecp/src/msgq.c')
| -rw-r--r-- | ecp/src/msgq.c | 65 | 
1 files changed, 34 insertions, 31 deletions
| diff --git a/ecp/src/msgq.c b/ecp/src/msgq.c index 201af4a..61c7b02 100644 --- a/ecp/src/msgq.c +++ b/ecp/src/msgq.c @@ -12,7 +12,7 @@  static struct timespec *abstime_ts(struct timespec *ts, ecp_cts_t msec) {      struct timeval tv;      uint64_t us_start; -     +      gettimeofday(&tv, NULL);      us_start = tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec;      us_start += msec * 1000; @@ -26,9 +26,9 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {      int rv;      if (msgq == NULL) return ECP_ERR; -     +      memset(msgq, 0, sizeof(ECPConnMsgQ)); -     +      rv = pthread_mutex_init(&msgq->mutex, NULL);      if (rv) return ECP_ERR; @@ -50,7 +50,7 @@ int ecp_conn_msgq_create(ECPConnMsgQ *msgq) {  void ecp_conn_msgq_destroy(ECPConnMsgQ *msgq) {      int i; -     +      if (msgq == NULL) return;      for (i=0; i<ECP_MAX_MTYPE; i++) { @@ -64,7 +64,7 @@ int ecp_conn_msgq_start(ECPConnMsgQ *msgq, ecp_seq_t seq) {      msgq->seq_max = seq;      msgq->seq_start = seq + 1; -     +      return ECP_OK;  } @@ -75,23 +75,25 @@ int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype)      mtype &= ECP_MTYPE_MASK;      if (msgq == NULL) return ECP_ERR;      if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; -     +      if ((unsigned short)(msgq->idx_w[mtype] - msgq->idx_r[mtype]) == ECP_MSGQ_MAX_MSG) return ECP_MSGQ_ERR_MAX_MSG;      if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); -     +      msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq;      msgq->idx_w[mtype]++;      if (ECP_SEQ_LT(msgq->seq_max, seq)) msgq->seq_max = seq; -     +      return ECP_OK;  }  ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, ecp_cts_t timeout) {      ECPRBRecv *buf = conn->rbuf.recv;      ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; -    ssize_t rv = ECP_OK; -    int idx; +    ecp_seq_t seq; +    ecp_seq_t seq_offset; +    unsigned int idx; +    ssize_t rv;      if (msgq == NULL) return ECP_ERR;      if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; @@ -101,33 +103,34 @@ ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned cha              pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);          } else {              struct timespec ts; -            int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); -            if (_rv) rv = ECP_ERR_TIMEOUT; +            int _rv; + +            _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout)); +            if (_rv) return ECP_ERR_TIMEOUT;          }      } -    if (!rv) { -        ecp_seq_t seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; -        ecp_seq_t seq_offset = seq - buf->rbuf.seq_start; -        unsigned int idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); -        msgq->idx_r[mtype]++; -        buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; +    seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; +    seq_offset = seq - buf->rbuf.seq_start; +    idx = ECP_RBUF_IDX_MASK(buf->rbuf.msg_start + seq_offset, buf->rbuf.msg_size); -        if (msgq->seq_start == seq) { -            int i, _idx = idx; -            ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; +    msgq->idx_r[mtype]++; +    buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_IN_MSGQ; -            for (i=0; i<msg_cnt; i++) { -                if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; -                _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); -            } -            msgq->seq_start += i; -        } -        rv = buf->rbuf.msg[idx].size - 1; -        if (rv >= 0) { -            rv = MIN(msg_size, rv); -            memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv); +    if (msgq->seq_start == seq) { +        int i, _idx = idx; +        ecp_seq_t msg_cnt = msgq->seq_max - msgq->seq_start + 1; + +        for (i=0; i<msg_cnt; i++) { +            if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_IN_MSGQ) break; +            _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size);          } +        msgq->seq_start += i; +    } +    rv = buf->rbuf.msg[idx].size - 1; +    if (rv >= 0) { +        rv = MIN(msg_size, rv); +        memcpy(msg, buf->rbuf.msg[idx].msg + 1, rv);      }      return rv; | 
