summaryrefslogtreecommitdiff
path: root/ecp/server
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-05-06 02:08:31 +0200
committerUros Majstorovic <majstor@majstor.org>2024-05-06 02:08:31 +0200
commit5f55d9d4d14635678e7f582215e3642de2e232a4 (patch)
tree3322f643e0fbc16984e8eebfca4de7bd4cf63391 /ecp/server
parent1060b5e4712db12b52944bdcf7f2588cea23382b (diff)
new ecp directory and vconn server
Diffstat (limited to 'ecp/server')
-rw-r--r--ecp/server/Makefile7
-rw-r--r--ecp/server/acl.c275
-rw-r--r--ecp/server/acl.h21
-rw-r--r--ecp/server/dir.c1053
-rw-r--r--ecp/server/dir.h89
-rw-r--r--ecp/server/ht.c21
-rw-r--r--ecp/server/ht.h10
-rw-r--r--ecp/server/server.c270
-rw-r--r--ecp/server/server.h15
-rw-r--r--ecp/server/timer.c98
-rw-r--r--ecp/server/timer.h9
-rw-r--r--ecp/server/vlink.c360
-rw-r--r--ecp/server/vlink.h21
13 files changed, 1581 insertions, 668 deletions
diff --git a/ecp/server/Makefile b/ecp/server/Makefile
index 2105e9f..84070bf 100644
--- a/ecp/server/Makefile
+++ b/ecp/server/Makefile
@@ -1,9 +1,10 @@
src_dir = ../src
include $(src_dir)/ecp/common.mk
-CFLAGS += -Wno-int-to-void-pointer-cast
+CFLAGS += -I../util -Wno-int-to-void-pointer-cast
+LDFLAGS += -lrt
-obj = server.o dir.o vlink.o ht.o
-dep = ../build-posix/*.a
+obj = server.o dir.o vlink.o ht.o acl.o timer.o
+dep = ../build-posix/*.a ../util/libecputil.a
%.o: %.c
$(CC) $(CFLAGS) -c $<
diff --git a/ecp/server/acl.c b/ecp/server/acl.c
new file mode 100644
index 0000000..d8cdc6e
--- /dev/null
+++ b/ecp/server/acl.c
@@ -0,0 +1,275 @@
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include <ecp/core.h>
+#include <ecp/cr.h>
+#include <ecp/ht.h>
+#include <ecp/dir/dir.h>
+
+#include <util.h>
+
+#include "server.h"
+#include "acl.h"
+
+static SRVConfig *srv_config;
+
+static ecp_ht_table_t *acl_keys = NULL;
+static ecp_ht_table_t *acl_keys_dir = NULL;
+static pthread_mutex_t acl_li_mutex;
+static pthread_rwlock_t acl_ht_rwlock;
+static ACLItem *acl_head = NULL;
+static ACLItem *acl_head_dir = NULL;
+static int acl_mark = 0;
+
+ACLItem *acl_create_item(void) {
+ ACLItem *ret = NULL;
+
+ ret = malloc(sizeof(ACLItem));
+ if (ret == NULL) return ret;
+ memset(&ret->key, 0, sizeof(ret->key));
+ ret->key_cnt = 0;
+ ret->next = NULL;
+
+ return ret;
+}
+
+void acl_destroy_item(ACLItem *item) {
+ free(item);
+}
+
+void acl_destroy_list(ACLItem *head) {
+ ACLItem *acl_next;
+
+ while (head) {
+ acl_next = head->next;
+ acl_destroy_item(head);
+ head = acl_next;
+ }
+}
+
+static int _add_key(ecp_ecdh_public_t *public, uint8_t capabilities) {
+ int rv;
+
+ if ((acl_keys == NULL) || (acl_keys_dir == NULL)) return ECP_ERR;
+
+ if ((srv_config->capabilities & ECP_DIR_CAP_DIR) || (srv_config->capabilities & capabilities & ECP_DIR_CAP_VCONN)) {
+ /* directory server accepts all connections
+ vconn server accepts connections only from other vconn servers */
+ rv = ecp_ht_insert_uniq(acl_keys, public, &acl_mark);
+ if (rv && (rv != ECP_ERR_DUP)) return rv;
+ }
+ if (srv_config->capabilities & capabilities & ECP_DIR_CAP_DIR) {
+ rv = ecp_ht_insert_uniq(acl_keys_dir, public, &acl_mark);
+ if (rv && (rv != ECP_ERR_DUP)) return rv;
+ }
+
+ return ECP_OK;
+}
+
+static int _read_file(int fd, ACLItem *head) {
+ ecp_ecdh_public_t public;
+ int rv;
+
+ if (head == NULL) return ECP_ERR_ALLOC;
+
+ while (head->next) {
+ head = head->next;
+ }
+
+ while(ecp_util_read_key(fd, &public, NULL) == ECP_OK) {
+ if (head->key_cnt == ACL_MAX_KEY) {
+ head->next = acl_create_item();
+ if (head->next == NULL) return ECP_ERR_ALLOC;
+ head = head->next;
+ }
+ memcpy(&head->key[head->key_cnt], &public, sizeof(head->key[head->key_cnt]));
+ head->key_cnt++;
+ }
+
+ return ECP_OK;
+}
+
+static int _li2ht(ACLItem *head, int is_dir) {
+ int i;
+ int rv;
+
+ while (head) {
+ for (i=0; i<head->key_cnt; i++) {
+ rv = _add_key(&head->key[i], is_dir ? ECP_DIR_CAP_DIR : 0);
+ if (rv) return rv;
+ }
+ head = head->next;
+ }
+
+ return ECP_OK;
+}
+
+int acl_add_key(ECPDirItem *dir_item) {
+ int rv;
+
+ pthread_rwlock_wrlock(&acl_ht_rwlock);
+ rv = _add_key(&dir_item->node.key_perma.public, dir_item->capabilities);
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+
+ return rv;
+}
+
+int acl_inlist(ecp_ecdh_public_t *public) {
+ void *item = NULL;
+
+ pthread_rwlock_rdlock(&acl_ht_rwlock);
+ if (acl_keys) item = ecp_ht_search(acl_keys, public);
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+
+ return (item != NULL);
+}
+
+int acl_dir_inlist(ecp_ecdh_public_t *public) {
+ void *item = NULL;
+
+ pthread_rwlock_rdlock(&acl_ht_rwlock);
+ if (acl_keys_dir) item = ecp_ht_search(acl_keys_dir, public);
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+
+ return (item != NULL);
+}
+
+int acl_reset_ht(void) {
+ int rv = ECP_OK;
+
+ pthread_rwlock_wrlock(&acl_ht_rwlock);
+ if (acl_keys) ecp_ht_destroy(acl_keys);
+ if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir);
+ acl_keys = ecp_ht_create_keys();
+ acl_keys_dir = ecp_ht_create_keys();
+ if ((acl_keys == NULL) || (acl_keys_dir == NULL)) rv = ECP_ERR_ALLOC;
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+
+ return rv;
+}
+
+int acl_load_ht(void) {
+ int rv = ECP_OK;
+ int _rv;
+
+ pthread_mutex_lock(&acl_li_mutex);
+ pthread_rwlock_wrlock(&acl_ht_rwlock);
+
+ _rv = _li2ht(acl_head, 0);
+ if (_rv) rv = ECP_ERR;
+ _rv = _li2ht(acl_head_dir, 1);
+ if (_rv) rv = ECP_ERR;
+
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+ pthread_mutex_unlock(&acl_li_mutex);
+
+ return rv;
+}
+
+int acl_load(void) {
+ ACLItem *acl_new = NULL;
+ ACLItem *acl_dir_new = NULL;
+ int fd = -1;
+ int fd_dir = -1;
+ int rv = ECP_OK;
+ int _rv;
+
+ if ((srv_config->acl_fn == NULL) && (srv_config->acl_fn_dir == NULL)) return ECP_OK;
+
+ if (srv_config->acl_fn) {
+ fd = open(srv_config->acl_fn, O_RDONLY);
+ if (fd < 0) {
+ LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn);
+ return ECP_ERR;
+ }
+ }
+
+ if (srv_config->acl_fn_dir) {
+ fd_dir = open(srv_config->acl_fn_dir, O_RDONLY);
+ if (fd_dir < 0) {
+ LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn_dir);
+ close(fd);
+ return ECP_ERR;
+ }
+ }
+
+ pthread_mutex_lock(&acl_li_mutex);
+ if (fd >= 0) {
+ acl_new = acl_create_item();
+ rv = _read_file(fd, acl_new);
+ if (rv) {
+ LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn, rv);
+ acl_destroy_list(acl_new);
+ goto load_fin;
+ }
+ }
+
+ if (fd_dir >= 0) {
+ acl_dir_new = acl_create_item();
+ rv = _read_file(fd_dir, acl_dir_new);
+ if (rv) {
+ LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn_dir, rv);
+ acl_destroy_list(acl_dir_new);
+ acl_destroy_list(acl_new);
+ goto load_fin;
+ }
+ }
+ if (acl_new) {
+ acl_destroy_list(acl_head);
+ acl_head = acl_new;
+ }
+ if (acl_dir_new) {
+ acl_destroy_list(acl_head_dir);
+ acl_head_dir = acl_dir_new;
+ }
+
+ pthread_rwlock_wrlock(&acl_ht_rwlock);
+ if (acl_new) {
+ _rv = _li2ht(acl_new, 0);
+ if (_rv) rv = ECP_ERR;
+ }
+ if (acl_dir_new) {
+ _rv = _li2ht(acl_dir_new, 1);
+ if (_rv) rv = ECP_ERR;
+ }
+
+ pthread_rwlock_unlock(&acl_ht_rwlock);
+
+load_fin:
+ pthread_mutex_unlock(&acl_li_mutex);
+
+ close(fd_dir);
+ close(fd);
+
+ return rv;
+}
+
+int acl_init(void) {
+ int rv;
+
+ srv_config = srv_get_config();
+
+ rv = pthread_mutex_init(&acl_li_mutex, NULL);
+ if (rv) return ECP_ERR;
+
+ rv = pthread_rwlock_init(&acl_ht_rwlock, NULL);
+ if (rv) {
+ pthread_mutex_destroy(&acl_li_mutex);
+ return ECP_ERR;
+ }
+
+ acl_keys = ecp_ht_create_keys();
+ acl_keys_dir = ecp_ht_create_keys();
+ if ((acl_keys == NULL) || (acl_keys_dir == NULL)) {
+ pthread_rwlock_destroy(&acl_ht_rwlock);
+ pthread_mutex_destroy(&acl_li_mutex);
+ if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir);
+ if (acl_keys) ecp_ht_destroy(acl_keys);
+ return ECP_ERR_ALLOC;
+ }
+
+ return ECP_OK;
+}
diff --git a/ecp/server/acl.h b/ecp/server/acl.h
new file mode 100644
index 0000000..5e3f83c
--- /dev/null
+++ b/ecp/server/acl.h
@@ -0,0 +1,21 @@
+#define ACL_MAX_KEY 50
+
+typedef struct ACLItem {
+ ecp_ecdh_public_t key[ACL_MAX_KEY];
+ unsigned short key_cnt;
+ struct ACLItem *next;
+} ACLItem;
+
+ACLItem *acl_create_item(void);
+void acl_destroy_item(ACLItem *acl_item);
+void acl_destroy_list(ACLItem *head);
+
+int acl_add_key(ECPDirItem *dir_item);
+int acl_inlist(ecp_ecdh_public_t *public);
+int acl_dir_inlist(ecp_ecdh_public_t *public);
+
+int acl_reset_ht(void);
+int acl_load_ht(void);
+
+int acl_load(void);
+int acl_init(void);
diff --git a/ecp/server/dir.c b/ecp/server/dir.c
index 0066bac..7244fd9 100644
--- a/ecp/server/dir.c
+++ b/ecp/server/dir.c
@@ -1,31 +1,122 @@
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
+#include <time.h>
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
+#include <ecp/cr.h>
#include <ecp/ht.h>
#include <ecp/tm.h>
-#include "server.h"
-#include "vlink.h"
#include "dir.h"
+#include "vlink.h"
#include "ht.h"
+#include "acl.h"
+
+#include "timer.h"
+#include "server.h"
static ecp_ht_table_t *dir_shadow = NULL;
static pthread_rwlock_t dir_shadow_rwlock;
static pthread_rwlock_t dir_online_rwlock;
+static pthread_rwlock_t dir_timer_rwlock;
static pthread_t dir_announce_thd;
-static DirList dir_online;
+static DIROnline *dir_online = NULL;
+static unsigned int dir_vkey_req;
+static int dir_process_enable;
static SRVConfig *srv_config;
+#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
+
+/* messages from clients */
+ssize_t dir_send_online(ECPConnection *conn, uint8_t region) {
+ DIRList *list;
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+ size_t msg_size;
+ ssize_t rv;
+ int i;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ if (dir_online == NULL) return ECP_ERR;
+
+ rv = 0;
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+ list = &dir_online->list[region];
+
+ if (list->msg_count == 0) goto send_online_fin;
+
+ for (i=0; i<list->msg_count; i++) {
+ ssize_t rv_snd;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg_size = 0;
+
+ msg[0] = i; // frag_cnt
+ msg[1] = list->msg_count; // frag_tot
+ msg[2] = list->msg[i].count; // item_cnt
+ msg[3] = region; // region
+ msg[4] = dir_online->serial >> 8; // serial
+ msg[5] = dir_online->serial;
+
+ msg += 4 + sizeof(uint16_t);
+ msg_size += 4 + sizeof(uint16_t);
+
+ memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM);
+ msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM;
+
+ rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+ if (rv_snd < 0) {
+ rv = rv_snd;
+ break;
+ }
+
+ rv += rv_snd;
+ }
+
+send_online_fin:
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ return rv;
+}
+
+ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
+ switch (mtype) {
+ case ECP_MTYPE_DIR_REQ: {
+ uint8_t region;
+ ssize_t rv;
+
+ if (msg_size < 1) return ECP_ERR_SIZE;
+
+ region = *msg;
+ if (region >= MAX_REGION) return ECP_ERR;
+
+ rv = dir_send_online(conn, region);
+ if (rv < 0) return rv;
+
+ return 1;
+ }
+
+ default:
+ return ECP_ERR_MTYPE;
+ }
+}
+
+/* messages from other servers */
ssize_t dir_send_ann(ECPConnection *conn) {
ECPBuffer packet;
ECPBuffer payload;
- unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
- unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char pkt_buf[ECP_SIZE_PKT_BUF(2, MTYPE_DIR_ANN, conn)];
+ unsigned char pld_buf[ECP_SIZE_PLD_BUF(2, MTYPE_DIR_ANN, conn)];
unsigned char *msg;
packet.buffer = pkt_buf;
@@ -33,12 +124,12 @@ ssize_t dir_send_ann(ECPConnection *conn) {
payload.buffer = pld_buf;
payload.size = sizeof(pld_buf);
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN);
msg = ecp_pld_get_msg(payload.buffer, payload.size);
- msg[0] = srv_config->capabilities >> 8;
+ msg[0] = srv_config->region;
msg[1] = srv_config->capabilities;
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0);
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0);
}
ssize_t dir_send_shadow(ECPConnection *conn) {
@@ -49,111 +140,170 @@ ssize_t dir_send_shadow(ECPConnection *conn) {
unsigned char *msg;
struct hashtable_itr itr;
- DirNode *node;
- uint16_t count;
- ecp_sts_t access_ts;
+ DIRNode *node;
+ ecp_ecdh_public_t *node_next;
+ uint8_t count;
size_t msg_size;
+ ssize_t rv;
packet.buffer = pkt_buf;
packet.size = ECP_MAX_PKT;
payload.buffer = pld_buf;
payload.size = ECP_MAX_PLD;
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW);
- msg = ecp_pld_get_msg(payload.buffer, payload.size);
- msg_size = 0;
+ rv = 0;
+ node_next = NULL;
+ do {
+ ssize_t rv_snd;
- pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
- count = ecp_ht_count(dir_shadow);
- msg[0] = count >> 8;
- msg[1] = count;
- msg += sizeof(uint16_t);
- msg_size += sizeof(uint16_t);
+ memset(msg, 0, 4 + sizeof(uint16_t)); // frag_cnt, frag_tot, item_cnt, region, serial
- if (count > 0) {
- size_t rv;
- int _rv;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
- ecp_ht_itr_create(&itr, dir_shadow);
- do {
- node = ecp_ht_itr_value(&itr);
+ if (ecp_ht_count(dir_shadow) > 0) {
+ unsigned char *_msg;
+ size_t _msg_size;
+ int _rv;
- pthread_mutex_lock(&node->mutex);
- access_ts = node->access_ts;
- pthread_mutex_unlock(&node->mutex);
+ _msg = msg + 4 + sizeof(uint16_t);
+ _msg_size = 0;
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ _rv = ecp_ht_itr_search(&itr, node_next);
+ if (_rv) {
+ LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv);
+ break;
+ }
+ node_next = NULL;
+ }
- rv = ecp_dir_item_serialize(&node->dir_item, msg);
- msg += rv;
- msg_size += rv;
+ count = 0;
+ do {
+ node = ecp_ht_itr_value(&itr);
- msg[0] = access_ts >> 24;
- msg[1] = access_ts >> 16;
- msg[2] = access_ts >> 8;
- msg[3] = access_ts;
- msg += sizeof(uint32_t);
- msg_size += sizeof(uint32_t);
+ pthread_mutex_lock(&node->mutex);
+ if (node->verified || node->ann_local) {
+ if (count < MAX_DIR_ITEM_IN_MSG) {
+ size_t rv_ser;
+
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, _msg);
+ _msg += rv_ser;
+ _msg_size += rv_ser;
+ count++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&node->mutex);
- _rv = ecp_ht_itr_advance(&itr);
- } while (_rv == ECP_OK);
- }
+ _rv = ecp_ht_itr_advance(&itr);
+ } while (_rv == ECP_OK);
+ msg[2] = count;
+ msg_size = 4 + sizeof(uint16_t) + _msg_size;
+ }
- pthread_rwlock_unlock(&dir_shadow_rwlock);
+ /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0);
+ if (rv_snd < 0) {
+ rv = rv_snd;
+ break;
+ }
+
+ rv += rv_snd;
+ } while (node_next);
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0);
+ return rv;
}
-ssize_t dir_send_online(ECPConnection *conn) {
+ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) {
ECPBuffer packet;
ECPBuffer payload;
unsigned char pkt_buf[ECP_MAX_PKT];
unsigned char pld_buf[ECP_MAX_PLD];
unsigned char *msg;
- size_t msg_size;
+ unsigned short vkey_max;
+ size_t msg_size, msg_size_max, hdr_size;
+ DIRNode *node;
+ int i;
packet.buffer = pkt_buf;
packet.size = ECP_MAX_PKT;
payload.buffer = pld_buf;
payload.size = ECP_MAX_PLD;
- ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP);
msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ hdr_size = msg - payload.buffer;
+ vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t);
+ msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t);
- pthread_rwlock_rdlock(&dir_online_rwlock);
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
- msg[0] = dir_online.count >> 8;
- msg[1] = dir_online.count;
- msg += sizeof(uint16_t);
- msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM;
- memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM);
+ node = ecp_ht_search(dir_shadow, public);
+ if (node) pthread_mutex_lock(&node->mutex);
- pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (node == NULL) return ECP_ERR;
- return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+ msg_size = 0;
+
+ *msg = node->vkey_cnt;
+ msg++;
+ msg_size++;
+
+ for (i=0; i<node->vkey_cnt; i++) {
+ memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t));
+ msg += sizeof(ecp_ecdh_public_t);
+ msg_size += sizeof(ecp_ecdh_public_t);
+ if (msg_size == msg_size_max) break;
+ }
+ pthread_mutex_unlock(&node->mutex);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0);
}
ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
ECPDirItem dir_item;
- int is_new;
- uint16_t capabilities;
- ecp_sts_t now;
- ssize_t rsize;
+ uint8_t region;
+ uint8_t capabilities;
+ size_t rsize;
ssize_t rv;
+ int ann_enable;
- rsize = sizeof(uint16_t);
+ rsize = 2;
if (msg_size < rsize) return ECP_ERR_SIZE;
- capabilities = \
- ((uint16_t)msg[0] << 8) | \
- ((uint16_t)msg[1]);
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (!ann_enable) return rsize;
+
+ region = msg[0];
+ capabilities = msg[1];
+ if ((capabilities & ECP_DIR_CAP_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) {
+ LOG(LOG_ERR, "dir_handle_ann: not a directory server\n");
+ return ECP_ERR;
+ }
memset(&dir_item, 0, sizeof(ECPDirItem));
dir_item.node = conn->remote;
+ dir_item.region = region;
dir_item.capabilities = capabilities;
- now = ecp_tm_get_s();
- is_new = dir_process_item(&dir_item, now);
- if (is_new) vlink_new_node(conn->sock, &dir_item);
+ if (dir_item.region >= MAX_REGION) {
+ LOG(LOG_ERR, "dir_handle_ann: bad region\n");
+ return ECP_ERR;
+ }
+
+ dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public);
rv = dir_send_shadow(conn);
if (rv < 0) return rv;
@@ -161,348 +311,573 @@ ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size)
return rsize;
}
-ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
- ssize_t rv;
-
- rv = dir_send_online(conn);
- if (rv < 0) return rv;
-
- return 0;
-}
-
ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
- ecp_sts_t now;
- uint16_t count;
+ uint8_t count;
size_t rsize;
- int i;
+ int i, ann_enable;
- if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE;
+ if (msg_size < 4 + sizeof(uint16_t)) return ECP_ERR_SIZE;
- count = \
- ((uint16_t)msg[0] << 8) | \
- ((uint16_t)msg[1]);
+ count = msg[2];
+ msg += 4 + sizeof(uint16_t); // frag_cnt, frag_tot, item_cnt, region, serial
- rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t));
+ rsize = 4 + sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM;
if (msg_size < rsize) return ECP_ERR_SIZE;
- msg += sizeof(uint16_t);
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (!ann_enable) return rsize;
- now = ecp_tm_get_s();
for (i=0; i<count; i++) {
ECPDirItem dir_item;
- ecp_sts_t access_ts;
size_t rv;
rv = ecp_dir_item_parse(&dir_item, msg);
msg += rv;
- access_ts = \
- ((uint32_t)msg[0] << 24) | \
- ((uint32_t)msg[1] << 16) | \
- ((uint32_t)msg[2] << 8) | \
- ((uint32_t)msg[3]);
- msg += sizeof(uint32_t);
-
- if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) {
- int is_new;
-
- is_new = dir_process_item(&dir_item, access_ts);
- if (is_new) vlink_new_node(conn->sock, &dir_item);
+ if (dir_item.region >= MAX_REGION) {
+ LOG(LOG_ERR, "dir_handle_shadow: bad region\n");
+ return ECP_ERR;
}
+
+ dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public);
}
return rsize;
}
+ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ size_t rsize;
+ ssize_t rv;
+
+ rsize = ECP_SIZE_ECDH_PUB;
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg);
+ if (rv < 0) return rv;
+
+ return rsize;
+}
+
ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
switch (mtype) {
- case ECP_MTYPE_DIR_REQ: {
- if (ecp_conn_is_outb(conn)) return ECP_ERR;
- return dir_handle_req(conn, msg, msg_size);
- }
+ case MTYPE_DIR_ANN: {
+ int is_dir;
+
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
+ if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR;
- case ECP_MTYPE_DIR_ANN: {
- if (ecp_conn_is_outb(conn)) return ECP_ERR;
- if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX;
return dir_handle_ann(conn, msg, msg_size);
}
- case ECP_MTYPE_DIR_SHADOW: {
+ case MTYPE_DIR_SHADOW: {
if (ecp_conn_is_inb(conn)) return ECP_ERR;
+
return dir_handle_shadow(conn, msg, msg_size);
}
+ case MTYPE_DIR_ORIGIN_REQ: {
+ int is_dir;
+
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
+ if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR;
+
+ return dir_handle_origin_req(conn, msg, msg_size);
+ }
+
default:
return ECP_ERR_MTYPE;
}
}
-int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
- ssize_t rv;
-
- if (ecp_conn_is_inb(conn)) return ECP_OK;
-
- rv = dir_send_ann(conn);
- if (rv < 0) return rv;
-
- return ECP_OK;
-}
+void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) {
+ DIRNode *node;
+ unsigned int vkey_req_dir;
+ int is_verified = 0;
+ int rv = ECP_OK;
-void dir_online_switch(void) {
- struct hashtable_itr itr;
- DirNode *node;
- unsigned char *msg;
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+ vkey_req_dir = dir_vkey_req;
+ pthread_rwlock_unlock(&dir_online_rwlock);
pthread_rwlock_rdlock(&dir_shadow_rwlock);
- pthread_rwlock_wrlock(&dir_online_rwlock);
-
- msg = dir_online.msg;
- dir_online.count = ecp_ht_count(dir_shadow);
-
- if (dir_online.count > 0) {
- size_t _rv;
- int rv;
- ecp_ht_itr_create(&itr, dir_shadow);
- do {
- node = ecp_ht_itr_value(&itr);
+ node = ht_search_item(dir_shadow, dir_item);
+ if (node == NULL) {
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
- _rv = ecp_dir_item_serialize(&node->dir_item, msg);
- msg += _rv;
+ rv = dir_create_node(dir_item, sock, &node);
+ if (!rv) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
+ if (!rv) rv = ht_insert_node(dir_shadow, node);
+ if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+ if (rv) {
+ if (node) {
+ dir_destroy_node(node);
+ node = NULL;
+ }
+ LOG(LOG_ERR, "dir_process_item: err:%d\n", rv);
+ return;
+ }
- rv = ecp_ht_itr_advance(&itr);
- } while (rv == ECP_OK);
+ LOG(LOG_DEBUG, "dir_process_item: new node\n");
}
- pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_mutex_lock(&node->mutex);
pthread_rwlock_unlock(&dir_shadow_rwlock);
-}
-int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) {
- DirNode *node;
- int is_new = 0;
- int rv = ECP_OK;
+ if (node->zombie) goto process_item_fin;
- pthread_rwlock_rdlock(&dir_shadow_rwlock);
-
- node = ht_search_item(dir_shadow, dir_item);
- if (node) {
- pthread_mutex_lock(&node->mutex);
- if (!node->zombie) node->access_ts = access_ts;
- pthread_mutex_unlock(&node->mutex);
+ if (!node->verified) {
+ int key_ex = 0;
+ int i;
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- } else {
- pthread_rwlock_unlock(&dir_shadow_rwlock);
+ if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) {
+ node->ann_local = 1;
+ }
- LOG(LOG_DEBUG, "dir new node\n");
+ for (i=0; i<node->vkey_cnt; i++) {
+ if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) {
+ key_ex = 1;
+ break;
+ }
+ }
+ if (!key_ex) {
+ unsigned int vkey_req;
- node = dir_create_node(dir_item, access_ts);
- if (node == NULL) rv = ECP_ERR_ALLOC;
+ if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) {
+ vkey_req = vkey_req_dir;
+ } else {
+ vkey_req = MIN_VKEY_REQ;
+ }
- if (!rv) {
- pthread_rwlock_wrlock(&dir_shadow_rwlock);
- if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
- if (!rv) {
- if (ht_search_node(dir_shadow, node) == NULL) {
- rv = ht_insert_node(dir_shadow, node);
- } else {
- rv = ECP_ERR_DUP;
+ if (node->vkey_cnt < vkey_req) {
+ /* vkey list is used to track origin of particular server */
+ memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt]));
+ node->vkey_cnt++;
+ if (node->vkey_cnt == vkey_req) {
+ node->verified = 1;
+ node->is_new = 0;
+ is_verified = 1;
}
}
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- }
- if (!rv) {
- is_new = 1;
- /* always switch */
- dir_online_switch();
- } else {
- if (node) dir_destroy_node(node);
- LOG(LOG_ERR, "dir create node err:%d\n", rv);
+ /* protocol does not support server properties change */
+ if (!ecp_dir_item_eq(dir_item, &node->dir_item)) {
+ char buffer[ECP_SIZE_ECDH_KEY_BUF];
+
+ ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public);
+ LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer);
+ }
}
}
- return is_new;
+process_item_fin:
+ pthread_mutex_unlock(&node->mutex);
+
+ if (is_verified) {
+ LOG(LOG_DEBUG, "dir_process_item: node verified\n");
+
+ rv = acl_add_key(&node->dir_item);
+ if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv);
+
+ vlink_new_node(sock, &node->dir_item);
+ }
}
-DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) {
- DirNode *node;
+int dir_open_conn(DIRNode *node, ECPSocket *sock) {
int rv;
- node = malloc(sizeof(DirNode));
- if (node == NULL) return NULL;
+ node->conn = malloc(sizeof(ECPConnection));
+ if (node->conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_conn_init(node->conn, sock, CTYPE_DIR);
+ ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX);
+
+ rv = ecp_conn_open(node->conn, &node->dir_item.node);
+ if (rv) node->conn = NULL;
- memset(node, 0, sizeof(DirNode));
+ return rv;
+}
- rv = pthread_mutex_init(&node->mutex, NULL);
+int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) {
+ DIRNode *_node;
+ int rv;
+
+ *node = NULL;
+ _node = malloc(sizeof(DIRNode));
+ if (_node == NULL) return ECP_ERR_ALLOC;
+
+ memset(_node, 0, sizeof(DIRNode));
+ rv = pthread_mutex_init(&_node->mutex, NULL);
if (rv) {
- free(node);
- return NULL;
+ free(_node);
+ return ECP_ERR;
}
- node->dir_item = *dir_item;
- node->access_ts = access_ts;
+ _node->dir_item = *dir_item;
+ _node->is_new = 1;
+
+ /* open connection to other directory servers */
+ if ((dir_item->capabilities & ECP_DIR_CAP_DIR) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ rv = dir_open_conn(_node, sock);
+ if (rv) {
+ pthread_mutex_destroy(&_node->mutex);
+ free(_node);
+ return rv;
+ }
+ }
- return node;
+ *node = _node;
+ return ECP_OK;
}
-void dir_destroy_node(DirNode *node) {
+void dir_destroy_node(DIRNode *node) {
+ if (node->conn) ecp_conn_close(node->conn);
pthread_mutex_destroy(&node->mutex);
free(node);
}
-int dir_open_conn(ECPSocket *sock, ECPNode *node) {
- ECPConnection *conn;
- int rv;
+static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) {
+ if (conn->type == CTYPE_DIR) return 1;
+ return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO);
+}
- conn = malloc(sizeof(ECPConnection));
- if (conn == NULL) return ECP_ERR_ALLOC;
+static void remove_nodes(DIRNode *remove_node[], int remove_cnt) {
+ int i;
- ecp_dir_conn_init(conn, sock);
- ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX);
- rv = ecp_conn_try_open(conn, node);
- return rv;
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ for (i=0; i<remove_cnt; i++) {
+ ht_remove_node(dir_shadow, remove_node[i]);
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ for (i=0; i<remove_cnt; i++) {
+ vlink_del_node(&remove_node[i]->dir_item);
+ dir_destroy_node(remove_node[i]);
+ }
}
-DirNode *dir_search_conn(ECPConnection *conn) {
- DirNode *node;
- DirNode *ret = NULL;
+void dir_online_switch(ECPSocket *sock, int inc_serial) {
+ struct hashtable_itr itr;
+ DIRNode *node;
+ uint8_t count[MAX_REGION];
+ uint8_t msg_count[MAX_REGION];
+ unsigned char *msg[MAX_REGION];
+ ecp_ecdh_public_t *node_next;
+ DIRNode *remove_node[MAX_NODE_REMOVE];
+ unsigned int dir_cnt, node_cnt;
+ int i, remove_cnt;
+ int rv;
pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ pthread_rwlock_wrlock(&dir_online_rwlock);
- node = ht_search_conn(dir_shadow, conn);
- if (node) {
- pthread_mutex_lock(&node->mutex);
- if (!node->zombie) {
- node->refcount++;
- ret = node;
- } else {
- ret = NULL;
+ if (dir_online) {
+ for (i=0; i<MAX_REGION; i++) {
+ dir_online->list[i].msg_count = 0;
+ count[i] = 0;
+ msg_count[i] = 0;
+ msg[i] = dir_online->list[i].msg[0].buffer;
+ }
+ }
+
+ dir_cnt = 0;
+ node_cnt = 0;
+ remove_cnt = 0;
+
+ if (ecp_ht_count(dir_shadow) > 0) {
+ uint8_t region;
+ int verified;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+
+ node_next = NULL;
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ region = node->dir_item.region;
+ verified = node->verified;
+
+ if (verified) {
+ if (dir_online) {
+ size_t rv_ser;
+
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[0]);
+ msg[0] += rv_ser;
+ count[0]++;
+ if (region) {
+ rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[region]);
+ msg[region] += rv_ser;
+ count[region]++;
+ }
+ }
+ if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) {
+ dir_cnt++;
+ }
+ node_cnt++;
+ } else {
+ node->zombie = 1;
+ if (node_next == NULL) {
+ if (remove_cnt < MAX_NODE_REMOVE) {
+ remove_node[remove_cnt] = node;
+ remove_cnt++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ }
+ node->verified = 0;
+ node->ann_local = 0;
+ memset(&node->vkey, 0, sizeof(node->vkey));
+ node->vkey_cnt = 0;
+ pthread_mutex_unlock(&node->mutex);
+
+ /* let's reconnect all directory servers
+ node->conn is currently immutable since announce is disabled well before online switch */
+ if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT);
+
+ if (dir_online && verified) {
+ if (count[0] == MAX_DIR_ITEM_IN_MSG) {
+ dir_online->list[0].msg[msg_count[0]].count = count[0];
+ msg_count[0]++;
+ msg[0] = dir_online->list[0].msg[msg_count[0]].buffer;
+ count[0] = 0;
+ }
+ if (region && count[region] == MAX_DIR_ITEM_IN_MSG) {
+ dir_online->list[region].msg[msg_count[region]].count = count[region];
+ msg_count[region]++;
+ msg[region] = dir_online->list[region].msg[msg_count[region]].buffer;
+ count[region] = 0;
+ }
+ }
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+
+ if (dir_online) {
+ for (i=0; i<MAX_REGION; i++) {
+ if (count[i]) {
+ dir_online->list[i].msg[msg_count[i]].count = count[i];
+ dir_online->list[i].msg_count = msg_count[i] + 1;
+ } else {
+ dir_online->list[i].msg_count = msg_count[i];
+ }
+ }
}
- pthread_mutex_unlock(&node->mutex);
}
+ dir_vkey_req = dir_cnt / 2 + 1;
+ if (dir_vkey_req > MAX_VKEY) {
+ LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY);
+ dir_vkey_req = MAX_VKEY;
+ }
+
+ if (dir_online && inc_serial) dir_online->serial++;
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
pthread_rwlock_unlock(&dir_shadow_rwlock);
- return ret;
+ rv = acl_reset_ht();
+ if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv);
+
+ ecp_sock_expire(sock, online_switch_expired);
+
+ if (remove_cnt) {
+ remove_nodes(remove_node, remove_cnt);
+ while (node_next) {
+ remove_cnt = 0;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ if (node->zombie) {
+ if (remove_cnt < MAX_NODE_REMOVE) {
+ remove_node[remove_cnt] = node;
+ remove_cnt++;
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
+ }
+ }
+ pthread_mutex_unlock(&node->mutex);
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+
+ /* no need to copy node_next key, since this is the only thread that can destroy node */
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (remove_cnt) remove_nodes(remove_node, remove_cnt);
+ }
+ }
+
+ if (srv_config->capabilities & ECP_DIR_CAP_DIR) acl_load_ht();
+
+ LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0);
+}
+
+void dir_announce_allow(void) {
+ pthread_rwlock_wrlock(&dir_timer_rwlock);
+ dir_process_enable = PROC_ALLOW_ALL;
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+}
+
+void dir_announce_block(void) {
+ pthread_rwlock_wrlock(&dir_timer_rwlock);
+ dir_process_enable = PROC_BLOCK_ANN;
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ LOG(LOG_DEBUG, "dir_announce_block\n");
}
-void dir_announce(ECPSocket *sock) {
- static DirNode *expire_node[MAX_EXPIRE_CNT];
- static int expire_node_z[MAX_EXPIRE_CNT];
- static int expire_cnt;
+void dit_init_serial(uint16_t serial) {
+ if (dir_online) {
+ pthread_rwlock_wrlock(&dir_online_rwlock);
+ dir_online->serial = serial;
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ }
+}
+void dir_announce(ECPSocket *sock, int ann_period) {
struct hashtable_itr itr;
- ecp_sts_t now;
- DirNode *node;
- DirNode *node_next;
- DirNode *announce_node[MAX_ANNOUNCE_CNT];
+ DIRNode *node;
+ ecp_ecdh_public_t *node_next;
+ DIRNode *announce_node[MAX_NODE_ANNOUNCE];
+ unsigned int ht_count;
int announce_cnt;
- int refcount;
int i;
int rv;
- now = ecp_tm_get_s();
node_next = NULL;
do {
+ uint32_t delay;
+ uint32_t rnd;
+
announce_cnt = 0;
pthread_rwlock_rdlock(&dir_shadow_rwlock);
- if (ecp_ht_count(dir_shadow) > 0) {
+ ht_count = ecp_ht_count(dir_shadow);
+ if (ht_count > 0) {
ecp_ht_itr_create(&itr, dir_shadow);
if (node_next) {
rv = ecp_ht_itr_search(&itr, node_next);
if (rv) {
- LOG(LOG_ERR, "dir ann itr search err:%d\n", rv);
+ LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv);
break;
}
node_next = NULL;
}
do {
node = ecp_ht_itr_value(&itr);
- rv = ecp_ht_itr_advance(&itr);
- pthread_mutex_lock(&node->mutex);
- if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) {
- node->zombie = 1;
- pthread_mutex_unlock(&node->mutex);
-
- if (expire_cnt < MAX_EXPIRE_CNT) {
- expire_node[expire_cnt] = node;
- expire_node_z[expire_cnt] = 1;
- expire_cnt++;
- }
- } else {
- pthread_mutex_unlock(&node->mutex);
-
- if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(&node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ if (announce_cnt < MAX_NODE_ANNOUNCE) {
announce_node[announce_cnt] = node;
announce_cnt++;
-
- if (announce_cnt == MAX_ANNOUNCE_CNT) {
- if (!rv) {
- node_next = ecp_ht_itr_key(&itr);
- } else {
- node_next = NULL;
- }
- break;
- }
+ } else {
+ node_next = ecp_ht_itr_key(&itr);
+ break;
}
}
+
+ rv = ecp_ht_itr_advance(&itr);
} while (rv == ECP_OK);
}
+ /* no need to copy node_next key, since announce is disabled during online switch node removal */
pthread_rwlock_unlock(&dir_shadow_rwlock);
- if (expire_cnt) {
- pthread_rwlock_wrlock(&dir_shadow_rwlock);
- for (i=0; i<expire_cnt; i++) {
- if (expire_node_z[i]) {
- expire_node_z[i] = 0;
- ht_remove_node(dir_shadow, expire_node[i]);
- LOG(LOG_DEBUG, "dir remove node\n");
- }
- }
- pthread_rwlock_unlock(&dir_shadow_rwlock);
- }
-
+ /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, announce period of 600s) is ~70ms */
+ delay = ann_period * 1000000 / MAX(ht_count, 100);
for (i=0; i<announce_cnt; i++) {
- rv = dir_open_conn(sock, &announce_node[i]->dir_item.node);
- if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv);
- }
-
- i = expire_cnt - 1;
- while (i >= 0) {
- node = expire_node[i];
-
- pthread_mutex_lock(&node->mutex);
- refcount = node->refcount;
- pthread_mutex_unlock(&node->mutex);
+ ECPConnection *conn;
+ int is_reg, is_open, is_z;
+ ecp_sts_t now;
+
+ conn = announce_node[i]->conn;
+ if (conn == NULL) {
+ rv = dir_open_conn(announce_node[i], sock);
+ if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv);
+ continue;
+ }
- if (refcount == 0) {
- int j = i;
+ is_reg = is_open = is_z = 0;
+ now = ecp_tm_get_s();
+
+ ecp_conn_lock(conn);
+ is_reg = _ecp_conn_is_reg(conn);
+ if (is_reg) is_open = _ecp_conn_is_open(conn);
+ if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT);
+ ecp_conn_unlock(conn);
+
+ if (!is_reg || is_z) {
+ LOG(LOG_DEBUG, "dir_announce: reconnect\n");
+ ecp_conn_close(conn);
+ rv = dir_open_conn(announce_node[i], sock);
+ if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv);
+ } else if (is_open) {
+ ssize_t _rv;
+
+ _rv = dir_send_ann(conn);
+ if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv);
+ }
- expire_cnt--;
- while (j < expire_cnt) {
- expire_node[j] = expire_node[j+1];
- j++;
- }
- expire_node[j] = NULL;
- dir_destroy_node(node);
+ if (delay) {
+ /* randomize over delay / 2 and delay + delay / 2 */
+ rnd = arc4random_uniform(delay + 1);
+ usleep(delay / 2 + rnd);
}
- i--;
}
+
} while (node_next);
+
+ LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count);
}
static void *_dir_announce(void *arg) {
ECPSocket *sock = arg;
+ uint32_t rnd;
+ time_t tv_sec;
+ int delay;
+ int ann_enable;
while (1) {
- LOG(LOG_DEBUG, "dir announce...\n");
- dir_announce(sock);
- sleep(10);
+ tv_sec = time(NULL);
+
+ pthread_rwlock_rdlock(&dir_timer_rwlock);
+ ann_enable = (dir_process_enable > PROC_BLOCK_ANN);
+ pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ if (ann_enable) {
+ dir_announce(sock, ANN_PERIOD);
+ } else {
+ sleep(ANN_BLOCK_TIME + ANN_PERIOD);
+ }
+
+ /* resolution is 1s */
+ delay = ANN_PERIOD - (time(NULL) - tv_sec);
+ if (delay > 0) {
+ rnd = arc4random_uniform(delay + 1);
+ sleep(delay / 2 + rnd);
+ }
}
return NULL;
@@ -516,18 +891,72 @@ int dir_start_announce(ECPSocket *sock) {
return ECP_OK;
}
-int dir_init(ECPContext *ctx) {
- ECPConnHandler *handler;
+void dir_init_switch(ECPSocket *sock, int init_ann) {
+ int i;
+
+ dir_announce_allow();
+ for (i=0; i<init_ann; i++) {
+ dir_announce(sock, 0);
+ sleep(1);
+ }
+ dir_announce_block();
+ LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann);
+ sleep(init_ann);
+ dir_online_switch(sock, 0);
+ dir_announce_allow();
+ LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann);
+ sleep(init_ann);
+}
+
+int dir_init_ann(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int i, is_open;
int rv;
+ ssize_t _rv;
- srv_config = srv_get_config();
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
- handler = malloc(sizeof(ECPConnHandler));
- if (handler == NULL) return ECP_ERR_ALLOC;
+ ecp_conn_init(conn, sock, CTYPE_DIR);
+ ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX);
+ rv = ecp_conn_open(conn, node);
+ if (rv) {
+ LOG(LOG_ERR, "dir_init_ann: conn open err:%d\n", rv);
+ return rv;
+ }
- ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL);
- rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler);
- if (rv) return rv;
+ is_open = 0;
+ for (i=0; i<ECP_SEND_TRIES; i++) {
+ usleep(ECP_SEND_TIMEOUT * 1000);
+ is_open = ecp_conn_is_open(conn);
+ if (is_open) break;
+ }
+
+ if (!is_open) {
+ LOG(LOG_ERR, "dir_init_ann: conn open timeout\n");
+ ecp_conn_close(conn);
+ return ECP_ERR_TIMEOUT;
+ }
+
+ _rv = dir_send_ann(conn);
+ if (_rv < 0) {
+ LOG(LOG_ERR, "dir_init_ann: send ann err:%ld\n", _rv);
+ return _rv;
+ }
+
+ return ECP_OK;
+}
+
+int dir_init(ECPSocket *sock) {
+ ECPContext *ctx = sock->ctx;
+ ECPConnHandler *handler, *c_handler;
+ int is_dir;
+ int rv;
+
+ /* dir_process_enable and dir_online.serial will be set from timer */
+ dir_vkey_req = 1;
+ srv_config = srv_get_config();
+ is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;
rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL);
if (rv) {
@@ -540,12 +969,64 @@ int dir_init(ECPContext *ctx) {
return ECP_ERR;
}
+ rv = pthread_rwlock_init(&dir_timer_rwlock, NULL);
+ if (rv) {
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR;
+ }
+
dir_shadow = ecp_ht_create_keys();
if (dir_shadow == NULL) {
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR_ALLOC;
+ }
+
+ handler = malloc(sizeof(ECPConnHandler));
+ if (handler == NULL) {
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
return ECP_ERR_ALLOC;
}
+ ecp_conn_handler_init(handler, dir_handle_msg, NULL, NULL, NULL);
+
+ if (is_dir) {
+ c_handler = malloc(sizeof(ECPConnHandler));
+ dir_online = malloc(sizeof(DIROnline));
+ if ((c_handler == NULL) || (dir_online == NULL)) {
+ free(handler);
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ if (dir_online) free(dir_online);
+ if (c_handler) free(c_handler);
+ return ECP_ERR_ALLOC;
+ }
+ ecp_conn_handler_init(c_handler, dir_handle_client_msg, NULL, NULL, NULL);
+ memset(dir_online, 0, sizeof(DIROnline));
+ }
+
+ rv = timer_init(sock);
+ if (rv) {
+ free(handler);
+ if (is_dir) {
+ free(dir_online);
+ free(c_handler);
+ }
+ ecp_ht_destroy(dir_shadow);
+ pthread_rwlock_destroy(&dir_timer_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return rv;
+ }
+
+ ecp_ctx_set_handler(ctx, CTYPE_DIR, handler);
+ if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler);
return ECP_OK;
}
diff --git a/ecp/server/dir.h b/ecp/server/dir.h
index cbc0c87..24af3bc 100644
--- a/ecp/server/dir.h
+++ b/ecp/server/dir.h
@@ -1,35 +1,80 @@
-#define MAX_DIR_ITEM 30
-#define MAX_EXPIRE_CNT 100
-#define MAX_ANNOUNCE_CNT 100
+#include <ecp/dir/dir.h>
-#define NODE_EXP_TIME 86400
+#define MAX_DIR_ITEM_IN_MSG ((ECP_MAX_PLD - (4 + sizeof(uint16_t))) / ECP_SIZE_DIR_ITEM)
-typedef struct DirNode {
+#define MAX_DIR_MSG 10
+#define MAX_DIR_ITEM (MAX_DIR_MSG * MAX_DIR_ITEM_IN_MSG)
+#define MAX_DIR_ITEM_DIR 100
+#define MAX_REGION 10
+
+#define MAX_VKEY ((MAX_DIR_ITEM_DIR / 2) + 1)
+#define MIN_VKEY_REQ 2 /* minimum number of vkeys required for non directory server */
+
+#define MAX_NODE_ANNOUNCE 100
+#define MAX_NODE_REMOVE 100
+
+#define CTYPE_DIR 0x00
+
+#define MTYPE_DIR_ANN 0x00
+#define MTYPE_DIR_SHADOW 0x01
+#define MTYPE_DIR_ORIGIN_REQ 0x02
+#define MTYPE_DIR_ORIGIN_REP 0x03
+
+#define PROC_BLOCK_ALL 0
+#define PROC_BLOCK_ANN 1
+#define PROC_ALLOW_ALL 2
+
+#define ANN_PERIOD 600 /* announce priod (s); can't exceed 1h */
+#define CONN_EXPIRE_TO 60
+
+#define DIR_UFLAG_RECONNECT 0x80
+
+typedef struct DIRNode {
ECPDirItem dir_item;
- int refcount;
int zombie;
- ecp_sts_t access_ts;
+ int verified;
+ int ann_local;
+ int is_new;
+ ecp_ecdh_public_t vkey[MAX_VKEY];
+ unsigned short vkey_cnt;
+ ECPConnection *conn;
pthread_mutex_t mutex;
-} DirNode;
+} DIRNode;
+
+typedef struct DIRList {
+ struct {
+ uint8_t count;
+ unsigned char buffer[MAX_DIR_ITEM_IN_MSG * ECP_SIZE_DIR_ITEM];
+ } msg[MAX_DIR_MSG];
+ uint8_t msg_count;
+} DIRList;
-typedef struct DirList {
- unsigned char msg[ECP_MAX_PLD];
- uint16_t count;
-} DirList;
+typedef struct DIROnline {
+ DIRList list[MAX_REGION];
+ uint16_t serial;
+} DIROnline;
+
+ssize_t dir_send_online(ECPConnection *conn, uint8_t region);
+ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b);
ssize_t dir_send_ann(ECPConnection *conn);
ssize_t dir_send_shadow(ECPConnection *conn);
-ssize_t dir_send_online(ECPConnection *conn);
ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size);
-ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size);
ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size);
ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b);
-int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs);
-int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts);
-DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts);
-void dir_destroy_node(DirNode *node);
-int dir_open_conn(ECPSocket *sock, ECPNode *node);
-DirNode *dir_search_conn(ECPConnection *conn);
-void dir_announce(ECPSocket *sock);
+
+void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public);
+int dir_open_conn(DIRNode *node, ECPSocket *sock);
+int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node);
+void dir_destroy_node(DIRNode *node);
+
+void dir_online_switch(ECPSocket *sock, int inc_serial);
+void dir_remove_nodes(DIRNode *remove_node[], int remove_cnt);
+void dir_announce_allow(void);
+void dir_announce_block(void);
+void dit_init_serial(uint16_t serial);
+void dir_announce(ECPSocket *sock, int ann_period);
int dir_start_announce(ECPSocket *sock);
-int dir_init(ECPContext *ctx);
+void dir_init_switch(ECPSocket *sock, int init_ann);
+int dir_init_ann(ECPSocket *sock, ECPNode *node);
+int dir_init(ECPSocket *sock);
diff --git a/ecp/server/ht.c b/ecp/server/ht.c
index 0daf8d5..dc67d30 100644
--- a/ecp/server/ht.c
+++ b/ecp/server/ht.c
@@ -1,34 +1,25 @@
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
#include <ecp/ht.h>
#include "dir.h"
#include "ht.h"
-int ht_insert_node(ecp_ht_table_t *table, DirNode *node) {
- return ecp_ht_insert(table, &node->dir_item.node.key_perma.public, node);
+int ht_insert_node(ecp_ht_table_t *table, DIRNode *node) {
+ return ecp_ht_insert_uniq(table, &node->dir_item.node.key_perma.public, node);
}
-void ht_remove_node(ecp_ht_table_t *table, DirNode *node) {
+void ht_remove_node(ecp_ht_table_t *table, DIRNode *node) {
ecp_ht_remove(table, &node->dir_item.node.key_perma.public);
}
-void *ht_search_node(ecp_ht_table_t *table, DirNode *node) {
- return ecp_ht_search(table, &node->dir_item.node.key_perma.public);
+void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) {
+ return ecp_ht_search(table, &dir_item->node.key_perma.public);
}
int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn) {
- return ecp_ht_insert(table, &conn->remote.key_perma.public, conn);
+ return ecp_ht_insert_uniq(table, &conn->remote.key_perma.public, conn);
}
void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn) {
ecp_ht_remove(table, &conn->remote.key_perma.public);
}
-
-void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn) {
- return ecp_ht_search(table, &conn->remote.key_perma.public);
-}
-
-void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) {
- return ecp_ht_search(table, &dir_item->node.key_perma.public);
-}
diff --git a/ecp/server/ht.h b/ecp/server/ht.h
index cba4691..a9cb02f 100644
--- a/ecp/server/ht.h
+++ b/ecp/server/ht.h
@@ -1,7 +1,5 @@
-int ht_insert_node(ecp_ht_table_t *table, DirNode *node);
-void ht_remove_node(ecp_ht_table_t *table, DirNode *node);
-void *ht_search_node(ecp_ht_table_t *table, DirNode *node);
+int ht_insert_node(ecp_ht_table_t *table, DIRNode *node);
+void ht_remove_node(ecp_ht_table_t *table, DIRNode *node);
+void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item);
int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn);
-void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn);
-void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn);
-void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item); \ No newline at end of file
+void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn); \ No newline at end of file
diff --git a/ecp/server/server.c b/ecp/server/server.c
index d34fb19..890d468 100644
--- a/ecp/server/server.c
+++ b/ecp/server/server.c
@@ -1,41 +1,81 @@
#include <stdlib.h>
-#include <fcntl.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <string.h>
#include <unistd.h>
+#include <time.h>
+#include <fcntl.h>
#include <sys/stat.h>
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
#include <ecp/vconn/vconn.h>
+#include <util.h>
+
#include "dir.h"
#include "vlink.h"
#include "ht.h"
+#include "acl.h"
#include "server.h"
static SRVConfig srv_config;
+static const char *srv_llevel_str[] = {
+ "DEBUG",
+ "INFO",
+ "ERROR"
+};
SRVConfig *srv_get_config(void) {
return &srv_config;
}
static void usage(char *arg) {
- fprintf(stderr, "Usage: %s <capabilities> <priv> <addr> [ <pub> <addr> ]\n", arg);
+ fprintf(stderr, "Usage: %s <region> <capabilities> <private key> <addr> [ <dir acl> <vconn acl> ] [ <public key> <addr> ]\n", arg);
+ exit(1);
+}
+
+static void fail(char *format, ...) {
+ va_list args;
+
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
exit(1);
}
static void handle_err(ECPConnection *conn, unsigned char mtype, int err) {
- if (conn->type == ECP_CTYPE_VLINK) vlink_handle_err(conn, mtype, err);
- LOG(LOG_ERR, "handle error ctype:%d mtype:%d err:%d\n", conn->type, mtype, err);
+ LOG(LOG_ERR, "handle_err: ctype:%d mtype:%d err:%d\n", conn->type, mtype, err);
}
-static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) {
+static ECPConnection *conn_new(ECPSocket *sock, ECPConnection *parent, unsigned char ctype) {
ECPConnection *conn = NULL;
- switch (type) {
+ switch (ctype) {
+ case CTYPE_DIR: {
+ if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn) {
+ ecp_conn_init(conn, sock, ctype);
+ ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX);
+ }
+ break;
+ }
+
+ case ECP_CTYPE_DIR: {
+ if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn) ecp_conn_init(conn, sock, ctype);
+ break;
+ }
+
case ECP_CTYPE_VCONN: {
ECPVConnInb *_conn;
+ if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL;
+
_conn = malloc(sizeof(ECPVConnInb));
if (_conn) {
ecp_vconn_init_inb(_conn, sock);
@@ -45,29 +85,61 @@ static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) {
}
case ECP_CTYPE_VLINK: {
- conn = malloc(sizeof(ECPConnection));
- if (conn) ecp_vlink_init(conn, sock);
- break;
- }
+ if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL;
- default: {
conn = malloc(sizeof(ECPConnection));
- if (conn) ecp_conn_init(conn, sock, type);
+ if (conn) ecp_vlink_init(conn, sock);
break;
}
}
+ if (conn) ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC);
return conn;
}
+static int key_check(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public) {
+ switch (ctype) {
+ case CTYPE_DIR: {
+ if (public == NULL) return 0;
+ return acl_inlist(public);
+ }
+
+ case ECP_CTYPE_VLINK: {
+ if (public == NULL) return 0;
+ if (parent == NULL) return acl_inlist(public);
+ return 1;
+ }
+
+ default:
+ return 1;
+ }
+}
+
static void conn_free(ECPConnection *conn) {
free(conn);
}
+void log_print(int level, char *format, ...) {
+ va_list args;
+ time_t t;
+ char buf[26];
+
+ if (level >= (sizeof(srv_llevel_str) / sizeof(char *))) return;
+
+ t = time(NULL);
+ ctime_r(&t, buf);
+ buf[24] = '\0';
+ fprintf(stderr, "%s [%s]: ", buf, srv_llevel_str[level]);
+
+ va_start(args, format);
+ vfprintf(stderr, format, args);
+ va_end(args);
+}
+
int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) {
int rv;
- rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, NULL);
+ rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, key_check);
if (rv) return rv;
rv = ecp_vconn_handler_init(ctx, vconn_handler);
@@ -86,118 +158,120 @@ int main(int argc, char *argv[]) {
ECPConnHandler vconn_handler;
ECPConnHandler vlink_handler;
char *endptr;
- int fd;
+ char *debug_init_str;
+ int _argc, fd;
int rv;
- if ((argc < 4) || (argc > 6)) usage(argv[0]);
+ memset(&srv_config, 0, sizeof(srv_config));
- srv_config.capabilities = (uint16_t)strtol(argv[1], &endptr, 16);
- if (endptr[0] != '\0') {
- fprintf(stderr, "Bad capabilities\n");
- exit(1);
- }
+ if (argc < 3) usage(argv[0]);
- if ((fd = open(argv[2], O_RDONLY)) < 0) {
- fprintf(stderr, "Unable to open %s\n", argv[2]);
- exit(1);
- }
- if (read(fd, &srv_config.key_perma.public, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) {
- close(fd);
- fprintf(stderr, "Unable to read public key from %s\n", argv[2]);
- exit(1);
- }
- if (read(fd, &srv_config.key_perma.private, sizeof(ecp_ecdh_private_t)) != sizeof(ecp_ecdh_private_t)) {
- close(fd);
- fprintf(stderr, "Unable to read private key from %s\n", argv[2]);
- exit(1);
+ _argc = 1;
+ srv_config.region = (uint8_t)strtol(argv[_argc], &endptr, 16);
+ if (endptr[0] != '\0') fail("Bad region\n");
+ if (srv_config.region >= MAX_REGION) fail("Bad region\n");
+ _argc++;
+
+ srv_config.capabilities = (uint8_t)strtol(argv[_argc], &endptr, 16);
+ if (endptr[0] != '\0') fail("Bad capabilities\n");
+ _argc++;
+
+ if (srv_config.capabilities & ECP_DIR_CAP_DIR) {
+ if (argc < 7) usage(argv[0]);
+ } else {
+ if (argc < 5) usage(argv[0]);
}
+
+ fd = open(argv[_argc], O_RDONLY);
+ if (fd < 0) fail("Unable to open %s\n", argv[_argc]);
+
+ rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private);
close(fd);
+ if (rv) fail("Unable to read key from %s\n", argv[_argc]);
srv_config.key_perma.valid = 1;
+ _argc++;
rv = ecp_init(&ctx, &vconn_handler, &vlink_handler);
- if (rv) {
- fprintf(stderr, "ecp_init RV:%d\n", rv);
- exit(1);
- }
+ if (rv) fail("ecp_init err:%d\n", rv);
+
rv = ecp_sock_create(&sock, &ctx, &srv_config.key_perma);
- if (rv) {
- fprintf(stderr, "ecp_sock_create RV:%d\n", rv);
- exit(1);
- }
+ if (rv) fail("ecp_sock_create err:%d\n", rv);
- rv = ecp_addr_init(&addr, argv[3]);
- if (rv) {
- fprintf(stderr, "ecp_addr_init RV:%d\n", rv);
- exit(1);
- }
+ rv = ecp_vconn_sock_create(&sock);
+ if (rv) fail("ecp_vconn_sock_create err:%d\n", rv);
+
+ rv = ecp_addr_init(&addr, argv[_argc]);
+ if (rv) fail("ecp_addr_init err:%d\n", rv);
+ _argc++;
rv = ecp_sock_open(&sock, &addr);
- if (rv) {
- fprintf(stderr, "ecp_sock_open RV:%d\n", rv);
- exit(1);
- }
+ if (rv) fail("ecp_sock_open err:%d\n", rv);
+ srv_config.my_addr = addr;
- rv = dir_init(&ctx);
- if (rv) {
- fprintf(stderr, "dir_init RV:%d\n", rv);
- exit(1);
- }
+ rv = acl_init();
+ if (rv) fail("acl_init err:%d\n", rv);
- rv = vlink_init(&ctx);
- if (rv) {
- fprintf(stderr, "vlink_init RV:%d\n", rv);
- exit(1);
- }
+ if (srv_config.capabilities & ECP_DIR_CAP_DIR) {
+ srv_config.acl_fn_dir = strdup(argv[_argc]);
+ _argc++;
+ srv_config.acl_fn = strdup(argv[_argc]);
+ _argc++;
- rv = dir_start_announce(&sock);
- if (rv) {
- fprintf(stderr, "dir_start_announce RV:%d\n", rv);
- exit(1);
+ rv = acl_load();
+ if (rv) fail("acl_load err:%d\n", rv);
}
- rv = vlink_start_open(&sock);
- if (rv) {
- fprintf(stderr, "vlink_start_open RV:%d\n", rv);
- exit(1);
- }
+ rv = dir_init(&sock);
+ if (rv) fail("dir_init err:%d\n", rv);
- rv = vlink_start_keyx();
- if (rv) {
- fprintf(stderr, "vlink_start_keyx RV:%d\n", rv);
- exit(1);
- }
+ rv = vlink_init(&sock);
+ if (rv) fail("vlink_init err:%d\n", rv);
- if (argc == 6) {
+ rv = ecp_start_receiver(&sock);
+ if (rv) fail("ecp_start_receiver err:%d\n", rv);
+
+ if (argc == _argc + 2) {
ECPNode node;
ecp_ecdh_public_t node_pub;
ecp_tr_addr_t node_addr;
- if ((fd = open(argv[4], O_RDONLY)) < 0) {
- fprintf(stderr, "Unable to open %s\n", argv[4]);
- exit(1);
- }
- if (read(fd, &node_pub, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) {
- close(fd);
- fprintf(stderr, "Unable to read public key from %s\n", argv[4]);
- exit(1);
- }
- close(fd);
+ fd = open(argv[_argc], O_RDONLY);
+ if (fd < 0) fail("Unable to open %s\n", argv[_argc]);
- int rv;
+ rv = ecp_util_read_key(fd, &node_pub, NULL);
+ close(fd);
+ if (rv) fail("Unable to read public key from %s\n", argv[_argc]);
+ _argc++;
ecp_node_init(&node, &node_pub, NULL);
- rv = ecp_node_set_addr(&node, argv[5]);
- if (rv) {
- fprintf(stderr, "ecp_node_set_addr RV:%d\n", rv);
- exit(1);
- }
- rv = dir_open_conn(&sock, &node);
- if (rv) {
- fprintf(stderr, "dir_open_conn RV:%d\n", rv);
- exit(1);
+ rv = ecp_node_set_addr(&node, argv[_argc]);
+ if (rv) fail("ecp_node_set_addr err:%d\n", rv);
+ _argc++;
+
+ rv = dir_init_ann(&sock, &node);
+ if (rv) fail("dir_init_ann err:%d\n", rv);
+ }
+
+ if (_argc != argc) usage(argv[0]);
+
+ debug_init_str = getenv("ECP_DBG_INIT");
+ if (debug_init_str) {
+ int init_ann;
+
+ init_ann = (int)strtol(debug_init_str, &endptr, 10);
+ if (endptr[0] == '\0') {
+ LOG(LOG_DEBUG, "init switch start - number of announces:%d\n", init_ann);
+ dir_init_switch(&sock, init_ann);
+ LOG(LOG_DEBUG, "init switch done\n");
}
}
- ecp_receiver(&sock);
-} \ No newline at end of file
+ rv = dir_start_announce(&sock);
+ if (rv) fail("dir_start_announce err:%d\n", rv);
+
+ rv = vlink_start_keyx(&sock);
+ if (rv) fail("vlink_start_keyx err:%d\n", rv);
+
+ while(1) pause();
+}
diff --git a/ecp/server/server.h b/ecp/server/server.h
index ff8e444..21f2129 100644
--- a/ecp/server/server.h
+++ b/ecp/server/server.h
@@ -1,15 +1,20 @@
#include <stdio.h>
-#define LOG_DEBUG 1
-#define LOG_INFO 2
-#define LOG_ERR 3
+#define LOG_DEBUG 0
+#define LOG_INFO 1
+#define LOG_ERR 2
#define LOG_LEVEL LOG_DEBUG
-#define LOG(l, ...) (l >= LOG_LEVEL ? fprintf(stderr, __VA_ARGS__) : 0 )
+#define LOG(l, ...) log_print(l, __VA_ARGS__);
typedef struct SRVConfig {
ECPDHKey key_perma;
- uint16_t capabilities;
+ char *acl_fn;
+ char *acl_fn_dir;
+ ecp_tr_addr_t my_addr;
+ uint8_t region;
+ uint8_t capabilities;
} SRVConfig;
SRVConfig *srv_get_config(void);
+void log_print(int level, char *format, ...);
diff --git a/ecp/server/timer.c b/ecp/server/timer.c
new file mode 100644
index 0000000..ab30633
--- /dev/null
+++ b/ecp/server/timer.c
@@ -0,0 +1,98 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <time.h>
+
+#include <ecp/core.h>
+
+#include "dir.h"
+
+#include "server.h"
+#include "timer.h"
+
+static struct timespec timer_ts_mono;
+static timer_t timer_ann_block_id;
+static timer_t timer_online_switch_id;
+
+int timer_set_next(time_t tv_now) {
+ struct itimerspec its = { 0 };
+ time_t tv_midnight;
+ int rv = 0;
+
+ /* next midnight */
+ tv_midnight = (tv_now / ONLINE_SWITCH_PERIOD) * ONLINE_SWITCH_PERIOD + ONLINE_SWITCH_PERIOD;
+
+ if (tv_now < (tv_midnight - ANN_BLOCK_TIME)) {
+ dir_announce_allow();
+
+ its.it_value.tv_sec = tv_midnight - ANN_BLOCK_TIME;
+ rv = timer_settime(timer_ann_block_id, TIMER_ABSTIME, &its, NULL);
+ if (rv) return ECP_ERR;
+ } else {
+ dir_announce_block();
+ }
+
+ its.it_value.tv_sec = tv_midnight;
+ rv = timer_settime(timer_online_switch_id, TIMER_ABSTIME, &its, NULL);
+ if (rv) return ECP_ERR;
+
+ return ECP_OK;
+}
+
+void timer_ann_block(union sigval timer_data) {
+ dir_announce_block();
+}
+
+void timer_online_switch(union sigval timer_data) {
+ ECPSocket *sock = timer_data.sival_ptr;
+ struct timespec ts_prev;
+ int rv;
+
+ ts_prev = timer_ts_mono;
+ clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono);
+
+ /* exit if someone is messing with realtime clock */
+ if ((timer_ts_mono.tv_sec - ts_prev.tv_sec) < (ONLINE_SWITCH_PERIOD / 2)) goto online_switch_fin;
+
+ dir_online_switch(sock, 1);
+
+online_switch_fin:
+ rv = timer_set_next(time(NULL));
+ if (rv) LOG(LOG_ERR, "timer_online_switch: set next timer err:%d\n", rv);
+}
+
+int timer_init(ECPSocket *sock) {
+ struct sigevent timer_ann_block_evt = { 0 };
+ struct sigevent timer_online_switch_evt = { 0 };
+ time_t tv_now;
+ int rv = 0;
+
+ timer_ann_block_evt.sigev_notify = SIGEV_THREAD;
+ timer_ann_block_evt.sigev_notify_function = timer_ann_block;
+ timer_ann_block_evt.sigev_value.sival_ptr = NULL;
+ rv = timer_create(CLOCK_REALTIME, &timer_ann_block_evt, &timer_ann_block_id);
+ if (rv) return ECP_ERR;
+
+ timer_online_switch_evt.sigev_notify = SIGEV_THREAD;
+ timer_online_switch_evt.sigev_notify_function = timer_online_switch;
+ timer_online_switch_evt.sigev_value.sival_ptr = sock;
+ rv = timer_create(CLOCK_REALTIME, &timer_online_switch_evt, &timer_online_switch_id);
+ if (rv) {
+ timer_delete(timer_ann_block_id);
+ return ECP_ERR;
+ }
+
+ /* ensure that first dir_online_switch is executed */
+ clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono);
+ timer_ts_mono.tv_sec -= ONLINE_SWITCH_PERIOD / 2;
+
+ tv_now = time(NULL);
+ dit_init_serial(tv_now / ONLINE_SWITCH_PERIOD);
+ rv = timer_set_next(tv_now);
+ if (rv) {
+ timer_delete(timer_ann_block_id);
+ timer_delete(timer_online_switch_id);
+ return rv;
+ }
+
+ return ECP_OK;
+}
diff --git a/ecp/server/timer.h b/ecp/server/timer.h
new file mode 100644
index 0000000..526f0b8
--- /dev/null
+++ b/ecp/server/timer.h
@@ -0,0 +1,9 @@
+#include <signal.h>
+
+#define ANN_BLOCK_TIME 7200 /* time to block announce before online switch (s) */
+#define ONLINE_SWITCH_PERIOD 86400 /* online switch period (s) */
+
+int timer_set_next(time_t tv_now);
+void timer_ann_block(union sigval timer_data);
+void timer_online_switch(union sigval timer_data);
+int timer_init(ECPSocket *sock);
diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c
index e98dacc..d0c865b 100644
--- a/ecp/server/vlink.c
+++ b/ecp/server/vlink.c
@@ -3,318 +3,238 @@
#include <string.h>
#include <ecp/core.h>
-#include <ecp/dir/dir.h>
#include <ecp/vconn/vconn.h>
#include <ecp/ht.h>
#include <ecp/tm.h>
-#include "server.h"
#include "dir.h"
+#include "vlink.h"
#include "ht.h"
-#include "vlink.h"
+#include "server.h"
static ecp_ht_table_t *vlink_conn = NULL;
-static ecp_ht_table_t *vlink_node = NULL;
-static pthread_mutex_t vlink_conn_mutex;
-static pthread_mutex_t vlink_node_mutex;
-static pthread_t vlink_open_thd;
+static pthread_rwlock_t vlink_conn_rwlock;
static pthread_t vlink_keyx_thd;
static SRVConfig *srv_config;
-void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) {
- switch (mtype) {
- case ECP_MTYPE_OPEN_REP: {
- if (err == ECP_ERR_TIMEOUT) {
- int rv;
-
- rv = vlink_insert_node(conn);
- if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
- }
- ecp_conn_close(conn);
- break;
- }
- }
-}
-
-int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
- int rv;
-
- rv = ecp_vlink_handle_open(conn, bufs);
- if (rv) return rv;
-
- if (ecp_conn_is_inb(conn)) return ECP_OK;
-
- pthread_mutex_lock(&vlink_conn_mutex);
- rv = ht_insert_conn(vlink_conn, conn);
- pthread_mutex_unlock(&vlink_conn_mutex);
-
- if (rv) {
- ecp_vlink_handle_close(conn);
- return rv;
- }
-
- return ECP_OK;
-}
-
-void vlink_handle_close(ECPConnection *conn) {
- int rv;
-
- ecp_vlink_handle_close(conn);
+#define MAX(X, Y) (((X) > (Y)) ? (X) : (Y))
- if (ecp_conn_is_inb(conn)) return;
-
- rv = vlink_insert_node(conn);
- if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
-}
-
-int vlink_open_conn(ECPSocket *sock, ECPNode *node) {
+int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) {
ECPConnection *conn;
int rv;
+ *_conn = NULL;
conn = malloc(sizeof(ECPConnection));
if (conn == NULL) return ECP_ERR_ALLOC;
ecp_vlink_init(conn, sock);
rv = ecp_conn_open(conn, node);
- return rv;
+ if (rv) return rv;
+
+ *_conn = conn;
+ return ECP_OK;
}
void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) {
- if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) {
- int rv;
+ ECPConnection *conn;
+ int rv;
- LOG(LOG_DEBUG, "vlink open connection\n");
- rv = vlink_open_conn(sock, &dir_item->node);
- if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
- }
-}
+ if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return;
-int vlink_insert_node(ECPConnection *conn) {
- DirNode *node;
- int rv = ECP_OK;
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
+ conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
- node = dir_search_conn(conn);
- if (node) {
- pthread_mutex_lock(&vlink_node_mutex);
- rv = ht_insert_node(vlink_node, node);
- pthread_mutex_unlock(&vlink_node_mutex);
+ if (conn) return;
- if (rv) {
- pthread_mutex_lock(&node->mutex);
- node->refcount--;
- pthread_mutex_unlock(&node->mutex);
- }
+ rv = vlink_open_conn(sock, &dir_item->node, &conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv);
+ return;
}
- return rv;
-}
-
-void vlink_open(ECPSocket *sock) {
- struct hashtable_itr itr;
- DirNode *node;
- DirNode *node_next;
- DirNode *open_node[MAX_OPEN_CNT];
- int open_cnt;
- int i;
- int rv;
-
- node_next = NULL;
- do {
- open_cnt = 0;
- pthread_mutex_lock(&vlink_node_mutex);
-
- if (ecp_ht_count(vlink_node) > 0) {
- ecp_ht_itr_create(&itr, vlink_node);
- if (node_next) {
- rv = ecp_ht_itr_search(&itr, node_next);
- if (rv) {
- LOG(LOG_ERR, "vlink open itr search err:%d\n", rv);
- break;
- }
- node_next = NULL;
- }
- do {
- node = ecp_ht_itr_value(&itr);
- rv = ecp_ht_itr_remove(&itr);
-
- open_node[open_cnt] = node;
- open_cnt++;
- if (open_cnt == MAX_OPEN_CNT) {
- if (!rv) {
- node_next = ecp_ht_itr_key(&itr);
- } else {
- node_next = NULL;
- }
- }
- } while (rv == ECP_OK);
- }
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ rv = ht_insert_conn(vlink_conn, conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
- pthread_mutex_unlock(&vlink_node_mutex);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv);
+ ecp_conn_close(conn);
+ return;
+ }
- for (i=0; i<open_cnt; i++) {
- pthread_mutex_lock(&open_node[i]->mutex);
+ LOG(LOG_DEBUG, "vlink_new_node: new connection\n");
+}
- if (open_node[i]->zombie) {
- open_node[i]->refcount--;
+void vlink_del_node(ECPDirItem *dir_item) {
+ ECPConnection *conn;
- pthread_mutex_unlock(&open_node[i]->mutex);
- } else {
- pthread_mutex_unlock(&open_node[i]->mutex);
+ if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return;
- LOG(LOG_DEBUG, "vlink open connection\n");
- rv = vlink_open_conn(sock, &open_node[i]->dir_item.node);
- if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
- }
- }
- } while (node_next);
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
+ conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public);
+ if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
}
-void vlink_keyx(void) {
+void vlink_keyx(ECPSocket *sock, int keyx_period) {
struct hashtable_itr itr;
- ecp_sts_t now;
ECPConnection *conn;
- ECPConnection *keyx_next;
- ECPConnection *keyx_conn[MAX_KEYX_CNT];
- int keyx_conn_z[MAX_KEYX_CNT];
+ ecp_ecdh_public_t *conn_next;
+ ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE];
+ unsigned int ht_count;
int keyx_cnt;
- int is_zombie;
int i;
int rv;
- now = ecp_tm_get_s();
- keyx_next = NULL;
+ conn_next = NULL;
do {
+ uint32_t delay;
+ uint32_t rnd;
+
keyx_cnt = 0;
- pthread_mutex_lock(&vlink_conn_mutex);
+ pthread_rwlock_rdlock(&vlink_conn_rwlock);
- if (ecp_ht_count(vlink_conn) > 0) {
+ ht_count = ecp_ht_count(vlink_conn);
+ if (ht_count > 0) {
ecp_ht_itr_create(&itr, vlink_conn);
- if (keyx_next) {
- rv = ecp_ht_itr_search(&itr, keyx_next);
+ if (conn_next) {
+ rv = ecp_ht_itr_search(&itr, conn_next);
if (rv) {
- LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv);
+ LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv);
break;
}
- keyx_next = NULL;
+ conn_next = NULL;
}
do {
conn = ecp_ht_itr_value(&itr);
- is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME);
- if (is_zombie) {
- rv = ecp_ht_itr_remove(&itr);
+ if (keyx_cnt < MAX_NODE_ANNOUNCE) {
+ keyx_conn[keyx_cnt] = conn;
+ keyx_cnt++;
} else {
- rv = ecp_ht_itr_advance(&itr);
+ conn_next = ecp_ht_itr_key(&itr);
+ break;
}
- keyx_conn[keyx_cnt] = conn;
- keyx_conn_z[keyx_cnt] = is_zombie;
- keyx_cnt++;
-
- if (keyx_cnt == MAX_KEYX_CNT) {
- if (!rv) {
- keyx_next = ecp_ht_itr_key(&itr);
- } else {
- keyx_next = NULL;
- }
- }
+ rv = ecp_ht_itr_advance(&itr);
} while (rv == ECP_OK);
}
- pthread_mutex_unlock(&vlink_conn_mutex);
+ /* no need to copy conn_next key, since this is the only thread that can remove connection */
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+ /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */
+ delay = keyx_period * 1000000 / MAX(ht_count, 100);
for (i=0; i<keyx_cnt; i++) {
- if (keyx_conn_z[i]) {
- LOG(LOG_DEBUG, "vlink close connection\n");
- ecp_conn_close(keyx_conn[i]);
- } else {
+ int is_d, is_reg, is_open, is_z;
+ ecp_sts_t now;
+
+ conn = keyx_conn[i];
+
+ is_d = is_reg = is_open = is_z = 0;
+ now = ecp_tm_get_s();
+
+ ecp_conn_lock(conn);
+ is_d = _ecp_conn_test_uflags(conn, VLINK_UFLAG_DISCONNECT);
+ is_reg = _ecp_conn_is_reg(conn);
+ if (is_reg) is_open = _ecp_conn_is_open(conn);
+ if (is_open) is_z = _ecp_conn_is_zombie(conn, now, KEYX_PERIOD * 3);
+ ecp_conn_unlock(conn);
+
+ if (is_d) {
+ LOG(LOG_DEBUG, "vlink_keyx: disconnect\n");
+ // close all inbound connections;
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ ht_remove_conn(vlink_conn, conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+
+ ecp_conn_close(conn);
+ } else if (!is_reg || is_z) {
+ ECPConnection *_conn = NULL;
+
+ LOG(LOG_DEBUG, "vlink_keyx: reconnect\n");
+ rv = vlink_open_conn(conn->sock, &conn->remote, &_conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv);
+ continue;
+ }
+
+ pthread_rwlock_wrlock(&vlink_conn_rwlock);
+ ht_remove_conn(vlink_conn, conn);
+ rv = ht_insert_conn(vlink_conn, _conn);
+ pthread_rwlock_unlock(&vlink_conn_rwlock);
+
+ ecp_conn_close(conn);
+ if (rv) {
+ LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv);
+ ecp_conn_close(_conn);
+ }
+ } else if (is_open) {
ssize_t _rv;
- LOG(LOG_DEBUG, "vlink send keyx\n");
- _rv = ecp_send_keyx_req(keyx_conn[i], 1);
- if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv);
+ _rv = ecp_send_keyx_req(conn, 1);
+ if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv);
+ }
+
+ if (delay) {
+ /* randomize over delay / 2 and delay + delay / 2 */
+ rnd = arc4random_uniform(delay + 1);
+ usleep(delay / 2 + rnd);
}
}
- } while (keyx_next);
+
+ } while (conn_next);
+
+ LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count);
}
-static void *_vlink_open(void *arg) {
+static void *_vlink_keyx(void *arg) {
ECPSocket *sock = arg;
+ uint32_t rnd;
+ time_t tv_sec;
+ int delay;
while (1) {
- LOG(LOG_DEBUG, "vlink open...\n");
- vlink_open(sock);
- sleep(10);
- }
+ tv_sec = time(NULL);
- return NULL;
-}
+ vlink_keyx(sock, KEYX_PERIOD);
-static void *_vlink_keyx(void *arg) {
- while (1) {
- LOG(LOG_DEBUG, "vlink keyx...\n");
- vlink_keyx();
- sleep(10);
+ /* resolution is 1s */
+ delay = KEYX_PERIOD - (time(NULL) - tv_sec);
+ if (delay > 0) {
+ rnd = arc4random_uniform(delay + 1);
+ sleep(delay / 2 + rnd);
+ }
}
return NULL;
}
-int vlink_start_open(ECPSocket *sock) {
- int rv;
-
- rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock);
- if (rv) return ECP_ERR;
- return ECP_OK;
-}
-int vlink_start_keyx(void) {
+int vlink_start_keyx(ECPSocket *sock) {
int rv;
- rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL);
+ rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock);
if (rv) return ECP_ERR;
return ECP_OK;
}
-int vlink_init(ECPContext *ctx) {
- ECPConnHandler *handler;
+int vlink_init(ECPSocket *sock) {
+ ECPContext *ctx = sock->ctx;
int rv;
srv_config = srv_get_config();
- handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK);
- if (handler == NULL) return ECP_ERR;
-
- handler->handle_open = vlink_handle_open;
- handler->handle_close = vlink_handle_close;
-
- rv = pthread_mutex_init(&vlink_conn_mutex, NULL);
- if (rv) {
- return ECP_ERR;
- }
-
- rv = pthread_mutex_init(&vlink_node_mutex, NULL);
- if (rv) {
- pthread_mutex_destroy(&vlink_conn_mutex);
- return ECP_ERR;
- }
+ rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL);
+ if (rv) return ECP_ERR;
- rv = ECP_OK;
vlink_conn = ecp_ht_create_keys();
- if (vlink_conn == NULL) rv = ECP_ERR_ALLOC;
- if (!rv) {
- vlink_node = ecp_ht_create_keys();
- if (vlink_node == NULL) rv = ECP_ERR_ALLOC;
- }
-
- if (rv) {
- pthread_mutex_destroy(&vlink_node_mutex);
- pthread_mutex_destroy(&vlink_conn_mutex);
- if (vlink_node) ecp_ht_destroy(vlink_node);
- if (vlink_conn) ecp_ht_destroy(vlink_conn);
- return rv;
+ if (vlink_conn == NULL) {
+ pthread_rwlock_destroy(&vlink_conn_rwlock);
+ return ECP_ERR_ALLOC;
}
return ECP_OK;
diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h
index 78af6b1..3e8022f 100644
--- a/ecp/server/vlink.h
+++ b/ecp/server/vlink.h
@@ -1,17 +1,12 @@
-#define MAX_KEYX_CNT 100
-#define MAX_OPEN_CNT 100
+#define MAX_KEYX_CNT 100
-#define CONN_EXP_TIME 22
+#define KEYX_PERIOD 600 /* key exchange priod (s); can't exceed 1h */
+#define VLINK_UFLAG_DISCONNECT 0x80
-void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err);
-int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs);
-void vlink_handle_close(ECPConnection *conn);
-int vlink_open_conn(ECPSocket *sock, ECPNode *node);
+int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn);
void vlink_new_node(ECPSocket *sock, ECPDirItem *item);
-int vlink_insert_node(ECPConnection *conn);
+void vlink_del_node(ECPDirItem *dir_item);
-void vlink_keyx(void);
-void vlink_open(ECPSocket *sock);
-int vlink_start_open(ECPSocket *sock);
-int vlink_start_keyx(void);
-int vlink_init(ECPContext *ctx);
+void vlink_keyx(ECPSocket *sock, int keyx_period);
+int vlink_start_keyx(ECPSocket *sock);
+int vlink_init(ECPSocket *sock);