summaryrefslogtreecommitdiff
path: root/ecp/server
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2022-08-09 21:54:45 +0200
committerUros Majstorovic <majstor@majstor.org>2022-08-09 21:54:45 +0200
commit810dde21ee65653c15606917b19566cfbaaf165e (patch)
tree4cd84b109e06660a9c59f2487822905e5672681e /ecp/server
parentaee853a208d6abec53ec81dc4ef110b63e13342f (diff)
ecp server added
Diffstat (limited to 'ecp/server')
-rw-r--r--ecp/server/Makefile16
-rw-r--r--ecp/server/dir.c551
-rw-r--r--ecp/server/dir.h35
-rw-r--r--ecp/server/ht.c34
-rw-r--r--ecp/server/ht.h7
-rw-r--r--ecp/server/server.c204
-rw-r--r--ecp/server/server.h15
-rw-r--r--ecp/server/vlink.c321
-rw-r--r--ecp/server/vlink.h17
9 files changed, 1200 insertions, 0 deletions
diff --git a/ecp/server/Makefile b/ecp/server/Makefile
new file mode 100644
index 0000000..2105e9f
--- /dev/null
+++ b/ecp/server/Makefile
@@ -0,0 +1,16 @@
+src_dir = ../src
+include $(src_dir)/ecp/common.mk
+CFLAGS += -Wno-int-to-void-pointer-cast
+
+obj = server.o dir.o vlink.o ht.o
+dep = ../build-posix/*.a
+
+%.o: %.c
+ $(CC) $(CFLAGS) -c $<
+
+ecp_server: $(obj)
+ $(CC) -o $@ $(obj) $(dep) $(LDFLAGS)
+
+clean:
+ rm -f *.o
+ rm -f ecp_server
diff --git a/ecp/server/dir.c b/ecp/server/dir.c
new file mode 100644
index 0000000..0066bac
--- /dev/null
+++ b/ecp/server/dir.c
@@ -0,0 +1,551 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/ht.h>
+#include <ecp/tm.h>
+
+#include "server.h"
+#include "vlink.h"
+#include "dir.h"
+#include "ht.h"
+
+static ecp_ht_table_t *dir_shadow = NULL;
+static pthread_rwlock_t dir_shadow_rwlock;
+static pthread_rwlock_t dir_online_rwlock;
+static pthread_t dir_announce_thd;
+
+static DirList dir_online;
+
+static SRVConfig *srv_config;
+
+ssize_t dir_send_ann(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char *msg;
+
+ packet.buffer = pkt_buf;
+ packet.size = sizeof(pkt_buf);
+ payload.buffer = pld_buf;
+ payload.size = sizeof(pld_buf);
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg[0] = srv_config->capabilities >> 8;
+ msg[1] = srv_config->capabilities;
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0);
+}
+
+ssize_t dir_send_shadow(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+
+ struct hashtable_itr itr;
+ DirNode *node;
+ uint16_t count;
+ ecp_sts_t access_ts;
+ size_t msg_size;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg_size = 0;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ count = ecp_ht_count(dir_shadow);
+ msg[0] = count >> 8;
+ msg[1] = count;
+ msg += sizeof(uint16_t);
+ msg_size += sizeof(uint16_t);
+
+ if (count > 0) {
+ size_t rv;
+ int _rv;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ access_ts = node->access_ts;
+ pthread_mutex_unlock(&node->mutex);
+
+ rv = ecp_dir_item_serialize(&node->dir_item, msg);
+ msg += rv;
+ msg_size += rv;
+
+ msg[0] = access_ts >> 24;
+ msg[1] = access_ts >> 16;
+ msg[2] = access_ts >> 8;
+ msg[3] = access_ts;
+ msg += sizeof(uint32_t);
+ msg_size += sizeof(uint32_t);
+
+ _rv = ecp_ht_itr_advance(&itr);
+ } while (_rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0);
+}
+
+ssize_t dir_send_online(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+ size_t msg_size;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+
+ msg[0] = dir_online.count >> 8;
+ msg[1] = dir_online.count;
+ msg += sizeof(uint16_t);
+ msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM;
+ memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM);
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+}
+
+ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ECPDirItem dir_item;
+ int is_new;
+ uint16_t capabilities;
+ ecp_sts_t now;
+ ssize_t rsize;
+ ssize_t rv;
+
+ rsize = sizeof(uint16_t);
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ capabilities = \
+ ((uint16_t)msg[0] << 8) | \
+ ((uint16_t)msg[1]);
+
+ memset(&dir_item, 0, sizeof(ECPDirItem));
+ dir_item.node = conn->remote;
+ dir_item.capabilities = capabilities;
+
+ now = ecp_tm_get_s();
+ is_new = dir_process_item(&dir_item, now);
+ if (is_new) vlink_new_node(conn->sock, &dir_item);
+
+ rv = dir_send_shadow(conn);
+ if (rv < 0) return rv;
+
+ return rsize;
+}
+
+ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ssize_t rv;
+
+ rv = dir_send_online(conn);
+ if (rv < 0) return rv;
+
+ return 0;
+}
+
+ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ecp_sts_t now;
+ uint16_t count;
+ size_t rsize;
+ int i;
+
+ if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE;
+
+ count = \
+ ((uint16_t)msg[0] << 8) | \
+ ((uint16_t)msg[1]);
+
+ rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t));
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ msg += sizeof(uint16_t);
+
+ now = ecp_tm_get_s();
+ for (i=0; i<count; i++) {
+ ECPDirItem dir_item;
+ ecp_sts_t access_ts;
+ size_t rv;
+
+ rv = ecp_dir_item_parse(&dir_item, msg);
+ msg += rv;
+
+ access_ts = \
+ ((uint32_t)msg[0] << 24) | \
+ ((uint32_t)msg[1] << 16) | \
+ ((uint32_t)msg[2] << 8) | \
+ ((uint32_t)msg[3]);
+ msg += sizeof(uint32_t);
+
+ if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) {
+ int is_new;
+
+ is_new = dir_process_item(&dir_item, access_ts);
+ if (is_new) vlink_new_node(conn->sock, &dir_item);
+ }
+ }
+
+ return rsize;
+}
+
+ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
+ switch (mtype) {
+ case ECP_MTYPE_DIR_REQ: {
+ if (ecp_conn_is_outb(conn)) return ECP_ERR;
+ return dir_handle_req(conn, msg, msg_size);
+ }
+
+ case ECP_MTYPE_DIR_ANN: {
+ if (ecp_conn_is_outb(conn)) return ECP_ERR;
+ if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX;
+ return dir_handle_ann(conn, msg, msg_size);
+ }
+
+ case ECP_MTYPE_DIR_SHADOW: {
+ if (ecp_conn_is_inb(conn)) return ECP_ERR;
+ return dir_handle_shadow(conn, msg, msg_size);
+ }
+
+ default:
+ return ECP_ERR_MTYPE;
+ }
+}
+
+int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
+ ssize_t rv;
+
+ if (ecp_conn_is_inb(conn)) return ECP_OK;
+
+ rv = dir_send_ann(conn);
+ if (rv < 0) return rv;
+
+ return ECP_OK;
+}
+
+void dir_online_switch(void) {
+ struct hashtable_itr itr;
+ DirNode *node;
+ unsigned char *msg;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ pthread_rwlock_wrlock(&dir_online_rwlock);
+
+ msg = dir_online.msg;
+ dir_online.count = ecp_ht_count(dir_shadow);
+
+ if (dir_online.count > 0) {
+ size_t _rv;
+ int rv;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ _rv = ecp_dir_item_serialize(&node->dir_item, msg);
+ msg += _rv;
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+}
+
+int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) {
+ DirNode *node;
+ int is_new = 0;
+ int rv = ECP_OK;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ node = ht_search_item(dir_shadow, dir_item);
+ if (node) {
+ pthread_mutex_lock(&node->mutex);
+ if (!node->zombie) node->access_ts = access_ts;
+ pthread_mutex_unlock(&node->mutex);
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ } else {
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ LOG(LOG_DEBUG, "dir new node\n");
+
+ node = dir_create_node(dir_item, access_ts);
+ if (node == NULL) rv = ECP_ERR_ALLOC;
+
+ if (!rv) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
+ if (!rv) {
+ if (ht_search_node(dir_shadow, node) == NULL) {
+ rv = ht_insert_node(dir_shadow, node);
+ } else {
+ rv = ECP_ERR_DUP;
+ }
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+
+ if (!rv) {
+ is_new = 1;
+ /* always switch */
+ dir_online_switch();
+ } else {
+ if (node) dir_destroy_node(node);
+ LOG(LOG_ERR, "dir create node err:%d\n", rv);
+ }
+ }
+
+ return is_new;
+}
+
+DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) {
+ DirNode *node;
+ int rv;
+
+ node = malloc(sizeof(DirNode));
+ if (node == NULL) return NULL;
+
+ memset(node, 0, sizeof(DirNode));
+
+ rv = pthread_mutex_init(&node->mutex, NULL);
+ if (rv) {
+ free(node);
+ return NULL;
+ }
+
+ node->dir_item = *dir_item;
+ node->access_ts = access_ts;
+
+ return node;
+}
+
+void dir_destroy_node(DirNode *node) {
+ pthread_mutex_destroy(&node->mutex);
+ free(node);
+}
+
+int dir_open_conn(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int rv;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_dir_conn_init(conn, sock);
+ ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX);
+ rv = ecp_conn_try_open(conn, node);
+ return rv;
+}
+
+DirNode *dir_search_conn(ECPConnection *conn) {
+ DirNode *node;
+ DirNode *ret = NULL;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ node = ht_search_conn(dir_shadow, conn);
+ if (node) {
+ pthread_mutex_lock(&node->mutex);
+ if (!node->zombie) {
+ node->refcount++;
+ ret = node;
+ } else {
+ ret = NULL;
+ }
+ pthread_mutex_unlock(&node->mutex);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ return ret;
+}
+
+void dir_announce(ECPSocket *sock) {
+ static DirNode *expire_node[MAX_EXPIRE_CNT];
+ static int expire_node_z[MAX_EXPIRE_CNT];
+ static int expire_cnt;
+
+ struct hashtable_itr itr;
+ ecp_sts_t now;
+ DirNode *node;
+ DirNode *node_next;
+ DirNode *announce_node[MAX_ANNOUNCE_CNT];
+ int announce_cnt;
+ int refcount;
+ int i;
+ int rv;
+
+ now = ecp_tm_get_s();
+ node_next = NULL;
+ do {
+ announce_cnt = 0;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ if (ecp_ht_count(dir_shadow) > 0) {
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "dir ann itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+ rv = ecp_ht_itr_advance(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) {
+ node->zombie = 1;
+ pthread_mutex_unlock(&node->mutex);
+
+ if (expire_cnt < MAX_EXPIRE_CNT) {
+ expire_node[expire_cnt] = node;
+ expire_node_z[expire_cnt] = 1;
+ expire_cnt++;
+ }
+ } else {
+ pthread_mutex_unlock(&node->mutex);
+
+ if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ announce_node[announce_cnt] = node;
+ announce_cnt++;
+
+ if (announce_cnt == MAX_ANNOUNCE_CNT) {
+ if (!rv) {
+ node_next = ecp_ht_itr_key(&itr);
+ } else {
+ node_next = NULL;
+ }
+ break;
+ }
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (expire_cnt) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ for (i=0; i<expire_cnt; i++) {
+ if (expire_node_z[i]) {
+ expire_node_z[i] = 0;
+ ht_remove_node(dir_shadow, expire_node[i]);
+ LOG(LOG_DEBUG, "dir remove node\n");
+ }
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+
+ for (i=0; i<announce_cnt; i++) {
+ rv = dir_open_conn(sock, &announce_node[i]->dir_item.node);
+ if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv);
+ }
+
+ i = expire_cnt - 1;
+ while (i >= 0) {
+ node = expire_node[i];
+
+ pthread_mutex_lock(&node->mutex);
+ refcount = node->refcount;
+ pthread_mutex_unlock(&node->mutex);
+
+ if (refcount == 0) {
+ int j = i;
+
+ expire_cnt--;
+ while (j < expire_cnt) {
+ expire_node[j] = expire_node[j+1];
+ j++;
+ }
+ expire_node[j] = NULL;
+ dir_destroy_node(node);
+ }
+ i--;
+ }
+ } while (node_next);
+}
+
+static void *_dir_announce(void *arg) {
+ ECPSocket *sock = arg;
+
+ while (1) {
+ LOG(LOG_DEBUG, "dir announce...\n");
+ dir_announce(sock);
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+int dir_start_announce(ECPSocket *sock) {
+ int rv;
+
+ rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int dir_init(ECPContext *ctx) {
+ ECPConnHandler *handler;
+ int rv;
+
+ srv_config = srv_get_config();
+
+ handler = malloc(sizeof(ECPConnHandler));
+ if (handler == NULL) return ECP_ERR_ALLOC;
+
+ ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL);
+ rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler);
+ if (rv) return rv;
+
+ rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL);
+ if (rv) {
+ return ECP_ERR;
+ }
+
+ rv = pthread_rwlock_init(&dir_online_rwlock, NULL);
+ if (rv) {
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR;
+ }
+
+ dir_shadow = ecp_ht_create_keys();
+ if (dir_shadow == NULL) {
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ return ECP_ERR_ALLOC;
+ }
+
+ return ECP_OK;
+}
diff --git a/ecp/server/dir.h b/ecp/server/dir.h
new file mode 100644
index 0000000..cbc0c87
--- /dev/null
+++ b/ecp/server/dir.h
@@ -0,0 +1,35 @@
+#define MAX_DIR_ITEM 30
+#define MAX_EXPIRE_CNT 100
+#define MAX_ANNOUNCE_CNT 100
+
+#define NODE_EXP_TIME 86400
+
+typedef struct DirNode {
+ ECPDirItem dir_item;
+ int refcount;
+ int zombie;
+ ecp_sts_t access_ts;
+ pthread_mutex_t mutex;
+} DirNode;
+
+typedef struct DirList {
+ unsigned char msg[ECP_MAX_PLD];
+ uint16_t count;
+} DirList;
+
+ssize_t dir_send_ann(ECPConnection *conn);
+ssize_t dir_send_shadow(ECPConnection *conn);
+ssize_t dir_send_online(ECPConnection *conn);
+ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size);
+ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size);
+ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size);
+ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b);
+int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs);
+int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts);
+DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts);
+void dir_destroy_node(DirNode *node);
+int dir_open_conn(ECPSocket *sock, ECPNode *node);
+DirNode *dir_search_conn(ECPConnection *conn);
+void dir_announce(ECPSocket *sock);
+int dir_start_announce(ECPSocket *sock);
+int dir_init(ECPContext *ctx);
diff --git a/ecp/server/ht.c b/ecp/server/ht.c
new file mode 100644
index 0000000..0daf8d5
--- /dev/null
+++ b/ecp/server/ht.c
@@ -0,0 +1,34 @@
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/ht.h>
+
+#include "dir.h"
+#include "ht.h"
+
+int ht_insert_node(ecp_ht_table_t *table, DirNode *node) {
+ return ecp_ht_insert(table, &node->dir_item.node.key_perma.public, node);
+}
+
+void ht_remove_node(ecp_ht_table_t *table, DirNode *node) {
+ ecp_ht_remove(table, &node->dir_item.node.key_perma.public);
+}
+
+void *ht_search_node(ecp_ht_table_t *table, DirNode *node) {
+ return ecp_ht_search(table, &node->dir_item.node.key_perma.public);
+}
+
+int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn) {
+ return ecp_ht_insert(table, &conn->remote.key_perma.public, conn);
+}
+
+void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn) {
+ ecp_ht_remove(table, &conn->remote.key_perma.public);
+}
+
+void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn) {
+ return ecp_ht_search(table, &conn->remote.key_perma.public);
+}
+
+void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) {
+ return ecp_ht_search(table, &dir_item->node.key_perma.public);
+}
diff --git a/ecp/server/ht.h b/ecp/server/ht.h
new file mode 100644
index 0000000..cba4691
--- /dev/null
+++ b/ecp/server/ht.h
@@ -0,0 +1,7 @@
+int ht_insert_node(ecp_ht_table_t *table, DirNode *node);
+void ht_remove_node(ecp_ht_table_t *table, DirNode *node);
+void *ht_search_node(ecp_ht_table_t *table, DirNode *node);
+int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn);
+void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn);
+void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn);
+void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item); \ No newline at end of file
diff --git a/ecp/server/server.c b/ecp/server/server.c
new file mode 100644
index 0000000..bbe7c4e
--- /dev/null
+++ b/ecp/server/server.c
@@ -0,0 +1,204 @@
+#include <stdlib.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/stat.h>
+
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/vconn/vconn.h>
+
+#include "dir.h"
+#include "vlink.h"
+#include "ht.h"
+
+#include "server.h"
+
+static SRVConfig srv_config;
+
+SRVConfig *srv_get_config(void) {
+ return &srv_config;
+}
+
+static void usage(char *arg) {
+ fprintf(stderr, "Usage: %s <capabilities> <priv> <addr> [ <pub> <addr> ]\n", arg);
+ exit(1);
+}
+
+static void handle_err(ECPConnection *conn, unsigned char mtype, int err) {
+ if (conn->type == ECP_CTYPE_VLINK) vlink_handle_err(conn, mtype, err);
+ LOG(LOG_ERR, "handle error ctype:%d mtype:%d err:%d\n", conn->type, mtype, err);
+}
+
+static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) {
+ ECPConnection *conn = NULL;
+
+ switch (type) {
+ case ECP_CTYPE_VCONN: {
+ ECPVConnInb *_conn;
+
+ _conn = malloc(sizeof(ECPVConnInb));
+ if (_conn) {
+ ecp_vconn_init_inb(_conn, sock);
+ conn = &_conn->b;
+ }
+ break;
+ }
+
+ case ECP_CTYPE_VLINK: {
+ conn = malloc(sizeof(ECPConnection));
+ if (conn) ecp_vlink_init(conn, sock);
+ break;
+ }
+
+ default: {
+ conn = malloc(sizeof(ECPConnection));
+ if (conn) ecp_conn_init(conn, sock, type);
+ break;
+ }
+ }
+
+ return conn;
+}
+
+static void conn_free(ECPConnection *conn) {
+ free(conn);
+}
+
+int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) {
+ int rv;
+
+ rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free);
+ if (rv) return rv;
+
+ rv = ecp_vconn_handler_init(ctx, vconn_handler);
+ if (rv) return rv;
+
+ rv = ecp_vlink_handler_init(ctx, vlink_handler);
+ if (rv) return rv;
+
+ return ECP_OK;
+}
+
+int main(int argc, char *argv[]) {
+ ecp_tr_addr_t addr;
+ ECPContext ctx;
+ ECPSocket sock;
+ ECPConnHandler vconn_handler;
+ ECPConnHandler vlink_handler;
+ char *endptr;
+ int fd;
+ int rv;
+
+ if ((argc < 4) || (argc > 6)) usage(argv[0]);
+
+ srv_config.capabilities = (uint16_t)strtol(argv[1], &endptr, 16);
+ if (endptr[0] != '\0') {
+ fprintf(stderr, "Bad capabilities\n");
+ exit(1);
+ }
+
+ if ((fd = open(argv[2], O_RDONLY)) < 0) {
+ fprintf(stderr, "Unable to open %s\n", argv[2]);
+ exit(1);
+ }
+ if (read(fd, &srv_config.key_perma.public, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) {
+ close(fd);
+ fprintf(stderr, "Unable to read public key from %s\n", argv[2]);
+ exit(1);
+ }
+ if (read(fd, &srv_config.key_perma.private, sizeof(ecp_ecdh_private_t)) != sizeof(ecp_ecdh_private_t)) {
+ close(fd);
+ fprintf(stderr, "Unable to read private key from %s\n", argv[2]);
+ exit(1);
+ }
+ close(fd);
+ srv_config.key_perma.valid = 1;
+
+ rv = ecp_init(&ctx, &vconn_handler, &vlink_handler);
+ if (rv) {
+ fprintf(stderr, "ecp_init RV:%d\n", rv);
+ exit(1);
+ }
+ rv = ecp_sock_create(&sock, &ctx, &srv_config.key_perma);
+ if (rv) {
+ fprintf(stderr, "ecp_sock_create RV:%d\n", rv);
+ exit(1);
+ }
+
+
+ rv = ecp_addr_init(&addr, argv[3]);
+ if (rv) {
+ fprintf(stderr, "ecp_addr_init RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = ecp_sock_open(&sock, &addr);
+ if (rv) {
+ fprintf(stderr, "ecp_sock_open RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = dir_init(&ctx);
+ if (rv) {
+ fprintf(stderr, "dir_init RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = vlink_init(&ctx);
+ if (rv) {
+ fprintf(stderr, "vlink_init RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = dir_start_announce(&sock);
+ if (rv) {
+ fprintf(stderr, "dir_start_announce RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = vlink_start_open(&sock);
+ if (rv) {
+ fprintf(stderr, "vlink_start_open RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = vlink_start_keyx();
+ if (rv) {
+ fprintf(stderr, "vlink_start_keyx RV:%d\n", rv);
+ exit(1);
+ }
+
+ if (argc == 6) {
+ ECPNode node;
+ ecp_ecdh_public_t node_pub;
+ ecp_tr_addr_t node_addr;
+
+ if ((fd = open(argv[4], O_RDONLY)) < 0) {
+ fprintf(stderr, "Unable to open %s\n", argv[4]);
+ exit(1);
+ }
+ if (read(fd, &node_pub, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) {
+ close(fd);
+ fprintf(stderr, "Unable to read public key from %s\n", argv[4]);
+ exit(1);
+ }
+ close(fd);
+
+ int rv;
+
+ ecp_node_init(&node, &node_pub, NULL);
+ rv = ecp_node_set_addr(&node, argv[5]);
+ if (rv) {
+ fprintf(stderr, "ecp_node_set_addr RV:%d\n", rv);
+ exit(1);
+ }
+
+ rv = dir_open_conn(&sock, &node);
+ if (rv) {
+ fprintf(stderr, "dir_open_conn RV:%d\n", rv);
+ exit(1);
+ }
+ }
+
+ ecp_receiver(&sock);
+} \ No newline at end of file
diff --git a/ecp/server/server.h b/ecp/server/server.h
new file mode 100644
index 0000000..ff8e444
--- /dev/null
+++ b/ecp/server/server.h
@@ -0,0 +1,15 @@
+#include <stdio.h>
+
+#define LOG_DEBUG 1
+#define LOG_INFO 2
+#define LOG_ERR 3
+
+#define LOG_LEVEL LOG_DEBUG
+#define LOG(l, ...) (l >= LOG_LEVEL ? fprintf(stderr, __VA_ARGS__) : 0 )
+
+typedef struct SRVConfig {
+ ECPDHKey key_perma;
+ uint16_t capabilities;
+} SRVConfig;
+
+SRVConfig *srv_get_config(void);
diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c
new file mode 100644
index 0000000..e98dacc
--- /dev/null
+++ b/ecp/server/vlink.c
@@ -0,0 +1,321 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/vconn/vconn.h>
+#include <ecp/ht.h>
+#include <ecp/tm.h>
+
+#include "server.h"
+#include "dir.h"
+#include "ht.h"
+
+#include "vlink.h"
+
+static ecp_ht_table_t *vlink_conn = NULL;
+static ecp_ht_table_t *vlink_node = NULL;
+static pthread_mutex_t vlink_conn_mutex;
+static pthread_mutex_t vlink_node_mutex;
+static pthread_t vlink_open_thd;
+static pthread_t vlink_keyx_thd;
+
+static SRVConfig *srv_config;
+
+void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) {
+ switch (mtype) {
+ case ECP_MTYPE_OPEN_REP: {
+ if (err == ECP_ERR_TIMEOUT) {
+ int rv;
+
+ rv = vlink_insert_node(conn);
+ if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
+ }
+ ecp_conn_close(conn);
+ break;
+ }
+ }
+}
+
+int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
+ int rv;
+
+ rv = ecp_vlink_handle_open(conn, bufs);
+ if (rv) return rv;
+
+ if (ecp_conn_is_inb(conn)) return ECP_OK;
+
+ pthread_mutex_lock(&vlink_conn_mutex);
+ rv = ht_insert_conn(vlink_conn, conn);
+ pthread_mutex_unlock(&vlink_conn_mutex);
+
+ if (rv) {
+ ecp_vlink_handle_close(conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}
+
+void vlink_handle_close(ECPConnection *conn) {
+ int rv;
+
+ ecp_vlink_handle_close(conn);
+
+ if (ecp_conn_is_inb(conn)) return;
+
+ rv = vlink_insert_node(conn);
+ if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
+}
+
+int vlink_open_conn(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int rv;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_vlink_init(conn, sock);
+ rv = ecp_conn_open(conn, node);
+ return rv;
+}
+
+void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) {
+ if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) {
+ int rv;
+
+ LOG(LOG_DEBUG, "vlink open connection\n");
+ rv = vlink_open_conn(sock, &dir_item->node);
+ if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
+ }
+}
+
+int vlink_insert_node(ECPConnection *conn) {
+ DirNode *node;
+ int rv = ECP_OK;
+
+ node = dir_search_conn(conn);
+ if (node) {
+ pthread_mutex_lock(&vlink_node_mutex);
+ rv = ht_insert_node(vlink_node, node);
+ pthread_mutex_unlock(&vlink_node_mutex);
+
+ if (rv) {
+ pthread_mutex_lock(&node->mutex);
+ node->refcount--;
+ pthread_mutex_unlock(&node->mutex);
+ }
+ }
+
+ return rv;
+}
+
+void vlink_open(ECPSocket *sock) {
+ struct hashtable_itr itr;
+ DirNode *node;
+ DirNode *node_next;
+ DirNode *open_node[MAX_OPEN_CNT];
+ int open_cnt;
+ int i;
+ int rv;
+
+ node_next = NULL;
+ do {
+ open_cnt = 0;
+ pthread_mutex_lock(&vlink_node_mutex);
+
+ if (ecp_ht_count(vlink_node) > 0) {
+ ecp_ht_itr_create(&itr, vlink_node);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "vlink open itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+ rv = ecp_ht_itr_remove(&itr);
+
+ open_node[open_cnt] = node;
+ open_cnt++;
+ if (open_cnt == MAX_OPEN_CNT) {
+ if (!rv) {
+ node_next = ecp_ht_itr_key(&itr);
+ } else {
+ node_next = NULL;
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_mutex_unlock(&vlink_node_mutex);
+
+ for (i=0; i<open_cnt; i++) {
+ pthread_mutex_lock(&open_node[i]->mutex);
+
+ if (open_node[i]->zombie) {
+ open_node[i]->refcount--;
+
+ pthread_mutex_unlock(&open_node[i]->mutex);
+ } else {
+ pthread_mutex_unlock(&open_node[i]->mutex);
+
+ LOG(LOG_DEBUG, "vlink open connection\n");
+ rv = vlink_open_conn(sock, &open_node[i]->dir_item.node);
+ if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
+ }
+ }
+ } while (node_next);
+}
+
+void vlink_keyx(void) {
+ struct hashtable_itr itr;
+ ecp_sts_t now;
+ ECPConnection *conn;
+ ECPConnection *keyx_next;
+ ECPConnection *keyx_conn[MAX_KEYX_CNT];
+ int keyx_conn_z[MAX_KEYX_CNT];
+ int keyx_cnt;
+ int is_zombie;
+ int i;
+ int rv;
+
+ now = ecp_tm_get_s();
+ keyx_next = NULL;
+ do {
+ keyx_cnt = 0;
+ pthread_mutex_lock(&vlink_conn_mutex);
+
+ if (ecp_ht_count(vlink_conn) > 0) {
+ ecp_ht_itr_create(&itr, vlink_conn);
+ if (keyx_next) {
+ rv = ecp_ht_itr_search(&itr, keyx_next);
+ if (rv) {
+ LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv);
+ break;
+ }
+ keyx_next = NULL;
+ }
+ do {
+ conn = ecp_ht_itr_value(&itr);
+
+ is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME);
+ if (is_zombie) {
+ rv = ecp_ht_itr_remove(&itr);
+ } else {
+ rv = ecp_ht_itr_advance(&itr);
+ }
+
+ keyx_conn[keyx_cnt] = conn;
+ keyx_conn_z[keyx_cnt] = is_zombie;
+ keyx_cnt++;
+
+ if (keyx_cnt == MAX_KEYX_CNT) {
+ if (!rv) {
+ keyx_next = ecp_ht_itr_key(&itr);
+ } else {
+ keyx_next = NULL;
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_mutex_unlock(&vlink_conn_mutex);
+
+ for (i=0; i<keyx_cnt; i++) {
+ if (keyx_conn_z[i]) {
+ LOG(LOG_DEBUG, "vlink close connection\n");
+ ecp_conn_close(keyx_conn[i]);
+ } else {
+ ssize_t _rv;
+
+ LOG(LOG_DEBUG, "vlink send keyx\n");
+ _rv = ecp_send_keyx_req(keyx_conn[i], 1);
+ if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv);
+ }
+ }
+ } while (keyx_next);
+}
+
+static void *_vlink_open(void *arg) {
+ ECPSocket *sock = arg;
+
+ while (1) {
+ LOG(LOG_DEBUG, "vlink open...\n");
+ vlink_open(sock);
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+static void *_vlink_keyx(void *arg) {
+ while (1) {
+ LOG(LOG_DEBUG, "vlink keyx...\n");
+ vlink_keyx();
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+int vlink_start_open(ECPSocket *sock) {
+ int rv;
+
+ rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int vlink_start_keyx(void) {
+ int rv;
+
+ rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int vlink_init(ECPContext *ctx) {
+ ECPConnHandler *handler;
+ int rv;
+
+ srv_config = srv_get_config();
+
+ handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK);
+ if (handler == NULL) return ECP_ERR;
+
+ handler->handle_open = vlink_handle_open;
+ handler->handle_close = vlink_handle_close;
+
+ rv = pthread_mutex_init(&vlink_conn_mutex, NULL);
+ if (rv) {
+ return ECP_ERR;
+ }
+
+ rv = pthread_mutex_init(&vlink_node_mutex, NULL);
+ if (rv) {
+ pthread_mutex_destroy(&vlink_conn_mutex);
+ return ECP_ERR;
+ }
+
+ rv = ECP_OK;
+ vlink_conn = ecp_ht_create_keys();
+ if (vlink_conn == NULL) rv = ECP_ERR_ALLOC;
+ if (!rv) {
+ vlink_node = ecp_ht_create_keys();
+ if (vlink_node == NULL) rv = ECP_ERR_ALLOC;
+ }
+
+ if (rv) {
+ pthread_mutex_destroy(&vlink_node_mutex);
+ pthread_mutex_destroy(&vlink_conn_mutex);
+ if (vlink_node) ecp_ht_destroy(vlink_node);
+ if (vlink_conn) ecp_ht_destroy(vlink_conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}
diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h
new file mode 100644
index 0000000..78af6b1
--- /dev/null
+++ b/ecp/server/vlink.h
@@ -0,0 +1,17 @@
+#define MAX_KEYX_CNT 100
+#define MAX_OPEN_CNT 100
+
+#define CONN_EXP_TIME 22
+
+void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err);
+int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs);
+void vlink_handle_close(ECPConnection *conn);
+int vlink_open_conn(ECPSocket *sock, ECPNode *node);
+void vlink_new_node(ECPSocket *sock, ECPDirItem *item);
+int vlink_insert_node(ECPConnection *conn);
+
+void vlink_keyx(void);
+void vlink_open(ECPSocket *sock);
+int vlink_start_open(ECPSocket *sock);
+int vlink_start_keyx(void);
+int vlink_init(ECPContext *ctx);