summaryrefslogtreecommitdiff
path: root/ecp/server/dir.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/server/dir.c')
-rw-r--r--ecp/server/dir.c551
1 files changed, 551 insertions, 0 deletions
diff --git a/ecp/server/dir.c b/ecp/server/dir.c
new file mode 100644
index 0000000..0066bac
--- /dev/null
+++ b/ecp/server/dir.c
@@ -0,0 +1,551 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/ht.h>
+#include <ecp/tm.h>
+
+#include "server.h"
+#include "vlink.h"
+#include "dir.h"
+#include "ht.h"
+
+static ecp_ht_table_t *dir_shadow = NULL;
+static pthread_rwlock_t dir_shadow_rwlock;
+static pthread_rwlock_t dir_online_rwlock;
+static pthread_t dir_announce_thd;
+
+static DirList dir_online;
+
+static SRVConfig *srv_config;
+
+ssize_t dir_send_ann(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)];
+ unsigned char *msg;
+
+ packet.buffer = pkt_buf;
+ packet.size = sizeof(pkt_buf);
+ payload.buffer = pld_buf;
+ payload.size = sizeof(pld_buf);
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg[0] = srv_config->capabilities >> 8;
+ msg[1] = srv_config->capabilities;
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0);
+}
+
+ssize_t dir_send_shadow(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+
+ struct hashtable_itr itr;
+ DirNode *node;
+ uint16_t count;
+ ecp_sts_t access_ts;
+ size_t msg_size;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+ msg_size = 0;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ count = ecp_ht_count(dir_shadow);
+ msg[0] = count >> 8;
+ msg[1] = count;
+ msg += sizeof(uint16_t);
+ msg_size += sizeof(uint16_t);
+
+ if (count > 0) {
+ size_t rv;
+ int _rv;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ access_ts = node->access_ts;
+ pthread_mutex_unlock(&node->mutex);
+
+ rv = ecp_dir_item_serialize(&node->dir_item, msg);
+ msg += rv;
+ msg_size += rv;
+
+ msg[0] = access_ts >> 24;
+ msg[1] = access_ts >> 16;
+ msg[2] = access_ts >> 8;
+ msg[3] = access_ts;
+ msg += sizeof(uint32_t);
+ msg_size += sizeof(uint32_t);
+
+ _rv = ecp_ht_itr_advance(&itr);
+ } while (_rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0);
+}
+
+ssize_t dir_send_online(ECPConnection *conn) {
+ ECPBuffer packet;
+ ECPBuffer payload;
+ unsigned char pkt_buf[ECP_MAX_PKT];
+ unsigned char pld_buf[ECP_MAX_PLD];
+ unsigned char *msg;
+ size_t msg_size;
+
+ packet.buffer = pkt_buf;
+ packet.size = ECP_MAX_PKT;
+ payload.buffer = pld_buf;
+ payload.size = ECP_MAX_PLD;
+
+ ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP);
+ msg = ecp_pld_get_msg(payload.buffer, payload.size);
+
+ pthread_rwlock_rdlock(&dir_online_rwlock);
+
+ msg[0] = dir_online.count >> 8;
+ msg[1] = dir_online.count;
+ msg += sizeof(uint16_t);
+ msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM;
+ memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM);
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
+
+ return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0);
+}
+
+ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ECPDirItem dir_item;
+ int is_new;
+ uint16_t capabilities;
+ ecp_sts_t now;
+ ssize_t rsize;
+ ssize_t rv;
+
+ rsize = sizeof(uint16_t);
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ capabilities = \
+ ((uint16_t)msg[0] << 8) | \
+ ((uint16_t)msg[1]);
+
+ memset(&dir_item, 0, sizeof(ECPDirItem));
+ dir_item.node = conn->remote;
+ dir_item.capabilities = capabilities;
+
+ now = ecp_tm_get_s();
+ is_new = dir_process_item(&dir_item, now);
+ if (is_new) vlink_new_node(conn->sock, &dir_item);
+
+ rv = dir_send_shadow(conn);
+ if (rv < 0) return rv;
+
+ return rsize;
+}
+
+ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ssize_t rv;
+
+ rv = dir_send_online(conn);
+ if (rv < 0) return rv;
+
+ return 0;
+}
+
+ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) {
+ ecp_sts_t now;
+ uint16_t count;
+ size_t rsize;
+ int i;
+
+ if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE;
+
+ count = \
+ ((uint16_t)msg[0] << 8) | \
+ ((uint16_t)msg[1]);
+
+ rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t));
+ if (msg_size < rsize) return ECP_ERR_SIZE;
+
+ msg += sizeof(uint16_t);
+
+ now = ecp_tm_get_s();
+ for (i=0; i<count; i++) {
+ ECPDirItem dir_item;
+ ecp_sts_t access_ts;
+ size_t rv;
+
+ rv = ecp_dir_item_parse(&dir_item, msg);
+ msg += rv;
+
+ access_ts = \
+ ((uint32_t)msg[0] << 24) | \
+ ((uint32_t)msg[1] << 16) | \
+ ((uint32_t)msg[2] << 8) | \
+ ((uint32_t)msg[3]);
+ msg += sizeof(uint32_t);
+
+ if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) {
+ int is_new;
+
+ is_new = dir_process_item(&dir_item, access_ts);
+ if (is_new) vlink_new_node(conn->sock, &dir_item);
+ }
+ }
+
+ return rsize;
+}
+
+ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {
+ switch (mtype) {
+ case ECP_MTYPE_DIR_REQ: {
+ if (ecp_conn_is_outb(conn)) return ECP_ERR;
+ return dir_handle_req(conn, msg, msg_size);
+ }
+
+ case ECP_MTYPE_DIR_ANN: {
+ if (ecp_conn_is_outb(conn)) return ECP_ERR;
+ if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX;
+ return dir_handle_ann(conn, msg, msg_size);
+ }
+
+ case ECP_MTYPE_DIR_SHADOW: {
+ if (ecp_conn_is_inb(conn)) return ECP_ERR;
+ return dir_handle_shadow(conn, msg, msg_size);
+ }
+
+ default:
+ return ECP_ERR_MTYPE;
+ }
+}
+
+int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
+ ssize_t rv;
+
+ if (ecp_conn_is_inb(conn)) return ECP_OK;
+
+ rv = dir_send_ann(conn);
+ if (rv < 0) return rv;
+
+ return ECP_OK;
+}
+
+void dir_online_switch(void) {
+ struct hashtable_itr itr;
+ DirNode *node;
+ unsigned char *msg;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+ pthread_rwlock_wrlock(&dir_online_rwlock);
+
+ msg = dir_online.msg;
+ dir_online.count = ecp_ht_count(dir_shadow);
+
+ if (dir_online.count > 0) {
+ size_t _rv;
+ int rv;
+
+ ecp_ht_itr_create(&itr, dir_shadow);
+ do {
+ node = ecp_ht_itr_value(&itr);
+
+ _rv = ecp_dir_item_serialize(&node->dir_item, msg);
+ msg += _rv;
+
+ rv = ecp_ht_itr_advance(&itr);
+ } while (rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_online_rwlock);
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+}
+
+int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) {
+ DirNode *node;
+ int is_new = 0;
+ int rv = ECP_OK;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ node = ht_search_item(dir_shadow, dir_item);
+ if (node) {
+ pthread_mutex_lock(&node->mutex);
+ if (!node->zombie) node->access_ts = access_ts;
+ pthread_mutex_unlock(&node->mutex);
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ } else {
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ LOG(LOG_DEBUG, "dir new node\n");
+
+ node = dir_create_node(dir_item, access_ts);
+ if (node == NULL) rv = ECP_ERR_ALLOC;
+
+ if (!rv) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
+ if (!rv) {
+ if (ht_search_node(dir_shadow, node) == NULL) {
+ rv = ht_insert_node(dir_shadow, node);
+ } else {
+ rv = ECP_ERR_DUP;
+ }
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+
+ if (!rv) {
+ is_new = 1;
+ /* always switch */
+ dir_online_switch();
+ } else {
+ if (node) dir_destroy_node(node);
+ LOG(LOG_ERR, "dir create node err:%d\n", rv);
+ }
+ }
+
+ return is_new;
+}
+
+DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) {
+ DirNode *node;
+ int rv;
+
+ node = malloc(sizeof(DirNode));
+ if (node == NULL) return NULL;
+
+ memset(node, 0, sizeof(DirNode));
+
+ rv = pthread_mutex_init(&node->mutex, NULL);
+ if (rv) {
+ free(node);
+ return NULL;
+ }
+
+ node->dir_item = *dir_item;
+ node->access_ts = access_ts;
+
+ return node;
+}
+
+void dir_destroy_node(DirNode *node) {
+ pthread_mutex_destroy(&node->mutex);
+ free(node);
+}
+
+int dir_open_conn(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int rv;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_dir_conn_init(conn, sock);
+ ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX);
+ rv = ecp_conn_try_open(conn, node);
+ return rv;
+}
+
+DirNode *dir_search_conn(ECPConnection *conn) {
+ DirNode *node;
+ DirNode *ret = NULL;
+
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ node = ht_search_conn(dir_shadow, conn);
+ if (node) {
+ pthread_mutex_lock(&node->mutex);
+ if (!node->zombie) {
+ node->refcount++;
+ ret = node;
+ } else {
+ ret = NULL;
+ }
+ pthread_mutex_unlock(&node->mutex);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ return ret;
+}
+
+void dir_announce(ECPSocket *sock) {
+ static DirNode *expire_node[MAX_EXPIRE_CNT];
+ static int expire_node_z[MAX_EXPIRE_CNT];
+ static int expire_cnt;
+
+ struct hashtable_itr itr;
+ ecp_sts_t now;
+ DirNode *node;
+ DirNode *node_next;
+ DirNode *announce_node[MAX_ANNOUNCE_CNT];
+ int announce_cnt;
+ int refcount;
+ int i;
+ int rv;
+
+ now = ecp_tm_get_s();
+ node_next = NULL;
+ do {
+ announce_cnt = 0;
+ pthread_rwlock_rdlock(&dir_shadow_rwlock);
+
+ if (ecp_ht_count(dir_shadow) > 0) {
+ ecp_ht_itr_create(&itr, dir_shadow);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "dir ann itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+ rv = ecp_ht_itr_advance(&itr);
+
+ pthread_mutex_lock(&node->mutex);
+ if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) {
+ node->zombie = 1;
+ pthread_mutex_unlock(&node->mutex);
+
+ if (expire_cnt < MAX_EXPIRE_CNT) {
+ expire_node[expire_cnt] = node;
+ expire_node_z[expire_cnt] = 1;
+ expire_cnt++;
+ }
+ } else {
+ pthread_mutex_unlock(&node->mutex);
+
+ if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) {
+ announce_node[announce_cnt] = node;
+ announce_cnt++;
+
+ if (announce_cnt == MAX_ANNOUNCE_CNT) {
+ if (!rv) {
+ node_next = ecp_ht_itr_key(&itr);
+ } else {
+ node_next = NULL;
+ }
+ break;
+ }
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+
+ if (expire_cnt) {
+ pthread_rwlock_wrlock(&dir_shadow_rwlock);
+ for (i=0; i<expire_cnt; i++) {
+ if (expire_node_z[i]) {
+ expire_node_z[i] = 0;
+ ht_remove_node(dir_shadow, expire_node[i]);
+ LOG(LOG_DEBUG, "dir remove node\n");
+ }
+ }
+ pthread_rwlock_unlock(&dir_shadow_rwlock);
+ }
+
+ for (i=0; i<announce_cnt; i++) {
+ rv = dir_open_conn(sock, &announce_node[i]->dir_item.node);
+ if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv);
+ }
+
+ i = expire_cnt - 1;
+ while (i >= 0) {
+ node = expire_node[i];
+
+ pthread_mutex_lock(&node->mutex);
+ refcount = node->refcount;
+ pthread_mutex_unlock(&node->mutex);
+
+ if (refcount == 0) {
+ int j = i;
+
+ expire_cnt--;
+ while (j < expire_cnt) {
+ expire_node[j] = expire_node[j+1];
+ j++;
+ }
+ expire_node[j] = NULL;
+ dir_destroy_node(node);
+ }
+ i--;
+ }
+ } while (node_next);
+}
+
+static void *_dir_announce(void *arg) {
+ ECPSocket *sock = arg;
+
+ while (1) {
+ LOG(LOG_DEBUG, "dir announce...\n");
+ dir_announce(sock);
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+int dir_start_announce(ECPSocket *sock) {
+ int rv;
+
+ rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int dir_init(ECPContext *ctx) {
+ ECPConnHandler *handler;
+ int rv;
+
+ srv_config = srv_get_config();
+
+ handler = malloc(sizeof(ECPConnHandler));
+ if (handler == NULL) return ECP_ERR_ALLOC;
+
+ ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL);
+ rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler);
+ if (rv) return rv;
+
+ rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL);
+ if (rv) {
+ return ECP_ERR;
+ }
+
+ rv = pthread_rwlock_init(&dir_online_rwlock, NULL);
+ if (rv) {
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ return ECP_ERR;
+ }
+
+ dir_shadow = ecp_ht_create_keys();
+ if (dir_shadow == NULL) {
+ pthread_rwlock_destroy(&dir_shadow_rwlock);
+ pthread_rwlock_destroy(&dir_online_rwlock);
+ return ECP_ERR_ALLOC;
+ }
+
+ return ECP_OK;
+}