diff options
Diffstat (limited to 'code/core/msgq.c')
| -rw-r--r-- | code/core/msgq.c | 112 | 
1 files changed, 60 insertions, 52 deletions
| diff --git a/code/core/msgq.c b/code/core/msgq.c index 2ba0ed5..c441acd 100644 --- a/code/core/msgq.c +++ b/code/core/msgq.c @@ -7,7 +7,7 @@  #define MIN(a,b) (((a)<(b))?(a):(b))  #define MAX(a,b) (((a)>(b))?(a):(b)) -#define MSG_NEXT(msgi, max_msgs)    ((msgi + 1) % max_msgs) +#define MSG_IDX_MASK(idx)    ((idx) & ((ECP_MAX_CONN_MSG) - 1))  static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {      struct timeval tv; @@ -22,16 +22,27 @@ static struct timespec *abstime_ts(struct timespec *ts, unsigned int msec) {  }  int ecp_conn_msgq_create(ECPConnection *conn) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      int i; +    int rv; + +    if (msgq == NULL) return ECP_ERR; +     +    memset(msgq, 0, sizeof(ECPConnMsgQ)); +    rv = pthread_mutex_init(&msgq->mutex, NULL); +    if (rv) return ECP_ERR; +      for (i=0; i<ECP_MAX_MTYPE; i++) { -        int rv = pthread_cond_init(&conn->msgq.cond[i], NULL); +        rv = pthread_cond_init(&msgq->cond[i], NULL);          if (rv) {              int j;              for (j=0; j<i; j++) { -                pthread_cond_destroy(&conn->msgq.cond[j]); +                pthread_cond_destroy(&msgq->cond[j]);              } +            pthread_mutex_destroy(&msgq->mutex);              return ECP_ERR;          }      } @@ -40,81 +51,78 @@ int ecp_conn_msgq_create(ECPConnection *conn) {  }  void ecp_conn_msgq_destroy(ECPConnection *conn) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      int i; +    if (msgq == NULL) return; +      for (i=0; i<ECP_MAX_MTYPE; i++) { -        pthread_cond_destroy(&conn->msgq.cond[i]); +        pthread_cond_destroy(&msgq->cond[i]);      } +    pthread_mutex_destroy(&msgq->mutex);  } -ssize_t ecp_conn_msgq_push(ECPConnection *conn, unsigned char *msg, size_t msg_size) { -    ECPConnMsgQ *msgq = &conn->msgq; -    unsigned short msg_idx = msgq->empty_idx; -    unsigned short i; -    unsigned short done = 0; -    unsigned char mtype; -     -    if (msg_size == 0) return ECP_OK; +int ecp_conn_msgq_push(ECPConnection *conn, ecp_seq_t seq, unsigned char mtype) { +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL; -    mtype = *msg; -    msg++; -    msg_size--; - -    if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; -    if (msg_size >= ECP_MAX_MSG) return ECP_ERR_MAX_MSG; -    if (msg_size < 1) return ECP_ERR_MIN_MSG; +    if (msgq == NULL) return ECP_ERR; -    for (i=0; i<ECP_MAX_CONN_MSG; i++) { -        if (!msgq->occupied[msg_idx]) { -            ECPMessage *message = &msgq->msg[msg_idx]; -            if (msg_size > 0) memcpy(message->msg, msg, msg_size); -            message->size = msg_size; -            if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) pthread_cond_signal(&msgq->cond[mtype]); -            msgq->msg_idx[mtype][msgq->w_idx[mtype]] = msg_idx; -            msgq->w_idx[mtype] = MSG_NEXT(msgq->w_idx[mtype], ECP_MAX_CONN_MSG+1); - -            msgq->empty_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); -            msgq->occupied[msg_idx] = 1; -            done = 1; -            break; -        } -        msg_idx = MSG_NEXT(msg_idx, ECP_MAX_CONN_MSG); -    } -    if (done) { -        return msg_size+1; -    } else { -        return ECP_ERR_MAX_CONN_MSG; -    } +    if (msgq->idx_w[mtype] - msgq->idx_r[mtype] == ECP_MAX_CONN_MSG) return ECP_ERR_MAX_CONN_MSG; +    if (msgq->idx_w[mtype] == msgq->idx_r[mtype]) pthread_cond_signal(&msgq->cond[mtype]); +     +    msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_w[mtype])] = seq; +    msgq->idx_w[mtype]++; +     +    return ECP_OK;  }  ssize_t ecp_conn_msgq_pop(ECPConnection *conn, unsigned char mtype, unsigned char *msg, size_t msg_size, unsigned int timeout) { -    ECPConnMsgQ *msgq = &conn->msgq; -    ECPMessage *message; +    ECPRBRecv *buf = conn->rbuf.recv; +    ECPConnMsgQ *msgq = buf ? &buf->msgq : NULL;      ssize_t rv = ECP_OK; -    unsigned short msg_idx; +    ecp_seq_t seq; +    int idx; +    if (msgq == NULL) return ECP_ERR;      if (mtype >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; -    if (msgq->r_idx[mtype] == msgq->w_idx[mtype]) { +    if (msgq->idx_r[mtype] == msgq->idx_w[mtype]) {          if (timeout == -1) { -            pthread_cond_wait(&msgq->cond[mtype], &conn->mutex); +            pthread_cond_wait(&msgq->cond[mtype], &msgq->mutex);          } else {              struct timespec ts; -            int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &conn->mutex, abstime_ts(&ts, timeout)); +            int _rv = pthread_cond_timedwait(&msgq->cond[mtype], &msgq->mutex, abstime_ts(&ts, timeout));              if (_rv) rv = ECP_ERR_TIMEOUT;          }      }      if (!rv) { -        msg_idx = msgq->msg_idx[mtype][msgq->r_idx[mtype]]; -        msgq->r_idx[mtype] = MSG_NEXT(msgq->r_idx[mtype], ECP_MAX_CONN_MSG+1); -        msgq->occupied[msg_idx] = 0; -        message = &msgq->msg[msg_idx]; -        rv = message->size; +        seq = msgq->seq_msg[mtype][MSG_IDX_MASK(msgq->idx_r[mtype])]; +        msgq->idx_r[mtype]++; +        idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); +        if (idx < 0) rv = idx; +    } +    if (!rv) { +        buf->rbuf.msg[idx].flags &= ~ECP_RBUF_FLAG_RECEIVED; +        if (buf->rbuf.seq_start == seq) { +            int i, _idx = idx; +            ecp_seq_t msg_cnt = buf->rbuf.seq_max - buf->rbuf.seq_start + 1; + +            for (i=0; i<msg_cnt; i++) { +                if (buf->rbuf.msg[_idx].flags & ECP_RBUF_FLAG_RECEIVED) break; +                _idx = ECP_RBUF_IDX_MASK(_idx + 1, buf->rbuf.msg_size); +            } +            buf->rbuf.seq_start += i; +            buf->rbuf.msg_start = _idx; +        } +        rv = buf->rbuf.msg[idx].size;          if (rv >= 0) {              rv = MIN(msg_size, rv); -            memcpy(msg, message->msg, rv); +            memcpy(msg, buf->rbuf.msg[idx].msg, rv);          }      } +      return rv;  } | 
