diff options
Diffstat (limited to 'ecp/server')
-rw-r--r-- | ecp/server/Makefile | 16 | ||||
-rw-r--r-- | ecp/server/dir.c | 551 | ||||
-rw-r--r-- | ecp/server/dir.h | 35 | ||||
-rw-r--r-- | ecp/server/ht.c | 34 | ||||
-rw-r--r-- | ecp/server/ht.h | 7 | ||||
-rw-r--r-- | ecp/server/server.c | 204 | ||||
-rw-r--r-- | ecp/server/server.h | 15 | ||||
-rw-r--r-- | ecp/server/vlink.c | 321 | ||||
-rw-r--r-- | ecp/server/vlink.h | 17 |
9 files changed, 1200 insertions, 0 deletions
diff --git a/ecp/server/Makefile b/ecp/server/Makefile new file mode 100644 index 0000000..2105e9f --- /dev/null +++ b/ecp/server/Makefile @@ -0,0 +1,16 @@ +src_dir = ../src +include $(src_dir)/ecp/common.mk +CFLAGS += -Wno-int-to-void-pointer-cast + +obj = server.o dir.o vlink.o ht.o +dep = ../build-posix/*.a + +%.o: %.c + $(CC) $(CFLAGS) -c $< + +ecp_server: $(obj) + $(CC) -o $@ $(obj) $(dep) $(LDFLAGS) + +clean: + rm -f *.o + rm -f ecp_server diff --git a/ecp/server/dir.c b/ecp/server/dir.c new file mode 100644 index 0000000..0066bac --- /dev/null +++ b/ecp/server/dir.c @@ -0,0 +1,551 @@ +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include <ecp/core.h> +#include <ecp/dir/dir.h> +#include <ecp/ht.h> +#include <ecp/tm.h> + +#include "server.h" +#include "vlink.h" +#include "dir.h" +#include "ht.h" + +static ecp_ht_table_t *dir_shadow = NULL; +static pthread_rwlock_t dir_shadow_rwlock; +static pthread_rwlock_t dir_online_rwlock; +static pthread_t dir_announce_thd; + +static DirList dir_online; + +static SRVConfig *srv_config; + +ssize_t dir_send_ann(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; + unsigned char *msg; + + packet.buffer = pkt_buf; + packet.size = sizeof(pkt_buf); + payload.buffer = pld_buf; + payload.size = sizeof(pld_buf); + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg[0] = srv_config->capabilities >> 8; + msg[1] = srv_config->capabilities; + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); +} + +ssize_t dir_send_shadow(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + + struct hashtable_itr itr; + DirNode *node; + uint16_t count; + ecp_sts_t access_ts; + size_t msg_size; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + msg_size = 0; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + count = ecp_ht_count(dir_shadow); + msg[0] = count >> 8; + msg[1] = count; + msg += sizeof(uint16_t); + msg_size += sizeof(uint16_t); + + if (count > 0) { + size_t rv; + int _rv; + + ecp_ht_itr_create(&itr, dir_shadow); + do { + node = ecp_ht_itr_value(&itr); + + pthread_mutex_lock(&node->mutex); + access_ts = node->access_ts; + pthread_mutex_unlock(&node->mutex); + + rv = ecp_dir_item_serialize(&node->dir_item, msg); + msg += rv; + msg_size += rv; + + msg[0] = access_ts >> 24; + msg[1] = access_ts >> 16; + msg[2] = access_ts >> 8; + msg[3] = access_ts; + msg += sizeof(uint32_t); + msg_size += sizeof(uint32_t); + + _rv = ecp_ht_itr_advance(&itr); + } while (_rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); +} + +ssize_t dir_send_online(ECPConnection *conn) { + ECPBuffer packet; + ECPBuffer payload; + unsigned char pkt_buf[ECP_MAX_PKT]; + unsigned char pld_buf[ECP_MAX_PLD]; + unsigned char *msg; + size_t msg_size; + + packet.buffer = pkt_buf; + packet.size = ECP_MAX_PKT; + payload.buffer = pld_buf; + payload.size = ECP_MAX_PLD; + + ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); + msg = ecp_pld_get_msg(payload.buffer, payload.size); + + pthread_rwlock_rdlock(&dir_online_rwlock); + + msg[0] = dir_online.count >> 8; + msg[1] = dir_online.count; + msg += sizeof(uint16_t); + msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; + memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); + + pthread_rwlock_unlock(&dir_online_rwlock); + + return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); +} + +ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ECPDirItem dir_item; + int is_new; + uint16_t capabilities; + ecp_sts_t now; + ssize_t rsize; + ssize_t rv; + + rsize = sizeof(uint16_t); + if (msg_size < rsize) return ECP_ERR_SIZE; + + capabilities = \ + ((uint16_t)msg[0] << 8) | \ + ((uint16_t)msg[1]); + + memset(&dir_item, 0, sizeof(ECPDirItem)); + dir_item.node = conn->remote; + dir_item.capabilities = capabilities; + + now = ecp_tm_get_s(); + is_new = dir_process_item(&dir_item, now); + if (is_new) vlink_new_node(conn->sock, &dir_item); + + rv = dir_send_shadow(conn); + if (rv < 0) return rv; + + return rsize; +} + +ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ssize_t rv; + + rv = dir_send_online(conn); + if (rv < 0) return rv; + + return 0; +} + +ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { + ecp_sts_t now; + uint16_t count; + size_t rsize; + int i; + + if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; + + count = \ + ((uint16_t)msg[0] << 8) | \ + ((uint16_t)msg[1]); + + rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); + if (msg_size < rsize) return ECP_ERR_SIZE; + + msg += sizeof(uint16_t); + + now = ecp_tm_get_s(); + for (i=0; i<count; i++) { + ECPDirItem dir_item; + ecp_sts_t access_ts; + size_t rv; + + rv = ecp_dir_item_parse(&dir_item, msg); + msg += rv; + + access_ts = \ + ((uint32_t)msg[0] << 24) | \ + ((uint32_t)msg[1] << 16) | \ + ((uint32_t)msg[2] << 8) | \ + ((uint32_t)msg[3]); + msg += sizeof(uint32_t); + + if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) { + int is_new; + + is_new = dir_process_item(&dir_item, access_ts); + if (is_new) vlink_new_node(conn->sock, &dir_item); + } + } + + return rsize; +} + +ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { + switch (mtype) { + case ECP_MTYPE_DIR_REQ: { + if (ecp_conn_is_outb(conn)) return ECP_ERR; + return dir_handle_req(conn, msg, msg_size); + } + + case ECP_MTYPE_DIR_ANN: { + if (ecp_conn_is_outb(conn)) return ECP_ERR; + if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX; + return dir_handle_ann(conn, msg, msg_size); + } + + case ECP_MTYPE_DIR_SHADOW: { + if (ecp_conn_is_inb(conn)) return ECP_ERR; + return dir_handle_shadow(conn, msg, msg_size); + } + + default: + return ECP_ERR_MTYPE; + } +} + +int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { + ssize_t rv; + + if (ecp_conn_is_inb(conn)) return ECP_OK; + + rv = dir_send_ann(conn); + if (rv < 0) return rv; + + return ECP_OK; +} + +void dir_online_switch(void) { + struct hashtable_itr itr; + DirNode *node; + unsigned char *msg; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + pthread_rwlock_wrlock(&dir_online_rwlock); + + msg = dir_online.msg; + dir_online.count = ecp_ht_count(dir_shadow); + + if (dir_online.count > 0) { + size_t _rv; + int rv; + + ecp_ht_itr_create(&itr, dir_shadow); + do { + node = ecp_ht_itr_value(&itr); + + _rv = ecp_dir_item_serialize(&node->dir_item, msg); + msg += _rv; + + rv = ecp_ht_itr_advance(&itr); + } while (rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_online_rwlock); + pthread_rwlock_unlock(&dir_shadow_rwlock); +} + +int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { + DirNode *node; + int is_new = 0; + int rv = ECP_OK; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + node = ht_search_item(dir_shadow, dir_item); + if (node) { + pthread_mutex_lock(&node->mutex); + if (!node->zombie) node->access_ts = access_ts; + pthread_mutex_unlock(&node->mutex); + + pthread_rwlock_unlock(&dir_shadow_rwlock); + } else { + pthread_rwlock_unlock(&dir_shadow_rwlock); + + LOG(LOG_DEBUG, "dir new node\n"); + + node = dir_create_node(dir_item, access_ts); + if (node == NULL) rv = ECP_ERR_ALLOC; + + if (!rv) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; + if (!rv) { + if (ht_search_node(dir_shadow, node) == NULL) { + rv = ht_insert_node(dir_shadow, node); + } else { + rv = ECP_ERR_DUP; + } + } + pthread_rwlock_unlock(&dir_shadow_rwlock); + } + + if (!rv) { + is_new = 1; + /* always switch */ + dir_online_switch(); + } else { + if (node) dir_destroy_node(node); + LOG(LOG_ERR, "dir create node err:%d\n", rv); + } + } + + return is_new; +} + +DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { + DirNode *node; + int rv; + + node = malloc(sizeof(DirNode)); + if (node == NULL) return NULL; + + memset(node, 0, sizeof(DirNode)); + + rv = pthread_mutex_init(&node->mutex, NULL); + if (rv) { + free(node); + return NULL; + } + + node->dir_item = *dir_item; + node->access_ts = access_ts; + + return node; +} + +void dir_destroy_node(DirNode *node) { + pthread_mutex_destroy(&node->mutex); + free(node); +} + +int dir_open_conn(ECPSocket *sock, ECPNode *node) { + ECPConnection *conn; + int rv; + + conn = malloc(sizeof(ECPConnection)); + if (conn == NULL) return ECP_ERR_ALLOC; + + ecp_dir_conn_init(conn, sock); + ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); + rv = ecp_conn_try_open(conn, node); + return rv; +} + +DirNode *dir_search_conn(ECPConnection *conn) { + DirNode *node; + DirNode *ret = NULL; + + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + node = ht_search_conn(dir_shadow, conn); + if (node) { + pthread_mutex_lock(&node->mutex); + if (!node->zombie) { + node->refcount++; + ret = node; + } else { + ret = NULL; + } + pthread_mutex_unlock(&node->mutex); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + return ret; +} + +void dir_announce(ECPSocket *sock) { + static DirNode *expire_node[MAX_EXPIRE_CNT]; + static int expire_node_z[MAX_EXPIRE_CNT]; + static int expire_cnt; + + struct hashtable_itr itr; + ecp_sts_t now; + DirNode *node; + DirNode *node_next; + DirNode *announce_node[MAX_ANNOUNCE_CNT]; + int announce_cnt; + int refcount; + int i; + int rv; + + now = ecp_tm_get_s(); + node_next = NULL; + do { + announce_cnt = 0; + pthread_rwlock_rdlock(&dir_shadow_rwlock); + + if (ecp_ht_count(dir_shadow) > 0) { + ecp_ht_itr_create(&itr, dir_shadow); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + rv = ecp_ht_itr_advance(&itr); + + pthread_mutex_lock(&node->mutex); + if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { + node->zombie = 1; + pthread_mutex_unlock(&node->mutex); + + if (expire_cnt < MAX_EXPIRE_CNT) { + expire_node[expire_cnt] = node; + expire_node_z[expire_cnt] = 1; + expire_cnt++; + } + } else { + pthread_mutex_unlock(&node->mutex); + + if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { + announce_node[announce_cnt] = node; + announce_cnt++; + + if (announce_cnt == MAX_ANNOUNCE_CNT) { + if (!rv) { + node_next = ecp_ht_itr_key(&itr); + } else { + node_next = NULL; + } + break; + } + } + } + } while (rv == ECP_OK); + } + + pthread_rwlock_unlock(&dir_shadow_rwlock); + + if (expire_cnt) { + pthread_rwlock_wrlock(&dir_shadow_rwlock); + for (i=0; i<expire_cnt; i++) { + if (expire_node_z[i]) { + expire_node_z[i] = 0; + ht_remove_node(dir_shadow, expire_node[i]); + LOG(LOG_DEBUG, "dir remove node\n"); + } + } + pthread_rwlock_unlock(&dir_shadow_rwlock); + } + + for (i=0; i<announce_cnt; i++) { + rv = dir_open_conn(sock, &announce_node[i]->dir_item.node); + if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); + } + + i = expire_cnt - 1; + while (i >= 0) { + node = expire_node[i]; + + pthread_mutex_lock(&node->mutex); + refcount = node->refcount; + pthread_mutex_unlock(&node->mutex); + + if (refcount == 0) { + int j = i; + + expire_cnt--; + while (j < expire_cnt) { + expire_node[j] = expire_node[j+1]; + j++; + } + expire_node[j] = NULL; + dir_destroy_node(node); + } + i--; + } + } while (node_next); +} + +static void *_dir_announce(void *arg) { + ECPSocket *sock = arg; + + while (1) { + LOG(LOG_DEBUG, "dir announce...\n"); + dir_announce(sock); + sleep(10); + } + + return NULL; +} + +int dir_start_announce(ECPSocket *sock) { + int rv; + + rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock); + if (rv) return ECP_ERR; + return ECP_OK; +} + +int dir_init(ECPContext *ctx) { + ECPConnHandler *handler; + int rv; + + srv_config = srv_get_config(); + + handler = malloc(sizeof(ECPConnHandler)); + if (handler == NULL) return ECP_ERR_ALLOC; + + ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL); + rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler); + if (rv) return rv; + + rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); + if (rv) { + return ECP_ERR; + } + + rv = pthread_rwlock_init(&dir_online_rwlock, NULL); + if (rv) { + pthread_rwlock_destroy(&dir_shadow_rwlock); + return ECP_ERR; + } + + dir_shadow = ecp_ht_create_keys(); + if (dir_shadow == NULL) { + pthread_rwlock_destroy(&dir_shadow_rwlock); + pthread_rwlock_destroy(&dir_online_rwlock); + return ECP_ERR_ALLOC; + } + + return ECP_OK; +} diff --git a/ecp/server/dir.h b/ecp/server/dir.h new file mode 100644 index 0000000..cbc0c87 --- /dev/null +++ b/ecp/server/dir.h @@ -0,0 +1,35 @@ +#define MAX_DIR_ITEM 30 +#define MAX_EXPIRE_CNT 100 +#define MAX_ANNOUNCE_CNT 100 + +#define NODE_EXP_TIME 86400 + +typedef struct DirNode { + ECPDirItem dir_item; + int refcount; + int zombie; + ecp_sts_t access_ts; + pthread_mutex_t mutex; +} DirNode; + +typedef struct DirList { + unsigned char msg[ECP_MAX_PLD]; + uint16_t count; +} DirList; + +ssize_t dir_send_ann(ECPConnection *conn); +ssize_t dir_send_shadow(ECPConnection *conn); +ssize_t dir_send_online(ECPConnection *conn); +ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size); +ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size); +ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size); +ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b); +int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs); +int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts); +DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts); +void dir_destroy_node(DirNode *node); +int dir_open_conn(ECPSocket *sock, ECPNode *node); +DirNode *dir_search_conn(ECPConnection *conn); +void dir_announce(ECPSocket *sock); +int dir_start_announce(ECPSocket *sock); +int dir_init(ECPContext *ctx); diff --git a/ecp/server/ht.c b/ecp/server/ht.c new file mode 100644 index 0000000..0daf8d5 --- /dev/null +++ b/ecp/server/ht.c @@ -0,0 +1,34 @@ +#include <ecp/core.h> +#include <ecp/dir/dir.h> +#include <ecp/ht.h> + +#include "dir.h" +#include "ht.h" + +int ht_insert_node(ecp_ht_table_t *table, DirNode *node) { + return ecp_ht_insert(table, &node->dir_item.node.key_perma.public, node); +} + +void ht_remove_node(ecp_ht_table_t *table, DirNode *node) { + ecp_ht_remove(table, &node->dir_item.node.key_perma.public); +} + +void *ht_search_node(ecp_ht_table_t *table, DirNode *node) { + return ecp_ht_search(table, &node->dir_item.node.key_perma.public); +} + +int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn) { + return ecp_ht_insert(table, &conn->remote.key_perma.public, conn); +} + +void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn) { + ecp_ht_remove(table, &conn->remote.key_perma.public); +} + +void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn) { + return ecp_ht_search(table, &conn->remote.key_perma.public); +} + +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) { + return ecp_ht_search(table, &dir_item->node.key_perma.public); +} diff --git a/ecp/server/ht.h b/ecp/server/ht.h new file mode 100644 index 0000000..cba4691 --- /dev/null +++ b/ecp/server/ht.h @@ -0,0 +1,7 @@ +int ht_insert_node(ecp_ht_table_t *table, DirNode *node); +void ht_remove_node(ecp_ht_table_t *table, DirNode *node); +void *ht_search_node(ecp_ht_table_t *table, DirNode *node); +int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn); +void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn); +void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn); +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item);
\ No newline at end of file diff --git a/ecp/server/server.c b/ecp/server/server.c new file mode 100644 index 0000000..bbe7c4e --- /dev/null +++ b/ecp/server/server.c @@ -0,0 +1,204 @@ +#include <stdlib.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/stat.h> + +#include <ecp/core.h> +#include <ecp/dir/dir.h> +#include <ecp/vconn/vconn.h> + +#include "dir.h" +#include "vlink.h" +#include "ht.h" + +#include "server.h" + +static SRVConfig srv_config; + +SRVConfig *srv_get_config(void) { + return &srv_config; +} + +static void usage(char *arg) { + fprintf(stderr, "Usage: %s <capabilities> <priv> <addr> [ <pub> <addr> ]\n", arg); + exit(1); +} + +static void handle_err(ECPConnection *conn, unsigned char mtype, int err) { + if (conn->type == ECP_CTYPE_VLINK) vlink_handle_err(conn, mtype, err); + LOG(LOG_ERR, "handle error ctype:%d mtype:%d err:%d\n", conn->type, mtype, err); +} + +static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) { + ECPConnection *conn = NULL; + + switch (type) { + case ECP_CTYPE_VCONN: { + ECPVConnInb *_conn; + + _conn = malloc(sizeof(ECPVConnInb)); + if (_conn) { + ecp_vconn_init_inb(_conn, sock); + conn = &_conn->b; + } + break; + } + + case ECP_CTYPE_VLINK: { + conn = malloc(sizeof(ECPConnection)); + if (conn) ecp_vlink_init(conn, sock); + break; + } + + default: { + conn = malloc(sizeof(ECPConnection)); + if (conn) ecp_conn_init(conn, sock, type); + break; + } + } + + return conn; +} + +static void conn_free(ECPConnection *conn) { + free(conn); +} + +int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) { + int rv; + + rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free); + if (rv) return rv; + + rv = ecp_vconn_handler_init(ctx, vconn_handler); + if (rv) return rv; + + rv = ecp_vlink_handler_init(ctx, vlink_handler); + if (rv) return rv; + + return ECP_OK; +} + +int main(int argc, char *argv[]) { + ecp_tr_addr_t addr; + ECPContext ctx; + ECPSocket sock; + ECPConnHandler vconn_handler; + ECPConnHandler vlink_handler; + char *endptr; + int fd; + int rv; + + if ((argc < 4) || (argc > 6)) usage(argv[0]); + + srv_config.capabilities = (uint16_t)strtol(argv[1], &endptr, 16); + if (endptr[0] != '\0') { + fprintf(stderr, "Bad capabilities\n"); + exit(1); + } + + if ((fd = open(argv[2], O_RDONLY)) < 0) { + fprintf(stderr, "Unable to open %s\n", argv[2]); + exit(1); + } + if (read(fd, &srv_config.key_perma.public, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) { + close(fd); + fprintf(stderr, "Unable to read public key from %s\n", argv[2]); + exit(1); + } + if (read(fd, &srv_config.key_perma.private, sizeof(ecp_ecdh_private_t)) != sizeof(ecp_ecdh_private_t)) { + close(fd); + fprintf(stderr, "Unable to read private key from %s\n", argv[2]); + exit(1); + } + close(fd); + srv_config.key_perma.valid = 1; + + rv = ecp_init(&ctx, &vconn_handler, &vlink_handler); + if (rv) { + fprintf(stderr, "ecp_init RV:%d\n", rv); + exit(1); + } + rv = ecp_sock_create(&sock, &ctx, &srv_config.key_perma); + if (rv) { + fprintf(stderr, "ecp_sock_create RV:%d\n", rv); + exit(1); + } + + + rv = ecp_addr_init(&addr, argv[3]); + if (rv) { + fprintf(stderr, "ecp_addr_init RV:%d\n", rv); + exit(1); + } + + rv = ecp_sock_open(&sock, &addr); + if (rv) { + fprintf(stderr, "ecp_sock_open RV:%d\n", rv); + exit(1); + } + + rv = dir_init(&ctx); + if (rv) { + fprintf(stderr, "dir_init RV:%d\n", rv); + exit(1); + } + + rv = vlink_init(&ctx); + if (rv) { + fprintf(stderr, "vlink_init RV:%d\n", rv); + exit(1); + } + + rv = dir_start_announce(&sock); + if (rv) { + fprintf(stderr, "dir_start_announce RV:%d\n", rv); + exit(1); + } + + rv = vlink_start_open(&sock); + if (rv) { + fprintf(stderr, "vlink_start_open RV:%d\n", rv); + exit(1); + } + + rv = vlink_start_keyx(); + if (rv) { + fprintf(stderr, "vlink_start_keyx RV:%d\n", rv); + exit(1); + } + + if (argc == 6) { + ECPNode node; + ecp_ecdh_public_t node_pub; + ecp_tr_addr_t node_addr; + + if ((fd = open(argv[4], O_RDONLY)) < 0) { + fprintf(stderr, "Unable to open %s\n", argv[4]); + exit(1); + } + if (read(fd, &node_pub, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) { + close(fd); + fprintf(stderr, "Unable to read public key from %s\n", argv[4]); + exit(1); + } + close(fd); + + int rv; + + ecp_node_init(&node, &node_pub, NULL); + rv = ecp_node_set_addr(&node, argv[5]); + if (rv) { + fprintf(stderr, "ecp_node_set_addr RV:%d\n", rv); + exit(1); + } + + rv = dir_open_conn(&sock, &node); + if (rv) { + fprintf(stderr, "dir_open_conn RV:%d\n", rv); + exit(1); + } + } + + ecp_receiver(&sock); +}
\ No newline at end of file diff --git a/ecp/server/server.h b/ecp/server/server.h new file mode 100644 index 0000000..ff8e444 --- /dev/null +++ b/ecp/server/server.h @@ -0,0 +1,15 @@ +#include <stdio.h> + +#define LOG_DEBUG 1 +#define LOG_INFO 2 +#define LOG_ERR 3 + +#define LOG_LEVEL LOG_DEBUG +#define LOG(l, ...) (l >= LOG_LEVEL ? fprintf(stderr, __VA_ARGS__) : 0 ) + +typedef struct SRVConfig { + ECPDHKey key_perma; + uint16_t capabilities; +} SRVConfig; + +SRVConfig *srv_get_config(void); diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c new file mode 100644 index 0000000..e98dacc --- /dev/null +++ b/ecp/server/vlink.c @@ -0,0 +1,321 @@ +#include <stdlib.h> +#include <unistd.h> +#include <string.h> + +#include <ecp/core.h> +#include <ecp/dir/dir.h> +#include <ecp/vconn/vconn.h> +#include <ecp/ht.h> +#include <ecp/tm.h> + +#include "server.h" +#include "dir.h" +#include "ht.h" + +#include "vlink.h" + +static ecp_ht_table_t *vlink_conn = NULL; +static ecp_ht_table_t *vlink_node = NULL; +static pthread_mutex_t vlink_conn_mutex; +static pthread_mutex_t vlink_node_mutex; +static pthread_t vlink_open_thd; +static pthread_t vlink_keyx_thd; + +static SRVConfig *srv_config; + +void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { + switch (mtype) { + case ECP_MTYPE_OPEN_REP: { + if (err == ECP_ERR_TIMEOUT) { + int rv; + + rv = vlink_insert_node(conn); + if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); + } + ecp_conn_close(conn); + break; + } + } +} + +int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { + int rv; + + rv = ecp_vlink_handle_open(conn, bufs); + if (rv) return rv; + + if (ecp_conn_is_inb(conn)) return ECP_OK; + + pthread_mutex_lock(&vlink_conn_mutex); + rv = ht_insert_conn(vlink_conn, conn); + pthread_mutex_unlock(&vlink_conn_mutex); + + if (rv) { + ecp_vlink_handle_close(conn); + return rv; + } + + return ECP_OK; +} + +void vlink_handle_close(ECPConnection *conn) { + int rv; + + ecp_vlink_handle_close(conn); + + if (ecp_conn_is_inb(conn)) return; + + rv = vlink_insert_node(conn); + if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); +} + +int vlink_open_conn(ECPSocket *sock, ECPNode *node) { + ECPConnection *conn; + int rv; + + conn = malloc(sizeof(ECPConnection)); + if (conn == NULL) return ECP_ERR_ALLOC; + + ecp_vlink_init(conn, sock); + rv = ecp_conn_open(conn, node); + return rv; +} + +void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { + if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { + int rv; + + LOG(LOG_DEBUG, "vlink open connection\n"); + rv = vlink_open_conn(sock, &dir_item->node); + if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); + } +} + +int vlink_insert_node(ECPConnection *conn) { + DirNode *node; + int rv = ECP_OK; + + node = dir_search_conn(conn); + if (node) { + pthread_mutex_lock(&vlink_node_mutex); + rv = ht_insert_node(vlink_node, node); + pthread_mutex_unlock(&vlink_node_mutex); + + if (rv) { + pthread_mutex_lock(&node->mutex); + node->refcount--; + pthread_mutex_unlock(&node->mutex); + } + } + + return rv; +} + +void vlink_open(ECPSocket *sock) { + struct hashtable_itr itr; + DirNode *node; + DirNode *node_next; + DirNode *open_node[MAX_OPEN_CNT]; + int open_cnt; + int i; + int rv; + + node_next = NULL; + do { + open_cnt = 0; + pthread_mutex_lock(&vlink_node_mutex); + + if (ecp_ht_count(vlink_node) > 0) { + ecp_ht_itr_create(&itr, vlink_node); + if (node_next) { + rv = ecp_ht_itr_search(&itr, node_next); + if (rv) { + LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); + break; + } + node_next = NULL; + } + do { + node = ecp_ht_itr_value(&itr); + rv = ecp_ht_itr_remove(&itr); + + open_node[open_cnt] = node; + open_cnt++; + if (open_cnt == MAX_OPEN_CNT) { + if (!rv) { + node_next = ecp_ht_itr_key(&itr); + } else { + node_next = NULL; + } + } + } while (rv == ECP_OK); + } + + pthread_mutex_unlock(&vlink_node_mutex); + + for (i=0; i<open_cnt; i++) { + pthread_mutex_lock(&open_node[i]->mutex); + + if (open_node[i]->zombie) { + open_node[i]->refcount--; + + pthread_mutex_unlock(&open_node[i]->mutex); + } else { + pthread_mutex_unlock(&open_node[i]->mutex); + + LOG(LOG_DEBUG, "vlink open connection\n"); + rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); + if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); + } + } + } while (node_next); +} + +void vlink_keyx(void) { + struct hashtable_itr itr; + ecp_sts_t now; + ECPConnection *conn; + ECPConnection *keyx_next; + ECPConnection *keyx_conn[MAX_KEYX_CNT]; + int keyx_conn_z[MAX_KEYX_CNT]; + int keyx_cnt; + int is_zombie; + int i; + int rv; + + now = ecp_tm_get_s(); + keyx_next = NULL; + do { + keyx_cnt = 0; + pthread_mutex_lock(&vlink_conn_mutex); + + if (ecp_ht_count(vlink_conn) > 0) { + ecp_ht_itr_create(&itr, vlink_conn); + if (keyx_next) { + rv = ecp_ht_itr_search(&itr, keyx_next); + if (rv) { + LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); + break; + } + keyx_next = NULL; + } + do { + conn = ecp_ht_itr_value(&itr); + + is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); + if (is_zombie) { + rv = ecp_ht_itr_remove(&itr); + } else { + rv = ecp_ht_itr_advance(&itr); + } + + keyx_conn[keyx_cnt] = conn; + keyx_conn_z[keyx_cnt] = is_zombie; + keyx_cnt++; + + if (keyx_cnt == MAX_KEYX_CNT) { + if (!rv) { + keyx_next = ecp_ht_itr_key(&itr); + } else { + keyx_next = NULL; + } + } + } while (rv == ECP_OK); + } + + pthread_mutex_unlock(&vlink_conn_mutex); + + for (i=0; i<keyx_cnt; i++) { + if (keyx_conn_z[i]) { + LOG(LOG_DEBUG, "vlink close connection\n"); + ecp_conn_close(keyx_conn[i]); + } else { + ssize_t _rv; + + LOG(LOG_DEBUG, "vlink send keyx\n"); + _rv = ecp_send_keyx_req(keyx_conn[i], 1); + if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv); + } + } + } while (keyx_next); +} + +static void *_vlink_open(void *arg) { + ECPSocket *sock = arg; + + while (1) { + LOG(LOG_DEBUG, "vlink open...\n"); + vlink_open(sock); + sleep(10); + } + + return NULL; +} + +static void *_vlink_keyx(void *arg) { + while (1) { + LOG(LOG_DEBUG, "vlink keyx...\n"); + vlink_keyx(); + sleep(10); + } + + return NULL; +} + +int vlink_start_open(ECPSocket *sock) { + int rv; + + rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock); + if (rv) return ECP_ERR; + return ECP_OK; +} + +int vlink_start_keyx(void) { + int rv; + + rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL); + if (rv) return ECP_ERR; + return ECP_OK; +} + +int vlink_init(ECPContext *ctx) { + ECPConnHandler *handler; + int rv; + + srv_config = srv_get_config(); + + handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK); + if (handler == NULL) return ECP_ERR; + + handler->handle_open = vlink_handle_open; + handler->handle_close = vlink_handle_close; + + rv = pthread_mutex_init(&vlink_conn_mutex, NULL); + if (rv) { + return ECP_ERR; + } + + rv = pthread_mutex_init(&vlink_node_mutex, NULL); + if (rv) { + pthread_mutex_destroy(&vlink_conn_mutex); + return ECP_ERR; + } + + rv = ECP_OK; + vlink_conn = ecp_ht_create_keys(); + if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; + if (!rv) { + vlink_node = ecp_ht_create_keys(); + if (vlink_node == NULL) rv = ECP_ERR_ALLOC; + } + + if (rv) { + pthread_mutex_destroy(&vlink_node_mutex); + pthread_mutex_destroy(&vlink_conn_mutex); + if (vlink_node) ecp_ht_destroy(vlink_node); + if (vlink_conn) ecp_ht_destroy(vlink_conn); + return rv; + } + + return ECP_OK; +} diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h new file mode 100644 index 0000000..78af6b1 --- /dev/null +++ b/ecp/server/vlink.h @@ -0,0 +1,17 @@ +#define MAX_KEYX_CNT 100 +#define MAX_OPEN_CNT 100 + +#define CONN_EXP_TIME 22 + +void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err); +int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs); +void vlink_handle_close(ECPConnection *conn); +int vlink_open_conn(ECPSocket *sock, ECPNode *node); +void vlink_new_node(ECPSocket *sock, ECPDirItem *item); +int vlink_insert_node(ECPConnection *conn); + +void vlink_keyx(void); +void vlink_open(ECPSocket *sock); +int vlink_start_open(ECPSocket *sock); +int vlink_start_keyx(void); +int vlink_init(ECPContext *ctx); |