summaryrefslogtreecommitdiff
path: root/ecp/server/vlink.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-05-06 02:08:31 +0200
committerUros Majstorovic <majstor@majstor.org>2024-05-06 02:08:31 +0200
commit5f55d9d4d14635678e7f582215e3642de2e232a4 (patch)
tree3322f643e0fbc16984e8eebfca4de7bd4cf63391 /ecp/server/vlink.c
parent1060b5e4712db12b52944bdcf7f2588cea23382b (diff)
new ecp directory and vconn server
Diffstat (limited to 'ecp/server/vlink.c')
-rw-r--r--ecp/server/vlink.c360
1 files changed, 140 insertions, 220 deletions
diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c
index e98dacc..d0c865b 100644
--- a/ecp/server/vlink.c
+++ b/ecp/server/vlink.c
@@ -3,318 +3,238 @@
#include <string.h>
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
#include <ecp/vconn/vconn.h>
#include <ecp/ht.h>
#include <ecp/tm.h>
-#include "server.h"
#include "dir.h"
+#include "vlink.h"
#include "ht.h"
-#include "vlink.h"
+#include "server.h"
static ecp_ht_table_t *vlink_conn = NULL;
-static ecp_ht_table_t *vlink_node = NULL;
-static pthread_mutex_t vlink_conn_mutex;
-static pthread_mutex_t vlink_node_mutex;
-static pthread_t vlink_open_thd;
+static pthread_rwlock_t vlink_conn_rwlock;
static pthread_t vlink_keyx_thd;
static SRVConfig *srv_config;
-void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) {
- switch (mtype) {
- case ECP_MTYPE_OPEN_REP: {
- if (err == ECP_ERR_TIMEOUT) {
- int rv;
-
- rv = vlink_insert_node(conn);
- if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
- }
- ecp_conn_close(conn);
- break;
- }
- }
-}
-
-int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
- int rv;
-
- rv = ecp_vlink_handle_open(conn, bufs);
- if (rv) return rv;
-
- if (ecp_conn_is_inb(conn)) return ECP_OK;
-
- pthread_mutex_lock(&vlink_conn_mutex);
- rv = ht_insert_conn(vlink_conn, conn);
- pthread_mutex_unlock(&vlink_conn_mutex);
-
- if (rv) {
- ecp_vlink_handle_close(conn);
- return rv;
- }
-
- return ECP_OK;
-}
-
-void vlink_handle_close(ECPConnection *conn) {
- int rv;
-
- ecp_vlink_handle_close(conn);
+#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
- if (ecp_conn_is_inb(conn)) return;
-
- rv = vlink_insert_node(conn);
- if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
-}
-
-int vlink_open_conn(ECPSocket *sock, ECPNode *node) {
+int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) {
ECPConnection *conn;
int rv;
+ *_conn = NULL;
conn = malloc(sizeof(ECPConnection));
if (conn == NULL) return ECP_ERR_ALLOC;
ecp_vlink_init(conn, sock);
rv = ecp_conn_open(conn, node);
- return rv;
+ if (rv) return rv;
+
+ *_conn = conn;
+ return ECP_OK;
}
void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) {
- if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) {
- int rv;
+ ECPConnection *conn;
+ int rv;
- LOG(LOG_DEBUG, "vlink open connection\n");
- rv = vlink_open_conn(sock, &dir_item->node);
- if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
- }
-}
+ if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return;
-int vlink_insert_node(ECPConnection *conn) {
- DirNode *node;
- int rv = ECP_OK;
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
+ conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
- node = dir_search_conn(conn);
- if (node) {
- pthread_mutex_lock(&vlink_node_mutex);
- rv = ht_insert_node(vlink_node, node);
- pthread_mutex_unlock(&vlink_node_mutex);
+ if (conn) return;
- if (rv) {
- pthread_mutex_lock(&node->mutex);
- node->refcount--;
- pthread_mutex_unlock(&node->mutex);
- }
+ rv = vlink_open_conn(sock, &dir_item->node, &conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv);
+ return;
}
- return rv;
-}
-
-void vlink_open(ECPSocket *sock) {
- struct hashtable_itr itr;
- DirNode *node;
- DirNode *node_next;
- DirNode *open_node[MAX_OPEN_CNT];
- int open_cnt;
- int i;
- int rv;
-
- node_next = NULL;
- do {
- open_cnt = 0;
- pthread_mutex_lock(&vlink_node_mutex);
-
- if (ecp_ht_count(vlink_node) > 0) {
- ecp_ht_itr_create(&itr, vlink_node);
- if (node_next) {
- rv = ecp_ht_itr_search(&itr, node_next);
- if (rv) {
- LOG(LOG_ERR, "vlink open itr search err:%d\n", rv);
- break;
- }
- node_next = NULL;
- }
- do {
- node = ecp_ht_itr_value(&itr);
- rv = ecp_ht_itr_remove(&itr);
-
- open_node[open_cnt] = node;
- open_cnt++;
- if (open_cnt == MAX_OPEN_CNT) {
- if (!rv) {
- node_next = ecp_ht_itr_key(&itr);
- } else {
- node_next = NULL;
- }
- }
- } while (rv == ECP_OK);
- }
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ rv = ht_insert_conn(vlink_conn, conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
- pthread_mutex_unlock(&vlink_node_mutex);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv);
+ ecp_conn_close(conn);
+ return;
+ }
- for (i=0; i<open_cnt; i++) {
- pthread_mutex_lock(&open_node[i]->mutex);
+ LOG(LOG_DEBUG, "vlink_new_node: new connection\n");
+}
- if (open_node[i]->zombie) {
- open_node[i]->refcount--;
+void vlink_del_node(ECPDirItem *dir_item) {
+ ECPConnection *conn;
- pthread_mutex_unlock(&open_node[i]->mutex);
- } else {
- pthread_mutex_unlock(&open_node[i]->mutex);
+ if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return;
- LOG(LOG_DEBUG, "vlink open connection\n");
- rv = vlink_open_conn(sock, &open_node[i]->dir_item.node);
- if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
- }
- }
- } while (node_next);
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
+ conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public);
+ if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
}
-void vlink_keyx(void) {
+void vlink_keyx(ECPSocket *sock, int keyx_period) {
struct hashtable_itr itr;
- ecp_sts_t now;
ECPConnection *conn;
- ECPConnection *keyx_next;
- ECPConnection *keyx_conn[MAX_KEYX_CNT];
- int keyx_conn_z[MAX_KEYX_CNT];
+ ecp_ecdh_public_t *conn_next;
+ ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE];
+ unsigned int ht_count;
int keyx_cnt;
- int is_zombie;
int i;
int rv;
- now = ecp_tm_get_s();
- keyx_next = NULL;
+ conn_next = NULL;
do {
+ uint32_t delay;
+ uint32_t rnd;
+
keyx_cnt = 0;
- pthread_mutex_lock(&vlink_conn_mutex);
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
- if (ecp_ht_count(vlink_conn) > 0) {
+ ht_count = ecp_ht_count(vlink_conn);
+ if (ht_count > 0) {
ecp_ht_itr_create(&itr, vlink_conn);
- if (keyx_next) {
- rv = ecp_ht_itr_search(&itr, keyx_next);
+ if (conn_next) {
+ rv = ecp_ht_itr_search(&itr, conn_next);
if (rv) {
- LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv);
+ LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv);
break;
}
- keyx_next = NULL;
+ conn_next = NULL;
}
do {
conn = ecp_ht_itr_value(&itr);
- is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME);
- if (is_zombie) {
- rv = ecp_ht_itr_remove(&itr);
+ if (keyx_cnt < MAX_NODE_ANNOUNCE) {
+ keyx_conn[keyx_cnt] = conn;
+ keyx_cnt++;
} else {
- rv = ecp_ht_itr_advance(&itr);
+ conn_next = ecp_ht_itr_key(&itr);
+ break;
}
- keyx_conn[keyx_cnt] = conn;
- keyx_conn_z[keyx_cnt] = is_zombie;
- keyx_cnt++;
-
- if (keyx_cnt == MAX_KEYX_CNT) {
- if (!rv) {
- keyx_next = ecp_ht_itr_key(&itr);
- } else {
- keyx_next = NULL;
- }
- }
+ rv = ecp_ht_itr_advance(&itr);
} while (rv == ECP_OK);
}
- pthread_mutex_unlock(&vlink_conn_mutex);
+ /* no need to copy conn_next key, since this is the only thread that can remove connection */
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+ /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */
+ delay = keyx_period * 1000000 / MAX(ht_count, 100);
for (i=0; i<keyx_cnt; i++) {
- if (keyx_conn_z[i]) {
- LOG(LOG_DEBUG, "vlink close connection\n");
- ecp_conn_close(keyx_conn[i]);
- } else {
+ int is_d, is_reg, is_open, is_z;
+ ecp_sts_t now;
+
+ conn = keyx_conn[i];
+
+ is_d = is_reg = is_open = is_z = 0;
+ now = ecp_tm_get_s();
+
+ ecp_conn_lock(conn);
+ is_d = _ecp_conn_test_uflags(conn, VLINK_UFLAG_DISCONNECT);
+ is_reg = _ecp_conn_is_reg(conn);
+ if (is_reg) is_open = _ecp_conn_is_open(conn);
+ if (is_open) is_z = _ecp_conn_is_zombie(conn, now, KEYX_PERIOD * 3);
+ ecp_conn_unlock(conn);
+
+ if (is_d) {
+ LOG(LOG_DEBUG, "vlink_keyx: disconnect\n");
+ // close all inbound connections;
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ ht_remove_conn(vlink_conn, conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+
+ ecp_conn_close(conn);
+ } else if (!is_reg || is_z) {
+ ECPConnection *_conn = NULL;
+
+ LOG(LOG_DEBUG, "vlink_keyx: reconnect\n");
+ rv = vlink_open_conn(conn->sock, &conn->remote, &_conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ ht_remove_conn(vlink_conn, conn);
+ rv = ht_insert_conn(vlink_conn, _conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+
+ ecp_conn_close(conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv);
+ ecp_conn_close(_conn);
+ }
+ } else if (is_open) {
ssize_t _rv;
- LOG(LOG_DEBUG, "vlink send keyx\n");
- _rv = ecp_send_keyx_req(keyx_conn[i], 1);
- if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv);
+ _rv = ecp_send_keyx_req(conn, 1);
+ if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv);
+ }
+
+ if (delay) {
+ /* randomize over delay / 2 and delay + delay / 2 */
+ rnd = arc4random_uniform(delay + 1);
+ usleep(delay / 2 + rnd);
}
}
- } while (keyx_next);
+
+ } while (conn_next);
+
+ LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count);
}
-static void *_vlink_open(void *arg) {
+static void *_vlink_keyx(void *arg) {
ECPSocket *sock = arg;
+ uint32_t rnd;
+ time_t tv_sec;
+ int delay;
while (1) {
- LOG(LOG_DEBUG, "vlink open...\n");
- vlink_open(sock);
- sleep(10);
- }
+ tv_sec = time(NULL);
- return NULL;
-}
+ vlink_keyx(sock, KEYX_PERIOD);
-static void *_vlink_keyx(void *arg) {
- while (1) {
- LOG(LOG_DEBUG, "vlink keyx...\n");
- vlink_keyx();
- sleep(10);
+ /* resolution is 1s */
+ delay = KEYX_PERIOD - (time(NULL) - tv_sec);
+ if (delay > 0) {
+ rnd = arc4random_uniform(delay + 1);
+ sleep(delay / 2 + rnd);
+ }
}
return NULL;
}
-int vlink_start_open(ECPSocket *sock) {
- int rv;
-
- rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock);
- if (rv) return ECP_ERR;
- return ECP_OK;
-}
-int vlink_start_keyx(void) {
+int vlink_start_keyx(ECPSocket *sock) {
int rv;
- rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL);
+ rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock);
if (rv) return ECP_ERR;
return ECP_OK;
}
-int vlink_init(ECPContext *ctx) {
- ECPConnHandler *handler;
+int vlink_init(ECPSocket *sock) {
+ ECPContext *ctx = sock->ctx;
int rv;
srv_config = srv_get_config();
- handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK);
- if (handler == NULL) return ECP_ERR;
-
- handler->handle_open = vlink_handle_open;
- handler->handle_close = vlink_handle_close;
-
- rv = pthread_mutex_init(&vlink_conn_mutex, NULL);
- if (rv) {
- return ECP_ERR;
- }
-
- rv = pthread_mutex_init(&vlink_node_mutex, NULL);
- if (rv) {
- pthread_mutex_destroy(&vlink_conn_mutex);
- return ECP_ERR;
- }
+ rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL);
+ if (rv) return ECP_ERR;
- rv = ECP_OK;
vlink_conn = ecp_ht_create_keys();
- if (vlink_conn == NULL) rv = ECP_ERR_ALLOC;
- if (!rv) {
- vlink_node = ecp_ht_create_keys();
- if (vlink_node == NULL) rv = ECP_ERR_ALLOC;
- }
-
- if (rv) {
- pthread_mutex_destroy(&vlink_node_mutex);
- pthread_mutex_destroy(&vlink_conn_mutex);
- if (vlink_node) ecp_ht_destroy(vlink_node);
- if (vlink_conn) ecp_ht_destroy(vlink_conn);
- return rv;
+ if (vlink_conn == NULL) {
+ pthread_rwlock_destroy(&vlink_conn_rwlock);
+ return ECP_ERR_ALLOC;
}
return ECP_OK;