From 5f55d9d4d14635678e7f582215e3642de2e232a4 Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Mon, 6 May 2024 02:08:31 +0200 Subject: new ecp directory and vconn server --- ecp/server/Makefile | 7 +- ecp/server/acl.c | 275 +++++++++++++ ecp/server/acl.h | 21 + ecp/server/dir.c | 1053 +++++++++++++++++++++++++++++++++++-------------- ecp/server/dir.h | 89 +++-- ecp/server/ht.c | 21 +- ecp/server/ht.h | 10 +- ecp/server/server.c | 270 ++++++++----- ecp/server/server.h | 15 +- ecp/server/timer.c | 98 +++++ ecp/server/timer.h | 9 + ecp/server/vlink.c | 360 +++++++---------- ecp/server/vlink.h | 21 +- ecp/src/ecp/dir/dir.c | 24 +- ecp/src/ecp/dir/dir.h | 11 +- 15 files changed, 1602 insertions(+), 682 deletions(-) create mode 100644 ecp/server/acl.c create mode 100644 ecp/server/acl.h create mode 100644 ecp/server/timer.c create mode 100644 ecp/server/timer.h (limited to 'ecp') diff --git a/ecp/server/Makefile b/ecp/server/Makefile index 2105e9f..84070bf 100644 --- a/ecp/server/Makefile +++ b/ecp/server/Makefile @@ -1,9 +1,10 @@ src_dir = ../src include $(src_dir)/ecp/common.mk -CFLAGS += -Wno-int-to-void-pointer-cast +CFLAGS += -I../util -Wno-int-to-void-pointer-cast +LDFLAGS += -lrt -obj = server.o dir.o vlink.o ht.o -dep = ../build-posix/*.a +obj = server.o dir.o vlink.o ht.o acl.o timer.o +dep = ../build-posix/*.a ../util/libecputil.a %.o: %.c $(CC) $(CFLAGS) -c $< diff --git a/ecp/server/acl.c b/ecp/server/acl.c new file mode 100644 index 0000000..d8cdc6e --- /dev/null +++ b/ecp/server/acl.c @@ -0,0 +1,275 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include "server.h" +#include "acl.h" + +static SRVConfig *srv_config; + +static ecp_ht_table_t *acl_keys = NULL; +static ecp_ht_table_t *acl_keys_dir = NULL; +static pthread_mutex_t acl_li_mutex; +static pthread_rwlock_t acl_ht_rwlock; +static ACLItem *acl_head = NULL; +static ACLItem *acl_head_dir = NULL; +static int acl_mark = 0; + +ACLItem *acl_create_item(void) { + ACLItem *ret = NULL; + + ret = malloc(sizeof(ACLItem)); + if (ret == NULL) return ret; + memset(&ret->key, 0, sizeof(ret->key)); + ret->key_cnt = 0; + ret->next = NULL; + + return ret; +} + +void acl_destroy_item(ACLItem *item) { + free(item); +} + +void acl_destroy_list(ACLItem *head) { + ACLItem *acl_next; + + while (head) { + acl_next = head->next; + acl_destroy_item(head); + head = acl_next; + } +} + +static int _add_key(ecp_ecdh_public_t *public, uint8_t capabilities) { + int rv; + + if ((acl_keys == NULL) || (acl_keys_dir == NULL)) return ECP_ERR; + + if ((srv_config->capabilities & ECP_DIR_CAP_DIR) || (srv_config->capabilities & capabilities & ECP_DIR_CAP_VCONN)) { + /* directory server accepts all connections + vconn server accepts connections only from other vconn servers */ + rv = ecp_ht_insert_uniq(acl_keys, public, &acl_mark); + if (rv && (rv != ECP_ERR_DUP)) return rv; + } + if (srv_config->capabilities & capabilities & ECP_DIR_CAP_DIR) { + rv = ecp_ht_insert_uniq(acl_keys_dir, public, &acl_mark); + if (rv && (rv != ECP_ERR_DUP)) return rv; + } + + return ECP_OK; +} + +static int _read_file(int fd, ACLItem *head) { + ecp_ecdh_public_t public; + int rv; + + if (head == NULL) return ECP_ERR_ALLOC; + + while (head->next) { + head = head->next; + } + + while(ecp_util_read_key(fd, &public, NULL) == ECP_OK) { + if (head->key_cnt == ACL_MAX_KEY) { + head->next = acl_create_item(); + if (head->next == NULL) return ECP_ERR_ALLOC; + head = head->next; + } + memcpy(&head->key[head->key_cnt], &public, sizeof(head->key[head->key_cnt])); + head->key_cnt++; + } + + return ECP_OK; +} + +static int _li2ht(ACLItem *head, int is_dir) { + int i; + int rv; + + while (head) { + for (i=0; ikey_cnt; i++) { + rv = _add_key(&head->key[i], is_dir ? ECP_DIR_CAP_DIR : 0); + if (rv) return rv; + } + head = head->next; + } + + return ECP_OK; +} + +int acl_add_key(ECPDirItem *dir_item) { + int rv; + + pthread_rwlock_wrlock(&acl_ht_rwlock); + rv = _add_key(&dir_item->node.key_perma.public, dir_item->capabilities); + pthread_rwlock_unlock(&acl_ht_rwlock); + + return rv; +} + +int acl_inlist(ecp_ecdh_public_t *public) { + void *item = NULL; + + pthread_rwlock_rdlock(&acl_ht_rwlock); + if (acl_keys) item = ecp_ht_search(acl_keys, public); + pthread_rwlock_unlock(&acl_ht_rwlock); + + return (item != NULL); +} + +int acl_dir_inlist(ecp_ecdh_public_t *public) { + void *item = NULL; + + pthread_rwlock_rdlock(&acl_ht_rwlock); + if (acl_keys_dir) item = ecp_ht_search(acl_keys_dir, public); + pthread_rwlock_unlock(&acl_ht_rwlock); + + return (item != NULL); +} + +int acl_reset_ht(void) { + int rv = ECP_OK; + + pthread_rwlock_wrlock(&acl_ht_rwlock); + if (acl_keys) ecp_ht_destroy(acl_keys); + if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir); + acl_keys = ecp_ht_create_keys(); + acl_keys_dir = ecp_ht_create_keys(); + if ((acl_keys == NULL) || (acl_keys_dir == NULL)) rv = ECP_ERR_ALLOC; + pthread_rwlock_unlock(&acl_ht_rwlock); + + return rv; +} + +int acl_load_ht(void) { + int rv = ECP_OK; + int _rv; + + pthread_mutex_lock(&acl_li_mutex); + pthread_rwlock_wrlock(&acl_ht_rwlock); + + _rv = _li2ht(acl_head, 0); + if (_rv) rv = ECP_ERR; + _rv = _li2ht(acl_head_dir, 1); + if (_rv) rv = ECP_ERR; + + pthread_rwlock_unlock(&acl_ht_rwlock); + pthread_mutex_unlock(&acl_li_mutex); + + return rv; +} + +int acl_load(void) { + ACLItem *acl_new = NULL; + ACLItem *acl_dir_new = NULL; + int fd = -1; + int fd_dir = -1; + int rv = ECP_OK; + int _rv; + + if ((srv_config->acl_fn == NULL) && (srv_config->acl_fn_dir == NULL)) return ECP_OK; + + if (srv_config->acl_fn) { + fd = open(srv_config->acl_fn, O_RDONLY); + if (fd < 0) { + LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn); + return ECP_ERR; + } + } + + if (srv_config->acl_fn_dir) { + fd_dir = open(srv_config->acl_fn_dir, O_RDONLY); + if (fd_dir < 0) { + LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn_dir); + close(fd); + return ECP_ERR; + } + } + + pthread_mutex_lock(&acl_li_mutex); + if (fd >= 0) { + acl_new = acl_create_item(); + rv = _read_file(fd, acl_new); + if (rv) { + LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn, rv); + acl_destroy_list(acl_new); + goto load_fin; + } + } + + if (fd_dir >= 0) { + acl_dir_new = acl_create_item(); + rv = _read_file(fd_dir, acl_dir_new); + if (rv) { + LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn_dir, rv); + acl_destroy_list(acl_dir_new); + acl_destroy_list(acl_new); + goto load_fin; + } + } + if (acl_new) { + acl_destroy_list(acl_head); + acl_head = acl_new; + } + if (acl_dir_new) { + acl_destroy_list(acl_head_dir); + acl_head_dir = acl_dir_new; + } + + pthread_rwlock_wrlock(&acl_ht_rwlock); + if (acl_new) { + _rv = _li2ht(acl_new, 0); + if (_rv) rv = ECP_ERR; + } + if (acl_dir_new) { + _rv = _li2ht(acl_dir_new, 1); + if (_rv) rv = ECP_ERR; + } + + pthread_rwlock_unlock(&acl_ht_rwlock); + +load_fin: + pthread_mutex_unlock(&acl_li_mutex); + + close(fd_dir); + close(fd); + + return rv; +} + +int acl_init(void) { + int rv; + + srv_config = srv_get_config(); + + rv = pthread_mutex_init(&acl_li_mutex, NULL); + if (rv) return ECP_ERR; + + rv = pthread_rwlock_init(&acl_ht_rwlock, NULL); + if (rv) { + pthread_mutex_destroy(&acl_li_mutex); + return ECP_ERR; + } + + acl_keys = ecp_ht_create_keys(); + acl_keys_dir = ecp_ht_create_keys(); + if ((acl_keys == NULL) || (acl_keys_dir == NULL)) { + pthread_rwlock_destroy(&acl_ht_rwlock); + pthread_mutex_destroy(&acl_li_mutex); + if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir); + if (acl_keys) ecp_ht_destroy(acl_keys); + return ECP_ERR_ALLOC; + } + + return ECP_OK; +} diff --git a/ecp/server/acl.h b/ecp/server/acl.h new file mode 100644 index 0000000..5e3f83c --- /dev/null +++ b/ecp/server/acl.h @@ -0,0 +1,21 @@ +#define ACL_MAX_KEY 50 + +typedef struct ACLItem { + ecp_ecdh_public_t key[ACL_MAX_KEY]; + unsigned short key_cnt; + struct ACLItem *next; +} ACLItem; + +ACLItem *acl_create_item(void); +void acl_destroy_item(ACLItem *acl_item); +void acl_destroy_list(ACLItem *head); + +int acl_add_key(ECPDirItem *dir_item); +int acl_inlist(ecp_ecdh_public_t *public); +int acl_dir_inlist(ecp_ecdh_public_t *public); + +int acl_reset_ht(void); +int acl_load_ht(void); + +int acl_load(void); +int acl_init(void); diff --git a/ecp/server/dir.c b/ecp/server/dir.c index 0066bac..7244fd9 100644 --- a/ecp/server/dir.c +++ b/ecp/server/dir.c @@ -1,31 +1,122 @@ #include #include #include +#include #include -#include +#include #include #include -#include "server.h" -#include "vlink.h" #include "dir.h" +#include "vlink.h" #include "ht.h" +#include "acl.h" + +#include "timer.h" +#include "server.h" static ecp_ht_table_t *dir_shadow = NULL; static pthread_rwlock_t dir_shadow_rwlock; static pthread_rwlock_t dir_online_rwlock; +static pthread_rwlock_t dir_timer_rwlock; static pthread_t dir_announce_thd; -static DirList dir_online; +static DIROnline *dir_online = NULL; +static unsigned int dir_vkey_req; +static int dir_process_enable; static SRVConfig *srv_config; +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) + +/* messages from clients */ +ssize_t dir_send_online(ECPConnection *conn, uint8_t region) { + DIRList *list; + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + size_t msg_size; + ssize_t rv; + int i; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + if (dir_online == NULL) return ECP_ERR; + + rv = 0; + pthread_rwlock_rdlock(&dir_online_rwlock); + list = &dir_online->list[region]; + + if (list->msg_count == 0) goto send_online_fin; + + for (i=0; imsg_count; i++) { + ssize_t rv_snd; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg_size = 0; + + msg[0] = i; // frag_cnt + msg[1] = list->msg_count; // frag_tot + msg[2] = list->msg[i].count; // item_cnt + msg[3] = region; // region + msg[4] = dir_online->serial >> 8; // serial + msg[5] = dir_online->serial; + + msg += 4 + sizeof(uint16_t); + msg_size += 4 + sizeof(uint16_t); + + memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM); + msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM; + + rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); + if (rv_snd < 0) { + rv = rv_snd; + break; + } + + rv += rv_snd; + } + +send_online_fin: + pthread_rwlock_unlock(&dir_online_rwlock); + return rv; +} + +ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { + switch (mtype) { + case ECP_MTYPE_DIR_REQ: { + uint8_t region; + ssize_t rv; + + if (msg_size < 1) return ECP_ERR_SIZE; + + region = *msg; + if (region >= MAX_REGION) return ECP_ERR; + + rv = dir_send_online(conn, region); + if (rv < 0) return rv; + + return 1; + } + + default: + return ECP_ERR_MTYPE; + } +} + +/* messages from other servers */ ssize_t dir_send_ann(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; - unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; - unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(2, MTYPE_DIR_ANN, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(2, MTYPE_DIR_ANN, conn)]; unsigned char *msg; packet.buffer = pkt_buf; @@ -33,12 +124,12 @@ ssize_t dir_send_ann(ECPConnection *conn) { payload.buffer = pld_buf; payload.size = sizeof(pld_buf); - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN); msg = ecp_pld_get_msg(payload.buffer, payload.size); - msg[0] = srv_config->capabilities >> 8; + msg[0] = srv_config->region; msg[1] = srv_config->capabilities; - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0); } ssize_t dir_send_shadow(ECPConnection *conn) { @@ -49,111 +140,170 @@ ssize_t dir_send_shadow(ECPConnection *conn) { unsigned char *msg; struct hashtable_itr itr; - DirNode *node; - uint16_t count; - ecp_sts_t access_ts; + DIRNode *node; + ecp_ecdh_public_t *node_next; + uint8_t count; size_t msg_size; + ssize_t rv; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); - msg = ecp_pld_get_msg(payload.buffer, payload.size); - msg_size = 0; + rv = 0; + node_next = NULL; + do { + ssize_t rv_snd; - pthread_rwlock_rdlock(&dir_shadow_rwlock); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW); + msg = ecp_pld_get_msg(payload.buffer, payload.size); - count = ecp_ht_count(dir_shadow); - msg[0] = count >> 8; - msg[1] = count; - msg += sizeof(uint16_t); - msg_size += sizeof(uint16_t); + memset(msg, 0, 4 + sizeof(uint16_t)); // frag_cnt, frag_tot, item_cnt, region, serial - if (count > 0) { - size_t rv; - int _rv; + pthread_rwlock_rdlock(&dir_shadow_rwlock); - ecp_ht_itr_create(&itr, dir_shadow); - do { - node = ecp_ht_itr_value(&itr); + if (ecp_ht_count(dir_shadow) > 0) { + unsigned char *_msg; + size_t _msg_size; + int _rv; - pthread_mutex_lock(&node->mutex); - access_ts = node->access_ts; - pthread_mutex_unlock(&node->mutex); + _msg = msg + 4 + sizeof(uint16_t); + _msg_size = 0; + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + _rv = ecp_ht_itr_search(&itr, node_next); + if (_rv) { + LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv); + break; + } + node_next = NULL; + } - rv = ecp_dir_item_serialize(&node->dir_item, msg); - msg += rv; - msg_size += rv; + count = 0; + do { + node = ecp_ht_itr_value(&itr); - msg[0] = access_ts >> 24; - msg[1] = access_ts >> 16; - msg[2] = access_ts >> 8; - msg[3] = access_ts; - msg += sizeof(uint32_t); - msg_size += sizeof(uint32_t); + pthread_mutex_lock(&node->mutex); + if (node->verified || node->ann_local) { + if (count < MAX_DIR_ITEM_IN_MSG) { + size_t rv_ser; + + rv_ser = ecp_dir_item_serialize(&node->dir_item, _msg); + _msg += rv_ser; + _msg_size += rv_ser; + count++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + pthread_mutex_unlock(&node->mutex); - _rv = ecp_ht_itr_advance(&itr); - } while (_rv == ECP_OK); - } + _rv = ecp_ht_itr_advance(&itr); + } while (_rv == ECP_OK); + msg[2] = count; + msg_size = 4 + sizeof(uint16_t) + _msg_size; + } - pthread_rwlock_unlock(&dir_shadow_rwlock); + /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */ + pthread_rwlock_unlock(&dir_shadow_rwlock); + + rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0); + if (rv_snd < 0) { + rv = rv_snd; + break; + } + + rv += rv_snd; + } while (node_next); - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); + return rv; } -ssize_t dir_send_online(ECPConnection *conn) { +ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; - size_t msg_size; + unsigned short vkey_max; + size_t msg_size, msg_size_max, hdr_size; + DIRNode *node; + int i; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; - ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP); msg = ecp_pld_get_msg(payload.buffer, payload.size); + hdr_size = msg - payload.buffer; + vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t); + msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t); - pthread_rwlock_rdlock(&dir_online_rwlock); + pthread_rwlock_rdlock(&dir_shadow_rwlock); - msg[0] = dir_online.count >> 8; - msg[1] = dir_online.count; - msg += sizeof(uint16_t); - msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; - memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); + node = ecp_ht_search(dir_shadow, public); + if (node) pthread_mutex_lock(&node->mutex); - pthread_rwlock_unlock(&dir_online_rwlock); + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (node == NULL) return ECP_ERR; - return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); + msg_size = 0; + + *msg = node->vkey_cnt; + msg++; + msg_size++; + + for (i=0; ivkey_cnt; i++) { + memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t)); + msg += sizeof(ecp_ecdh_public_t); + msg_size += sizeof(ecp_ecdh_public_t); + if (msg_size == msg_size_max) break; + } + pthread_mutex_unlock(&node->mutex); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0); } ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ECPDirItem dir_item; - int is_new; - uint16_t capabilities; - ecp_sts_t now; - ssize_t rsize; + uint8_t region; + uint8_t capabilities; + size_t rsize; ssize_t rv; + int ann_enable; - rsize = sizeof(uint16_t); + rsize = 2; if (msg_size < rsize) return ECP_ERR_SIZE; - capabilities = \ - ((uint16_t)msg[0] << 8) | \ - ((uint16_t)msg[1]); + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (!ann_enable) return rsize; + + region = msg[0]; + capabilities = msg[1]; + if ((capabilities & ECP_DIR_CAP_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) { + LOG(LOG_ERR, "dir_handle_ann: not a directory server\n"); + return ECP_ERR; + } memset(&dir_item, 0, sizeof(ECPDirItem)); dir_item.node = conn->remote; + dir_item.region = region; dir_item.capabilities = capabilities; - now = ecp_tm_get_s(); - is_new = dir_process_item(&dir_item, now); - if (is_new) vlink_new_node(conn->sock, &dir_item); + if (dir_item.region >= MAX_REGION) { + LOG(LOG_ERR, "dir_handle_ann: bad region\n"); + return ECP_ERR; + } + + dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public); rv = dir_send_shadow(conn); if (rv < 0) return rv; @@ -161,348 +311,573 @@ ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) return rsize; } -ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ssize_t rv; - - rv = dir_send_online(conn); - if (rv < 0) return rv; - - return 0; -} - ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { - ecp_sts_t now; - uint16_t count; + uint8_t count; size_t rsize; - int i; + int i, ann_enable; - if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; + if (msg_size < 4 + sizeof(uint16_t)) return ECP_ERR_SIZE; - count = \ - ((uint16_t)msg[0] << 8) | \ - ((uint16_t)msg[1]); + count = msg[2]; + msg += 4 + sizeof(uint16_t); // frag_cnt, frag_tot, item_cnt, region, serial - rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); + rsize = 4 + sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM; if (msg_size < rsize) return ECP_ERR_SIZE; - msg += sizeof(uint16_t); + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (!ann_enable) return rsize; - now = ecp_tm_get_s(); for (i=0; isock, &dir_item); + if (dir_item.region >= MAX_REGION) { + LOG(LOG_ERR, "dir_handle_shadow: bad region\n"); + return ECP_ERR; } + + dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public); } return rsize; } +ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + size_t rsize; + ssize_t rv; + + rsize = ECP_SIZE_ECDH_PUB; + if (msg_size < rsize) return ECP_ERR_SIZE; + + rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg); + if (rv < 0) return rv; + + return rsize; +} + ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { switch (mtype) { - case ECP_MTYPE_DIR_REQ: { - if (ecp_conn_is_outb(conn)) return ECP_ERR; - return dir_handle_req(conn, msg, msg_size); - } + case MTYPE_DIR_ANN: { + int is_dir; + + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; + if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; - case ECP_MTYPE_DIR_ANN: { - if (ecp_conn_is_outb(conn)) return ECP_ERR; - if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX; return dir_handle_ann(conn, msg, msg_size); } - case ECP_MTYPE_DIR_SHADOW: { + case MTYPE_DIR_SHADOW: { if (ecp_conn_is_inb(conn)) return ECP_ERR; + return dir_handle_shadow(conn, msg, msg_size); } + case MTYPE_DIR_ORIGIN_REQ: { + int is_dir; + + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; + if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; + + return dir_handle_origin_req(conn, msg, msg_size); + } + default: return ECP_ERR_MTYPE; } } -int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { - ssize_t rv; - - if (ecp_conn_is_inb(conn)) return ECP_OK; - - rv = dir_send_ann(conn); - if (rv < 0) return rv; - - return ECP_OK; -} +void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) { + DIRNode *node; + unsigned int vkey_req_dir; + int is_verified = 0; + int rv = ECP_OK; -void dir_online_switch(void) { - struct hashtable_itr itr; - DirNode *node; - unsigned char *msg; + pthread_rwlock_rdlock(&dir_online_rwlock); + vkey_req_dir = dir_vkey_req; + pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_rdlock(&dir_shadow_rwlock); - pthread_rwlock_wrlock(&dir_online_rwlock); - - msg = dir_online.msg; - dir_online.count = ecp_ht_count(dir_shadow); - - if (dir_online.count > 0) { - size_t _rv; - int rv; - ecp_ht_itr_create(&itr, dir_shadow); - do { - node = ecp_ht_itr_value(&itr); + node = ht_search_item(dir_shadow, dir_item); + if (node == NULL) { + pthread_rwlock_unlock(&dir_shadow_rwlock); - _rv = ecp_dir_item_serialize(&node->dir_item, msg); - msg += _rv; + rv = dir_create_node(dir_item, sock, &node); + if (!rv) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; + if (!rv) rv = ht_insert_node(dir_shadow, node); + if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock); + } + if (rv) { + if (node) { + dir_destroy_node(node); + node = NULL; + } + LOG(LOG_ERR, "dir_process_item: err:%d\n", rv); + return; + } - rv = ecp_ht_itr_advance(&itr); - } while (rv == ECP_OK); + LOG(LOG_DEBUG, "dir_process_item: new node\n"); } - pthread_rwlock_unlock(&dir_online_rwlock); + pthread_mutex_lock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); -} -int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { - DirNode *node; - int is_new = 0; - int rv = ECP_OK; + if (node->zombie) goto process_item_fin; - pthread_rwlock_rdlock(&dir_shadow_rwlock); - - node = ht_search_item(dir_shadow, dir_item); - if (node) { - pthread_mutex_lock(&node->mutex); - if (!node->zombie) node->access_ts = access_ts; - pthread_mutex_unlock(&node->mutex); + if (!node->verified) { + int key_ex = 0; + int i; - pthread_rwlock_unlock(&dir_shadow_rwlock); - } else { - pthread_rwlock_unlock(&dir_shadow_rwlock); + if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) { + node->ann_local = 1; + } - LOG(LOG_DEBUG, "dir new node\n"); + for (i=0; ivkey_cnt; i++) { + if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) { + key_ex = 1; + break; + } + } + if (!key_ex) { + unsigned int vkey_req; - node = dir_create_node(dir_item, access_ts); - if (node == NULL) rv = ECP_ERR_ALLOC; + if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { + vkey_req = vkey_req_dir; + } else { + vkey_req = MIN_VKEY_REQ; + } - if (!rv) { - pthread_rwlock_wrlock(&dir_shadow_rwlock); - if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; - if (!rv) { - if (ht_search_node(dir_shadow, node) == NULL) { - rv = ht_insert_node(dir_shadow, node); - } else { - rv = ECP_ERR_DUP; + if (node->vkey_cnt < vkey_req) { + /* vkey list is used to track origin of particular server */ + memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt])); + node->vkey_cnt++; + if (node->vkey_cnt == vkey_req) { + node->verified = 1; + node->is_new = 0; + is_verified = 1; } } - pthread_rwlock_unlock(&dir_shadow_rwlock); - } - if (!rv) { - is_new = 1; - /* always switch */ - dir_online_switch(); - } else { - if (node) dir_destroy_node(node); - LOG(LOG_ERR, "dir create node err:%d\n", rv); + /* protocol does not support server properties change */ + if (!ecp_dir_item_eq(dir_item, &node->dir_item)) { + char buffer[ECP_SIZE_ECDH_KEY_BUF]; + + ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public); + LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer); + } } } - return is_new; +process_item_fin: + pthread_mutex_unlock(&node->mutex); + + if (is_verified) { + LOG(LOG_DEBUG, "dir_process_item: node verified\n"); + + rv = acl_add_key(&node->dir_item); + if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv); + + vlink_new_node(sock, &node->dir_item); + } } -DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { - DirNode *node; +int dir_open_conn(DIRNode *node, ECPSocket *sock) { int rv; - node = malloc(sizeof(DirNode)); - if (node == NULL) return NULL; + node->conn = malloc(sizeof(ECPConnection)); + if (node->conn == NULL) return ECP_ERR_ALLOC; + + ecp_conn_init(node->conn, sock, CTYPE_DIR); + ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX); + + rv = ecp_conn_open(node->conn, &node->dir_item.node); + if (rv) node->conn = NULL; - memset(node, 0, sizeof(DirNode)); + return rv; +} - rv = pthread_mutex_init(&node->mutex, NULL); +int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) { + DIRNode *_node; + int rv; + + *node = NULL; + _node = malloc(sizeof(DIRNode)); + if (_node == NULL) return ECP_ERR_ALLOC; + + memset(_node, 0, sizeof(DIRNode)); + rv = pthread_mutex_init(&_node->mutex, NULL); if (rv) { - free(node); - return NULL; + free(_node); + return ECP_ERR; } - node->dir_item = *dir_item; - node->access_ts = access_ts; + _node->dir_item = *dir_item; + _node->is_new = 1; + + /* open connection to other directory servers */ + if ((dir_item->capabilities & ECP_DIR_CAP_DIR) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + rv = dir_open_conn(_node, sock); + if (rv) { + pthread_mutex_destroy(&_node->mutex); + free(_node); + return rv; + } + } - return node; + *node = _node; + return ECP_OK; } -void dir_destroy_node(DirNode *node) { +void dir_destroy_node(DIRNode *node) { + if (node->conn) ecp_conn_close(node->conn); pthread_mutex_destroy(&node->mutex); free(node); } -int dir_open_conn(ECPSocket *sock, ECPNode *node) { - ECPConnection *conn; - int rv; +static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) { + if (conn->type == CTYPE_DIR) return 1; + return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO); +} - conn = malloc(sizeof(ECPConnection)); - if (conn == NULL) return ECP_ERR_ALLOC; +static void remove_nodes(DIRNode *remove_node[], int remove_cnt) { + int i; - ecp_dir_conn_init(conn, sock); - ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); - rv = ecp_conn_try_open(conn, node); - return rv; + pthread_rwlock_wrlock(&dir_shadow_rwlock); + for (i=0; idir_item); + dir_destroy_node(remove_node[i]); + } } -DirNode *dir_search_conn(ECPConnection *conn) { - DirNode *node; - DirNode *ret = NULL; +void dir_online_switch(ECPSocket *sock, int inc_serial) { + struct hashtable_itr itr; + DIRNode *node; + uint8_t count[MAX_REGION]; + uint8_t msg_count[MAX_REGION]; + unsigned char *msg[MAX_REGION]; + ecp_ecdh_public_t *node_next; + DIRNode *remove_node[MAX_NODE_REMOVE]; + unsigned int dir_cnt, node_cnt; + int i, remove_cnt; + int rv; pthread_rwlock_rdlock(&dir_shadow_rwlock); + pthread_rwlock_wrlock(&dir_online_rwlock); - node = ht_search_conn(dir_shadow, conn); - if (node) { - pthread_mutex_lock(&node->mutex); - if (!node->zombie) { - node->refcount++; - ret = node; - } else { - ret = NULL; + if (dir_online) { + for (i=0; ilist[i].msg_count = 0; + count[i] = 0; + msg_count[i] = 0; + msg[i] = dir_online->list[i].msg[0].buffer; + } + } + + dir_cnt = 0; + node_cnt = 0; + remove_cnt = 0; + + if (ecp_ht_count(dir_shadow) > 0) { + uint8_t region; + int verified; + + ecp_ht_itr_create(&itr, dir_shadow); + + node_next = NULL; + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + region = node->dir_item.region; + verified = node->verified; + + if (verified) { + if (dir_online) { + size_t rv_ser; + + rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[0]); + msg[0] += rv_ser; + count[0]++; + if (region) { + rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[region]); + msg[region] += rv_ser; + count[region]++; + } + } + if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { + dir_cnt++; + } + node_cnt++; + } else { + node->zombie = 1; + if (node_next == NULL) { + if (remove_cnt < MAX_NODE_REMOVE) { + remove_node[remove_cnt] = node; + remove_cnt++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + } + node->verified = 0; + node->ann_local = 0; + memset(&node->vkey, 0, sizeof(node->vkey)); + node->vkey_cnt = 0; + pthread_mutex_unlock(&node->mutex); + + /* let's reconnect all directory servers + node->conn is currently immutable since announce is disabled well before online switch */ + if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT); + + if (dir_online && verified) { + if (count[0] == MAX_DIR_ITEM_IN_MSG) { + dir_online->list[0].msg[msg_count[0]].count = count[0]; + msg_count[0]++; + msg[0] = dir_online->list[0].msg[msg_count[0]].buffer; + count[0] = 0; + } + if (region && count[region] == MAX_DIR_ITEM_IN_MSG) { + dir_online->list[region].msg[msg_count[region]].count = count[region]; + msg_count[region]++; + msg[region] = dir_online->list[region].msg[msg_count[region]].buffer; + count[region] = 0; + } + } + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + + if (dir_online) { + for (i=0; ilist[i].msg[msg_count[i]].count = count[i]; + dir_online->list[i].msg_count = msg_count[i] + 1; + } else { + dir_online->list[i].msg_count = msg_count[i]; + } + } } - pthread_mutex_unlock(&node->mutex); } + dir_vkey_req = dir_cnt / 2 + 1; + if (dir_vkey_req > MAX_VKEY) { + LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY); + dir_vkey_req = MAX_VKEY; + } + + if (dir_online && inc_serial) dir_online->serial++; + + pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_unlock(&dir_shadow_rwlock); - return ret; + rv = acl_reset_ht(); + if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv); + + ecp_sock_expire(sock, online_switch_expired); + + if (remove_cnt) { + remove_nodes(remove_node, remove_cnt); + while (node_next) { + remove_cnt = 0; + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + if (node->zombie) { + if (remove_cnt < MAX_NODE_REMOVE) { + remove_node[remove_cnt] = node; + remove_cnt++; + } else { + node_next = ecp_ht_itr_key(&itr); + break; + } + } + pthread_mutex_unlock(&node->mutex); + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + + /* no need to copy node_next key, since this is the only thread that can destroy node */ + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (remove_cnt) remove_nodes(remove_node, remove_cnt); + } + } + + if (srv_config->capabilities & ECP_DIR_CAP_DIR) acl_load_ht(); + + LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0); +} + +void dir_announce_allow(void) { + pthread_rwlock_wrlock(&dir_timer_rwlock); + dir_process_enable = PROC_ALLOW_ALL; + pthread_rwlock_unlock(&dir_timer_rwlock); +} + +void dir_announce_block(void) { + pthread_rwlock_wrlock(&dir_timer_rwlock); + dir_process_enable = PROC_BLOCK_ANN; + pthread_rwlock_unlock(&dir_timer_rwlock); + + LOG(LOG_DEBUG, "dir_announce_block\n"); } -void dir_announce(ECPSocket *sock) { - static DirNode *expire_node[MAX_EXPIRE_CNT]; - static int expire_node_z[MAX_EXPIRE_CNT]; - static int expire_cnt; +void dit_init_serial(uint16_t serial) { + if (dir_online) { + pthread_rwlock_wrlock(&dir_online_rwlock); + dir_online->serial = serial; + pthread_rwlock_unlock(&dir_online_rwlock); + } +} +void dir_announce(ECPSocket *sock, int ann_period) { struct hashtable_itr itr; - ecp_sts_t now; - DirNode *node; - DirNode *node_next; - DirNode *announce_node[MAX_ANNOUNCE_CNT]; + DIRNode *node; + ecp_ecdh_public_t *node_next; + DIRNode *announce_node[MAX_NODE_ANNOUNCE]; + unsigned int ht_count; int announce_cnt; - int refcount; int i; int rv; - now = ecp_tm_get_s(); node_next = NULL; do { + uint32_t delay; + uint32_t rnd; + announce_cnt = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); - if (ecp_ht_count(dir_shadow) > 0) { + ht_count = ecp_ht_count(dir_shadow); + if (ht_count > 0) { ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { - LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); + LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); - rv = ecp_ht_itr_advance(&itr); - pthread_mutex_lock(&node->mutex); - if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { - node->zombie = 1; - pthread_mutex_unlock(&node->mutex); - - if (expire_cnt < MAX_EXPIRE_CNT) { - expire_node[expire_cnt] = node; - expire_node_z[expire_cnt] = 1; - expire_cnt++; - } - } else { - pthread_mutex_unlock(&node->mutex); - - if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(&node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + if (announce_cnt < MAX_NODE_ANNOUNCE) { announce_node[announce_cnt] = node; announce_cnt++; - - if (announce_cnt == MAX_ANNOUNCE_CNT) { - if (!rv) { - node_next = ecp_ht_itr_key(&itr); - } else { - node_next = NULL; - } - break; - } + } else { + node_next = ecp_ht_itr_key(&itr); + break; } } + + rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } + /* no need to copy node_next key, since announce is disabled during online switch node removal */ pthread_rwlock_unlock(&dir_shadow_rwlock); - if (expire_cnt) { - pthread_rwlock_wrlock(&dir_shadow_rwlock); - for (i=0; idir_item.node); - if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); - } - - i = expire_cnt - 1; - while (i >= 0) { - node = expire_node[i]; - - pthread_mutex_lock(&node->mutex); - refcount = node->refcount; - pthread_mutex_unlock(&node->mutex); + ECPConnection *conn; + int is_reg, is_open, is_z; + ecp_sts_t now; + + conn = announce_node[i]->conn; + if (conn == NULL) { + rv = dir_open_conn(announce_node[i], sock); + if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); + continue; + } - if (refcount == 0) { - int j = i; + is_reg = is_open = is_z = 0; + now = ecp_tm_get_s(); + + ecp_conn_lock(conn); + is_reg = _ecp_conn_is_reg(conn); + if (is_reg) is_open = _ecp_conn_is_open(conn); + if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT); + ecp_conn_unlock(conn); + + if (!is_reg || is_z) { + LOG(LOG_DEBUG, "dir_announce: reconnect\n"); + ecp_conn_close(conn); + rv = dir_open_conn(announce_node[i], sock); + if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); + } else if (is_open) { + ssize_t _rv; + + _rv = dir_send_ann(conn); + if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv); + } - expire_cnt--; - while (j < expire_cnt) { - expire_node[j] = expire_node[j+1]; - j++; - } - expire_node[j] = NULL; - dir_destroy_node(node); + if (delay) { + /* randomize over delay / 2 and delay + delay / 2 */ + rnd = arc4random_uniform(delay + 1); + usleep(delay / 2 + rnd); } - i--; } + } while (node_next); + + LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count); } static void *_dir_announce(void *arg) { ECPSocket *sock = arg; + uint32_t rnd; + time_t tv_sec; + int delay; + int ann_enable; while (1) { - LOG(LOG_DEBUG, "dir announce...\n"); - dir_announce(sock); - sleep(10); + tv_sec = time(NULL); + + pthread_rwlock_rdlock(&dir_timer_rwlock); + ann_enable = (dir_process_enable > PROC_BLOCK_ANN); + pthread_rwlock_unlock(&dir_timer_rwlock); + + if (ann_enable) { + dir_announce(sock, ANN_PERIOD); + } else { + sleep(ANN_BLOCK_TIME + ANN_PERIOD); + } + + /* resolution is 1s */ + delay = ANN_PERIOD - (time(NULL) - tv_sec); + if (delay > 0) { + rnd = arc4random_uniform(delay + 1); + sleep(delay / 2 + rnd); + } } return NULL; @@ -516,18 +891,72 @@ int dir_start_announce(ECPSocket *sock) { return ECP_OK; } -int dir_init(ECPContext *ctx) { - ECPConnHandler *handler; +void dir_init_switch(ECPSocket *sock, int init_ann) { + int i; + + dir_announce_allow(); + for (i=0; ictx; + ECPConnHandler *handler, *c_handler; + int is_dir; + int rv; + + /* dir_process_enable and dir_online.serial will be set from timer */ + dir_vkey_req = 1; + srv_config = srv_get_config(); + is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); if (rv) { @@ -540,12 +969,64 @@ int dir_init(ECPContext *ctx) { return ECP_ERR; } + rv = pthread_rwlock_init(&dir_timer_rwlock, NULL); + if (rv) { + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR; + } + dir_shadow = ecp_ht_create_keys(); if (dir_shadow == NULL) { + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR_ALLOC; + } + + handler = malloc(sizeof(ECPConnHandler)); + if (handler == NULL) { + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR_ALLOC; } + ecp_conn_handler_init(handler, dir_handle_msg, NULL, NULL, NULL); + + if (is_dir) { + c_handler = malloc(sizeof(ECPConnHandler)); + dir_online = malloc(sizeof(DIROnline)); + if ((c_handler == NULL) || (dir_online == NULL)) { + free(handler); + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + if (dir_online) free(dir_online); + if (c_handler) free(c_handler); + return ECP_ERR_ALLOC; + } + ecp_conn_handler_init(c_handler, dir_handle_client_msg, NULL, NULL, NULL); + memset(dir_online, 0, sizeof(DIROnline)); + } + + rv = timer_init(sock); + if (rv) { + free(handler); + if (is_dir) { + free(dir_online); + free(c_handler); + } + ecp_ht_destroy(dir_shadow); + pthread_rwlock_destroy(&dir_timer_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + pthread_rwlock_destroy(&dir_shadow_rwlock); + return rv; + } + + ecp_ctx_set_handler(ctx, CTYPE_DIR, handler); + if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler); return ECP_OK; } diff --git a/ecp/server/dir.h b/ecp/server/dir.h index cbc0c87..24af3bc 100644 --- a/ecp/server/dir.h +++ b/ecp/server/dir.h @@ -1,35 +1,80 @@ -#define MAX_DIR_ITEM 30 -#define MAX_EXPIRE_CNT 100 -#define MAX_ANNOUNCE_CNT 100 +#include -#define NODE_EXP_TIME 86400 +#define MAX_DIR_ITEM_IN_MSG ((ECP_MAX_PLD - (4 + sizeof(uint16_t))) / ECP_SIZE_DIR_ITEM) -typedef struct DirNode { +#define MAX_DIR_MSG 10 +#define MAX_DIR_ITEM (MAX_DIR_MSG * MAX_DIR_ITEM_IN_MSG) +#define MAX_DIR_ITEM_DIR 100 +#define MAX_REGION 10 + +#define MAX_VKEY ((MAX_DIR_ITEM_DIR / 2) + 1) +#define MIN_VKEY_REQ 2 /* minimum number of vkeys required for non directory server */ + +#define MAX_NODE_ANNOUNCE 100 +#define MAX_NODE_REMOVE 100 + +#define CTYPE_DIR 0x00 + +#define MTYPE_DIR_ANN 0x00 +#define MTYPE_DIR_SHADOW 0x01 +#define MTYPE_DIR_ORIGIN_REQ 0x02 +#define MTYPE_DIR_ORIGIN_REP 0x03 + +#define PROC_BLOCK_ALL 0 +#define PROC_BLOCK_ANN 1 +#define PROC_ALLOW_ALL 2 + +#define ANN_PERIOD 600 /* announce priod (s); can't exceed 1h */ +#define CONN_EXPIRE_TO 60 + +#define DIR_UFLAG_RECONNECT 0x80 + +typedef struct DIRNode { ECPDirItem dir_item; - int refcount; int zombie; - ecp_sts_t access_ts; + int verified; + int ann_local; + int is_new; + ecp_ecdh_public_t vkey[MAX_VKEY]; + unsigned short vkey_cnt; + ECPConnection *conn; pthread_mutex_t mutex; -} DirNode; +} DIRNode; + +typedef struct DIRList { + struct { + uint8_t count; + unsigned char buffer[MAX_DIR_ITEM_IN_MSG * ECP_SIZE_DIR_ITEM]; + } msg[MAX_DIR_MSG]; + uint8_t msg_count; +} DIRList; -typedef struct DirList { - unsigned char msg[ECP_MAX_PLD]; - uint16_t count; -} DirList; +typedef struct DIROnline { + DIRList list[MAX_REGION]; + uint16_t serial; +} DIROnline; + +ssize_t dir_send_online(ECPConnection *conn, uint8_t region); +ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b); ssize_t dir_send_ann(ECPConnection *conn); ssize_t dir_send_shadow(ECPConnection *conn); -ssize_t dir_send_online(ECPConnection *conn); ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size); -ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size); ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size); ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b); -int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs); -int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts); -DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts); -void dir_destroy_node(DirNode *node); -int dir_open_conn(ECPSocket *sock, ECPNode *node); -DirNode *dir_search_conn(ECPConnection *conn); -void dir_announce(ECPSocket *sock); + +void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public); +int dir_open_conn(DIRNode *node, ECPSocket *sock); +int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node); +void dir_destroy_node(DIRNode *node); + +void dir_online_switch(ECPSocket *sock, int inc_serial); +void dir_remove_nodes(DIRNode *remove_node[], int remove_cnt); +void dir_announce_allow(void); +void dir_announce_block(void); +void dit_init_serial(uint16_t serial); +void dir_announce(ECPSocket *sock, int ann_period); int dir_start_announce(ECPSocket *sock); -int dir_init(ECPContext *ctx); +void dir_init_switch(ECPSocket *sock, int init_ann); +int dir_init_ann(ECPSocket *sock, ECPNode *node); +int dir_init(ECPSocket *sock); diff --git a/ecp/server/ht.c b/ecp/server/ht.c index 0daf8d5..dc67d30 100644 --- a/ecp/server/ht.c +++ b/ecp/server/ht.c @@ -1,34 +1,25 @@ #include -#include #include #include "dir.h" #include "ht.h" -int ht_insert_node(ecp_ht_table_t *table, DirNode *node) { - return ecp_ht_insert(table, &node->dir_item.node.key_perma.public, node); +int ht_insert_node(ecp_ht_table_t *table, DIRNode *node) { + return ecp_ht_insert_uniq(table, &node->dir_item.node.key_perma.public, node); } -void ht_remove_node(ecp_ht_table_t *table, DirNode *node) { +void ht_remove_node(ecp_ht_table_t *table, DIRNode *node) { ecp_ht_remove(table, &node->dir_item.node.key_perma.public); } -void *ht_search_node(ecp_ht_table_t *table, DirNode *node) { - return ecp_ht_search(table, &node->dir_item.node.key_perma.public); +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) { + return ecp_ht_search(table, &dir_item->node.key_perma.public); } int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn) { - return ecp_ht_insert(table, &conn->remote.key_perma.public, conn); + return ecp_ht_insert_uniq(table, &conn->remote.key_perma.public, conn); } void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn) { ecp_ht_remove(table, &conn->remote.key_perma.public); } - -void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn) { - return ecp_ht_search(table, &conn->remote.key_perma.public); -} - -void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) { - return ecp_ht_search(table, &dir_item->node.key_perma.public); -} diff --git a/ecp/server/ht.h b/ecp/server/ht.h index cba4691..a9cb02f 100644 --- a/ecp/server/ht.h +++ b/ecp/server/ht.h @@ -1,7 +1,5 @@ -int ht_insert_node(ecp_ht_table_t *table, DirNode *node); -void ht_remove_node(ecp_ht_table_t *table, DirNode *node); -void *ht_search_node(ecp_ht_table_t *table, DirNode *node); +int ht_insert_node(ecp_ht_table_t *table, DIRNode *node); +void ht_remove_node(ecp_ht_table_t *table, DIRNode *node); +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item); int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn); -void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn); -void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn); -void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item); \ No newline at end of file +void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn); \ No newline at end of file diff --git a/ecp/server/server.c b/ecp/server/server.c index d34fb19..890d468 100644 --- a/ecp/server/server.c +++ b/ecp/server/server.c @@ -1,41 +1,81 @@ #include -#include +#include +#include +#include #include +#include +#include #include #include -#include #include +#include + #include "dir.h" #include "vlink.h" #include "ht.h" +#include "acl.h" #include "server.h" static SRVConfig srv_config; +static const char *srv_llevel_str[] = { + "DEBUG", + "INFO", + "ERROR" +}; SRVConfig *srv_get_config(void) { return &srv_config; } static void usage(char *arg) { - fprintf(stderr, "Usage: %s [ ]\n", arg); + fprintf(stderr, "Usage: %s [ ] [ ]\n", arg); + exit(1); +} + +static void fail(char *format, ...) { + va_list args; + + va_start(args, format); + vfprintf(stderr, format, args); + va_end(args); exit(1); } static void handle_err(ECPConnection *conn, unsigned char mtype, int err) { - if (conn->type == ECP_CTYPE_VLINK) vlink_handle_err(conn, mtype, err); - LOG(LOG_ERR, "handle error ctype:%d mtype:%d err:%d\n", conn->type, mtype, err); + LOG(LOG_ERR, "handle_err: ctype:%d mtype:%d err:%d\n", conn->type, mtype, err); } -static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) { +static ECPConnection *conn_new(ECPSocket *sock, ECPConnection *parent, unsigned char ctype) { ECPConnection *conn = NULL; - switch (type) { + switch (ctype) { + case CTYPE_DIR: { + if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL; + + conn = malloc(sizeof(ECPConnection)); + if (conn) { + ecp_conn_init(conn, sock, ctype); + ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX); + } + break; + } + + case ECP_CTYPE_DIR: { + if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL; + + conn = malloc(sizeof(ECPConnection)); + if (conn) ecp_conn_init(conn, sock, ctype); + break; + } + case ECP_CTYPE_VCONN: { ECPVConnInb *_conn; + if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL; + _conn = malloc(sizeof(ECPVConnInb)); if (_conn) { ecp_vconn_init_inb(_conn, sock); @@ -45,29 +85,61 @@ static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) { } case ECP_CTYPE_VLINK: { - conn = malloc(sizeof(ECPConnection)); - if (conn) ecp_vlink_init(conn, sock); - break; - } + if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL; - default: { conn = malloc(sizeof(ECPConnection)); - if (conn) ecp_conn_init(conn, sock, type); + if (conn) ecp_vlink_init(conn, sock); break; } } + if (conn) ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC); return conn; } +static int key_check(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public) { + switch (ctype) { + case CTYPE_DIR: { + if (public == NULL) return 0; + return acl_inlist(public); + } + + case ECP_CTYPE_VLINK: { + if (public == NULL) return 0; + if (parent == NULL) return acl_inlist(public); + return 1; + } + + default: + return 1; + } +} + static void conn_free(ECPConnection *conn) { free(conn); } +void log_print(int level, char *format, ...) { + va_list args; + time_t t; + char buf[26]; + + if (level >= (sizeof(srv_llevel_str) / sizeof(char *))) return; + + t = time(NULL); + ctime_r(&t, buf); + buf[24] = '\0'; + fprintf(stderr, "%s [%s]: ", buf, srv_llevel_str[level]); + + va_start(args, format); + vfprintf(stderr, format, args); + va_end(args); +} + int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) { int rv; - rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, NULL); + rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, key_check); if (rv) return rv; rv = ecp_vconn_handler_init(ctx, vconn_handler); @@ -86,118 +158,120 @@ int main(int argc, char *argv[]) { ECPConnHandler vconn_handler; ECPConnHandler vlink_handler; char *endptr; - int fd; + char *debug_init_str; + int _argc, fd; int rv; - if ((argc < 4) || (argc > 6)) usage(argv[0]); + memset(&srv_config, 0, sizeof(srv_config)); - srv_config.capabilities = (uint16_t)strtol(argv[1], &endptr, 16); - if (endptr[0] != '\0') { - fprintf(stderr, "Bad capabilities\n"); - exit(1); - } + if (argc < 3) usage(argv[0]); - if ((fd = open(argv[2], O_RDONLY)) < 0) { - fprintf(stderr, "Unable to open %s\n", argv[2]); - exit(1); - } - if (read(fd, &srv_config.key_perma.public, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) { - close(fd); - fprintf(stderr, "Unable to read public key from %s\n", argv[2]); - exit(1); - } - if (read(fd, &srv_config.key_perma.private, sizeof(ecp_ecdh_private_t)) != sizeof(ecp_ecdh_private_t)) { - close(fd); - fprintf(stderr, "Unable to read private key from %s\n", argv[2]); - exit(1); + _argc = 1; + srv_config.region = (uint8_t)strtol(argv[_argc], &endptr, 16); + if (endptr[0] != '\0') fail("Bad region\n"); + if (srv_config.region >= MAX_REGION) fail("Bad region\n"); + _argc++; + + srv_config.capabilities = (uint8_t)strtol(argv[_argc], &endptr, 16); + if (endptr[0] != '\0') fail("Bad capabilities\n"); + _argc++; + + if (srv_config.capabilities & ECP_DIR_CAP_DIR) { + if (argc < 7) usage(argv[0]); + } else { + if (argc < 5) usage(argv[0]); } + + fd = open(argv[_argc], O_RDONLY); + if (fd < 0) fail("Unable to open %s\n", argv[_argc]); + + rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private); close(fd); + if (rv) fail("Unable to read key from %s\n", argv[_argc]); srv_config.key_perma.valid = 1; + _argc++; rv = ecp_init(&ctx, &vconn_handler, &vlink_handler); - if (rv) { - fprintf(stderr, "ecp_init RV:%d\n", rv); - exit(1); - } + if (rv) fail("ecp_init err:%d\n", rv); + rv = ecp_sock_create(&sock, &ctx, &srv_config.key_perma); - if (rv) { - fprintf(stderr, "ecp_sock_create RV:%d\n", rv); - exit(1); - } + if (rv) fail("ecp_sock_create err:%d\n", rv); - rv = ecp_addr_init(&addr, argv[3]); - if (rv) { - fprintf(stderr, "ecp_addr_init RV:%d\n", rv); - exit(1); - } + rv = ecp_vconn_sock_create(&sock); + if (rv) fail("ecp_vconn_sock_create err:%d\n", rv); + + rv = ecp_addr_init(&addr, argv[_argc]); + if (rv) fail("ecp_addr_init err:%d\n", rv); + _argc++; rv = ecp_sock_open(&sock, &addr); - if (rv) { - fprintf(stderr, "ecp_sock_open RV:%d\n", rv); - exit(1); - } + if (rv) fail("ecp_sock_open err:%d\n", rv); + srv_config.my_addr = addr; - rv = dir_init(&ctx); - if (rv) { - fprintf(stderr, "dir_init RV:%d\n", rv); - exit(1); - } + rv = acl_init(); + if (rv) fail("acl_init err:%d\n", rv); - rv = vlink_init(&ctx); - if (rv) { - fprintf(stderr, "vlink_init RV:%d\n", rv); - exit(1); - } + if (srv_config.capabilities & ECP_DIR_CAP_DIR) { + srv_config.acl_fn_dir = strdup(argv[_argc]); + _argc++; + srv_config.acl_fn = strdup(argv[_argc]); + _argc++; - rv = dir_start_announce(&sock); - if (rv) { - fprintf(stderr, "dir_start_announce RV:%d\n", rv); - exit(1); + rv = acl_load(); + if (rv) fail("acl_load err:%d\n", rv); } - rv = vlink_start_open(&sock); - if (rv) { - fprintf(stderr, "vlink_start_open RV:%d\n", rv); - exit(1); - } + rv = dir_init(&sock); + if (rv) fail("dir_init err:%d\n", rv); - rv = vlink_start_keyx(); - if (rv) { - fprintf(stderr, "vlink_start_keyx RV:%d\n", rv); - exit(1); - } + rv = vlink_init(&sock); + if (rv) fail("vlink_init err:%d\n", rv); - if (argc == 6) { + rv = ecp_start_receiver(&sock); + if (rv) fail("ecp_start_receiver err:%d\n", rv); + + if (argc == _argc + 2) { ECPNode node; ecp_ecdh_public_t node_pub; ecp_tr_addr_t node_addr; - if ((fd = open(argv[4], O_RDONLY)) < 0) { - fprintf(stderr, "Unable to open %s\n", argv[4]); - exit(1); - } - if (read(fd, &node_pub, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) { - close(fd); - fprintf(stderr, "Unable to read public key from %s\n", argv[4]); - exit(1); - } - close(fd); + fd = open(argv[_argc], O_RDONLY); + if (fd < 0) fail("Unable to open %s\n", argv[_argc]); - int rv; + rv = ecp_util_read_key(fd, &node_pub, NULL); + close(fd); + if (rv) fail("Unable to read public key from %s\n", argv[_argc]); + _argc++; ecp_node_init(&node, &node_pub, NULL); - rv = ecp_node_set_addr(&node, argv[5]); - if (rv) { - fprintf(stderr, "ecp_node_set_addr RV:%d\n", rv); - exit(1); - } - rv = dir_open_conn(&sock, &node); - if (rv) { - fprintf(stderr, "dir_open_conn RV:%d\n", rv); - exit(1); + rv = ecp_node_set_addr(&node, argv[_argc]); + if (rv) fail("ecp_node_set_addr err:%d\n", rv); + _argc++; + + rv = dir_init_ann(&sock, &node); + if (rv) fail("dir_init_ann err:%d\n", rv); + } + + if (_argc != argc) usage(argv[0]); + + debug_init_str = getenv("ECP_DBG_INIT"); + if (debug_init_str) { + int init_ann; + + init_ann = (int)strtol(debug_init_str, &endptr, 10); + if (endptr[0] == '\0') { + LOG(LOG_DEBUG, "init switch start - number of announces:%d\n", init_ann); + dir_init_switch(&sock, init_ann); + LOG(LOG_DEBUG, "init switch done\n"); } } - ecp_receiver(&sock); -} \ No newline at end of file + rv = dir_start_announce(&sock); + if (rv) fail("dir_start_announce err:%d\n", rv); + + rv = vlink_start_keyx(&sock); + if (rv) fail("vlink_start_keyx err:%d\n", rv); + + while(1) pause(); +} diff --git a/ecp/server/server.h b/ecp/server/server.h index ff8e444..21f2129 100644 --- a/ecp/server/server.h +++ b/ecp/server/server.h @@ -1,15 +1,20 @@ #include -#define LOG_DEBUG 1 -#define LOG_INFO 2 -#define LOG_ERR 3 +#define LOG_DEBUG 0 +#define LOG_INFO 1 +#define LOG_ERR 2 #define LOG_LEVEL LOG_DEBUG -#define LOG(l, ...) (l >= LOG_LEVEL ? fprintf(stderr, __VA_ARGS__) : 0 ) +#define LOG(l, ...) log_print(l, __VA_ARGS__); typedef struct SRVConfig { ECPDHKey key_perma; - uint16_t capabilities; + char *acl_fn; + char *acl_fn_dir; + ecp_tr_addr_t my_addr; + uint8_t region; + uint8_t capabilities; } SRVConfig; SRVConfig *srv_get_config(void); +void log_print(int level, char *format, ...); diff --git a/ecp/server/timer.c b/ecp/server/timer.c new file mode 100644 index 0000000..ab30633 --- /dev/null +++ b/ecp/server/timer.c @@ -0,0 +1,98 @@ +#include +#include +#include + +#include + +#include "dir.h" + +#include "server.h" +#include "timer.h" + +static struct timespec timer_ts_mono; +static timer_t timer_ann_block_id; +static timer_t timer_online_switch_id; + +int timer_set_next(time_t tv_now) { + struct itimerspec its = { 0 }; + time_t tv_midnight; + int rv = 0; + + /* next midnight */ + tv_midnight = (tv_now / ONLINE_SWITCH_PERIOD) * ONLINE_SWITCH_PERIOD + ONLINE_SWITCH_PERIOD; + + if (tv_now < (tv_midnight - ANN_BLOCK_TIME)) { + dir_announce_allow(); + + its.it_value.tv_sec = tv_midnight - ANN_BLOCK_TIME; + rv = timer_settime(timer_ann_block_id, TIMER_ABSTIME, &its, NULL); + if (rv) return ECP_ERR; + } else { + dir_announce_block(); + } + + its.it_value.tv_sec = tv_midnight; + rv = timer_settime(timer_online_switch_id, TIMER_ABSTIME, &its, NULL); + if (rv) return ECP_ERR; + + return ECP_OK; +} + +void timer_ann_block(union sigval timer_data) { + dir_announce_block(); +} + +void timer_online_switch(union sigval timer_data) { + ECPSocket *sock = timer_data.sival_ptr; + struct timespec ts_prev; + int rv; + + ts_prev = timer_ts_mono; + clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono); + + /* exit if someone is messing with realtime clock */ + if ((timer_ts_mono.tv_sec - ts_prev.tv_sec) < (ONLINE_SWITCH_PERIOD / 2)) goto online_switch_fin; + + dir_online_switch(sock, 1); + +online_switch_fin: + rv = timer_set_next(time(NULL)); + if (rv) LOG(LOG_ERR, "timer_online_switch: set next timer err:%d\n", rv); +} + +int timer_init(ECPSocket *sock) { + struct sigevent timer_ann_block_evt = { 0 }; + struct sigevent timer_online_switch_evt = { 0 }; + time_t tv_now; + int rv = 0; + + timer_ann_block_evt.sigev_notify = SIGEV_THREAD; + timer_ann_block_evt.sigev_notify_function = timer_ann_block; + timer_ann_block_evt.sigev_value.sival_ptr = NULL; + rv = timer_create(CLOCK_REALTIME, &timer_ann_block_evt, &timer_ann_block_id); + if (rv) return ECP_ERR; + + timer_online_switch_evt.sigev_notify = SIGEV_THREAD; + timer_online_switch_evt.sigev_notify_function = timer_online_switch; + timer_online_switch_evt.sigev_value.sival_ptr = sock; + rv = timer_create(CLOCK_REALTIME, &timer_online_switch_evt, &timer_online_switch_id); + if (rv) { + timer_delete(timer_ann_block_id); + return ECP_ERR; + } + + /* ensure that first dir_online_switch is executed */ + clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono); + timer_ts_mono.tv_sec -= ONLINE_SWITCH_PERIOD / 2; + + tv_now = time(NULL); + dit_init_serial(tv_now / ONLINE_SWITCH_PERIOD); + rv = timer_set_next(tv_now); + if (rv) { + timer_delete(timer_ann_block_id); + timer_delete(timer_online_switch_id); + return rv; + } + + return ECP_OK; +} diff --git a/ecp/server/timer.h b/ecp/server/timer.h new file mode 100644 index 0000000..526f0b8 --- /dev/null +++ b/ecp/server/timer.h @@ -0,0 +1,9 @@ +#include + +#define ANN_BLOCK_TIME 7200 /* time to block announce before online switch (s) */ +#define ONLINE_SWITCH_PERIOD 86400 /* online switch period (s) */ + +int timer_set_next(time_t tv_now); +void timer_ann_block(union sigval timer_data); +void timer_online_switch(union sigval timer_data); +int timer_init(ECPSocket *sock); diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c index e98dacc..d0c865b 100644 --- a/ecp/server/vlink.c +++ b/ecp/server/vlink.c @@ -3,318 +3,238 @@ #include #include -#include #include #include #include -#include "server.h" #include "dir.h" +#include "vlink.h" #include "ht.h" -#include "vlink.h" +#include "server.h" static ecp_ht_table_t *vlink_conn = NULL; -static ecp_ht_table_t *vlink_node = NULL; -static pthread_mutex_t vlink_conn_mutex; -static pthread_mutex_t vlink_node_mutex; -static pthread_t vlink_open_thd; +static pthread_rwlock_t vlink_conn_rwlock; static pthread_t vlink_keyx_thd; static SRVConfig *srv_config; -void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { - switch (mtype) { - case ECP_MTYPE_OPEN_REP: { - if (err == ECP_ERR_TIMEOUT) { - int rv; - - rv = vlink_insert_node(conn); - if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); - } - ecp_conn_close(conn); - break; - } - } -} - -int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { - int rv; - - rv = ecp_vlink_handle_open(conn, bufs); - if (rv) return rv; - - if (ecp_conn_is_inb(conn)) return ECP_OK; - - pthread_mutex_lock(&vlink_conn_mutex); - rv = ht_insert_conn(vlink_conn, conn); - pthread_mutex_unlock(&vlink_conn_mutex); - - if (rv) { - ecp_vlink_handle_close(conn); - return rv; - } - - return ECP_OK; -} - -void vlink_handle_close(ECPConnection *conn) { - int rv; - - ecp_vlink_handle_close(conn); +#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) - if (ecp_conn_is_inb(conn)) return; - - rv = vlink_insert_node(conn); - if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); -} - -int vlink_open_conn(ECPSocket *sock, ECPNode *node) { +int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) { ECPConnection *conn; int rv; + *_conn = NULL; conn = malloc(sizeof(ECPConnection)); if (conn == NULL) return ECP_ERR_ALLOC; ecp_vlink_init(conn, sock); rv = ecp_conn_open(conn, node); - return rv; + if (rv) return rv; + + *_conn = conn; + return ECP_OK; } void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { - if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { - int rv; + ECPConnection *conn; + int rv; - LOG(LOG_DEBUG, "vlink open connection\n"); - rv = vlink_open_conn(sock, &dir_item->node); - if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); - } -} + if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; -int vlink_insert_node(ECPConnection *conn) { - DirNode *node; - int rv = ECP_OK; + pthread_rwlock_rdlock(&vlink_conn_rwlock); + conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); + pthread_rwlock_unlock(&vlink_conn_rwlock); - node = dir_search_conn(conn); - if (node) { - pthread_mutex_lock(&vlink_node_mutex); - rv = ht_insert_node(vlink_node, node); - pthread_mutex_unlock(&vlink_node_mutex); + if (conn) return; - if (rv) { - pthread_mutex_lock(&node->mutex); - node->refcount--; - pthread_mutex_unlock(&node->mutex); - } + rv = vlink_open_conn(sock, &dir_item->node, &conn); + if (rv) { + LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv); + return; } - return rv; -} - -void vlink_open(ECPSocket *sock) { - struct hashtable_itr itr; - DirNode *node; - DirNode *node_next; - DirNode *open_node[MAX_OPEN_CNT]; - int open_cnt; - int i; - int rv; - - node_next = NULL; - do { - open_cnt = 0; - pthread_mutex_lock(&vlink_node_mutex); - - if (ecp_ht_count(vlink_node) > 0) { - ecp_ht_itr_create(&itr, vlink_node); - if (node_next) { - rv = ecp_ht_itr_search(&itr, node_next); - if (rv) { - LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); - break; - } - node_next = NULL; - } - do { - node = ecp_ht_itr_value(&itr); - rv = ecp_ht_itr_remove(&itr); - - open_node[open_cnt] = node; - open_cnt++; - if (open_cnt == MAX_OPEN_CNT) { - if (!rv) { - node_next = ecp_ht_itr_key(&itr); - } else { - node_next = NULL; - } - } - } while (rv == ECP_OK); - } + pthread_rwlock_wrlock(&vlink_conn_rwlock); + rv = ht_insert_conn(vlink_conn, conn); + pthread_rwlock_unlock(&vlink_conn_rwlock); - pthread_mutex_unlock(&vlink_node_mutex); + if (rv) { + LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv); + ecp_conn_close(conn); + return; + } - for (i=0; imutex); + LOG(LOG_DEBUG, "vlink_new_node: new connection\n"); +} - if (open_node[i]->zombie) { - open_node[i]->refcount--; +void vlink_del_node(ECPDirItem *dir_item) { + ECPConnection *conn; - pthread_mutex_unlock(&open_node[i]->mutex); - } else { - pthread_mutex_unlock(&open_node[i]->mutex); + if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; - LOG(LOG_DEBUG, "vlink open connection\n"); - rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); - if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); - } - } - } while (node_next); + pthread_rwlock_rdlock(&vlink_conn_rwlock); + conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); + if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT); + pthread_rwlock_unlock(&vlink_conn_rwlock); } -void vlink_keyx(void) { +void vlink_keyx(ECPSocket *sock, int keyx_period) { struct hashtable_itr itr; - ecp_sts_t now; ECPConnection *conn; - ECPConnection *keyx_next; - ECPConnection *keyx_conn[MAX_KEYX_CNT]; - int keyx_conn_z[MAX_KEYX_CNT]; + ecp_ecdh_public_t *conn_next; + ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE]; + unsigned int ht_count; int keyx_cnt; - int is_zombie; int i; int rv; - now = ecp_tm_get_s(); - keyx_next = NULL; + conn_next = NULL; do { + uint32_t delay; + uint32_t rnd; + keyx_cnt = 0; - pthread_mutex_lock(&vlink_conn_mutex); + pthread_rwlock_rdlock(&vlink_conn_rwlock); - if (ecp_ht_count(vlink_conn) > 0) { + ht_count = ecp_ht_count(vlink_conn); + if (ht_count > 0) { ecp_ht_itr_create(&itr, vlink_conn); - if (keyx_next) { - rv = ecp_ht_itr_search(&itr, keyx_next); + if (conn_next) { + rv = ecp_ht_itr_search(&itr, conn_next); if (rv) { - LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); + LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv); break; } - keyx_next = NULL; + conn_next = NULL; } do { conn = ecp_ht_itr_value(&itr); - is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); - if (is_zombie) { - rv = ecp_ht_itr_remove(&itr); + if (keyx_cnt < MAX_NODE_ANNOUNCE) { + keyx_conn[keyx_cnt] = conn; + keyx_cnt++; } else { - rv = ecp_ht_itr_advance(&itr); + conn_next = ecp_ht_itr_key(&itr); + break; } - keyx_conn[keyx_cnt] = conn; - keyx_conn_z[keyx_cnt] = is_zombie; - keyx_cnt++; - - if (keyx_cnt == MAX_KEYX_CNT) { - if (!rv) { - keyx_next = ecp_ht_itr_key(&itr); - } else { - keyx_next = NULL; - } - } + rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } - pthread_mutex_unlock(&vlink_conn_mutex); + /* no need to copy conn_next key, since this is the only thread that can remove connection */ + pthread_rwlock_unlock(&vlink_conn_rwlock); + /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */ + delay = keyx_period * 1000000 / MAX(ht_count, 100); for (i=0; isock, &conn->remote, &_conn); + if (rv) { + LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv); + continue; + } + + pthread_rwlock_wrlock(&vlink_conn_rwlock); + ht_remove_conn(vlink_conn, conn); + rv = ht_insert_conn(vlink_conn, _conn); + pthread_rwlock_unlock(&vlink_conn_rwlock); + + ecp_conn_close(conn); + if (rv) { + LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv); + ecp_conn_close(_conn); + } + } else if (is_open) { ssize_t _rv; - LOG(LOG_DEBUG, "vlink send keyx\n"); - _rv = ecp_send_keyx_req(keyx_conn[i], 1); - if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv); + _rv = ecp_send_keyx_req(conn, 1); + if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv); + } + + if (delay) { + /* randomize over delay / 2 and delay + delay / 2 */ + rnd = arc4random_uniform(delay + 1); + usleep(delay / 2 + rnd); } } - } while (keyx_next); + + } while (conn_next); + + LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count); } -static void *_vlink_open(void *arg) { +static void *_vlink_keyx(void *arg) { ECPSocket *sock = arg; + uint32_t rnd; + time_t tv_sec; + int delay; while (1) { - LOG(LOG_DEBUG, "vlink open...\n"); - vlink_open(sock); - sleep(10); - } + tv_sec = time(NULL); - return NULL; -} + vlink_keyx(sock, KEYX_PERIOD); -static void *_vlink_keyx(void *arg) { - while (1) { - LOG(LOG_DEBUG, "vlink keyx...\n"); - vlink_keyx(); - sleep(10); + /* resolution is 1s */ + delay = KEYX_PERIOD - (time(NULL) - tv_sec); + if (delay > 0) { + rnd = arc4random_uniform(delay + 1); + sleep(delay / 2 + rnd); + } } return NULL; } -int vlink_start_open(ECPSocket *sock) { - int rv; - - rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock); - if (rv) return ECP_ERR; - return ECP_OK; -} -int vlink_start_keyx(void) { +int vlink_start_keyx(ECPSocket *sock) { int rv; - rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL); + rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock); if (rv) return ECP_ERR; return ECP_OK; } -int vlink_init(ECPContext *ctx) { - ECPConnHandler *handler; +int vlink_init(ECPSocket *sock) { + ECPContext *ctx = sock->ctx; int rv; srv_config = srv_get_config(); - handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK); - if (handler == NULL) return ECP_ERR; - - handler->handle_open = vlink_handle_open; - handler->handle_close = vlink_handle_close; - - rv = pthread_mutex_init(&vlink_conn_mutex, NULL); - if (rv) { - return ECP_ERR; - } - - rv = pthread_mutex_init(&vlink_node_mutex, NULL); - if (rv) { - pthread_mutex_destroy(&vlink_conn_mutex); - return ECP_ERR; - } + rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL); + if (rv) return ECP_ERR; - rv = ECP_OK; vlink_conn = ecp_ht_create_keys(); - if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; - if (!rv) { - vlink_node = ecp_ht_create_keys(); - if (vlink_node == NULL) rv = ECP_ERR_ALLOC; - } - - if (rv) { - pthread_mutex_destroy(&vlink_node_mutex); - pthread_mutex_destroy(&vlink_conn_mutex); - if (vlink_node) ecp_ht_destroy(vlink_node); - if (vlink_conn) ecp_ht_destroy(vlink_conn); - return rv; + if (vlink_conn == NULL) { + pthread_rwlock_destroy(&vlink_conn_rwlock); + return ECP_ERR_ALLOC; } return ECP_OK; diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h index 78af6b1..3e8022f 100644 --- a/ecp/server/vlink.h +++ b/ecp/server/vlink.h @@ -1,17 +1,12 @@ -#define MAX_KEYX_CNT 100 -#define MAX_OPEN_CNT 100 +#define MAX_KEYX_CNT 100 -#define CONN_EXP_TIME 22 +#define KEYX_PERIOD 600 /* key exchange priod (s); can't exceed 1h */ +#define VLINK_UFLAG_DISCONNECT 0x80 -void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err); -int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs); -void vlink_handle_close(ECPConnection *conn); -int vlink_open_conn(ECPSocket *sock, ECPNode *node); +int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn); void vlink_new_node(ECPSocket *sock, ECPDirItem *item); -int vlink_insert_node(ECPConnection *conn); +void vlink_del_node(ECPDirItem *dir_item); -void vlink_keyx(void); -void vlink_open(ECPSocket *sock); -int vlink_start_open(ECPSocket *sock); -int vlink_start_keyx(void); -int vlink_init(ECPContext *ctx); +void vlink_keyx(ECPSocket *sock, int keyx_period); +int vlink_start_keyx(ECPSocket *sock); +int vlink_init(ECPSocket *sock); diff --git a/ecp/src/ecp/dir/dir.c b/ecp/src/ecp/dir/dir.c index d826e24..6258e0e 100644 --- a/ecp/src/ecp/dir/dir.c +++ b/ecp/src/ecp/dir/dir.c @@ -2,10 +2,19 @@ #include #include +#include #include #include "dir.h" +int ecp_dir_item_eq(ECPDirItem *item1, ECPDirItem *item2) { + if (item1->region != item2->region) return 0; + if (item1->capabilities != item2->capabilities) return 0; + if (!item1->node.key_perma.valid || !item1->node.key_perma.valid) return 0; + if (memcmp(&item1->node.key_perma.public, &item2->node.key_perma.public, sizeof(item1->node.key_perma.public)) != 0) return 0; + return ecp_tr_addr_eq(&item1->node.addr, &item2->node.addr); +} + size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) { ECPDHPub *key; ecp_tr_addr_t *addr; @@ -31,11 +40,10 @@ size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) { buf += sizeof(uint16_t); rsize += sizeof(uint16_t); - item->capabilities = \ - ((uint16_t)buf[0] << 8) | \ - ((uint16_t)buf[1]); - buf += sizeof(uint16_t); - rsize += sizeof(uint16_t); + item->region = buf[0]; + item->capabilities = buf[1]; + buf += 2; + rsize += 2; return rsize; } @@ -61,10 +69,10 @@ size_t ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf) { buf += sizeof(uint16_t); rsize += sizeof(uint16_t); - buf[0] = item->capabilities >> 8; + buf[0] = item->region; buf[1] = item->capabilities; - buf += sizeof(uint16_t); - rsize += sizeof(uint16_t); + buf += 2; + rsize += 2; return rsize; } diff --git a/ecp/src/ecp/dir/dir.h b/ecp/src/ecp/dir/dir.h index b28f801..83b97df 100644 --- a/ecp/src/ecp/dir/dir.h +++ b/ecp/src/ecp/dir/dir.h @@ -1,10 +1,7 @@ #define ECP_SIZE_DIR_ITEM 40 -#define ECP_MTYPE_DIR_UPD 0x00 -#define ECP_MTYPE_DIR_REQ 0x01 -#define ECP_MTYPE_DIR_ANN 0x02 -#define ECP_MTYPE_DIR_REP 0x03 -#define ECP_MTYPE_DIR_SHADOW 0x04 +#define ECP_MTYPE_DIR_REQ 0x00 +#define ECP_MTYPE_DIR_REP 0x01 #define ECP_CTYPE_DIR (0x00 | ECP_CTYPE_FLAG_SYS) @@ -13,9 +10,11 @@ typedef struct ECPDirItem { ECPNode node; - uint16_t capabilities; + uint8_t region; + uint8_t capabilities; } ECPDirItem; +int ecp_dir_item_eq(ECPDirItem *item1, ECPDirItem *item2); size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf); size_t ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf); -- cgit v1.2.3