#include #include #include #include #include #include #include #include #include #include "dir.h" #include "vlink.h" #include "ht.h" #include "acl.h" #include "timer.h" #include "server.h" static ecp_ht_table_t *dir_shadow = NULL; static pthread_rwlock_t dir_shadow_rwlock; static pthread_rwlock_t dir_online_rwlock; static pthread_rwlock_t dir_timer_rwlock; static pthread_t dir_announce_thd; static DIROnline *dir_online = NULL; static unsigned int dir_vkey_req; static int dir_process_enable; static SRVConfig *srv_config; /* other directory servers */ #define _dir_item_is_dir(dir_item) (((dir_item)->roles & ECP_ROLE_DIR) && (memcmp(&(dir_item)->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) #define MAX(X, Y) (((X) > (Y)) ? (X) : (Y)) /* messages from clients */ ssize_t dir_send_online(ECPConnection *conn, uint8_t region) { DIRList *list; ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; size_t msg_size; ssize_t rv; int i; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; if (dir_online == NULL) return ECP_ERR; rv = 0; pthread_rwlock_rdlock(&dir_online_rwlock); list = &dir_online->list[region]; for (i=0; imsg_count; i++) { ssize_t rv_snd; ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); ecp_pld_set_frag(payload.buffer, payload.size, i, list->msg_count, 0); msg = ecp_pld_get_msg(payload.buffer, payload.size); msg_size = 0; msg[0] = list->msg[i].count; // item_cnt msg[1] = region; // region msg[2] = dir_online->serial >> 8; // serial msg[3] = dir_online->serial; msg += 2 + sizeof(uint16_t); msg_size += 2 + sizeof(uint16_t); memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM); msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM; rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); if (rv_snd < 0) { rv = rv_snd; break; } rv += rv_snd; } pthread_rwlock_unlock(&dir_online_rwlock); return rv; } ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { switch (mtype) { case ECP_MTYPE_DIR_REQ: { uint8_t region; ssize_t rv; if (msg_size < 1) return ECP_ERR_SIZE; region = *msg; if (region >= MAX_REGION) return ECP_ERR; rv = dir_send_online(conn, region); if (rv < 0) return rv; return 1; } default: return ECP_ERR_MTYPE; } } /* messages from other servers */ ssize_t dir_send_ann(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_SIZE_PKT_BUF(ECP_SIZE_PLD(2, MTYPE_DIR_ANN), conn)]; unsigned char pld_buf[ECP_SIZE_PLD_BUF(ECP_SIZE_PLD(2, MTYPE_DIR_ANN), conn)]; unsigned char *msg; packet.buffer = pkt_buf; packet.size = sizeof(pkt_buf); payload.buffer = pld_buf; payload.size = sizeof(pld_buf); ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN); msg = ecp_pld_get_msg(payload.buffer, payload.size); msg[0] = srv_config->region; msg[1] = srv_config->roles; return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0); } ssize_t dir_send_shadow(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; struct hashtable_itr itr; DIRNode *node; ecp_ecdh_public_t *node_next; uint8_t count; size_t msg_size; ssize_t rv; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; rv = 0; node_next = NULL; do { ssize_t rv_snd; ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW); msg = ecp_pld_get_msg(payload.buffer, payload.size); pthread_rwlock_rdlock(&dir_shadow_rwlock); count = 0; msg_size = 0; if (ecp_ht_count(dir_shadow) > 0) { unsigned char *_msg; size_t _msg_size; int _rv; _msg = msg + 1; _msg_size = 0; ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { _rv = ecp_ht_itr_search(&itr, node_next); if (_rv) { LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); pthread_mutex_lock(&node->mutex); if (node->verified || node->ann_local) { if (count < ECP_MAX_DIR_ITEM_IN_MSG) { size_t rv_ser; rv_ser = ecp_dir_serialize_item(&node->dir_item, _msg); _msg += rv_ser; _msg_size += rv_ser; count++; } else { node_next = ecp_ht_itr_key(&itr); break; } } pthread_mutex_unlock(&node->mutex); _rv = ecp_ht_itr_advance(&itr); } while (_rv == ECP_OK); msg[0] = count; msg_size = 1 + _msg_size; } /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */ pthread_rwlock_unlock(&dir_shadow_rwlock); if (msg_size) { rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0); if (rv_snd < 0) { rv = rv_snd; break; } rv += rv_snd; } } while (node_next); return rv; } ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; unsigned short vkey_max; size_t msg_size, msg_size_max, hdr_size; DIRNode *node; int i; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP); msg = ecp_pld_get_msg(payload.buffer, payload.size); hdr_size = msg - payload.buffer; vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t); msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t); pthread_rwlock_rdlock(&dir_shadow_rwlock); node = ecp_ht_search(dir_shadow, public); if (node) pthread_mutex_lock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); if (node == NULL) return ECP_ERR; msg_size = 0; *msg = node->vkey_cnt; msg++; msg_size++; for (i=0; ivkey_cnt; i++) { memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t)); msg += sizeof(ecp_ecdh_public_t); msg_size += sizeof(ecp_ecdh_public_t); if (msg_size == msg_size_max) break; } pthread_mutex_unlock(&node->mutex); return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0); } ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ECPDirItem dir_item; uint8_t region; uint8_t roles; size_t rsize; ssize_t rv; int ann_enable; rsize = 2; if (msg_size < rsize) return ECP_ERR_SIZE; pthread_rwlock_rdlock(&dir_timer_rwlock); ann_enable = (dir_process_enable > PROC_BLOCK_ANN); pthread_rwlock_unlock(&dir_timer_rwlock); if (!ann_enable) return rsize; region = msg[0]; roles = msg[1]; if ((roles & ECP_ROLE_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) { LOG(LOG_ERR, "dir_handle_ann: not a directory server\n"); return ECP_ERR; } memset(&dir_item, 0, sizeof(ECPDirItem)); dir_item.node = conn->remote; dir_item.region = region; dir_item.roles = roles; if (dir_item.region >= MAX_REGION) { LOG(LOG_ERR, "dir_handle_ann: bad region\n"); return ECP_ERR; } dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public); rv = dir_send_shadow(conn); if (rv < 0) return rv; return rsize; } ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { uint8_t count; size_t rsize; int i, ann_enable; if (msg_size < 1) return ECP_ERR_SIZE; count = msg[0]; // item_cnt msg += 1; rsize = 1 + count * ECP_SIZE_DIR_ITEM; if (msg_size < rsize) return ECP_ERR_SIZE; pthread_rwlock_rdlock(&dir_timer_rwlock); ann_enable = (dir_process_enable > PROC_BLOCK_ANN); pthread_rwlock_unlock(&dir_timer_rwlock); if (!ann_enable) return rsize; for (i=0; i= MAX_REGION) { LOG(LOG_ERR, "dir_handle_shadow: bad region\n"); return ECP_ERR; } dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public); } return rsize; } ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { size_t rsize; ssize_t rv; rsize = ECP_SIZE_ECDH_PUB; if (msg_size < rsize) return ECP_ERR_SIZE; rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg); if (rv < 0) return rv; return rsize; } ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { switch (mtype) { case MTYPE_DIR_ANN: { int is_dir; is_dir = srv_config->roles & ECP_ROLE_DIR; if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; return dir_handle_ann(conn, msg, msg_size); } case MTYPE_DIR_SHADOW: { if (ecp_conn_is_inb(conn)) return ECP_ERR; return dir_handle_shadow(conn, msg, msg_size); } case MTYPE_DIR_ORIGIN_REQ: { int is_dir; is_dir = srv_config->roles & ECP_ROLE_DIR; if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; return dir_handle_origin_req(conn, msg, msg_size); } default: return ECP_ERR_MTYPE; } } void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) { DIRNode *node; unsigned int vkey_req_dir; int is_verified = 0; int rv = ECP_OK; pthread_rwlock_rdlock(&dir_online_rwlock); vkey_req_dir = dir_vkey_req; pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_rdlock(&dir_shadow_rwlock); node = ht_search_item(dir_shadow, dir_item); if (node == NULL) { pthread_rwlock_unlock(&dir_shadow_rwlock); rv = dir_create_node(dir_item, &node); if (!rv) { pthread_rwlock_wrlock(&dir_shadow_rwlock); if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; if (!rv) rv = ht_insert_node(dir_shadow, node); if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock); } if (rv) { if (node) dir_destroy_node(node); /* ignore duplicate hashtable inserts */ if (rv != ECP_ERR_DUP) LOG(LOG_ERR, "dir_process_item: create node err:%d\n", rv); return; } /* try to open connection to other directory servers */ if (_dir_item_is_dir(dir_item)) { rv = dir_open_conn(node, sock); if (rv) LOG(LOG_ERR, "dir_process_item: conn open err:%d\n", rv); } LOG(LOG_DEBUG, "dir_process_item: new node\n"); } pthread_mutex_lock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); if (node->zombie || (s_public == NULL)) goto process_item_fin; if (!node->verified) { int key_ex = 0; int i; if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) { node->ann_local = 1; } for (i=0; ivkey_cnt; i++) { if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) { key_ex = 1; break; } } if (!key_ex) { unsigned int vkey_req; if (node->dir_item.roles & ECP_ROLE_DIR) { vkey_req = vkey_req_dir; } else { vkey_req = MIN_VKEY_REQ; } if (node->vkey_cnt < vkey_req) { /* vkey list is used to track origin of particular server */ memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt])); node->vkey_cnt++; if (node->vkey_cnt == vkey_req) { node->verified = 1; node->is_new = 0; is_verified = 1; } } /* protocol does not support server properties change */ if (!ecp_dir_item_eq(dir_item, &node->dir_item)) { char buffer[ECP_SIZE_ECDH_KEY_BUF]; ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public); LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer); } } } process_item_fin: pthread_mutex_unlock(&node->mutex); if (is_verified) { LOG(LOG_DEBUG, "dir_process_item: node verified\n"); rv = acl_add_key(&node->dir_item); if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv); vlink_new_node(sock, &node->dir_item); } } int dir_open_conn(DIRNode *node, ECPSocket *sock) { int rv; node->conn = malloc(sizeof(ECPConnection)); if (node->conn == NULL) return ECP_ERR_ALLOC; ecp_conn_init(node->conn, sock, CTYPE_DIR); ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX); rv = ecp_conn_open(node->conn, &node->dir_item.node.key_perma.public, &node->dir_item.node.addr); if (rv) { free(node->conn); node->conn = NULL; } return rv; } int dir_create_node(ECPDirItem *dir_item, DIRNode **node) { DIRNode *_node; int rv; *node = NULL; _node = malloc(sizeof(DIRNode)); if (_node == NULL) return ECP_ERR_ALLOC; memset(_node, 0, sizeof(DIRNode)); rv = pthread_mutex_init(&_node->mutex, NULL); if (rv) { free(_node); return ECP_ERR; } _node->dir_item = *dir_item; _node->is_new = 1; _node->conn = NULL; *node = _node; return ECP_OK; } void dir_destroy_node(DIRNode *node) { if (node->conn) ecp_conn_close(node->conn); pthread_mutex_destroy(&node->mutex); free(node); } static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) { if (conn->type == CTYPE_DIR) return 1; return conn_expired(conn, now); } static void remove_nodes(DIRNode *remove_node[], int remove_cnt) { int i; pthread_rwlock_wrlock(&dir_shadow_rwlock); for (i=0; idir_item); dir_destroy_node(remove_node[i]); } } void dir_online_switch(ECPSocket *sock, int inc_serial) { struct hashtable_itr itr; DIRNode *node; uint8_t count[MAX_REGION]; uint8_t msg_count[MAX_REGION]; unsigned char *msg[MAX_REGION]; ecp_ecdh_public_t *node_next; DIRNode *remove_node[MAX_NODE_REMOVE]; unsigned int dir_cnt, node_cnt; int i, remove_cnt; int rv; pthread_rwlock_rdlock(&dir_shadow_rwlock); pthread_rwlock_wrlock(&dir_online_rwlock); if (dir_online) { for (i=0; ilist[i].msg_count = 0; count[i] = 0; msg_count[i] = 0; msg[i] = dir_online->list[i].msg[0].buffer; } } dir_cnt = 0; node_cnt = 0; remove_cnt = 0; if (ecp_ht_count(dir_shadow) > 0) { uint8_t region; int verified; ecp_ht_itr_create(&itr, dir_shadow); node_next = NULL; do { node = ecp_ht_itr_value(&itr); pthread_mutex_lock(&node->mutex); region = node->dir_item.region; verified = node->verified; if (verified) { if (dir_online) { size_t rv_ser; rv_ser = ecp_dir_serialize_item(&node->dir_item, msg[0]); msg[0] += rv_ser; count[0]++; if (region) { rv_ser = ecp_dir_serialize_item(&node->dir_item, msg[region]); msg[region] += rv_ser; count[region]++; } } if (node->dir_item.roles & ECP_ROLE_DIR) { dir_cnt++; } node_cnt++; } else { node->zombie = 1; if (node_next == NULL) { if (remove_cnt < MAX_NODE_REMOVE) { remove_node[remove_cnt] = node; remove_cnt++; } else { node_next = ecp_ht_itr_key(&itr); break; } } } node->verified = 0; node->ann_local = 0; memset(&node->vkey, 0, sizeof(node->vkey)); node->vkey_cnt = 0; pthread_mutex_unlock(&node->mutex); /* let's reconnect all directory servers node->conn is currently immutable since announce is disabled well before online switch */ if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT); if (dir_online && verified) { if (count[0] == ECP_MAX_DIR_ITEM_IN_MSG) { dir_online->list[0].msg[msg_count[0]].count = count[0]; msg_count[0]++; msg[0] = dir_online->list[0].msg[msg_count[0]].buffer; count[0] = 0; } if (region && count[region] == ECP_MAX_DIR_ITEM_IN_MSG) { dir_online->list[region].msg[msg_count[region]].count = count[region]; msg_count[region]++; msg[region] = dir_online->list[region].msg[msg_count[region]].buffer; count[region] = 0; } } rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); if (dir_online) { for (i=0; ilist[i].msg[msg_count[i]].count = count[i]; dir_online->list[i].msg_count = msg_count[i] + 1; } else { dir_online->list[i].msg_count = msg_count[i]; } } } } dir_vkey_req = dir_cnt / 2 + 1; if (dir_vkey_req > MAX_VKEY) { LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY); dir_vkey_req = MAX_VKEY; } if (dir_online && inc_serial) dir_online->serial++; pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_unlock(&dir_shadow_rwlock); rv = acl_reset_ht(); if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv); ecp_sock_expire(sock, online_switch_expired); if (remove_cnt) { remove_nodes(remove_node, remove_cnt); while (node_next) { remove_cnt = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); pthread_mutex_lock(&node->mutex); if (node->zombie) { if (remove_cnt < MAX_NODE_REMOVE) { remove_node[remove_cnt] = node; remove_cnt++; } else { node_next = ecp_ht_itr_key(&itr); break; } } pthread_mutex_unlock(&node->mutex); rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); /* no need to copy node_next key, since this is the only thread that can destroy node */ pthread_rwlock_unlock(&dir_shadow_rwlock); if (remove_cnt) remove_nodes(remove_node, remove_cnt); } } if (srv_config->roles & ECP_ROLE_DIR) acl_load_ht(); LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0); } void dir_announce_allow(void) { pthread_rwlock_wrlock(&dir_timer_rwlock); dir_process_enable = PROC_ALLOW_ALL; pthread_rwlock_unlock(&dir_timer_rwlock); LOG(LOG_DEBUG, "dir_announce_allow\n"); } void dir_announce_block(void) { pthread_rwlock_wrlock(&dir_timer_rwlock); dir_process_enable = PROC_BLOCK_ANN; pthread_rwlock_unlock(&dir_timer_rwlock); LOG(LOG_DEBUG, "dir_announce_block\n"); } void dit_init_serial(uint16_t serial) { if (dir_online) { pthread_rwlock_wrlock(&dir_online_rwlock); dir_online->serial = serial; pthread_rwlock_unlock(&dir_online_rwlock); } } int dir_init_dir_cnt(unsigned int dir_cnt, uint16_t serial) { int rv = ECP_OK; if (dir_online) { pthread_rwlock_wrlock(&dir_online_rwlock); if (dir_online->serial != serial) rv = ECP_ERR; if (!rv) dir_vkey_req = dir_cnt / 2 + 1; pthread_rwlock_unlock(&dir_online_rwlock); } return rv; } void dir_announce(ECPSocket *sock, int ann_period) { struct hashtable_itr itr; DIRNode *node; ecp_ecdh_public_t *node_next; DIRNode *announce_node[MAX_NODE_ANNOUNCE]; unsigned int ht_count; int announce_cnt; int i; int rv; node_next = NULL; do { uint32_t delay; uint32_t rnd; announce_cnt = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); ht_count = ecp_ht_count(dir_shadow); if (ht_count > 0) { ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); /* announce to other directory servers */ if (_dir_item_is_dir(&node->dir_item)) { if (announce_cnt < MAX_NODE_ANNOUNCE) { announce_node[announce_cnt] = node; announce_cnt++; } else { node_next = ecp_ht_itr_key(&itr); break; } } rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } /* no need to copy node_next key, since announce is disabled during online switch node removal */ pthread_rwlock_unlock(&dir_shadow_rwlock); /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, announce period of 600s) is ~70ms */ delay = ann_period * 1000000 / MAX(ht_count, 100); for (i=0; iconn; if (conn == NULL) { rv = dir_open_conn(announce_node[i], sock); if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); continue; } is_reg = is_open = is_z = 0; now = ecp_tm_get_s(); ecp_conn_lock(conn); is_reg = _ecp_conn_is_reg(conn); if (is_reg) is_open = _ecp_conn_is_open(conn); if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT); ecp_conn_unlock(conn); if (!is_reg || is_z) { LOG(LOG_DEBUG, "dir_announce: reconnect\n"); ecp_conn_close(conn); rv = dir_open_conn(announce_node[i], sock); if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); } else if (is_open) { ssize_t _rv; _rv = dir_send_ann(conn); if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv); } if (delay) { /* randomize over delay / 2 and delay + delay / 2 */ rnd = arc4random_uniform(delay + 1); usleep(delay / 2 + rnd); } } } while (node_next); LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count); } static void *_dir_announce(void *arg) { ECPSocket *sock = arg; uint32_t rnd; time_t tv_sec; int delay; int ann_enable; while (1) { tv_sec = time(NULL); pthread_rwlock_rdlock(&dir_timer_rwlock); ann_enable = (dir_process_enable > PROC_BLOCK_ANN); pthread_rwlock_unlock(&dir_timer_rwlock); if (ann_enable) { dir_announce(sock, ANN_PERIOD); } else { sleep(ANN_BLOCK_TIME + ANN_PERIOD); } /* resolution is 1s */ delay = ANN_PERIOD - (time(NULL) - tv_sec); if (delay > 0) { rnd = arc4random_uniform(delay + 1); sleep(delay / 2 + rnd); } } return NULL; } int dir_start_announce(ECPSocket *sock) { int rv; rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock); if (rv) return ECP_ERR; return ECP_OK; } void dir_init_switch(ECPSocket *sock, int init_ann) { int i; dir_announce_allow(); for (i=0; ictx; ECPConnHandler *handler, *c_handler; int is_dir; int rv; /* dir_process_enable and dir_online.serial will be set from timer */ dir_vkey_req = 1; srv_config = srv_get_config(); is_dir = srv_config->roles & ECP_ROLE_DIR; rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); if (rv) { return ECP_ERR; } rv = pthread_rwlock_init(&dir_online_rwlock, NULL); if (rv) { pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR; } rv = pthread_rwlock_init(&dir_timer_rwlock, NULL); if (rv) { pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR; } dir_shadow = ecp_ht_create_keys(); if (dir_shadow == NULL) { pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR_ALLOC; } handler = malloc(sizeof(ECPConnHandler)); if (handler == NULL) { ecp_ht_destroy(dir_shadow); pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR_ALLOC; } ecp_conn_handler_init(handler, NULL, NULL, dir_handle_msg, NULL); if (is_dir) { c_handler = malloc(sizeof(ECPConnHandler)); dir_online = malloc(sizeof(DIROnline)); if ((c_handler == NULL) || (dir_online == NULL)) { free(handler); ecp_ht_destroy(dir_shadow); pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); if (dir_online) free(dir_online); if (c_handler) free(c_handler); return ECP_ERR_ALLOC; } ecp_conn_handler_init(c_handler, NULL, NULL, dir_handle_client_msg, NULL); memset(dir_online, 0, sizeof(DIROnline)); } rv = timer_init(sock); if (rv) { free(handler); if (is_dir) { free(dir_online); free(c_handler); } ecp_ht_destroy(dir_shadow); pthread_rwlock_destroy(&dir_timer_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); pthread_rwlock_destroy(&dir_shadow_rwlock); return rv; } ecp_ctx_set_handler(ctx, CTYPE_DIR, handler); if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler); return ECP_OK; }