summaryrefslogtreecommitdiff
path: root/ecp/server/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/server/dir.c')
-rw-r--r--ecp/server/dir.c1053
1 files changed, 767 insertions, 286 deletions
diff --git a/ecp/server/dir.c b/ecp/server/dir.c
index 0066bac..7244fd9 100644
--- a/ecp/server/dir.c
+++ b/ecp/server/dir.c
@@ -1,31 +1,122 @@
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
+#include <time.h>
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
+#include <ecp/cr.h>
#include <ecp/ht.h>
#include <ecp/tm.h>
-#include "server.h"
-#include "vlink.h"
#include "dir.h"
+#include "vlink.h"
#include "ht.h"
+#include "acl.h"
+
+#include "timer.h"
+#include "server.h"
static ecp_ht_table_t *dir_shadow = NULL;
static pthread_rwlock_t dir_shadow_rwlock;
static pthread_rwlock_t dir_online_rwlock;
+static pthread_rwlock_t dir_timer_rwlock;
static pthread_t dir_announce_thd;
-static DirList dir_online;
+static DIROnline *dir_online = NULL;
+static unsigned int dir_vkey_req;
+static int dir_process_enable;
static SRVConfig *srv_config;
+#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
+
+/* messages from clients */
+ssize_t dir_send_online(ECPConnection *conn, uint8_t region) {
+ DIRList *list;
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+ size_t msg_size;
+ ssize_t rv;
+ int i;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ if (dir_online == NULL) return ECP_ERR;
+
+ rv = 0;
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+ list = &dir_online->list[region];
+
+ if (list->msg_count == 0) goto send_online_fin;
+
+ for (i=0; i<list->msg_count; i++) {
+ ssize_t rv_snd;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg_size = 0;
+
+ msg[0] = i; // frag_cnt
+ msg[1] = list->msg_count; // frag_tot
+ msg[2] = list->msg[i].count; // item_cnt
+ msg[3] = region; // region
+ msg[4] = dir_online->serial >> 8; // serial
+ msg[5] = dir_online->serial;
+
+ msg += 4 + sizeof(uint16_t);
+ msg_size += 4 + sizeof(uint16_t);
+
+ memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM);
+ msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM;
+
+ rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+ if (rv_snd < 0) {
+ rv = rv_snd;
+ break;
+ }
+
+ rv += rv_snd;
+ }
+
+send_online_fin:
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ return rv;
+}
+
+ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
+ switch (mtype) {
+ case ECP_MTYPE_DIR_REQ: {
+ uint8_t region;
+ ssize_t rv;
+
+ if (msg_size < 1) return ECP_ERR_SIZE;
+
+ region = *msg;
+ if (region >= MAX_REGION) return ECP_ERR;
+
+ rv = dir_send_online(conn, region);
+ if (rv < 0) return rv;
+
+ return 1;
+ }
+
+ default:
+ return ECP_ERR_MTYPE;
+ }
+}
+
+/* messages from other servers */
ssize_t dir_send_ann(ECPConnection *conn) {
ECPBuffer packet;
ECPBuffer payload;
- unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
- unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char pkt_buf[ECP_SIZE_PKT_BUF(2, MTYPE_DIR_ANN, conn)];
+ unsigned char pld_buf[ECP_SIZE_PLD_BUF(2, MTYPE_DIR_ANN, conn)];
unsigned char *msg;
packet.buffer = pkt_buf;
@@ -33,12 +124,12 @@ ssize_t dir_send_ann(ECPConnection *conn) {
payload.buffer = pld_buf;
payload.size = sizeof(pld_buf);
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN);
msg = ecp_pld_get_msg(payload.buffer, payload.size);
- msg[0] = srv_config->capabilities >> 8;
+ msg[0] = srv_config->region;
msg[1] = srv_config->capabilities;
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0);
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0);
}
ssize_t dir_send_shadow(ECPConnection *conn) {
@@ -49,111 +140,170 @@ ssize_t dir_send_shadow(ECPConnection *conn) {
unsigned char *msg;
struct hashtable_itr itr;
- DirNode *node;
- uint16_t count;
- ecp_sts_t access_ts;
+ DIRNode *node;
+ ecp_ecdh_public_t *node_next;
+ uint8_t count;
size_t msg_size;
+ ssize_t rv;
packet.buffer = pkt_buf;
packet.size = ECP_MAX_PKT;
payload.buffer = pld_buf;
payload.size = ECP_MAX_PLD;
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW);
- msg = ecp_pld_get_msg(payload.buffer, payload.size);
- msg_size = 0;
+ rv = 0;
+ node_next = NULL;
+ do {
+ ssize_t rv_snd;
- pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
- count = ecp_ht_count(dir_shadow);
- msg[0] = count >> 8;
- msg[1] = count;
- msg += sizeof(uint16_t);
- msg_size += sizeof(uint16_t);
+ memset(msg, 0, 4 + sizeof(uint16_t)); // frag_cnt, frag_tot, item_cnt, region, serial
- if (count > 0) {
- size_t rv;
- int _rv;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
- ecp_ht_itr_create(&itr, dir_shadow);
- do {
- node = ecp_ht_itr_value(&itr);
+ if (ecp_ht_count(dir_shadow) > 0) {
+ unsigned char *_msg;
+ size_t _msg_size;
+ int _rv;
- pthread_mutex_lock(&node->mutex);
- access_ts = node->access_ts;
- pthread_mutex_unlock(&node->mutex);
+ _msg = msg + 4 + sizeof(uint16_t);
+ _msg_size = 0;
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ _rv = ecp_ht_itr_search(&itr, node_next);
+ if (_rv) {
+ LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv);
+ break;
+ }
+ node_next = NULL;
+ }
- rv = ecp_dir_item_serialize(&node->dir_item, msg);
- msg += rv;
- msg_size += rv;
+ count = 0;
+ do {
+ node = ecp_ht_itr_value(&itr);
- msg[0] = access_ts >> 24;
- msg[1] = access_ts >> 16;
- msg[2] = access_ts >> 8;
- msg[3] = access_ts;
- msg += sizeof(uint32_t);
- msg_size += sizeof(uint32_t);
+ pthread_mutex_lock(&node->mutex);
+ if (node->verified || node->ann_local) {
+ if (count < MAX_DIR_ITEM_IN_MSG) {
+ size_t rv_ser;
+
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, _msg);
+ _msg += rv_ser;
+ _msg_size += rv_ser;
+ count++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&node->mutex);
- _rv = ecp_ht_itr_advance(&itr);
- } while (_rv == ECP_OK);
- }
+ _rv = ecp_ht_itr_advance(&itr);
+ } while (_rv == ECP_OK);
+ msg[2] = count;
+ msg_size = 4 + sizeof(uint16_t) + _msg_size;
+ }
- pthread_rwlock_unlock(&dir_shadow_rwlock);
+ /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0);
+ if (rv_snd < 0) {
+ rv = rv_snd;
+ break;
+ }
+
+ rv += rv_snd;
+ } while (node_next);
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0);
+ return rv;
}
-ssize_t dir_send_online(ECPConnection *conn) {
+ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) {
ECPBuffer packet;
ECPBuffer payload;
unsigned char pkt_buf[ECP_MAX_PKT];
unsigned char pld_buf[ECP_MAX_PLD];
unsigned char *msg;
- size_t msg_size;
+ unsigned short vkey_max;
+ size_t msg_size, msg_size_max, hdr_size;
+ DIRNode *node;
+ int i;
packet.buffer = pkt_buf;
packet.size = ECP_MAX_PKT;
payload.buffer = pld_buf;
payload.size = ECP_MAX_PLD;
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP);
msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ hdr_size = msg - payload.buffer;
+ vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t);
+ msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t);
- pthread_rwlock_rdlock(&dir_online_rwlock);
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
- msg[0] = dir_online.count >> 8;
- msg[1] = dir_online.count;
- msg += sizeof(uint16_t);
- msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM;
- memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM);
+ node = ecp_ht_search(dir_shadow, public);
+ if (node) pthread_mutex_lock(&node->mutex);
- pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (node == NULL) return ECP_ERR;
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+ msg_size = 0;
+
+ *msg = node->vkey_cnt;
+ msg++;
+ msg_size++;
+
+ for (i=0; i<node->vkey_cnt; i++) {
+ memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t));
+ msg += sizeof(ecp_ecdh_public_t);
+ msg_size += sizeof(ecp_ecdh_public_t);
+ if (msg_size == msg_size_max) break;
+ }
+ pthread_mutex_unlock(&node->mutex);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0);
}
ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
ECPDirItem dir_item;
- int is_new;
- uint16_t capabilities;
- ecp_sts_t now;
- ssize_t rsize;
+ uint8_t region;
+ uint8_t capabilities;
+ size_t rsize;
ssize_t rv;
+ int ann_enable;
- rsize = sizeof(uint16_t);
+ rsize = 2;
if (msg_size < rsize) return ECP_ERR_SIZE;
- capabilities = \
- ((uint16_t)msg[0] << 8) | \
- ((uint16_t)msg[1]);
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (!ann_enable) return rsize;
+
+ region = msg[0];
+ capabilities = msg[1];
+ if ((capabilities & ECP_DIR_CAP_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) {
+ LOG(LOG_ERR, "dir_handle_ann: not a directory server\n");
+ return ECP_ERR;
+ }
memset(&dir_item, 0, sizeof(ECPDirItem));
dir_item.node = conn->remote;
+ dir_item.region = region;
dir_item.capabilities = capabilities;
- now = ecp_tm_get_s();
- is_new = dir_process_item(&dir_item, now);
- if (is_new) vlink_new_node(conn->sock, &dir_item);
+ if (dir_item.region >= MAX_REGION) {
+ LOG(LOG_ERR, "dir_handle_ann: bad region\n");
+ return ECP_ERR;
+ }
+
+ dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public);
rv = dir_send_shadow(conn);
if (rv < 0) return rv;
@@ -161,348 +311,573 @@ ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size)
return rsize;
}
-ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
- ssize_t rv;
-
- rv = dir_send_online(conn);
- if (rv < 0) return rv;
-
- return 0;
-}
-
ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
- ecp_sts_t now;
- uint16_t count;
+ uint8_t count;
size_t rsize;
- int i;
+ int i, ann_enable;
- if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE;
+ if (msg_size < 4 + sizeof(uint16_t)) return ECP_ERR_SIZE;
- count = \
- ((uint16_t)msg[0] << 8) | \
- ((uint16_t)msg[1]);
+ count = msg[2];
+ msg += 4 + sizeof(uint16_t); // frag_cnt, frag_tot, item_cnt, region, serial
- rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t));
+ rsize = 4 + sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM;
if (msg_size < rsize) return ECP_ERR_SIZE;
- msg += sizeof(uint16_t);
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (!ann_enable) return rsize;
- now = ecp_tm_get_s();
for (i=0; i<count; i++) {
ECPDirItem dir_item;
- ecp_sts_t access_ts;
size_t rv;
rv = ecp_dir_item_parse(&dir_item, msg);
msg += rv;
- access_ts = \
- ((uint32_t)msg[0] << 24) | \
- ((uint32_t)msg[1] << 16) | \
- ((uint32_t)msg[2] << 8) | \
- ((uint32_t)msg[3]);
- msg += sizeof(uint32_t);
-
- if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) {
- int is_new;
-
- is_new = dir_process_item(&dir_item, access_ts);
- if (is_new) vlink_new_node(conn->sock, &dir_item);
+ if (dir_item.region >= MAX_REGION) {
+ LOG(LOG_ERR, "dir_handle_shadow: bad region\n");
+ return ECP_ERR;
}
+
+ dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public);
}
return rsize;
}
+ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ size_t rsize;
+ ssize_t rv;
+
+ rsize = ECP_SIZE_ECDH_PUB;
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg);
+ if (rv < 0) return rv;
+
+ return rsize;
+}
+
ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
switch (mtype) {
- case ECP_MTYPE_DIR_REQ: {
- if (ecp_conn_is_outb(conn)) return ECP_ERR;
- return dir_handle_req(conn, msg, msg_size);
- }
+ case MTYPE_DIR_ANN: {
+ int is_dir;
+
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
+ if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR;
- case ECP_MTYPE_DIR_ANN: {
- if (ecp_conn_is_outb(conn)) return ECP_ERR;
- if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX;
return dir_handle_ann(conn, msg, msg_size);
}
- case ECP_MTYPE_DIR_SHADOW: {
+ case MTYPE_DIR_SHADOW: {
if (ecp_conn_is_inb(conn)) return ECP_ERR;
+
return dir_handle_shadow(conn, msg, msg_size);
}
+ case MTYPE_DIR_ORIGIN_REQ: {
+ int is_dir;
+
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
+ if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR;
+
+ return dir_handle_origin_req(conn, msg, msg_size);
+ }
+
default:
return ECP_ERR_MTYPE;
}
}
-int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
- ssize_t rv;
-
- if (ecp_conn_is_inb(conn)) return ECP_OK;
-
- rv = dir_send_ann(conn);
- if (rv < 0) return rv;
-
- return ECP_OK;
-}
+void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) {
+ DIRNode *node;
+ unsigned int vkey_req_dir;
+ int is_verified = 0;
+ int rv = ECP_OK;
-void dir_online_switch(void) {
- struct hashtable_itr itr;
- DirNode *node;
- unsigned char *msg;
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+ vkey_req_dir = dir_vkey_req;
+ pthread_rwlock_unlock(&dir_online_rwlock);
pthread_rwlock_rdlock(&dir_shadow_rwlock);
- pthread_rwlock_wrlock(&dir_online_rwlock);
-
- msg = dir_online.msg;
- dir_online.count = ecp_ht_count(dir_shadow);
-
- if (dir_online.count > 0) {
- size_t _rv;
- int rv;
- ecp_ht_itr_create(&itr, dir_shadow);
- do {
- node = ecp_ht_itr_value(&itr);
+ node = ht_search_item(dir_shadow, dir_item);
+ if (node == NULL) {
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
- _rv = ecp_dir_item_serialize(&node->dir_item, msg);
- msg += _rv;
+ rv = dir_create_node(dir_item, sock, &node);
+ if (!rv) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
+ if (!rv) rv = ht_insert_node(dir_shadow, node);
+ if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+ if (rv) {
+ if (node) {
+ dir_destroy_node(node);
+ node = NULL;
+ }
+ LOG(LOG_ERR, "dir_process_item: err:%d\n", rv);
+ return;
+ }
- rv = ecp_ht_itr_advance(&itr);
- } while (rv == ECP_OK);
+ LOG(LOG_DEBUG, "dir_process_item: new node\n");
}
- pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_mutex_lock(&node->mutex);
pthread_rwlock_unlock(&dir_shadow_rwlock);
-}
-int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) {
- DirNode *node;
- int is_new = 0;
- int rv = ECP_OK;
+ if (node->zombie) goto process_item_fin;
- pthread_rwlock_rdlock(&dir_shadow_rwlock);
-
- node = ht_search_item(dir_shadow, dir_item);
- if (node) {
- pthread_mutex_lock(&node->mutex);
- if (!node->zombie) node->access_ts = access_ts;
- pthread_mutex_unlock(&node->mutex);
+ if (!node->verified) {
+ int key_ex = 0;
+ int i;
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- } else {
- pthread_rwlock_unlock(&dir_shadow_rwlock);
+ if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) {
+ node->ann_local = 1;
+ }
- LOG(LOG_DEBUG, "dir new node\n");
+ for (i=0; i<node->vkey_cnt; i++) {
+ if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) {
+ key_ex = 1;
+ break;
+ }
+ }
+ if (!key_ex) {
+ unsigned int vkey_req;
- node = dir_create_node(dir_item, access_ts);
- if (node == NULL) rv = ECP_ERR_ALLOC;
+ if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) {
+ vkey_req = vkey_req_dir;
+ } else {
+ vkey_req = MIN_VKEY_REQ;
+ }
- if (!rv) {
- pthread_rwlock_wrlock(&dir_shadow_rwlock);
- if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
- if (!rv) {
- if (ht_search_node(dir_shadow, node) == NULL) {
- rv = ht_insert_node(dir_shadow, node);
- } else {
- rv = ECP_ERR_DUP;
+ if (node->vkey_cnt < vkey_req) {
+ /* vkey list is used to track origin of particular server */
+ memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt]));
+ node->vkey_cnt++;
+ if (node->vkey_cnt == vkey_req) {
+ node->verified = 1;
+ node->is_new = 0;
+ is_verified = 1;
}
}
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- }
- if (!rv) {
- is_new = 1;
- /* always switch */
- dir_online_switch();
- } else {
- if (node) dir_destroy_node(node);
- LOG(LOG_ERR, "dir create node err:%d\n", rv);
+ /* protocol does not support server properties change */
+ if (!ecp_dir_item_eq(dir_item, &node->dir_item)) {
+ char buffer[ECP_SIZE_ECDH_KEY_BUF];
+
+ ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public);
+ LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer);
+ }
}
}
- return is_new;
+process_item_fin:
+ pthread_mutex_unlock(&node->mutex);
+
+ if (is_verified) {
+ LOG(LOG_DEBUG, "dir_process_item: node verified\n");
+
+ rv = acl_add_key(&node->dir_item);
+ if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv);
+
+ vlink_new_node(sock, &node->dir_item);
+ }
}
-DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) {
- DirNode *node;
+int dir_open_conn(DIRNode *node, ECPSocket *sock) {
int rv;
- node = malloc(sizeof(DirNode));
- if (node == NULL) return NULL;
+ node->conn = malloc(sizeof(ECPConnection));
+ if (node->conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_conn_init(node->conn, sock, CTYPE_DIR);
+ ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX);
+
+ rv = ecp_conn_open(node->conn, &node->dir_item.node);
+ if (rv) node->conn = NULL;
- memset(node, 0, sizeof(DirNode));
+ return rv;
+}
- rv = pthread_mutex_init(&node->mutex, NULL);
+int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) {
+ DIRNode *_node;
+ int rv;
+
+ *node = NULL;
+ _node = malloc(sizeof(DIRNode));
+ if (_node == NULL) return ECP_ERR_ALLOC;
+
+ memset(_node, 0, sizeof(DIRNode));
+ rv = pthread_mutex_init(&_node->mutex, NULL);
if (rv) {
- free(node);
- return NULL;
+ free(_node);
+ return ECP_ERR;
}
- node->dir_item = *dir_item;
- node->access_ts = access_ts;
+ _node->dir_item = *dir_item;
+ _node->is_new = 1;
+
+ /* open connection to other directory servers */
+ if ((dir_item->capabilities & ECP_DIR_CAP_DIR) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ rv = dir_open_conn(_node, sock);
+ if (rv) {
+ pthread_mutex_destroy(&_node->mutex);
+ free(_node);
+ return rv;
+ }
+ }
- return node;
+ *node = _node;
+ return ECP_OK;
}
-void dir_destroy_node(DirNode *node) {
+void dir_destroy_node(DIRNode *node) {
+ if (node->conn) ecp_conn_close(node->conn);
pthread_mutex_destroy(&node->mutex);
free(node);
}
-int dir_open_conn(ECPSocket *sock, ECPNode *node) {
- ECPConnection *conn;
- int rv;
+static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) {
+ if (conn->type == CTYPE_DIR) return 1;
+ return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO);
+}
- conn = malloc(sizeof(ECPConnection));
- if (conn == NULL) return ECP_ERR_ALLOC;
+static void remove_nodes(DIRNode *remove_node[], int remove_cnt) {
+ int i;
- ecp_dir_conn_init(conn, sock);
- ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX);
- rv = ecp_conn_try_open(conn, node);
- return rv;
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ for (i=0; i<remove_cnt; i++) {
+ ht_remove_node(dir_shadow, remove_node[i]);
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ for (i=0; i<remove_cnt; i++) {
+ vlink_del_node(&remove_node[i]->dir_item);
+ dir_destroy_node(remove_node[i]);
+ }
}
-DirNode *dir_search_conn(ECPConnection *conn) {
- DirNode *node;
- DirNode *ret = NULL;
+void dir_online_switch(ECPSocket *sock, int inc_serial) {
+ struct hashtable_itr itr;
+ DIRNode *node;
+ uint8_t count[MAX_REGION];
+ uint8_t msg_count[MAX_REGION];
+ unsigned char *msg[MAX_REGION];
+ ecp_ecdh_public_t *node_next;
+ DIRNode *remove_node[MAX_NODE_REMOVE];
+ unsigned int dir_cnt, node_cnt;
+ int i, remove_cnt;
+ int rv;
pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ pthread_rwlock_wrlock(&dir_online_rwlock);
- node = ht_search_conn(dir_shadow, conn);
- if (node) {
- pthread_mutex_lock(&node->mutex);
- if (!node->zombie) {
- node->refcount++;
- ret = node;
- } else {
- ret = NULL;
+ if (dir_online) {
+ for (i=0; i<MAX_REGION; i++) {
+ dir_online->list[i].msg_count = 0;
+ count[i] = 0;
+ msg_count[i] = 0;
+ msg[i] = dir_online->list[i].msg[0].buffer;
+ }
+ }
+
+ dir_cnt = 0;
+ node_cnt = 0;
+ remove_cnt = 0;
+
+ if (ecp_ht_count(dir_shadow) > 0) {
+ uint8_t region;
+ int verified;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+
+ node_next = NULL;
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ region = node->dir_item.region;
+ verified = node->verified;
+
+ if (verified) {
+ if (dir_online) {
+ size_t rv_ser;
+
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[0]);
+ msg[0] += rv_ser;
+ count[0]++;
+ if (region) {
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[region]);
+ msg[region] += rv_ser;
+ count[region]++;
+ }
+ }
+ if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) {
+ dir_cnt++;
+ }
+ node_cnt++;
+ } else {
+ node->zombie = 1;
+ if (node_next == NULL) {
+ if (remove_cnt < MAX_NODE_REMOVE) {
+ remove_node[remove_cnt] = node;
+ remove_cnt++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ }
+ node->verified = 0;
+ node->ann_local = 0;
+ memset(&node->vkey, 0, sizeof(node->vkey));
+ node->vkey_cnt = 0;
+ pthread_mutex_unlock(&node->mutex);
+
+ /* let's reconnect all directory servers
+ node->conn is currently immutable since announce is disabled well before online switch */
+ if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT);
+
+ if (dir_online && verified) {
+ if (count[0] == MAX_DIR_ITEM_IN_MSG) {
+ dir_online->list[0].msg[msg_count[0]].count = count[0];
+ msg_count[0]++;
+ msg[0] = dir_online->list[0].msg[msg_count[0]].buffer;
+ count[0] = 0;
+ }
+ if (region && count[region] == MAX_DIR_ITEM_IN_MSG) {
+ dir_online->list[region].msg[msg_count[region]].count = count[region];
+ msg_count[region]++;
+ msg[region] = dir_online->list[region].msg[msg_count[region]].buffer;
+ count[region] = 0;
+ }
+ }
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+
+ if (dir_online) {
+ for (i=0; i<MAX_REGION; i++) {
+ if (count[i]) {
+ dir_online->list[i].msg[msg_count[i]].count = count[i];
+ dir_online->list[i].msg_count = msg_count[i] + 1;
+ } else {
+ dir_online->list[i].msg_count = msg_count[i];
+ }
+ }
}
- pthread_mutex_unlock(&node->mutex);
}
+ dir_vkey_req = dir_cnt / 2 + 1;
+ if (dir_vkey_req > MAX_VKEY) {
+ LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY);
+ dir_vkey_req = MAX_VKEY;
+ }
+
+ if (dir_online && inc_serial) dir_online->serial++;
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
pthread_rwlock_unlock(&dir_shadow_rwlock);
- return ret;
+ rv = acl_reset_ht();
+ if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv);
+
+ ecp_sock_expire(sock, online_switch_expired);
+
+ if (remove_cnt) {
+ remove_nodes(remove_node, remove_cnt);
+ while (node_next) {
+ remove_cnt = 0;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ if (node->zombie) {
+ if (remove_cnt < MAX_NODE_REMOVE) {
+ remove_node[remove_cnt] = node;
+ remove_cnt++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&node->mutex);
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+
+ /* no need to copy node_next key, since this is the only thread that can destroy node */
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (remove_cnt) remove_nodes(remove_node, remove_cnt);
+ }
+ }
+
+ if (srv_config->capabilities & ECP_DIR_CAP_DIR) acl_load_ht();
+
+ LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0);
+}
+
+void dir_announce_allow(void) {
+ pthread_rwlock_wrlock(&dir_timer_rwlock);
+ dir_process_enable = PROC_ALLOW_ALL;
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+}
+
+void dir_announce_block(void) {
+ pthread_rwlock_wrlock(&dir_timer_rwlock);
+ dir_process_enable = PROC_BLOCK_ANN;
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ LOG(LOG_DEBUG, "dir_announce_block\n");
}
-void dir_announce(ECPSocket *sock) {
- static DirNode *expire_node[MAX_EXPIRE_CNT];
- static int expire_node_z[MAX_EXPIRE_CNT];
- static int expire_cnt;
+void dit_init_serial(uint16_t serial) {
+ if (dir_online) {
+ pthread_rwlock_wrlock(&dir_online_rwlock);
+ dir_online->serial = serial;
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ }
+}
+void dir_announce(ECPSocket *sock, int ann_period) {
struct hashtable_itr itr;
- ecp_sts_t now;
- DirNode *node;
- DirNode *node_next;
- DirNode *announce_node[MAX_ANNOUNCE_CNT];
+ DIRNode *node;
+ ecp_ecdh_public_t *node_next;
+ DIRNode *announce_node[MAX_NODE_ANNOUNCE];
+ unsigned int ht_count;
int announce_cnt;
- int refcount;
int i;
int rv;
- now = ecp_tm_get_s();
node_next = NULL;
do {
+ uint32_t delay;
+ uint32_t rnd;
+
announce_cnt = 0;
pthread_rwlock_rdlock(&dir_shadow_rwlock);
- if (ecp_ht_count(dir_shadow) > 0) {
+ ht_count = ecp_ht_count(dir_shadow);
+ if (ht_count > 0) {
ecp_ht_itr_create(&itr, dir_shadow);
if (node_next) {
rv = ecp_ht_itr_search(&itr, node_next);
if (rv) {
- LOG(LOG_ERR, "dir ann itr search err:%d\n", rv);
+ LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv);
break;
}
node_next = NULL;
}
do {
node = ecp_ht_itr_value(&itr);
- rv = ecp_ht_itr_advance(&itr);
- pthread_mutex_lock(&node->mutex);
- if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) {
- node->zombie = 1;
- pthread_mutex_unlock(&node->mutex);
-
- if (expire_cnt < MAX_EXPIRE_CNT) {
- expire_node[expire_cnt] = node;
- expire_node_z[expire_cnt] = 1;
- expire_cnt++;
- }
- } else {
- pthread_mutex_unlock(&node->mutex);
-
- if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(&node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ if (announce_cnt < MAX_NODE_ANNOUNCE) {
announce_node[announce_cnt] = node;
announce_cnt++;
-
- if (announce_cnt == MAX_ANNOUNCE_CNT) {
- if (!rv) {
- node_next = ecp_ht_itr_key(&itr);
- } else {
- node_next = NULL;
- }
- break;
- }
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
}
}
+
+ rv = ecp_ht_itr_advance(&itr);
} while (rv == ECP_OK);
}
+ /* no need to copy node_next key, since announce is disabled during online switch node removal */
pthread_rwlock_unlock(&dir_shadow_rwlock);
- if (expire_cnt) {
- pthread_rwlock_wrlock(&dir_shadow_rwlock);
- for (i=0; i<expire_cnt; i++) {
- if (expire_node_z[i]) {
- expire_node_z[i] = 0;
- ht_remove_node(dir_shadow, expire_node[i]);
- LOG(LOG_DEBUG, "dir remove node\n");
- }
- }
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- }
-
+ /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, announce period of 600s) is ~70ms */
+ delay = ann_period * 1000000 / MAX(ht_count, 100);
for (i=0; i<announce_cnt; i++) {
- rv = dir_open_conn(sock, &announce_node[i]->dir_item.node);
- if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv);
- }
-
- i = expire_cnt - 1;
- while (i >= 0) {
- node = expire_node[i];
-
- pthread_mutex_lock(&node->mutex);
- refcount = node->refcount;
- pthread_mutex_unlock(&node->mutex);
+ ECPConnection *conn;
+ int is_reg, is_open, is_z;
+ ecp_sts_t now;
+
+ conn = announce_node[i]->conn;
+ if (conn == NULL) {
+ rv = dir_open_conn(announce_node[i], sock);
+ if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv);
+ continue;
+ }
- if (refcount == 0) {
- int j = i;
+ is_reg = is_open = is_z = 0;
+ now = ecp_tm_get_s();
+
+ ecp_conn_lock(conn);
+ is_reg = _ecp_conn_is_reg(conn);
+ if (is_reg) is_open = _ecp_conn_is_open(conn);
+ if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT);
+ ecp_conn_unlock(conn);
+
+ if (!is_reg || is_z) {
+ LOG(LOG_DEBUG, "dir_announce: reconnect\n");
+ ecp_conn_close(conn);
+ rv = dir_open_conn(announce_node[i], sock);
+ if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv);
+ } else if (is_open) {
+ ssize_t _rv;
+
+ _rv = dir_send_ann(conn);
+ if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv);
+ }
- expire_cnt--;
- while (j < expire_cnt) {
- expire_node[j] = expire_node[j+1];
- j++;
- }
- expire_node[j] = NULL;
- dir_destroy_node(node);
+ if (delay) {
+ /* randomize over delay / 2 and delay + delay / 2 */
+ rnd = arc4random_uniform(delay + 1);
+ usleep(delay / 2 + rnd);
}
- i--;
}
+
} while (node_next);
+
+ LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count);
}
static void *_dir_announce(void *arg) {
ECPSocket *sock = arg;
+ uint32_t rnd;
+ time_t tv_sec;
+ int delay;
+ int ann_enable;
while (1) {
- LOG(LOG_DEBUG, "dir announce...\n");
- dir_announce(sock);
- sleep(10);
+ tv_sec = time(NULL);
+
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (ann_enable) {
+ dir_announce(sock, ANN_PERIOD);
+ } else {
+ sleep(ANN_BLOCK_TIME + ANN_PERIOD);
+ }
+
+ /* resolution is 1s */
+ delay = ANN_PERIOD - (time(NULL) - tv_sec);
+ if (delay > 0) {
+ rnd = arc4random_uniform(delay + 1);
+ sleep(delay / 2 + rnd);
+ }
}
return NULL;
@@ -516,18 +891,72 @@ int dir_start_announce(ECPSocket *sock) {
return ECP_OK;
}
-int dir_init(ECPContext *ctx) {
- ECPConnHandler *handler;
+void dir_init_switch(ECPSocket *sock, int init_ann) {
+ int i;
+
+ dir_announce_allow();
+ for (i=0; i<init_ann; i++) {
+ dir_announce(sock, 0);
+ sleep(1);
+ }
+ dir_announce_block();
+ LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann);
+ sleep(init_ann);
+ dir_online_switch(sock, 0);
+ dir_announce_allow();
+ LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann);
+ sleep(init_ann);
+}
+
+int dir_init_ann(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int i, is_open;
int rv;
+ ssize_t _rv;
- srv_config = srv_get_config();
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
- handler = malloc(sizeof(ECPConnHandler));
- if (handler == NULL) return ECP_ERR_ALLOC;
+ ecp_conn_init(conn, sock, CTYPE_DIR);
+ ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX);
+ rv = ecp_conn_open(conn, node);
+ if (rv) {
+ LOG(LOG_ERR, "dir_init_ann: conn open err:%d\n", rv);
+ return rv;
+ }
- ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL);
- rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler);
- if (rv) return rv;
+ is_open = 0;
+ for (i=0; i<ECP_SEND_TRIES; i++) {
+ usleep(ECP_SEND_TIMEOUT * 1000);
+ is_open = ecp_conn_is_open(conn);
+ if (is_open) break;
+ }
+
+ if (!is_open) {
+ LOG(LOG_ERR, "dir_init_ann: conn open timeout\n");
+ ecp_conn_close(conn);
+ return ECP_ERR_TIMEOUT;
+ }
+
+ _rv = dir_send_ann(conn);
+ if (_rv < 0) {
+ LOG(LOG_ERR, "dir_init_ann: send ann err:%ld\n", _rv);
+ return _rv;
+ }
+
+ return ECP_OK;
+}
+
+int dir_init(ECPSocket *sock) {
+ ECPContext *ctx = sock->ctx;
+ ECPConnHandler *handler, *c_handler;
+ int is_dir;
+ int rv;
+
+ /* dir_process_enable and dir_online.serial will be set from timer */
+ dir_vkey_req = 1;
+ srv_config = srv_get_config();
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL);
if (rv) {
@@ -540,12 +969,64 @@ int dir_init(ECPContext *ctx) {
return ECP_ERR;
}
+ rv = pthread_rwlock_init(&dir_timer_rwlock, NULL);
+ if (rv) {
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR;
+ }
+
dir_shadow = ecp_ht_create_keys();
if (dir_shadow == NULL) {
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR_ALLOC;
+ }
+
+ handler = malloc(sizeof(ECPConnHandler));
+ if (handler == NULL) {
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
return ECP_ERR_ALLOC;
}
+ ecp_conn_handler_init(handler, dir_handle_msg, NULL, NULL, NULL);
+
+ if (is_dir) {
+ c_handler = malloc(sizeof(ECPConnHandler));
+ dir_online = malloc(sizeof(DIROnline));
+ if ((c_handler == NULL) || (dir_online == NULL)) {
+ free(handler);
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ if (dir_online) free(dir_online);
+ if (c_handler) free(c_handler);
+ return ECP_ERR_ALLOC;
+ }
+ ecp_conn_handler_init(c_handler, dir_handle_client_msg, NULL, NULL, NULL);
+ memset(dir_online, 0, sizeof(DIROnline));
+ }
+
+ rv = timer_init(sock);
+ if (rv) {
+ free(handler);
+ if (is_dir) {
+ free(dir_online);
+ free(c_handler);
+ }
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return rv;
+ }
+
+ ecp_ctx_set_handler(ctx, CTYPE_DIR, handler);
+ if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler);
return ECP_OK;
}