From 5f55d9d4d14635678e7f582215e3642de2e232a4 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Mon, 6 May 2024 02:08:31 +0200 Subject: new ecp directory and vconn server --- ecp/server/vlink.c | 360 +++++++++++++++++++++-------------------------------- 1 file changed, 140 insertions(+), 220 deletions(-) (limited to 'ecp/server/vlink.c') diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c index e98dacc..d0c865b 100644 --- a/ecp/server/vlink.c +++ b/ecp/server/vlink.c @@ -3,318 +3,238 @@ #include #include -#include #include #include #include -#include "server.h" #include "dir.h" +#include "vlink.h" #include "ht.h" -#include "vlink.h" +#include "server.h" static ecp_ht_table_t *vlink_conn = NULL; -static ecp_ht_table_t *vlink_node = NULL; -static pthread_mutex_t vlink_conn_mutex; -static pthread_mutex_t vlink_node_mutex; -static pthread_t vlink_open_thd; +static pthread_rwlock_t vlink_conn_rwlock; static pthread_t vlink_keyx_thd; static SRVConfig *srv_config; -void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { - switch (mtype) { - case ECP_MTYPE_OPEN_REP: { - if (err == ECP_ERR_TIMEOUT) { - int rv; - - rv = vlink_insert_node(conn); - if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); - } - ecp_conn_close(conn); - break; - } - } -} - -int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { - int rv; - - rv = ecp_vlink_handle_open(conn, bufs); - if (rv) return rv; - - if (ecp_conn_is_inb(conn)) return ECP_OK; - - pthread_mutex_lock(&vlink_conn_mutex); - rv = ht_insert_conn(vlink_conn, conn); - pthread_mutex_unlock(&vlink_conn_mutex); - - if (rv) { - ecp_vlink_handle_close(conn); - return rv; - } - - return ECP_OK; -} - -void vlink_handle_close(ECPConnection *conn) { - int rv; - - ecp_vlink_handle_close(conn); +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) - if (ecp_conn_is_inb(conn)) return; - - rv = vlink_insert_node(conn); - if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); -} - -int vlink_open_conn(ECPSocket *sock, ECPNode *node) { +int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) { ECPConnection *conn; int rv; + *_conn = NULL; conn = malloc(sizeof(ECPConnection)); if (conn == NULL) return ECP_ERR_ALLOC; ecp_vlink_init(conn, sock); rv = ecp_conn_open(conn, node); - return rv; + if (rv) return rv; + + *_conn = conn; + return ECP_OK; } void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { - if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { - int rv; + ECPConnection *conn; + int rv; - LOG(LOG_DEBUG, "vlink open connection\n"); - rv = vlink_open_conn(sock, &dir_item->node); - if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); - } -} + if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; -int vlink_insert_node(ECPConnection *conn) { - DirNode *node; - int rv = ECP_OK; + pthread_rwlock_rdlock(&vlink_conn_rwlock); + conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); + pthread_rwlock_unlock(&vlink_conn_rwlock); - node = dir_search_conn(conn); - if (node) { - pthread_mutex_lock(&vlink_node_mutex); - rv = ht_insert_node(vlink_node, node); - pthread_mutex_unlock(&vlink_node_mutex); + if (conn) return; - if (rv) { - pthread_mutex_lock(&node->mutex); - node->refcount--; - pthread_mutex_unlock(&node->mutex); - } + rv = vlink_open_conn(sock, &dir_item->node, &conn); + if (rv) { + LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv); + return; } - return rv; -} - -void vlink_open(ECPSocket *sock) { - struct hashtable_itr itr; - DirNode *node; - DirNode *node_next; - DirNode *open_node[MAX_OPEN_CNT]; - int open_cnt; - int i; - int rv; - - node_next = NULL; - do { - open_cnt = 0; - pthread_mutex_lock(&vlink_node_mutex); - - if (ecp_ht_count(vlink_node) > 0) { - ecp_ht_itr_create(&itr, vlink_node); - if (node_next) { - rv = ecp_ht_itr_search(&itr, node_next); - if (rv) { - LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); - break; - } - node_next = NULL; - } - do { - node = ecp_ht_itr_value(&itr); - rv = ecp_ht_itr_remove(&itr); - - open_node[open_cnt] = node; - open_cnt++; - if (open_cnt == MAX_OPEN_CNT) { - if (!rv) { - node_next = ecp_ht_itr_key(&itr); - } else { - node_next = NULL; - } - } - } while (rv == ECP_OK); - } + pthread_rwlock_wrlock(&vlink_conn_rwlock); + rv = ht_insert_conn(vlink_conn, conn); + pthread_rwlock_unlock(&vlink_conn_rwlock); - pthread_mutex_unlock(&vlink_node_mutex); + if (rv) { + LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv); + ecp_conn_close(conn); + return; + } - for (i=0; imutex); + LOG(LOG_DEBUG, "vlink_new_node: new connection\n"); +} - if (open_node[i]->zombie) { - open_node[i]->refcount--; +void vlink_del_node(ECPDirItem *dir_item) { + ECPConnection *conn; - pthread_mutex_unlock(&open_node[i]->mutex); - } else { - pthread_mutex_unlock(&open_node[i]->mutex); + if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; - LOG(LOG_DEBUG, "vlink open connection\n"); - rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); - if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); - } - } - } while (node_next); + pthread_rwlock_rdlock(&vlink_conn_rwlock); + conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); + if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT); + pthread_rwlock_unlock(&vlink_conn_rwlock); } -void vlink_keyx(void) { +void vlink_keyx(ECPSocket *sock, int keyx_period) { struct hashtable_itr itr; - ecp_sts_t now; ECPConnection *conn; - ECPConnection *keyx_next; - ECPConnection *keyx_conn[MAX_KEYX_CNT]; - int keyx_conn_z[MAX_KEYX_CNT]; + ecp_ecdh_public_t *conn_next; + ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE]; + unsigned int ht_count; int keyx_cnt; - int is_zombie; int i; int rv; - now = ecp_tm_get_s(); - keyx_next = NULL; + conn_next = NULL; do { + uint32_t delay; + uint32_t rnd; + keyx_cnt = 0; - pthread_mutex_lock(&vlink_conn_mutex); + pthread_rwlock_rdlock(&vlink_conn_rwlock); - if (ecp_ht_count(vlink_conn) > 0) { + ht_count = ecp_ht_count(vlink_conn); + if (ht_count > 0) { ecp_ht_itr_create(&itr, vlink_conn); - if (keyx_next) { - rv = ecp_ht_itr_search(&itr, keyx_next); + if (conn_next) { + rv = ecp_ht_itr_search(&itr, conn_next); if (rv) { - LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); + LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv); break; } - keyx_next = NULL; + conn_next = NULL; } do { conn = ecp_ht_itr_value(&itr); - is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); - if (is_zombie) { - rv = ecp_ht_itr_remove(&itr); + if (keyx_cnt < MAX_NODE_ANNOUNCE) { + keyx_conn[keyx_cnt] = conn; + keyx_cnt++; } else { - rv = ecp_ht_itr_advance(&itr); + conn_next = ecp_ht_itr_key(&itr); + break; } - keyx_conn[keyx_cnt] = conn; - keyx_conn_z[keyx_cnt] = is_zombie; - keyx_cnt++; - - if (keyx_cnt == MAX_KEYX_CNT) { - if (!rv) { - keyx_next = ecp_ht_itr_key(&itr); - } else { - keyx_next = NULL; - } - } + rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } - pthread_mutex_unlock(&vlink_conn_mutex); + /* no need to copy conn_next key, since this is the only thread that can remove connection */ + pthread_rwlock_unlock(&vlink_conn_rwlock); + /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */ + delay = keyx_period * 1000000 / MAX(ht_count, 100); for (i=0; isock, &conn->remote, &_conn); + if (rv) { + LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv); + continue; + } + + pthread_rwlock_wrlock(&vlink_conn_rwlock); + ht_remove_conn(vlink_conn, conn); + rv = ht_insert_conn(vlink_conn, _conn); + pthread_rwlock_unlock(&vlink_conn_rwlock); + + ecp_conn_close(conn); + if (rv) { + LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv); + ecp_conn_close(_conn); + } + } else if (is_open) { ssize_t _rv; - LOG(LOG_DEBUG, "vlink send keyx\n"); - _rv = ecp_send_keyx_req(keyx_conn[i], 1); - if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv); + _rv = ecp_send_keyx_req(conn, 1); + if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv); + } + + if (delay) { + /* randomize over delay / 2 and delay + delay / 2 */ + rnd = arc4random_uniform(delay + 1); + usleep(delay / 2 + rnd); } } - } while (keyx_next); + + } while (conn_next); + + LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count); } -static void *_vlink_open(void *arg) { +static void *_vlink_keyx(void *arg) { ECPSocket *sock = arg; + uint32_t rnd; + time_t tv_sec; + int delay; while (1) { - LOG(LOG_DEBUG, "vlink open...\n"); - vlink_open(sock); - sleep(10); - } + tv_sec = time(NULL); - return NULL; -} + vlink_keyx(sock, KEYX_PERIOD); -static void *_vlink_keyx(void *arg) { - while (1) { - LOG(LOG_DEBUG, "vlink keyx...\n"); - vlink_keyx(); - sleep(10); + /* resolution is 1s */ + delay = KEYX_PERIOD - (time(NULL) - tv_sec); + if (delay > 0) { + rnd = arc4random_uniform(delay + 1); + sleep(delay / 2 + rnd); + } } return NULL; } -int vlink_start_open(ECPSocket *sock) { - int rv; - - rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock); - if (rv) return ECP_ERR; - return ECP_OK; -} -int vlink_start_keyx(void) { +int vlink_start_keyx(ECPSocket *sock) { int rv; - rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL); + rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock); if (rv) return ECP_ERR; return ECP_OK; } -int vlink_init(ECPContext *ctx) { - ECPConnHandler *handler; +int vlink_init(ECPSocket *sock) { + ECPContext *ctx = sock->ctx; int rv; srv_config = srv_get_config(); - handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK); - if (handler == NULL) return ECP_ERR; - - handler->handle_open = vlink_handle_open; - handler->handle_close = vlink_handle_close; - - rv = pthread_mutex_init(&vlink_conn_mutex, NULL); - if (rv) { - return ECP_ERR; - } - - rv = pthread_mutex_init(&vlink_node_mutex, NULL); - if (rv) { - pthread_mutex_destroy(&vlink_conn_mutex); - return ECP_ERR; - } + rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL); + if (rv) return ECP_ERR; - rv = ECP_OK; vlink_conn = ecp_ht_create_keys(); - if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; - if (!rv) { - vlink_node = ecp_ht_create_keys(); - if (vlink_node == NULL) rv = ECP_ERR_ALLOC; - } - - if (rv) { - pthread_mutex_destroy(&vlink_node_mutex); - pthread_mutex_destroy(&vlink_conn_mutex); - if (vlink_node) ecp_ht_destroy(vlink_node); - if (vlink_conn) ecp_ht_destroy(vlink_conn); - return rv; + if (vlink_conn == NULL) { + pthread_rwlock_destroy(&vlink_conn_rwlock); + return ECP_ERR_ALLOC; } return ECP_OK; -- cgit v1.2.3