#include #include #include #include #include #include #include #include #include #include "dir.h" #include "vlink.h" #include "ht.h" #include "server.h" static ecp_ht_table_t *vlink_conn = NULL; static pthread_rwlock_t vlink_conn_rwlock; static pthread_t vlink_keyx_thd; static SRVConfig *srv_config; #define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) { ECPConnection *conn; int rv; *_conn = NULL; conn = malloc(sizeof(ECPConnection)); if (conn == NULL) return ECP_ERR_ALLOC; ecp_vlink_init(conn, sock); rv = ecp_conn_open(conn, node); if (rv) { free(conn); return rv; } *_conn = conn; return ECP_OK; } void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { ECPConnection *conn; int rv; if (!(dir_item->roles & ECP_ROLE_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; pthread_rwlock_rdlock(&vlink_conn_rwlock); conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); pthread_rwlock_unlock(&vlink_conn_rwlock); if (conn) return; rv = vlink_open_conn(sock, &dir_item->node, &conn); if (rv) { LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv); return; } pthread_rwlock_wrlock(&vlink_conn_rwlock); rv = ht_insert_conn(vlink_conn, conn); pthread_rwlock_unlock(&vlink_conn_rwlock); if (rv) { LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv); ecp_conn_close(conn); return; } LOG(LOG_DEBUG, "vlink_new_node: new connection\n"); } void vlink_del_node(ECPDirItem *dir_item) { ECPConnection *conn; if (!(dir_item->roles & ECP_ROLE_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; pthread_rwlock_rdlock(&vlink_conn_rwlock); conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT); pthread_rwlock_unlock(&vlink_conn_rwlock); } void vlink_keyx(ECPSocket *sock, int keyx_period) { struct hashtable_itr itr; ECPConnection *conn; ecp_ecdh_public_t *conn_next; ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE]; unsigned int ht_count; int keyx_cnt; int i; int rv; conn_next = NULL; do { uint32_t delay; uint32_t rnd; keyx_cnt = 0; pthread_rwlock_rdlock(&vlink_conn_rwlock); ht_count = ecp_ht_count(vlink_conn); if (ht_count > 0) { ecp_ht_itr_create(&itr, vlink_conn); if (conn_next) { rv = ecp_ht_itr_search(&itr, conn_next); if (rv) { LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv); break; } conn_next = NULL; } do { conn = ecp_ht_itr_value(&itr); if (keyx_cnt < MAX_NODE_ANNOUNCE) { keyx_conn[keyx_cnt] = conn; keyx_cnt++; } else { conn_next = ecp_ht_itr_key(&itr); break; } rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } /* no need to copy conn_next key, since this is the only thread that can remove connection */ pthread_rwlock_unlock(&vlink_conn_rwlock); /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */ delay = keyx_period * 1000000 / MAX(ht_count, 100); for (i=0; isock, &conn->remote.key_perma.public))) { ecp_conn_close(_conn); ecp_conn_refcount_dec(_conn); } pthread_rwlock_wrlock(&vlink_conn_rwlock); ht_remove_conn(vlink_conn, conn); pthread_rwlock_unlock(&vlink_conn_rwlock); ecp_conn_close(conn); } else if (!is_reg || is_z) { ECPConnection *_conn = NULL; LOG(LOG_DEBUG, "vlink_keyx: reconnect\n"); rv = vlink_open_conn(conn->sock, &conn->remote, &_conn); if (rv) { LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv); continue; } pthread_rwlock_wrlock(&vlink_conn_rwlock); ht_remove_conn(vlink_conn, conn); rv = ht_insert_conn(vlink_conn, _conn); pthread_rwlock_unlock(&vlink_conn_rwlock); ecp_conn_close(conn); if (rv) { LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv); ecp_conn_close(_conn); } } else if (is_open) { ssize_t _rv; _rv = ecp_send_keyx_req(conn, 1); if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv); } if (delay) { /* randomize over delay / 2 and delay + delay / 2 */ rnd = arc4random_uniform(delay + 1); usleep(delay / 2 + rnd); } } } while (conn_next); LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count); } static void *_vlink_keyx(void *arg) { ECPSocket *sock = arg; uint32_t rnd; time_t tv_sec; int delay; while (1) { tv_sec = time(NULL); vlink_keyx(sock, KEYX_PERIOD); /* resolution is 1s */ delay = KEYX_PERIOD - (time(NULL) - tv_sec); if (delay > 0) { rnd = arc4random_uniform(delay + 1); sleep(delay / 2 + rnd); } } return NULL; } int vlink_start_keyx(ECPSocket *sock) { int rv; rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock); if (rv) return ECP_ERR; return ECP_OK; } int vlink_init(ECPSocket *sock) { ECPContext *ctx = sock->ctx; int rv; srv_config = srv_get_config(); rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL); if (rv) return ECP_ERR; vlink_conn = ecp_ht_create_keys(); if (vlink_conn == NULL) { pthread_rwlock_destroy(&vlink_conn_rwlock); return ECP_ERR_ALLOC; } return ECP_OK; }