summaryrefslogtreecommitdiff
path: root/ecp
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-04-23 17:57:46 +0200
committerUros Majstorovic <majstor@majstor.org>2024-04-23 17:57:46 +0200
commit40f4f88f04e834a9b9849dd6bcda78c1a1893506 (patch)
tree9e71c91d10a59d6aaf38fb5543f499b7f96cbb25 /ecp
parent65950182afb58a8bc41368e561ff9d1676660681 (diff)
fixed reference counter / garbage collector
Diffstat (limited to 'ecp')
-rw-r--r--ecp/src/ecp/core.c120
-rw-r--r--ecp/src/ecp/core.h6
-rw-r--r--ecp/src/ecp/timer.c48
3 files changed, 104 insertions, 70 deletions
diff --git a/ecp/src/ecp/core.c b/ecp/src/ecp/core.c
index 3def373..1e24d0b 100644
--- a/ecp/src/ecp/core.c
+++ b/ecp/src/ecp/core.c
@@ -456,22 +456,22 @@ static void conn_table_expire(ECPSocket *sock, ecp_sts_t to, ecp_conn_expired_t
for (i=0; i<sock->conn_table.size; i++) {
conn = sock->conn_table.arr[i];
- if (ecp_conn_is_gc(conn)) {
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&conn->mutex);
+ pthread_mutex_lock(&conn->mutex);
#endif
- expired = conn_expired(conn, now, to);
+ expired = 0;
+ if (_ecp_conn_in_gct(conn)) expired = conn_expired(conn, now, to);
+ if (expired) _ecp_conn_pull_gct(conn);
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&conn->mutex);
+ pthread_mutex_unlock(&conn->mutex);
#endif
- if (expired) {
- to_remove[remove_cnt] = conn;
- remove_cnt++;
- if (remove_cnt == ECP_MAX_EXP) break;
- }
+ if (expired) {
+ to_remove[remove_cnt] = conn;
+ remove_cnt++;
+ if (remove_cnt == ECP_MAX_EXP) break;
}
}
@@ -964,30 +964,36 @@ int ecp_conn_create(ECPConnection *conn, ECPConnection *parent) {
pcount = parent ? parent->pcount + 1 : 0;
if (pcount > ECP_MAX_PARENT) return ECP_ERR_MAX_PARENT;
+ if (parent) {
+ rv = ecp_conn_refcount_inc(parent);
+ if (rv) return rv;
+ conn->parent = parent;
+ conn->pcount = pcount;
+ }
#endif
#ifdef ECP_WITH_PTHREAD
rv = pthread_mutex_init(&conn->mutex, NULL);
- if (rv) return ECP_ERR;
+ if (rv) {
+#ifdef ECP_WITH_VCONN
+ if (conn->parent) ecp_conn_refcount_dec(conn->parent);
+#endif
+
+ return ECP_ERR;
+ }
#endif
rv = ecp_ext_conn_create(conn);
if (rv) {
+#ifdef ECP_WITH_VCONN
+ if (conn->parent) ecp_conn_refcount_dec(conn->parent);
+#endif
#ifdef ECP_WITH_PTHREAD
pthread_mutex_destroy(&conn->mutex);
#endif
return ECP_ERR;
}
-
-#ifdef ECP_WITH_VCONN
- if (parent) {
- conn->parent = parent;
- conn->pcount = pcount;
-
- ecp_conn_refcount_inc(parent);
- }
-#endif
if (!ecp_conn_is_gc(conn)) conn->refcount++;
return ECP_OK;
@@ -1110,21 +1116,36 @@ int ecp_conn_insert_gc(ECPConnection *conn) {
ECPSocket *sock = conn->sock;
int rv = ECP_OK;
-#ifdef ECP_WITH_HTABLE
+ rv = ecp_conn_refcount_inc(conn);
+ if (rv) return rv;
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&sock->conn_table.mutex_gc);
#endif
+#ifdef ECP_WITH_HTABLE
+
rv = ecp_ht_insert(sock->conn_table.keys_gc, &conn->remote.key_perma.public, conn);
+#else /* ECP_WITH_HTABLE */
+
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&sock->conn_table.mutex_gc);
+ pthread_mutex_lock(&conn->mutex);
#endif
- if (!rv) ecp_conn_refcount_inc(conn);
+ _ecp_conn_push_gct(conn);
+
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&conn->mutex);
+#endif
#endif /* ECP_WITH_HTABLE */
+#ifdef ECP_WITH_PTHREAD
+ pthread_mutex_unlock(&sock->conn_table.mutex_gc);
+#endif
+
+ if (rv) ecp_conn_refcount_dec(conn);
return rv;
}
@@ -1150,15 +1171,15 @@ void ecp_conn_remove(ECPConnection *conn) {
void ecp_conn_remove_addr(ECPConnection *conn) {
ECPSocket *sock = conn->sock;
+ if (!ecp_conn_is_outb(conn)) return;
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&sock->conn_table.mutex);
- pthread_mutex_lock(&conn->mutex);
#endif
conn_table_remove_addr(conn);
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&conn->mutex);
pthread_mutex_unlock(&sock->conn_table.mutex);
#endif
@@ -1168,36 +1189,35 @@ void ecp_conn_remove_gc(ECPConnection *conn) {
ECPSocket *sock = conn->sock;
ECPConnection *_conn = NULL;
-#ifdef ECP_WITH_HTABLE
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&sock->conn_table.mutex_gc);
#endif
+#ifdef ECP_WITH_HTABLE
_conn = ecp_ht_remove_kv(sock->conn_table.keys_gc, &conn->remote.key_perma.public, conn);
+#else /* ECP_WITH_HTABLE */
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&sock->conn_table.mutex_gc);
-#endif
+ pthread_mutex_lock(&conn->mutex);
#endif
- if (_conn) {
- ecp_conn_refcount_dec(conn);
- } else {
- int destroy = 0;
+ if (_ecp_conn_in_gct(conn)) {
+ _ecp_conn_pull_gct(conn);
+ _conn = conn;
+ }
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_lock(&conn->mutex);
+ pthread_mutex_unlock(&conn->mutex);
#endif
- if ((conn->refcount == 0) && !_ecp_conn_is_reg(conn)) destroy = 1;
+#endif /* ECP_WITH_HTABLE */
#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&conn->mutex);
+ pthread_mutex_unlock(&sock->conn_table.mutex_gc);
#endif
- if (destroy) {
- _ecp_conn_close(conn);
- }
+ if (_conn) {
+ ecp_conn_refcount_dec(conn);
}
}
@@ -1300,35 +1320,39 @@ int ecp_conn_is_zombie(ECPConnection *conn, ecp_sts_t now, ecp_sts_t to) {
return z;
}
-void ecp_conn_refcount_inc(ECPConnection *conn) {
+int ecp_conn_refcount_inc(ECPConnection *conn) {
+ int is_reg;
+
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
- conn->refcount++;
+ is_reg = _ecp_conn_is_reg(conn);
+ if (is_reg) conn->refcount++;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&conn->mutex);
#endif
+
+ if (!is_reg) return ECP_ERR_CLOSED;
+ return ECP_OK;
}
void ecp_conn_refcount_dec(ECPConnection *conn) {
- int is_reg;
- unsigned short refcount;
+ int destroy = 0;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&conn->mutex);
#endif
conn->refcount--;
- refcount = conn->refcount;
- is_reg = _ecp_conn_is_reg(conn);
+ if ((conn->refcount == 0) && !_ecp_conn_is_reg(conn)) destroy = 1;
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&conn->mutex);
#endif
- if (!is_reg && (refcount == 0)) {
+ if (destroy) {
_ecp_conn_close(conn);
}
}
@@ -2292,7 +2316,13 @@ ssize_t ecp_unpack(ECPSocket *sock, ECPConnection *parent, ecp_tr_addr_t *addr,
}
if (rv < 0) conn = NULL;
- if (conn) ecp_conn_refcount_inc(conn);
+ if (conn) {
+ _rv = ecp_conn_refcount_inc(conn);
+ if (_rv) {
+ rv = _rv;
+ conn = NULL;
+ }
+ }
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&sock->conn_table.mutex);
diff --git a/ecp/src/ecp/core.h b/ecp/src/ecp/core.h
index f988c19..7ee7a9a 100644
--- a/ecp/src/ecp/core.h
+++ b/ecp/src/ecp/core.h
@@ -158,6 +158,7 @@
/* mutable flags */
#define ECP_CONN_FLAG_REG 0x01
#define ECP_CONN_FLAG_OPEN 0x02
+#define ECP_CONN_FLAG_GCT 0x04
#define ECP_SEND_FLAG_REPLY 0x01
#define ECP_SEND_FLAG_MORE 0x02
@@ -170,14 +171,17 @@
#define ecp_conn_is_sys(conn) ((conn)->type & ECP_CTYPE_FLAG_SYS)
#define _ecp_conn_is_reg(conn) ((conn)->flags & ECP_CONN_FLAG_REG)
#define _ecp_conn_is_open(conn) ((conn)->flags & ECP_CONN_FLAG_OPEN)
+#define _ecp_conn_in_gct(conn) ((conn)->flags & ECP_CONN_FLAG_GCT)
#define ecp_conn_set_inb(conn) ((conn)->flags_im |= ECP_CONN_FLAG_INB)
#define ecp_conn_set_outb(conn) ((conn)->flags_im &= ~ECP_CONN_FLAG_INB)
#define _ecp_conn_set_reg(conn) ((conn)->flags |= ECP_CONN_FLAG_REG)
#define _ecp_conn_set_open(conn) ((conn)->flags |= ECP_CONN_FLAG_OPEN)
+#define _ecp_conn_push_gct(conn) ((conn)->flags |= ECP_CONN_FLAG_GCT)
#define _ecp_conn_clr_reg(conn) ((conn)->flags &= ~ECP_CONN_FLAG_REG)
#define _ecp_conn_clr_open(conn) ((conn)->flags &= ~ECP_CONN_FLAG_OPEN)
+#define _ecp_conn_pull_gct(conn) ((conn)->flags &= ~ECP_CONN_FLAG_GCT)
typedef uint32_t ecp_ack_t;
#define ECP_SIZE_ACKB (sizeof(ecp_ack_t)*8)
@@ -406,7 +410,7 @@ void _ecp_conn_close(ECPConnection *conn);
void ecp_conn_close(ECPConnection *conn);
int _ecp_conn_expired(ECPConnection *conn, ecp_sts_t now, ecp_sts_t to);
int ecp_conn_is_zombie(ECPConnection *conn, ecp_sts_t now, ecp_sts_t to);
-void ecp_conn_refcount_inc(ECPConnection *conn);
+int ecp_conn_refcount_inc(ECPConnection *conn);
void ecp_conn_refcount_dec(ECPConnection *conn);
void ecp_conn_set_flags(ECPConnection *conn, unsigned char flags);
diff --git a/ecp/src/ecp/timer.c b/ecp/src/ecp/timer.c
index c8e03b9..6b58303 100644
--- a/ecp/src/ecp/timer.c
+++ b/ecp/src/ecp/timer.c
@@ -34,7 +34,7 @@ void ecp_timer_item_init(ECPTimerItem *ti, ECPConnection *conn, unsigned char mt
}
int ecp_timer_push(ECPTimerItem *ti) {
- int i, is_reg, rv = ECP_OK;
+ int i, rv = ECP_OK;
ECPConnection *conn = ti->conn;
ECPTimer *timer = &conn->sock->timer;
@@ -42,39 +42,39 @@ int ecp_timer_push(ECPTimerItem *ti) {
#ifdef ECP_WITH_PTHREAD
pthread_mutex_lock(&timer->mutex);
- pthread_mutex_lock(&conn->mutex);
#endif
- is_reg = _ecp_conn_is_reg(conn);
- if (is_reg) conn->refcount++;
-#ifdef ECP_WITH_PTHREAD
- pthread_mutex_unlock(&conn->mutex);
-#endif
-
- if (!is_reg) rv = ECP_ERR_CLOSED;
- if (!rv && (timer->head == ECP_MAX_TIMER-1)) rv = ECP_ERR_MAX_TIMER;
+ rv = ecp_conn_refcount_inc(conn);
+ if (rv) {
+ conn = NULL;
+ goto timer_push_fin;
+ }
+ if (timer->head == ECP_MAX_TIMER-1) {
+ rv = ECP_ERR_MAX_TIMER;
+ goto timer_push_fin;
+ }
- if (!rv) {
- for (i=timer->head; i>=0; i--) {
- if (ECP_STS_LTE(ti->abstime, timer->item[i].abstime)) {
- if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i));
- timer->item[i+1] = *ti;
- timer->head++;
- break;
- }
- }
- if (i == -1) {
- if (timer->head != -1) memmove(timer->item+1, timer->item, sizeof(ECPTimerItem) * (timer->head+1));
- timer->item[0] = *ti;
+ for (i=timer->head; i>=0; i--) {
+ if (ECP_STS_LTE(ti->abstime, timer->item[i].abstime)) {
+ if (i != timer->head) memmove(timer->item+i+2, timer->item+i+1, sizeof(ECPTimerItem) * (timer->head-i));
+ timer->item[i+1] = *ti;
timer->head++;
+ break;
}
- ecp_tm_timer_set(ti->timeout);
}
+ if (i == -1) {
+ if (timer->head != -1) memmove(timer->item+1, timer->item, sizeof(ECPTimerItem) * (timer->head+1));
+ timer->item[0] = *ti;
+ timer->head++;
+ }
+ ecp_tm_timer_set(ti->timeout);
+
+timer_push_fin:
#ifdef ECP_WITH_PTHREAD
pthread_mutex_unlock(&timer->mutex);
#endif
- if (rv && is_reg) {
+ if (rv && conn) {
ecp_conn_refcount_dec(conn);
}
return rv;