From 5f55d9d4d14635678e7f582215e3642de2e232a4 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Mon, 6 May 2024 02:08:31 +0200 Subject: new ecp directory and vconn server --- ecp/server/dir.c | 1053 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 767 insertions(+), 286 deletions(-) (limited to 'ecp/server/dir.c') diff --git a/ecp/server/dir.c b/ecp/server/dir.c index 0066bac..7244fd9 100644 --- a/ecp/server/dir.c +++ b/ecp/server/dir.c @@ -1,31 +1,122 @@ #include #include #include +#include #include -#include +#include #include #include -#include "server.h" -#include "vlink.h" #include "dir.h" +#include "vlink.h" #include "ht.h" +#include "acl.h" + +#include "timer.h" +#include "server.h" static ecp_ht_table_t *dir_shadow = NULL; static pthread_rwlock_t dir_shadow_rwlock; static pthread_rwlock_t dir_online_rwlock; +static pthread_rwlock_t dir_timer_rwlock; static pthread_t dir_announce_thd; -static DirList dir_online; +static DIROnline *dir_online = NULL; +static unsigned int dir_vkey_req; +static int dir_process_enable; static SRVConfig *srv_config; +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) + +/* messages from clients */ +ssize_t dir_send_online(ECPConnection *conn, uint8_t region) { + DIRList *list; + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + size_t msg_size; + ssize_t rv; + int i; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + if (dir_online == NULL) return ECP_ERR; + + rv = 0; + pthread_rwlock_rdlock(&dir_online_rwlock); + list = &dir_online->list[region]; + + if (list->msg_count == 0) goto send_online_fin; + + for (i=0; imsg_count; i++) { + ssize_t rv_snd; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg_size = 0; + + msg[0] = i; // frag_cnt + msg[1] = list->msg_count; // frag_tot + msg[2] = list->msg[i].count; // item_cnt + msg[3] = region; // region + msg[4] = dir_online->serial >> 8; // serial + msg[5] = dir_online->serial; + + msg += 4 + sizeof(uint16_t); + msg_size += 4 + sizeof(uint16_t); + + memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM); + msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM; + + rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); + if (rv_snd < 0) { + rv = rv_snd; + break; + } + + rv += rv_snd; + } + +send_online_fin: + pthread_rwlock_unlock(&dir_online_rwlock); + return rv; +} + +ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { + switch (mtype) { + case ECP_MTYPE_DIR_REQ: { + uint8_t region; + ssize_t rv; + + if (msg_size < 1) return ECP_ERR_SIZE; + + region = *msg; + if (region >= MAX_REGION) return ECP_ERR; + + rv = dir_send_online(conn, region); + if (rv < 0) return rv; + + return 1; + } + + default: + return ECP_ERR_MTYPE; + } +} + +/* messages from other servers */ ssize_t dir_send_ann(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(2, MTYPE_DIR_ANN, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(2, MTYPE_DIR_ANN, conn)]; unsigned char *msg; packet.buffer = pkt_buf; @@ -33,12 +124,12 @@ ssize_t dir_send_ann(ECPConnection *conn) { payload.buffer = pld_buf; payload.size = sizeof(pld_buf); - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN); msg = ecp_pld_get_msg(payload.buffer, payload.size); - msg[0] = srv_config->capabilities >> 8; + msg[0] = srv_config->region; msg[1] = srv_config->capabilities; - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0); } ssize_t dir_send_shadow(ECPConnection *conn) { @@ -49,111 +140,170 @@ ssize_t dir_send_shadow(ECPConnection *conn) { unsigned char *msg; struct hashtable_itr itr; - DirNode *node; - uint16_t count; - ecp_sts_t access_ts; + DIRNode *node; + ecp_ecdh_public_t *node_next; + uint8_t count; size_t msg_size; + ssize_t rv; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); - msg = ecp_pld_get_msg(payload.buffer, payload.size); - msg_size = 0; + rv = 0; + node_next = NULL; + do { + ssize_t rv_snd; - pthread_rwlock_rdlock(&dir_shadow_rwlock); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW); + msg = ecp_pld_get_msg(payload.buffer, payload.size); - count = ecp_ht_count(dir_shadow); - msg[0] = count >> 8; - msg[1] = count; - msg += sizeof(uint16_t); - msg_size += sizeof(uint16_t); + memset(msg, 0, 4 + sizeof(uint16_t)); // frag_cnt, frag_tot, item_cnt, region, serial - if (count > 0) { - size_t rv; - int _rv; + pthread_rwlock_rdlock(&dir_shadow_rwlock); - ecp_ht_itr_create(&itr, dir_shadow); - do { - node = ecp_ht_itr_value(&itr); + if (ecp_ht_count(dir_shadow) > 0) { + unsigned char *_msg; + size_t _msg_size; + int _rv; - pthread_mutex_lock(&node->mutex); - access_ts = node->access_ts; - pthread_mutex_unlock(&node->mutex); + _msg = msg + 4 + sizeof(uint16_t); + _msg_size = 0; + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + _rv = ecp_ht_itr_search(&itr, node_next); + if (_rv) { + LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv); + break; + } + node_next = NULL; + } - rv = ecp_dir_item_serialize(&node->dir_item, msg); - msg += rv; - msg_size += rv; + count = 0; + do { + node = ecp_ht_itr_value(&itr); - msg[0] = access_ts >> 24; - msg[1] = access_ts >> 16; - msg[2] = access_ts >> 8; - msg[3] = access_ts; - msg += sizeof(uint32_t); - msg_size += sizeof(uint32_t); + pthread_mutex_lock(&node->mutex); + if (node->verified || node->ann_local) { + if (count < MAX_DIR_ITEM_IN_MSG) { + size_t rv_ser; + + rv_ser = ecp_dir_item_serialize(&node->dir_item, _msg); + _msg += rv_ser; + _msg_size += rv_ser; + count++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + pthread_mutex_unlock(&node->mutex); - _rv = ecp_ht_itr_advance(&itr); - } while (_rv == ECP_OK); - } + _rv = ecp_ht_itr_advance(&itr); + } while (_rv == ECP_OK); + msg[2] = count; + msg_size = 4 + sizeof(uint16_t) + _msg_size; + } - pthread_rwlock_unlock(&dir_shadow_rwlock); + /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */ + pthread_rwlock_unlock(&dir_shadow_rwlock); + + rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0); + if (rv_snd < 0) { + rv = rv_snd; + break; + } + + rv += rv_snd; + } while (node_next); - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); + return rv; } -ssize_t dir_send_online(ECPConnection *conn) { +ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; - size_t msg_size; + unsigned short vkey_max; + size_t msg_size, msg_size_max, hdr_size; + DIRNode *node; + int i; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP); msg = ecp_pld_get_msg(payload.buffer, payload.size); + hdr_size = msg - payload.buffer; + vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t); + msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t); - pthread_rwlock_rdlock(&dir_online_rwlock); + pthread_rwlock_rdlock(&dir_shadow_rwlock); - msg[0] = dir_online.count >> 8; - msg[1] = dir_online.count; - msg += sizeof(uint16_t); - msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; - memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); + node = ecp_ht_search(dir_shadow, public); + if (node) pthread_mutex_lock(&node->mutex); - pthread_rwlock_unlock(&dir_online_rwlock); + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (node == NULL) return ECP_ERR; - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); + msg_size = 0; + + *msg = node->vkey_cnt; + msg++; + msg_size++; + + for (i=0; ivkey_cnt; i++) { + memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t)); + msg += sizeof(ecp_ecdh_public_t); + msg_size += sizeof(ecp_ecdh_public_t); + if (msg_size == msg_size_max) break; + } + pthread_mutex_unlock(&node->mutex); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0); } ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ECPDirItem dir_item; - int is_new; - uint16_t capabilities; - ecp_sts_t now; - ssize_t rsize; + uint8_t region; + uint8_t capabilities; + size_t rsize; ssize_t rv; + int ann_enable; - rsize = sizeof(uint16_t); + rsize = 2; if (msg_size < rsize) return ECP_ERR_SIZE; - capabilities = \ - ((uint16_t)msg[0] << 8) | \ - ((uint16_t)msg[1]); + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (!ann_enable) return rsize; + + region = msg[0]; + capabilities = msg[1]; + if ((capabilities & ECP_DIR_CAP_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) { + LOG(LOG_ERR, "dir_handle_ann: not a directory server\n"); + return ECP_ERR; + } memset(&dir_item, 0, sizeof(ECPDirItem)); dir_item.node = conn->remote; + dir_item.region = region; dir_item.capabilities = capabilities; - now = ecp_tm_get_s(); - is_new = dir_process_item(&dir_item, now); - if (is_new) vlink_new_node(conn->sock, &dir_item); + if (dir_item.region >= MAX_REGION) { + LOG(LOG_ERR, "dir_handle_ann: bad region\n"); + return ECP_ERR; + } + + dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public); rv = dir_send_shadow(conn); if (rv < 0) return rv; @@ -161,348 +311,573 @@ ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) return rsize; } -ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ssize_t rv; - - rv = dir_send_online(conn); - if (rv < 0) return rv; - - return 0; -} - ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ecp_sts_t now; - uint16_t count; + uint8_t count; size_t rsize; - int i; + int i, ann_enable; - if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; + if (msg_size < 4 + sizeof(uint16_t)) return ECP_ERR_SIZE; - count = \ - ((uint16_t)msg[0] << 8) | \ - ((uint16_t)msg[1]); + count = msg[2]; + msg += 4 + sizeof(uint16_t); // frag_cnt, frag_tot, item_cnt, region, serial - rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); + rsize = 4 + sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM; if (msg_size < rsize) return ECP_ERR_SIZE; - msg += sizeof(uint16_t); + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (!ann_enable) return rsize; - now = ecp_tm_get_s(); for (i=0; isock, &dir_item); + if (dir_item.region >= MAX_REGION) { + LOG(LOG_ERR, "dir_handle_shadow: bad region\n"); + return ECP_ERR; } + + dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public); } return rsize; } +ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + size_t rsize; + ssize_t rv; + + rsize = ECP_SIZE_ECDH_PUB; + if (msg_size < rsize) return ECP_ERR_SIZE; + + rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg); + if (rv < 0) return rv; + + return rsize; +} + ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { switch (mtype) { - case ECP_MTYPE_DIR_REQ: { - if (ecp_conn_is_outb(conn)) return ECP_ERR; - return dir_handle_req(conn, msg, msg_size); - } + case MTYPE_DIR_ANN: { + int is_dir; + + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; + if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; - case ECP_MTYPE_DIR_ANN: { - if (ecp_conn_is_outb(conn)) return ECP_ERR; - if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX; return dir_handle_ann(conn, msg, msg_size); } - case ECP_MTYPE_DIR_SHADOW: { + case MTYPE_DIR_SHADOW: { if (ecp_conn_is_inb(conn)) return ECP_ERR; + return dir_handle_shadow(conn, msg, msg_size); } + case MTYPE_DIR_ORIGIN_REQ: { + int is_dir; + + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; + if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; + + return dir_handle_origin_req(conn, msg, msg_size); + } + default: return ECP_ERR_MTYPE; } } -int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { - ssize_t rv; - - if (ecp_conn_is_inb(conn)) return ECP_OK; - - rv = dir_send_ann(conn); - if (rv < 0) return rv; - - return ECP_OK; -} +void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) { + DIRNode *node; + unsigned int vkey_req_dir; + int is_verified = 0; + int rv = ECP_OK; -void dir_online_switch(void) { - struct hashtable_itr itr; - DirNode *node; - unsigned char *msg; + pthread_rwlock_rdlock(&dir_online_rwlock); + vkey_req_dir = dir_vkey_req; + pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_rdlock(&dir_shadow_rwlock); - pthread_rwlock_wrlock(&dir_online_rwlock); - - msg = dir_online.msg; - dir_online.count = ecp_ht_count(dir_shadow); - - if (dir_online.count > 0) { - size_t _rv; - int rv; - ecp_ht_itr_create(&itr, dir_shadow); - do { - node = ecp_ht_itr_value(&itr); + node = ht_search_item(dir_shadow, dir_item); + if (node == NULL) { + pthread_rwlock_unlock(&dir_shadow_rwlock); - _rv = ecp_dir_item_serialize(&node->dir_item, msg); - msg += _rv; + rv = dir_create_node(dir_item, sock, &node); + if (!rv) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; + if (!rv) rv = ht_insert_node(dir_shadow, node); + if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock); + } + if (rv) { + if (node) { + dir_destroy_node(node); + node = NULL; + } + LOG(LOG_ERR, "dir_process_item: err:%d\n", rv); + return; + } - rv = ecp_ht_itr_advance(&itr); - } while (rv == ECP_OK); + LOG(LOG_DEBUG, "dir_process_item: new node\n"); } - pthread_rwlock_unlock(&dir_online_rwlock); + pthread_mutex_lock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); -} -int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { - DirNode *node; - int is_new = 0; - int rv = ECP_OK; + if (node->zombie) goto process_item_fin; - pthread_rwlock_rdlock(&dir_shadow_rwlock); - - node = ht_search_item(dir_shadow, dir_item); - if (node) { - pthread_mutex_lock(&node->mutex); - if (!node->zombie) node->access_ts = access_ts; - pthread_mutex_unlock(&node->mutex); + if (!node->verified) { + int key_ex = 0; + int i; - pthread_rwlock_unlock(&dir_shadow_rwlock); - } else { - pthread_rwlock_unlock(&dir_shadow_rwlock); + if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) { + node->ann_local = 1; + } - LOG(LOG_DEBUG, "dir new node\n"); + for (i=0; ivkey_cnt; i++) { + if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) { + key_ex = 1; + break; + } + } + if (!key_ex) { + unsigned int vkey_req; - node = dir_create_node(dir_item, access_ts); - if (node == NULL) rv = ECP_ERR_ALLOC; + if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { + vkey_req = vkey_req_dir; + } else { + vkey_req = MIN_VKEY_REQ; + } - if (!rv) { - pthread_rwlock_wrlock(&dir_shadow_rwlock); - if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; - if (!rv) { - if (ht_search_node(dir_shadow, node) == NULL) { - rv = ht_insert_node(dir_shadow, node); - } else { - rv = ECP_ERR_DUP; + if (node->vkey_cnt < vkey_req) { + /* vkey list is used to track origin of particular server */ + memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt])); + node->vkey_cnt++; + if (node->vkey_cnt == vkey_req) { + node->verified = 1; + node->is_new = 0; + is_verified = 1; } } - pthread_rwlock_unlock(&dir_shadow_rwlock); - } - if (!rv) { - is_new = 1; - /* always switch */ - dir_online_switch(); - } else { - if (node) dir_destroy_node(node); - LOG(LOG_ERR, "dir create node err:%d\n", rv); + /* protocol does not support server properties change */ + if (!ecp_dir_item_eq(dir_item, &node->dir_item)) { + char buffer[ECP_SIZE_ECDH_KEY_BUF]; + + ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public); + LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer); + } } } - return is_new; +process_item_fin: + pthread_mutex_unlock(&node->mutex); + + if (is_verified) { + LOG(LOG_DEBUG, "dir_process_item: node verified\n"); + + rv = acl_add_key(&node->dir_item); + if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv); + + vlink_new_node(sock, &node->dir_item); + } } -DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { - DirNode *node; +int dir_open_conn(DIRNode *node, ECPSocket *sock) { int rv; - node = malloc(sizeof(DirNode)); - if (node == NULL) return NULL; + node->conn = malloc(sizeof(ECPConnection)); + if (node->conn == NULL) return ECP_ERR_ALLOC; + + ecp_conn_init(node->conn, sock, CTYPE_DIR); + ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX); + + rv = ecp_conn_open(node->conn, &node->dir_item.node); + if (rv) node->conn = NULL; - memset(node, 0, sizeof(DirNode)); + return rv; +} - rv = pthread_mutex_init(&node->mutex, NULL); +int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) { + DIRNode *_node; + int rv; + + *node = NULL; + _node = malloc(sizeof(DIRNode)); + if (_node == NULL) return ECP_ERR_ALLOC; + + memset(_node, 0, sizeof(DIRNode)); + rv = pthread_mutex_init(&_node->mutex, NULL); if (rv) { - free(node); - return NULL; + free(_node); + return ECP_ERR; } - node->dir_item = *dir_item; - node->access_ts = access_ts; + _node->dir_item = *dir_item; + _node->is_new = 1; + + /* open connection to other directory servers */ + if ((dir_item->capabilities & ECP_DIR_CAP_DIR) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + rv = dir_open_conn(_node, sock); + if (rv) { + pthread_mutex_destroy(&_node->mutex); + free(_node); + return rv; + } + } - return node; + *node = _node; + return ECP_OK; } -void dir_destroy_node(DirNode *node) { +void dir_destroy_node(DIRNode *node) { + if (node->conn) ecp_conn_close(node->conn); pthread_mutex_destroy(&node->mutex); free(node); } -int dir_open_conn(ECPSocket *sock, ECPNode *node) { - ECPConnection *conn; - int rv; +static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) { + if (conn->type == CTYPE_DIR) return 1; + return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO); +} - conn = malloc(sizeof(ECPConnection)); - if (conn == NULL) return ECP_ERR_ALLOC; +static void remove_nodes(DIRNode *remove_node[], int remove_cnt) { + int i; - ecp_dir_conn_init(conn, sock); - ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); - rv = ecp_conn_try_open(conn, node); - return rv; + pthread_rwlock_wrlock(&dir_shadow_rwlock); + for (i=0; idir_item); + dir_destroy_node(remove_node[i]); + } } -DirNode *dir_search_conn(ECPConnection *conn) { - DirNode *node; - DirNode *ret = NULL; +void dir_online_switch(ECPSocket *sock, int inc_serial) { + struct hashtable_itr itr; + DIRNode *node; + uint8_t count[MAX_REGION]; + uint8_t msg_count[MAX_REGION]; + unsigned char *msg[MAX_REGION]; + ecp_ecdh_public_t *node_next; + DIRNode *remove_node[MAX_NODE_REMOVE]; + unsigned int dir_cnt, node_cnt; + int i, remove_cnt; + int rv; pthread_rwlock_rdlock(&dir_shadow_rwlock); + pthread_rwlock_wrlock(&dir_online_rwlock); - node = ht_search_conn(dir_shadow, conn); - if (node) { - pthread_mutex_lock(&node->mutex); - if (!node->zombie) { - node->refcount++; - ret = node; - } else { - ret = NULL; + if (dir_online) { + for (i=0; ilist[i].msg_count = 0; + count[i] = 0; + msg_count[i] = 0; + msg[i] = dir_online->list[i].msg[0].buffer; + } + } + + dir_cnt = 0; + node_cnt = 0; + remove_cnt = 0; + + if (ecp_ht_count(dir_shadow) > 0) { + uint8_t region; + int verified; + + ecp_ht_itr_create(&itr, dir_shadow); + + node_next = NULL; + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + region = node->dir_item.region; + verified = node->verified; + + if (verified) { + if (dir_online) { + size_t rv_ser; + + rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[0]); + msg[0] += rv_ser; + count[0]++; + if (region) { + rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[region]); + msg[region] += rv_ser; + count[region]++; + } + } + if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { + dir_cnt++; + } + node_cnt++; + } else { + node->zombie = 1; + if (node_next == NULL) { + if (remove_cnt < MAX_NODE_REMOVE) { + remove_node[remove_cnt] = node; + remove_cnt++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + } + node->verified = 0; + node->ann_local = 0; + memset(&node->vkey, 0, sizeof(node->vkey)); + node->vkey_cnt = 0; + pthread_mutex_unlock(&node->mutex); + + /* let's reconnect all directory servers + node->conn is currently immutable since announce is disabled well before online switch */ + if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT); + + if (dir_online && verified) { + if (count[0] == MAX_DIR_ITEM_IN_MSG) { + dir_online->list[0].msg[msg_count[0]].count = count[0]; + msg_count[0]++; + msg[0] = dir_online->list[0].msg[msg_count[0]].buffer; + count[0] = 0; + } + if (region && count[region] == MAX_DIR_ITEM_IN_MSG) { + dir_online->list[region].msg[msg_count[region]].count = count[region]; + msg_count[region]++; + msg[region] = dir_online->list[region].msg[msg_count[region]].buffer; + count[region] = 0; + } + } + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + + if (dir_online) { + for (i=0; ilist[i].msg[msg_count[i]].count = count[i]; + dir_online->list[i].msg_count = msg_count[i] + 1; + } else { + dir_online->list[i].msg_count = msg_count[i]; + } + } } - pthread_mutex_unlock(&node->mutex); } + dir_vkey_req = dir_cnt / 2 + 1; + if (dir_vkey_req > MAX_VKEY) { + LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY); + dir_vkey_req = MAX_VKEY; + } + + if (dir_online && inc_serial) dir_online->serial++; + + pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_unlock(&dir_shadow_rwlock); - return ret; + rv = acl_reset_ht(); + if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv); + + ecp_sock_expire(sock, online_switch_expired); + + if (remove_cnt) { + remove_nodes(remove_node, remove_cnt); + while (node_next) { + remove_cnt = 0; + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + if (node->zombie) { + if (remove_cnt < MAX_NODE_REMOVE) { + remove_node[remove_cnt] = node; + remove_cnt++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + pthread_mutex_unlock(&node->mutex); + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + + /* no need to copy node_next key, since this is the only thread that can destroy node */ + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (remove_cnt) remove_nodes(remove_node, remove_cnt); + } + } + + if (srv_config->capabilities & ECP_DIR_CAP_DIR) acl_load_ht(); + + LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0); +} + +void dir_announce_allow(void) { + pthread_rwlock_wrlock(&dir_timer_rwlock); + dir_process_enable = PROC_ALLOW_ALL; + pthread_rwlock_unlock(&dir_timer_rwlock); +} + +void dir_announce_block(void) { + pthread_rwlock_wrlock(&dir_timer_rwlock); + dir_process_enable = PROC_BLOCK_ANN; + pthread_rwlock_unlock(&dir_timer_rwlock); + + LOG(LOG_DEBUG, "dir_announce_block\n"); } -void dir_announce(ECPSocket *sock) { - static DirNode *expire_node[MAX_EXPIRE_CNT]; - static int expire_node_z[MAX_EXPIRE_CNT]; - static int expire_cnt; +void dit_init_serial(uint16_t serial) { + if (dir_online) { + pthread_rwlock_wrlock(&dir_online_rwlock); + dir_online->serial = serial; + pthread_rwlock_unlock(&dir_online_rwlock); + } +} +void dir_announce(ECPSocket *sock, int ann_period) { struct hashtable_itr itr; - ecp_sts_t now; - DirNode *node; - DirNode *node_next; - DirNode *announce_node[MAX_ANNOUNCE_CNT]; + DIRNode *node; + ecp_ecdh_public_t *node_next; + DIRNode *announce_node[MAX_NODE_ANNOUNCE]; + unsigned int ht_count; int announce_cnt; - int refcount; int i; int rv; - now = ecp_tm_get_s(); node_next = NULL; do { + uint32_t delay; + uint32_t rnd; + announce_cnt = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); - if (ecp_ht_count(dir_shadow) > 0) { + ht_count = ecp_ht_count(dir_shadow); + if (ht_count > 0) { ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { - LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); + LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); - rv = ecp_ht_itr_advance(&itr); - pthread_mutex_lock(&node->mutex); - if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { - node->zombie = 1; - pthread_mutex_unlock(&node->mutex); - - if (expire_cnt < MAX_EXPIRE_CNT) { - expire_node[expire_cnt] = node; - expire_node_z[expire_cnt] = 1; - expire_cnt++; - } - } else { - pthread_mutex_unlock(&node->mutex); - - if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(&node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + if (announce_cnt < MAX_NODE_ANNOUNCE) { announce_node[announce_cnt] = node; announce_cnt++; - - if (announce_cnt == MAX_ANNOUNCE_CNT) { - if (!rv) { - node_next = ecp_ht_itr_key(&itr); - } else { - node_next = NULL; - } - break; - } + } else { + node_next = ecp_ht_itr_key(&itr); + break; } } + + rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } + /* no need to copy node_next key, since announce is disabled during online switch node removal */ pthread_rwlock_unlock(&dir_shadow_rwlock); - if (expire_cnt) { - pthread_rwlock_wrlock(&dir_shadow_rwlock); - for (i=0; idir_item.node); - if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); - } - - i = expire_cnt - 1; - while (i >= 0) { - node = expire_node[i]; - - pthread_mutex_lock(&node->mutex); - refcount = node->refcount; - pthread_mutex_unlock(&node->mutex); + ECPConnection *conn; + int is_reg, is_open, is_z; + ecp_sts_t now; + + conn = announce_node[i]->conn; + if (conn == NULL) { + rv = dir_open_conn(announce_node[i], sock); + if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); + continue; + } - if (refcount == 0) { - int j = i; + is_reg = is_open = is_z = 0; + now = ecp_tm_get_s(); + + ecp_conn_lock(conn); + is_reg = _ecp_conn_is_reg(conn); + if (is_reg) is_open = _ecp_conn_is_open(conn); + if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT); + ecp_conn_unlock(conn); + + if (!is_reg || is_z) { + LOG(LOG_DEBUG, "dir_announce: reconnect\n"); + ecp_conn_close(conn); + rv = dir_open_conn(announce_node[i], sock); + if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); + } else if (is_open) { + ssize_t _rv; + + _rv = dir_send_ann(conn); + if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv); + } - expire_cnt--; - while (j < expire_cnt) { - expire_node[j] = expire_node[j+1]; - j++; - } - expire_node[j] = NULL; - dir_destroy_node(node); + if (delay) { + /* randomize over delay / 2 and delay + delay / 2 */ + rnd = arc4random_uniform(delay + 1); + usleep(delay / 2 + rnd); } - i--; } + } while (node_next); + + LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count); } static void *_dir_announce(void *arg) { ECPSocket *sock = arg; + uint32_t rnd; + time_t tv_sec; + int delay; + int ann_enable; while (1) { - LOG(LOG_DEBUG, "dir announce...\n"); - dir_announce(sock); - sleep(10); + tv_sec = time(NULL); + + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (ann_enable) { + dir_announce(sock, ANN_PERIOD); + } else { + sleep(ANN_BLOCK_TIME + ANN_PERIOD); + } + + /* resolution is 1s */ + delay = ANN_PERIOD - (time(NULL) - tv_sec); + if (delay > 0) { + rnd = arc4random_uniform(delay + 1); + sleep(delay / 2 + rnd); + } } return NULL; @@ -516,18 +891,72 @@ int dir_start_announce(ECPSocket *sock) { return ECP_OK; } -int dir_init(ECPContext *ctx) { - ECPConnHandler *handler; +void dir_init_switch(ECPSocket *sock, int init_ann) { + int i; + + dir_announce_allow(); + for (i=0; ictx; + ECPConnHandler *handler, *c_handler; + int is_dir; + int rv; + + /* dir_process_enable and dir_online.serial will be set from timer */ + dir_vkey_req = 1; + srv_config = srv_get_config(); + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); if (rv) { @@ -540,12 +969,64 @@ int dir_init(ECPContext *ctx) { return ECP_ERR; } + rv = pthread_rwlock_init(&dir_timer_rwlock, NULL); + if (rv) { + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR; + } + dir_shadow = ecp_ht_create_keys(); if (dir_shadow == NULL) { + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR_ALLOC; + } + + handler = malloc(sizeof(ECPConnHandler)); + if (handler == NULL) { + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR_ALLOC; } + ecp_conn_handler_init(handler, dir_handle_msg, NULL, NULL, NULL); + + if (is_dir) { + c_handler = malloc(sizeof(ECPConnHandler)); + dir_online = malloc(sizeof(DIROnline)); + if ((c_handler == NULL) || (dir_online == NULL)) { + free(handler); + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + if (dir_online) free(dir_online); + if (c_handler) free(c_handler); + return ECP_ERR_ALLOC; + } + ecp_conn_handler_init(c_handler, dir_handle_client_msg, NULL, NULL, NULL); + memset(dir_online, 0, sizeof(DIROnline)); + } + + rv = timer_init(sock); + if (rv) { + free(handler); + if (is_dir) { + free(dir_online); + free(c_handler); + } + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + return rv; + } + + ecp_ctx_set_handler(ctx, CTYPE_DIR, handler); + if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler); return ECP_OK; } -- cgit v1.2.3