From 810dde21ee65653c15606917b19566cfbaaf165e Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Tue, 9 Aug 2022 21:54:45 +0200 Subject: ecp server added --- ecp/server/dir.c | 551 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 551 insertions(+) create mode 100644 ecp/server/dir.c (limited to 'ecp/server/dir.c') diff --git a/ecp/server/dir.c b/ecp/server/dir.c new file mode 100644 index 0000000..0066bac --- /dev/null +++ b/ecp/server/dir.c @@ -0,0 +1,551 @@ +#include +#include +#include + +#include +#include +#include +#include + +#include "server.h" +#include "vlink.h" +#include "dir.h" +#include "ht.h" + +static ecp_ht_table_t *dir_shadow = NULL; +static pthread_rwlock_t dir_shadow_rwlock; +static pthread_rwlock_t dir_online_rwlock; +static pthread_t dir_announce_thd; + +static DirList dir_online; + +static SRVConfig *srv_config; + +ssize_t dir_send_ann(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char *msg; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg[0] = srv_config->capabilities >> 8; + msg[1] = srv_config->capabilities; + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); +} + +ssize_t dir_send_shadow(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + + struct hashtable_itr itr; + DirNode *node; + uint16_t count; + ecp_sts_t access_ts; + size_t msg_size; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg_size = 0; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + count = ecp_ht_count(dir_shadow); + msg[0] = count >> 8; + msg[1] = count; + msg += sizeof(uint16_t); + msg_size += sizeof(uint16_t); + + if (count > 0) { + size_t rv; + int _rv; + + ecp_ht_itr_create(&itr, dir_shadow); + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + access_ts = node->access_ts; + pthread_mutex_unlock(&node->mutex); + + rv = ecp_dir_item_serialize(&node->dir_item, msg); + msg += rv; + msg_size += rv; + + msg[0] = access_ts >> 24; + msg[1] = access_ts >> 16; + msg[2] = access_ts >> 8; + msg[3] = access_ts; + msg += sizeof(uint32_t); + msg_size += sizeof(uint32_t); + + _rv = ecp_ht_itr_advance(&itr); + } while (_rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); +} + +ssize_t dir_send_online(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + size_t msg_size; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + + pthread_rwlock_rdlock(&dir_online_rwlock); + + msg[0] = dir_online.count >> 8; + msg[1] = dir_online.count; + msg += sizeof(uint16_t); + msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; + memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); + + pthread_rwlock_unlock(&dir_online_rwlock); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); +} + +ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ECPDirItem dir_item; + int is_new; + uint16_t capabilities; + ecp_sts_t now; + ssize_t rsize; + ssize_t rv; + + rsize = sizeof(uint16_t); + if (msg_size < rsize) return ECP_ERR_SIZE; + + capabilities = \ + ((uint16_t)msg[0] << 8) | \ + ((uint16_t)msg[1]); + + memset(&dir_item, 0, sizeof(ECPDirItem)); + dir_item.node = conn->remote; + dir_item.capabilities = capabilities; + + now = ecp_tm_get_s(); + is_new = dir_process_item(&dir_item, now); + if (is_new) vlink_new_node(conn->sock, &dir_item); + + rv = dir_send_shadow(conn); + if (rv < 0) return rv; + + return rsize; +} + +ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ssize_t rv; + + rv = dir_send_online(conn); + if (rv < 0) return rv; + + return 0; +} + +ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ecp_sts_t now; + uint16_t count; + size_t rsize; + int i; + + if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; + + count = \ + ((uint16_t)msg[0] << 8) | \ + ((uint16_t)msg[1]); + + rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); + if (msg_size < rsize) return ECP_ERR_SIZE; + + msg += sizeof(uint16_t); + + now = ecp_tm_get_s(); + for (i=0; isock, &dir_item); + } + } + + return rsize; +} + +ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { + switch (mtype) { + case ECP_MTYPE_DIR_REQ: { + if (ecp_conn_is_outb(conn)) return ECP_ERR; + return dir_handle_req(conn, msg, msg_size); + } + + case ECP_MTYPE_DIR_ANN: { + if (ecp_conn_is_outb(conn)) return ECP_ERR; + if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX; + return dir_handle_ann(conn, msg, msg_size); + } + + case ECP_MTYPE_DIR_SHADOW: { + if (ecp_conn_is_inb(conn)) return ECP_ERR; + return dir_handle_shadow(conn, msg, msg_size); + } + + default: + return ECP_ERR_MTYPE; + } +} + +int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { + ssize_t rv; + + if (ecp_conn_is_inb(conn)) return ECP_OK; + + rv = dir_send_ann(conn); + if (rv < 0) return rv; + + return ECP_OK; +} + +void dir_online_switch(void) { + struct hashtable_itr itr; + DirNode *node; + unsigned char *msg; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + pthread_rwlock_wrlock(&dir_online_rwlock); + + msg = dir_online.msg; + dir_online.count = ecp_ht_count(dir_shadow); + + if (dir_online.count > 0) { + size_t _rv; + int rv; + + ecp_ht_itr_create(&itr, dir_shadow); + do { + node = ecp_ht_itr_value(&itr); + + _rv = ecp_dir_item_serialize(&node->dir_item, msg); + msg += _rv; + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_online_rwlock); + pthread_rwlock_unlock(&dir_shadow_rwlock); +} + +int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { + DirNode *node; + int is_new = 0; + int rv = ECP_OK; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + node = ht_search_item(dir_shadow, dir_item); + if (node) { + pthread_mutex_lock(&node->mutex); + if (!node->zombie) node->access_ts = access_ts; + pthread_mutex_unlock(&node->mutex); + + pthread_rwlock_unlock(&dir_shadow_rwlock); + } else { + pthread_rwlock_unlock(&dir_shadow_rwlock); + + LOG(LOG_DEBUG, "dir new node\n"); + + node = dir_create_node(dir_item, access_ts); + if (node == NULL) rv = ECP_ERR_ALLOC; + + if (!rv) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; + if (!rv) { + if (ht_search_node(dir_shadow, node) == NULL) { + rv = ht_insert_node(dir_shadow, node); + } else { + rv = ECP_ERR_DUP; + } + } + pthread_rwlock_unlock(&dir_shadow_rwlock); + } + + if (!rv) { + is_new = 1; + /* always switch */ + dir_online_switch(); + } else { + if (node) dir_destroy_node(node); + LOG(LOG_ERR, "dir create node err:%d\n", rv); + } + } + + return is_new; +} + +DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { + DirNode *node; + int rv; + + node = malloc(sizeof(DirNode)); + if (node == NULL) return NULL; + + memset(node, 0, sizeof(DirNode)); + + rv = pthread_mutex_init(&node->mutex, NULL); + if (rv) { + free(node); + return NULL; + } + + node->dir_item = *dir_item; + node->access_ts = access_ts; + + return node; +} + +void dir_destroy_node(DirNode *node) { + pthread_mutex_destroy(&node->mutex); + free(node); +} + +int dir_open_conn(ECPSocket *sock, ECPNode *node) { + ECPConnection *conn; + int rv; + + conn = malloc(sizeof(ECPConnection)); + if (conn == NULL) return ECP_ERR_ALLOC; + + ecp_dir_conn_init(conn, sock); + ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); + rv = ecp_conn_try_open(conn, node); + return rv; +} + +DirNode *dir_search_conn(ECPConnection *conn) { + DirNode *node; + DirNode *ret = NULL; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + node = ht_search_conn(dir_shadow, conn); + if (node) { + pthread_mutex_lock(&node->mutex); + if (!node->zombie) { + node->refcount++; + ret = node; + } else { + ret = NULL; + } + pthread_mutex_unlock(&node->mutex); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + return ret; +} + +void dir_announce(ECPSocket *sock) { + static DirNode *expire_node[MAX_EXPIRE_CNT]; + static int expire_node_z[MAX_EXPIRE_CNT]; + static int expire_cnt; + + struct hashtable_itr itr; + ecp_sts_t now; + DirNode *node; + DirNode *node_next; + DirNode *announce_node[MAX_ANNOUNCE_CNT]; + int announce_cnt; + int refcount; + int i; + int rv; + + now = ecp_tm_get_s(); + node_next = NULL; + do { + announce_cnt = 0; + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + if (ecp_ht_count(dir_shadow) > 0) { + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + rv = ecp_ht_itr_advance(&itr); + + pthread_mutex_lock(&node->mutex); + if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { + node->zombie = 1; + pthread_mutex_unlock(&node->mutex); + + if (expire_cnt < MAX_EXPIRE_CNT) { + expire_node[expire_cnt] = node; + expire_node_z[expire_cnt] = 1; + expire_cnt++; + } + } else { + pthread_mutex_unlock(&node->mutex); + + if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + announce_node[announce_cnt] = node; + announce_cnt++; + + if (announce_cnt == MAX_ANNOUNCE_CNT) { + if (!rv) { + node_next = ecp_ht_itr_key(&itr); + } else { + node_next = NULL; + } + break; + } + } + } + } while (rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (expire_cnt) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + for (i=0; idir_item.node); + if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); + } + + i = expire_cnt - 1; + while (i >= 0) { + node = expire_node[i]; + + pthread_mutex_lock(&node->mutex); + refcount = node->refcount; + pthread_mutex_unlock(&node->mutex); + + if (refcount == 0) { + int j = i; + + expire_cnt--; + while (j < expire_cnt) { + expire_node[j] = expire_node[j+1]; + j++; + } + expire_node[j] = NULL; + dir_destroy_node(node); + } + i--; + } + } while (node_next); +} + +static void *_dir_announce(void *arg) { + ECPSocket *sock = arg; + + while (1) { + LOG(LOG_DEBUG, "dir announce...\n"); + dir_announce(sock); + sleep(10); + } + + return NULL; +} + +int dir_start_announce(ECPSocket *sock) { + int rv; + + rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock); + if (rv) return ECP_ERR; + return ECP_OK; +} + +int dir_init(ECPContext *ctx) { + ECPConnHandler *handler; + int rv; + + srv_config = srv_get_config(); + + handler = malloc(sizeof(ECPConnHandler)); + if (handler == NULL) return ECP_ERR_ALLOC; + + ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL); + rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler); + if (rv) return rv; + + rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); + if (rv) { + return ECP_ERR; + } + + rv = pthread_rwlock_init(&dir_online_rwlock, NULL); + if (rv) { + pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR; + } + + dir_shadow = ecp_ht_create_keys(); + if (dir_shadow == NULL) { + pthread_rwlock_destroy(&dir_shadow_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + return ECP_ERR_ALLOC; + } + + return ECP_OK; +} -- cgit v1.2.3