#include #include #include "core.h" #include "tm.h" int ecp_timer_create(ECPTimer *timer) { int rv; timer->head = -1; #ifdef ECP_WITH_PTHREAD rv = pthread_mutex_init(&timer->mutex, NULL); if (rv) return ECP_ERR; #endif return ECP_OK; } void ecp_timer_destroy(ECPTimer *timer) { #ifdef ECP_WITH_PTHREAD pthread_mutex_destroy(&timer->mutex); #endif } void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mtype, ecp_timer_retry_t retry_f, unsigned short cnt, ecp_sts_t timeout, void *param) { ti->conn = conn; ti->mtype = mtype; ti->frag_cnt = 0; ti->retry = retry_f; ti->cnt = cnt; ti->timeout = timeout; ti->abstime = 0; ti->param = param; } void *ecp_timer_get_param(ECPTimerItem *ti) { return ti->param; } int _ecp_timer_push(ECPTimerItem *ti, int refcount_inc) { ECPConnection *conn = ti->conn; ECPTimer *timer = &conn->sock->timer; int i, rv = ECP_OK; ti->abstime = ecp_tm_get_ms() + ti->timeout; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif if (timer->head == ECP_MAX_TIMER-1) { rv = ECP_ERR_MAX_TIMER; goto timer_push_fin; } if (refcount_inc) { rv = ecp_conn_refcount_inc(conn); if (rv) goto timer_push_fin; } for (i=timer->head; i>=0; i--) { if (ECP_STS_LTE(ti->abstime, timer->item[i].abstime)) { if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i)); timer->item[i+1] = *ti; timer->head++; break; } } if (i == -1) { if (timer->head != -1) memmove(timer->item+1, timer->item, sizeof(ECPTimerItem) * (timer->head+1)); timer->item[0] = *ti; timer->head++; } ecp_tm_timer_set(ti->timeout); timer_push_fin: #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&timer->mutex); #endif return rv; } int ecp_timer_push(ECPTimerItem *ti) { return _ecp_timer_push(ti, 1); } void ecp_timer_pop(ECPConnection *conn, unsigned char mtype, ecp_seq_t seq, unsigned char frag_cnt, unsigned char frag_tot) { ECPTimer *timer = &conn->sock->timer; int i; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif for (i=timer->head; i>=0; i--) { ECPConnection *curr_conn = timer->item[i].conn; if ((conn == curr_conn) && (mtype == timer->item[i].mtype)) { if (mtype & ECP_MTYPE_FLAG_FRAG) { int rv; rv = ECP_OK; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&conn->mutex); #endif if (conn->frag_cnt) { ecp_seq_t seq_frag = conn->frag_seq + frag_cnt; if (ECP_SEQ_LT(seq_frag, seq)) { timer->item[i].frag_cnt = 0; } else if (seq_frag != seq) { rv = ECP_ERR_FRAG; } } #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&conn->mutex); #endif if (rv) break; if (timer->item[i].frag_cnt) { timer->item[i].frag_cnt--; } else if (frag_tot) { timer->item[i].frag_cnt = frag_tot - 1; } if (timer->item[i].frag_cnt != 0) break; } if (i != timer->head) { memmove(timer->item+i, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i)); memset(timer->item+timer->head, 0, sizeof(ECPTimerItem)); } else { memset(timer->item+i, 0, sizeof(ECPTimerItem)); } ecp_conn_refcount_dec(conn); timer->head--; break; } } #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&timer->mutex); #endif } void ecp_timer_remove(ECPConnection *conn) { ECPTimer *timer = &conn->sock->timer; int i; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif for (i=timer->head; i>=0; i--) { ECPConnection *curr_conn = timer->item[i].conn; if (conn == curr_conn) { if (i != timer->head) { memmove(timer->item+i, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i)); memset(timer->item+timer->head, 0, sizeof(ECPTimerItem)); } else { memset(timer->item+i, 0, sizeof(ECPTimerItem)); } ecp_conn_refcount_dec(conn); timer->head--; } } #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&timer->mutex); #endif } ecp_sts_t ecp_timer_exe(ECPSocket *sock) { ECPContext *ctx; ECPTimer *timer = &sock->timer; ECPTimerItem to_exec[ECP_MAX_TIMER]; ecp_sts_t ret = 0; ecp_sts_t now = ecp_tm_get_ms(); int to_exec_size = 0; int i; ctx = sock->ctx; #ifdef ECP_WITH_PTHREAD pthread_mutex_lock(&timer->mutex); #endif for (i=timer->head; i>=0; i--) { ecp_sts_t abstime = timer->item[i].abstime; if (ECP_STS_LT(now, abstime)) { ret = abstime - now; break; } to_exec[to_exec_size] = timer->item[i]; to_exec_size++; } if (i != timer->head) { memset(timer->item+i+1, 0, sizeof(ECPTimerItem) * (timer->head-i)); timer->head = i; } #ifdef ECP_WITH_PTHREAD pthread_mutex_unlock(&timer->mutex); #endif for (i=to_exec_size-1; i>=0; i--) { ECPConnection *conn = to_exec[i].conn; unsigned char mtype = to_exec[i].mtype; ecp_timer_retry_t retry = to_exec[i].retry; if (to_exec[i].cnt > 0) { to_exec[i].cnt--; to_exec[i].frag_cnt = 0; if (retry) { ssize_t rv; rv = retry(conn, to_exec+i); if (rv < 0) ECP_LOG_ERR(sock, "ecp_timer_exe: retry err:%d\n", (int)rv); } } else { int rv; rv = ECP_ERR_TIMEOUT; if (mtype == ECP_MTYPE_OPEN_REP) { int _rv; /* remove only if connection is not already open, avoids race condition with open reply message handler */ _rv = _ecp_conn_remove(conn); if (_rv) rv = ECP_OK; } if (rv) { ecp_err_handle(conn, mtype, rv); } } ecp_conn_refcount_dec(conn); } return ret; } ssize_t ecp_timer_send(ECPConnection *conn, ecp_timer_retry_t send_f, unsigned char mtype, unsigned short cnt, ecp_sts_t timeout, void *param) { ECPTimerItem ti; if (cnt == 0) return ECP_ERR; ecp_timer_item_init(&ti, conn, mtype, send_f, cnt-1, timeout, param); return send_f(conn, &ti); }