diff options
| -rw-r--r-- | code/core/TODO | 4 | ||||
| -rw-r--r-- | code/core/htable/htable.c | 2 | ||||
| -rw-r--r-- | code/core/rbuf.c | 79 | ||||
| -rw-r--r-- | code/core/rbuf.h | 19 | ||||
| -rw-r--r-- | code/core/rbuf_recv.c | 5 | ||||
| -rw-r--r-- | code/core/rbuf_send.c | 105 | ||||
| -rw-r--r-- | code/core/timer.c | 14 | 
7 files changed, 138 insertions, 90 deletions
| diff --git a/code/core/TODO b/code/core/TODO index b1e7d12..bd4cc70 100644 --- a/code/core/TODO +++ b/code/core/TODO @@ -6,8 +6,6 @@  rbuf:  - implement _wait variants of open / send +- msgq should subtract ECP_MTYPE_SYS from mtype  - consider adding one buffer for all msgs (frag. issue) -should implement: -- send flush (seq) -- sending ack with congestion control diff --git a/code/core/htable/htable.c b/code/core/htable/htable.c index 82dbcbc..6330ef8 100644 --- a/code/core/htable/htable.c +++ b/code/core/htable/htable.c @@ -26,6 +26,7 @@ static ECPConnection *h_search(void *h, unsigned char *k) {      return hashtable_search(h, k);  } +#ifdef ECP_WITH_HTABLE  int ecp_htable_init(ECPHTableIface *h) {      h->init = 1;      h->create = h_create; @@ -35,3 +36,4 @@ int ecp_htable_init(ECPHTableIface *h) {      h->search = h_search;      return ECP_OK;  } +#endif
\ No newline at end of file diff --git a/code/core/rbuf.c b/code/core/rbuf.c index ee9a7fc..f4ec3a4 100644 --- a/code/core/rbuf.c +++ b/code/core/rbuf.c @@ -44,6 +44,12 @@ ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned ch      return msg_size;  } +int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) { +    memset(rbuf_info, 0, sizeof(ECPRBInfo)); + +    return ECP_OK; +} +  ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq) {      unsigned char packet[ECP_MAX_PKT];      ECPSocket *sock = conn->sock; @@ -67,79 +73,6 @@ ssize_t ecp_rbuf_pld_send(ECPConnection *conn, unsigned char *payload, size_t pa      return rv;  } -int ecp_rbuf_info_init(ECPRBInfo *rbuf_info) { -    memset(rbuf_info, 0, sizeof(ECPRBInfo)); - -    return ECP_OK; -} - -int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) { -    if (buf == NULL) return ECP_ERR; -     -#ifdef ECP_WITH_PTHREAD -    pthread_mutex_lock(&buf->mutex); -#endif -    int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); -#ifdef ECP_WITH_PTHREAD -    pthread_mutex_unlock(&buf->mutex); -#endif - -    if (idx < 0) return idx; -     -    rbuf_info->seq = seq; -    rbuf_info->idx = idx; -    rbuf_info->mtype = mtype; -    buf->rbuf.msg[idx].size = 0; -    buf->rbuf.msg[idx].flags = 0; - -    return ECP_OK;  -} - -ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) { -    unsigned char flags = 0; -    int do_send = 1; -    ssize_t rv = 0; -    ecp_seq_t seq = rbuf_info->seq; -    unsigned int idx = rbuf_info->idx; -    unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK; - -    if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; - -    rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); -    if (rv < 0) return rv; - -    if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { -#ifdef ECP_WITH_PTHREAD -        pthread_mutex_lock(&buf->mutex); -#endif - -        if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; - -        if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { -            if (!buf->cnt_cc) buf->seq_cc = seq; -            buf->cnt_cc++; -            buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; -            do_send = 0; -        } else { -            buf->in_transit++; -        } -         -#ifdef ECP_WITH_PTHREAD -        pthread_mutex_unlock(&buf->mutex); -#endif -    } - -    if (do_send) { -        int _rv; -        if (ti) { -            _rv = ecp_timer_push(ti); -            if (_rv) return _rv; -        } -        rv = ecp_pkt_send(sock, addr, packet, pkt_size); -    } -    return rv; -} -  int ecp_rbuf_conn_create(ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size) {      int rv; diff --git a/code/core/rbuf.h b/code/core/rbuf.h index 1332dee..77663ad 100644 --- a/code/core/rbuf.h +++ b/code/core/rbuf.h @@ -35,6 +35,7 @@ typedef struct ECPRBMessage {      unsigned char msg[ECP_MAX_PKT];      ssize_t size;      unsigned char flags; +    short idx_t;  } ECPRBMessage;  typedef struct ECPRBuffer { @@ -45,6 +46,17 @@ typedef struct ECPRBuffer {      ECPRBMessage *msg;  } ECPRBuffer; + +typedef struct ECPRBTimerItem { +    unsigned char occupied; +    ECPTimerItem item; +} ECPRBTimerItem; + +typedef struct ECPRBTimer { +    ECPRBTimerItem item[ECP_MAX_TIMER]; +    unsigned short idx_w; +} ECPRBTimer; +  typedef struct ECPRBRecv {      unsigned char flags;      unsigned char flush; @@ -72,6 +84,7 @@ typedef struct ECPRBSend {      ecp_seq_t seq_flush;      unsigned int nack_rate;      ECPRBuffer rbuf; +    ECPRBTimer timer;  #ifdef ECP_WITH_PTHREAD      pthread_mutex_t mutex;  #endif @@ -93,11 +106,9 @@ int ecp_rbuf_init(ECPRBuffer *rbuf, ECPRBMessage *msg, unsigned int msg_size);  int ecp_rbuf_start(ECPRBuffer *rbuf, ecp_seq_t seq);  int ecp_rbuf_msg_idx(ECPRBuffer *rbuf, ecp_seq_t seq);  ssize_t ecp_rbuf_msg_store(ECPRBuffer *rbuf, ecp_seq_t seq, int idx, unsigned char *msg, size_t msg_size, unsigned char test_flags, unsigned char set_flags); -ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);  int ecp_rbuf_info_init(ECPRBInfo *rbuf_info); -int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info); -ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info); +ssize_t ecp_rbuf_pld_send(struct ECPConnection *conn, unsigned char *payload, size_t payload_size, ecp_seq_t seq);  int ecp_rbuf_conn_create(struct ECPConnection *conn, ECPRBSend *buf_s, ECPRBMessage *msg_s, unsigned int msg_s_size, ECPRBRecv *buf_r, ECPRBMessage *msg_r, unsigned int msg_r_size);  void ecp_rbuf_conn_destroy(struct ECPConnection *conn); @@ -113,6 +124,8 @@ ssize_t ecp_rbuf_recv_store(struct ECPConnection *conn, ecp_seq_t seq, unsigned  int ecp_rbuf_send_create(struct ECPConnection *conn, ECPRBSend *buf, ECPRBMessage *msg, unsigned int msg_size);  void ecp_rbuf_send_destroy(struct ECPConnection *conn);  int ecp_rbuf_send_start(struct ECPConnection *conn); +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info); +ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, struct ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info);  ssize_t ecp_rbuf_handle_ack(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size);  ssize_t ecp_rbuf_handle_flush(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, ssize_t size); diff --git a/code/core/rbuf_recv.c b/code/core/rbuf_recv.c index d78ffce..e73c602 100644 --- a/code/core/rbuf_recv.c +++ b/code/core/rbuf_recv.c @@ -221,11 +221,14 @@ ssize_t ecp_rbuf_recv_store(ECPConnection *conn, ecp_seq_t seq, unsigned char *m      ecp_seq_t ack_pkt = 0;      ssize_t rv;      int do_ack = 0;  +    unsigned char mtype;      if (buf == NULL) return ECP_ERR;      if (msg_size < 1) return ECP_ERR_MIN_MSG; -    // XXX mtype == RBACK | RBFLUSH handle imediately +    mtype = msg[0] & ECP_MTYPE_MASK; +    if ((mtype == ECP_MTYPE_RBACK) || (mtype == ECP_MTYPE_RBFLUSH)) return ecp_msg_handle(conn, seq, msg, msg_size); +      if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) ack_pkt = seq - buf->rbuf.seq_max;      if (ECP_RBUF_SEQ_LTE(seq, buf->seq_ack)) {          ecp_seq_t seq_offset = buf->seq_ack - seq; diff --git a/code/core/rbuf_send.c b/code/core/rbuf_send.c index e7020cb..b6cdbac 100644 --- a/code/core/rbuf_send.c +++ b/code/core/rbuf_send.c @@ -27,6 +27,9 @@ static void cc_flush(ECPConnection *conn) {      ecp_win_t pkt_cc_cnt = buf->win_size > buf->in_transit ? buf->win_size - buf->in_transit : 0;      int pkt_to_send = pkt_buf_cnt < pkt_cc_cnt ? pkt_buf_cnt : pkt_cc_cnt;      int i; + +    ECPTimerItem ti[ECP_MAX_TIMER]; +    unsigned short max_t = 0;      if (pkt_to_send) {          unsigned int idx = ecp_rbuf_msg_idx(rbuf, buf->seq_cc); @@ -35,6 +38,15 @@ static void cc_flush(ECPConnection *conn) {          for (i=0; i<pkt_to_send; i++) {              if (!(rbuf->msg[_idx].flags & ECP_RBUF_FLAG_IN_CCONTROL)) break;              rbuf->msg[_idx].flags &= ~ECP_RBUF_FLAG_IN_CCONTROL; +            if (rbuf->msg[_idx].idx_t != -1) { +                ECPRBTimer *timer = &buf->timer; +                ECPRBTimerItem *item = &timer->item[rbuf->msg[_idx].idx_t]; + +                item->occupied = 0; +                ti[max_t] = item->item; +                rbuf->msg[_idx].idx_t = max_t; +                max_t++; +            }              _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);          }          pkt_to_send = i; @@ -45,6 +57,7 @@ static void cc_flush(ECPConnection *conn) {  #endif          for (i=0; i<pkt_to_send; i++) { +            if (rbuf->msg[_idx].idx_t != -1) ecp_timer_push(&ti[rbuf->msg[_idx].idx_t]);              ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);              _idx = ECP_RBUF_IDX_MASK(_idx + 1, rbuf->msg_size);          } @@ -114,7 +127,9 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt                      if (buf->flags & ECP_RBUF_FLAG_RELIABLE) {                          unsigned int _idx = ECP_RBUF_IDX_MASK(idx + 1 - ECP_RBUF_ACK_SIZE + i, rbuf->msg_size);                          if ((rbuf->msg[_idx].size == 0) || (rbuf->msg[_idx].flags & ECP_RBUF_FLAG_SYS)) { -                             +                            unsigned char payload[ECP_SIZE_PLD(0)]; +                            ecp_pld_set_type(payload, ECP_MTYPE_NOP); +                            ecp_rbuf_pld_send(conn, payload, sizeof(payload), seq_ack + i);                          } else {                              ecp_pkt_send(conn->sock, &conn->node.addr, rbuf->msg[_idx].msg, rbuf->msg[_idx].size);                          } @@ -147,9 +162,7 @@ ssize_t ecp_rbuf_handle_ack(ECPConnection *conn, ecp_seq_t seq, unsigned char mt          }          if (buf->flush) {              if (ECP_RBUF_SEQ_LT(buf->seq_flush, rbuf->seq_start)) buf->flush = 0; -            if (buf->flush) { -                do_flush = 1; -            } +            if (buf->flush) do_flush = 1;          }          if (buf->cnt_cc) cc_flush(conn);      } @@ -250,9 +263,91 @@ int ecp_rbuf_flush(ECPConnection *conn) {      pthread_mutex_unlock(&buf->mutex);  #endif -    ssize_t rv = flush_send(conn, 0); +    ssize_t rv = flush_send(conn, NULL);      if (rv < 0) return rv;      return ECP_OK;  } +int ecp_rbuf_pkt_prep(ECPRBSend *buf, ecp_seq_t seq, unsigned char mtype, ECPRBInfo *rbuf_info) { +    if (buf == NULL) return ECP_ERR; +     +#ifdef ECP_WITH_PTHREAD +    pthread_mutex_lock(&buf->mutex); +#endif +    int idx = ecp_rbuf_msg_idx(&buf->rbuf, seq); +#ifdef ECP_WITH_PTHREAD +    pthread_mutex_unlock(&buf->mutex); +#endif + +    if (idx < 0) return idx; +     +    rbuf_info->seq = seq; +    rbuf_info->idx = idx; +    rbuf_info->mtype = mtype; +    buf->rbuf.msg[idx].size = 0; +    buf->rbuf.msg[idx].flags = 0; + +    return ECP_OK;  +} + +ssize_t ecp_rbuf_pkt_send(ECPRBSend *buf, ECPSocket *sock, ECPNetAddr *addr, ECPTimerItem *ti, unsigned char *packet, size_t pkt_size, ECPRBInfo *rbuf_info) { +    unsigned char flags = 0; +    int do_send = 1; +    ssize_t rv = 0; +    ecp_seq_t seq = rbuf_info->seq; +    unsigned int idx = rbuf_info->idx; +    unsigned char mtype = rbuf_info->mtype & ECP_MTYPE_MASK; + +    if (mtype < ECP_MAX_MTYPE_SYS) flags |= ECP_RBUF_FLAG_SYS; + +    rv = ecp_rbuf_msg_store(&buf->rbuf, seq, idx, packet, pkt_size, 0, flags); +    if (rv < 0) return rv; + +    if (buf->flags & ECP_RBUF_FLAG_CCONTROL) { +#ifdef ECP_WITH_PTHREAD +        pthread_mutex_lock(&buf->mutex); +#endif + +        if (ECP_RBUF_SEQ_LT(buf->rbuf.seq_max, seq)) buf->rbuf.seq_max = seq; + +        if (buf->cnt_cc || (buf->in_transit >= buf->win_size)) { +            if (!buf->cnt_cc) buf->seq_cc = seq; +            buf->cnt_cc++; +            buf->rbuf.msg[idx].flags |= ECP_RBUF_FLAG_IN_CCONTROL; +            do_send = 0; +            if (ti) { +                ECPRBTimer *timer = &buf->timer; +                ECPRBTimerItem *item = &timer->item[timer->idx_w]; +                     +                if (!item->occupied) { +                    item->occupied = 1; +                    item->item = *ti; +                    buf->rbuf.msg[idx].idx_t = timer->idx_w; +                    timer->idx_w = (timer->idx_w) % ECP_MAX_TIMER; +                } else { +                    rv = ECP_ERR_MAX_TIMER; +                } +            } else { +                buf->rbuf.msg[idx].idx_t = -1; +            } +        } else { +            buf->in_transit++; +        } +         +#ifdef ECP_WITH_PTHREAD +        pthread_mutex_unlock(&buf->mutex); +#endif +         +        if (rv) return rv; +    } + +    if (do_send) { +        if (ti) { +            int _rv = ecp_timer_push(ti); +            if (_rv) return _rv; +        } +        rv = ecp_pkt_send(sock, addr, packet, pkt_size); +    } +    return rv; +} diff --git a/code/core/timer.c b/code/core/timer.c index a2d3fa5..ea516df 100644 --- a/code/core/timer.c +++ b/code/core/timer.c @@ -21,15 +21,16 @@ void ecp_timer_destroy(ECPTimer *timer) {  }  int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, unsigned short cnt, unsigned int timeout) { -    unsigned int abstime = conn->sock->ctx->tm.abstime_ms(timeout); -      if ((mtype & ECP_MTYPE_MASK) >= ECP_MAX_MTYPE) return ECP_ERR_MAX_MTYPE; +    if (ti == NULL) return ECP_ERR; +    if (conn == NULL) return ECP_ERR; +          ti->conn = conn;      ti->mtype = mtype;      ti->cnt = cnt-1;      ti->timeout = timeout; -    ti->abstime = abstime; +    ti->abstime = 0;      ti->retry = NULL;      ti->pld = NULL;      ti->pld_size = 0; @@ -40,7 +41,10 @@ int ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mty  int ecp_timer_push(ECPTimerItem *ti) {      int i, is_reg, rv = ECP_OK;      ECPConnection *conn = ti->conn; +    ECPContext *ctx = conn->sock->ctx;      ECPTimer *timer = &conn->sock->timer; +     +    ti->abstime = ctx->tm.abstime_ms(ti->timeout);  #ifdef ECP_WITH_PTHREAD      pthread_mutex_lock(&timer->mutex); @@ -181,7 +185,6 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {      for (i=to_exec_size-1; i>=0; i--) {          int rv = ECP_OK;          ECPConnection *conn = to_exec[i].conn; -        ECPSocket *sock = conn->sock;          unsigned char mtype = to_exec[i].mtype;          unsigned char *pld = to_exec[i].pld;          unsigned char pld_size = to_exec[i].pld_size; @@ -191,7 +194,6 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {          if (to_exec[i].cnt) {              ssize_t _rv = 0;              to_exec[i].cnt--; -            to_exec[i].abstime = now + to_exec[i].timeout;              if (retry) {                  _rv = retry(conn, to_exec+i);                  if (_rv < 0) rv = _rv; @@ -203,6 +205,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {          } else if (handler) {              handler(conn, 0, mtype, NULL, ECP_ERR_TIMEOUT);          } +  #ifdef ECP_WITH_PTHREAD          pthread_mutex_lock(&conn->mutex);  #endif @@ -210,6 +213,7 @@ unsigned int ecp_timer_exe(ECPSocket *sock) {  #ifdef ECP_WITH_PTHREAD          pthread_mutex_unlock(&conn->mutex);  #endif +      }      return ret; | 
