From 810dde21ee65653c15606917b19566cfbaaf165e Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Tue, 9 Aug 2022 21:54:45 +0200 Subject: ecp server added --- ecp/server/vlink.c | 321 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100644 ecp/server/vlink.c (limited to 'ecp/server/vlink.c') diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c new file mode 100644 index 0000000..e98dacc --- /dev/null +++ b/ecp/server/vlink.c @@ -0,0 +1,321 @@ +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "server.h" +#include "dir.h" +#include "ht.h" + +#include "vlink.h" + +static ecp_ht_table_t *vlink_conn = NULL; +static ecp_ht_table_t *vlink_node = NULL; +static pthread_mutex_t vlink_conn_mutex; +static pthread_mutex_t vlink_node_mutex; +static pthread_t vlink_open_thd; +static pthread_t vlink_keyx_thd; + +static SRVConfig *srv_config; + +void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { + switch (mtype) { + case ECP_MTYPE_OPEN_REP: { + if (err == ECP_ERR_TIMEOUT) { + int rv; + + rv = vlink_insert_node(conn); + if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); + } + ecp_conn_close(conn); + break; + } + } +} + +int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { + int rv; + + rv = ecp_vlink_handle_open(conn, bufs); + if (rv) return rv; + + if (ecp_conn_is_inb(conn)) return ECP_OK; + + pthread_mutex_lock(&vlink_conn_mutex); + rv = ht_insert_conn(vlink_conn, conn); + pthread_mutex_unlock(&vlink_conn_mutex); + + if (rv) { + ecp_vlink_handle_close(conn); + return rv; + } + + return ECP_OK; +} + +void vlink_handle_close(ECPConnection *conn) { + int rv; + + ecp_vlink_handle_close(conn); + + if (ecp_conn_is_inb(conn)) return; + + rv = vlink_insert_node(conn); + if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); +} + +int vlink_open_conn(ECPSocket *sock, ECPNode *node) { + ECPConnection *conn; + int rv; + + conn = malloc(sizeof(ECPConnection)); + if (conn == NULL) return ECP_ERR_ALLOC; + + ecp_vlink_init(conn, sock); + rv = ecp_conn_open(conn, node); + return rv; +} + +void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { + if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { + int rv; + + LOG(LOG_DEBUG, "vlink open connection\n"); + rv = vlink_open_conn(sock, &dir_item->node); + if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); + } +} + +int vlink_insert_node(ECPConnection *conn) { + DirNode *node; + int rv = ECP_OK; + + node = dir_search_conn(conn); + if (node) { + pthread_mutex_lock(&vlink_node_mutex); + rv = ht_insert_node(vlink_node, node); + pthread_mutex_unlock(&vlink_node_mutex); + + if (rv) { + pthread_mutex_lock(&node->mutex); + node->refcount--; + pthread_mutex_unlock(&node->mutex); + } + } + + return rv; +} + +void vlink_open(ECPSocket *sock) { + struct hashtable_itr itr; + DirNode *node; + DirNode *node_next; + DirNode *open_node[MAX_OPEN_CNT]; + int open_cnt; + int i; + int rv; + + node_next = NULL; + do { + open_cnt = 0; + pthread_mutex_lock(&vlink_node_mutex); + + if (ecp_ht_count(vlink_node) > 0) { + ecp_ht_itr_create(&itr, vlink_node); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + rv = ecp_ht_itr_remove(&itr); + + open_node[open_cnt] = node; + open_cnt++; + if (open_cnt == MAX_OPEN_CNT) { + if (!rv) { + node_next = ecp_ht_itr_key(&itr); + } else { + node_next = NULL; + } + } + } while (rv == ECP_OK); + } + + pthread_mutex_unlock(&vlink_node_mutex); + + for (i=0; imutex); + + if (open_node[i]->zombie) { + open_node[i]->refcount--; + + pthread_mutex_unlock(&open_node[i]->mutex); + } else { + pthread_mutex_unlock(&open_node[i]->mutex); + + LOG(LOG_DEBUG, "vlink open connection\n"); + rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); + if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); + } + } + } while (node_next); +} + +void vlink_keyx(void) { + struct hashtable_itr itr; + ecp_sts_t now; + ECPConnection *conn; + ECPConnection *keyx_next; + ECPConnection *keyx_conn[MAX_KEYX_CNT]; + int keyx_conn_z[MAX_KEYX_CNT]; + int keyx_cnt; + int is_zombie; + int i; + int rv; + + now = ecp_tm_get_s(); + keyx_next = NULL; + do { + keyx_cnt = 0; + pthread_mutex_lock(&vlink_conn_mutex); + + if (ecp_ht_count(vlink_conn) > 0) { + ecp_ht_itr_create(&itr, vlink_conn); + if (keyx_next) { + rv = ecp_ht_itr_search(&itr, keyx_next); + if (rv) { + LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); + break; + } + keyx_next = NULL; + } + do { + conn = ecp_ht_itr_value(&itr); + + is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); + if (is_zombie) { + rv = ecp_ht_itr_remove(&itr); + } else { + rv = ecp_ht_itr_advance(&itr); + } + + keyx_conn[keyx_cnt] = conn; + keyx_conn_z[keyx_cnt] = is_zombie; + keyx_cnt++; + + if (keyx_cnt == MAX_KEYX_CNT) { + if (!rv) { + keyx_next = ecp_ht_itr_key(&itr); + } else { + keyx_next = NULL; + } + } + } while (rv == ECP_OK); + } + + pthread_mutex_unlock(&vlink_conn_mutex); + + for (i=0; ihandle_open = vlink_handle_open; + handler->handle_close = vlink_handle_close; + + rv = pthread_mutex_init(&vlink_conn_mutex, NULL); + if (rv) { + return ECP_ERR; + } + + rv = pthread_mutex_init(&vlink_node_mutex, NULL); + if (rv) { + pthread_mutex_destroy(&vlink_conn_mutex); + return ECP_ERR; + } + + rv = ECP_OK; + vlink_conn = ecp_ht_create_keys(); + if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; + if (!rv) { + vlink_node = ecp_ht_create_keys(); + if (vlink_node == NULL) rv = ECP_ERR_ALLOC; + } + + if (rv) { + pthread_mutex_destroy(&vlink_node_mutex); + pthread_mutex_destroy(&vlink_conn_mutex); + if (vlink_node) ecp_ht_destroy(vlink_node); + if (vlink_conn) ecp_ht_destroy(vlink_conn); + return rv; + } + + return ECP_OK; +} -- cgit v1.2.3