summaryrefslogtreecommitdiff
path: root/ecp/server/vlink.c
diff options
context:
space:
mode:
Diffstat (limited to 'ecp/server/vlink.c')
-rw-r--r--ecp/server/vlink.c321
1 files changed, 321 insertions, 0 deletions
diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c
new file mode 100644
index 0000000..e98dacc
--- /dev/null
+++ b/ecp/server/vlink.c
@@ -0,0 +1,321 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include <ecp/core.h>
+#include <ecp/dir/dir.h>
+#include <ecp/vconn/vconn.h>
+#include <ecp/ht.h>
+#include <ecp/tm.h>
+
+#include "server.h"
+#include "dir.h"
+#include "ht.h"
+
+#include "vlink.h"
+
+static ecp_ht_table_t *vlink_conn = NULL;
+static ecp_ht_table_t *vlink_node = NULL;
+static pthread_mutex_t vlink_conn_mutex;
+static pthread_mutex_t vlink_node_mutex;
+static pthread_t vlink_open_thd;
+static pthread_t vlink_keyx_thd;
+
+static SRVConfig *srv_config;
+
+void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) {
+ switch (mtype) {
+ case ECP_MTYPE_OPEN_REP: {
+ if (err == ECP_ERR_TIMEOUT) {
+ int rv;
+
+ rv = vlink_insert_node(conn);
+ if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
+ }
+ ecp_conn_close(conn);
+ break;
+ }
+ }
+}
+
+int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) {
+ int rv;
+
+ rv = ecp_vlink_handle_open(conn, bufs);
+ if (rv) return rv;
+
+ if (ecp_conn_is_inb(conn)) return ECP_OK;
+
+ pthread_mutex_lock(&vlink_conn_mutex);
+ rv = ht_insert_conn(vlink_conn, conn);
+ pthread_mutex_unlock(&vlink_conn_mutex);
+
+ if (rv) {
+ ecp_vlink_handle_close(conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}
+
+void vlink_handle_close(ECPConnection *conn) {
+ int rv;
+
+ ecp_vlink_handle_close(conn);
+
+ if (ecp_conn_is_inb(conn)) return;
+
+ rv = vlink_insert_node(conn);
+ if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv);
+}
+
+int vlink_open_conn(ECPSocket *sock, ECPNode *node) {
+ ECPConnection *conn;
+ int rv;
+
+ conn = malloc(sizeof(ECPConnection));
+ if (conn == NULL) return ECP_ERR_ALLOC;
+
+ ecp_vlink_init(conn, sock);
+ rv = ecp_conn_open(conn, node);
+ return rv;
+}
+
+void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) {
+ if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) {
+ int rv;
+
+ LOG(LOG_DEBUG, "vlink open connection\n");
+ rv = vlink_open_conn(sock, &dir_item->node);
+ if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
+ }
+}
+
+int vlink_insert_node(ECPConnection *conn) {
+ DirNode *node;
+ int rv = ECP_OK;
+
+ node = dir_search_conn(conn);
+ if (node) {
+ pthread_mutex_lock(&vlink_node_mutex);
+ rv = ht_insert_node(vlink_node, node);
+ pthread_mutex_unlock(&vlink_node_mutex);
+
+ if (rv) {
+ pthread_mutex_lock(&node->mutex);
+ node->refcount--;
+ pthread_mutex_unlock(&node->mutex);
+ }
+ }
+
+ return rv;
+}
+
+void vlink_open(ECPSocket *sock) {
+ struct hashtable_itr itr;
+ DirNode *node;
+ DirNode *node_next;
+ DirNode *open_node[MAX_OPEN_CNT];
+ int open_cnt;
+ int i;
+ int rv;
+
+ node_next = NULL;
+ do {
+ open_cnt = 0;
+ pthread_mutex_lock(&vlink_node_mutex);
+
+ if (ecp_ht_count(vlink_node) > 0) {
+ ecp_ht_itr_create(&itr, vlink_node);
+ if (node_next) {
+ rv = ecp_ht_itr_search(&itr, node_next);
+ if (rv) {
+ LOG(LOG_ERR, "vlink open itr search err:%d\n", rv);
+ break;
+ }
+ node_next = NULL;
+ }
+ do {
+ node = ecp_ht_itr_value(&itr);
+ rv = ecp_ht_itr_remove(&itr);
+
+ open_node[open_cnt] = node;
+ open_cnt++;
+ if (open_cnt == MAX_OPEN_CNT) {
+ if (!rv) {
+ node_next = ecp_ht_itr_key(&itr);
+ } else {
+ node_next = NULL;
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_mutex_unlock(&vlink_node_mutex);
+
+ for (i=0; i<open_cnt; i++) {
+ pthread_mutex_lock(&open_node[i]->mutex);
+
+ if (open_node[i]->zombie) {
+ open_node[i]->refcount--;
+
+ pthread_mutex_unlock(&open_node[i]->mutex);
+ } else {
+ pthread_mutex_unlock(&open_node[i]->mutex);
+
+ LOG(LOG_DEBUG, "vlink open connection\n");
+ rv = vlink_open_conn(sock, &open_node[i]->dir_item.node);
+ if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv);
+ }
+ }
+ } while (node_next);
+}
+
+void vlink_keyx(void) {
+ struct hashtable_itr itr;
+ ecp_sts_t now;
+ ECPConnection *conn;
+ ECPConnection *keyx_next;
+ ECPConnection *keyx_conn[MAX_KEYX_CNT];
+ int keyx_conn_z[MAX_KEYX_CNT];
+ int keyx_cnt;
+ int is_zombie;
+ int i;
+ int rv;
+
+ now = ecp_tm_get_s();
+ keyx_next = NULL;
+ do {
+ keyx_cnt = 0;
+ pthread_mutex_lock(&vlink_conn_mutex);
+
+ if (ecp_ht_count(vlink_conn) > 0) {
+ ecp_ht_itr_create(&itr, vlink_conn);
+ if (keyx_next) {
+ rv = ecp_ht_itr_search(&itr, keyx_next);
+ if (rv) {
+ LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv);
+ break;
+ }
+ keyx_next = NULL;
+ }
+ do {
+ conn = ecp_ht_itr_value(&itr);
+
+ is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME);
+ if (is_zombie) {
+ rv = ecp_ht_itr_remove(&itr);
+ } else {
+ rv = ecp_ht_itr_advance(&itr);
+ }
+
+ keyx_conn[keyx_cnt] = conn;
+ keyx_conn_z[keyx_cnt] = is_zombie;
+ keyx_cnt++;
+
+ if (keyx_cnt == MAX_KEYX_CNT) {
+ if (!rv) {
+ keyx_next = ecp_ht_itr_key(&itr);
+ } else {
+ keyx_next = NULL;
+ }
+ }
+ } while (rv == ECP_OK);
+ }
+
+ pthread_mutex_unlock(&vlink_conn_mutex);
+
+ for (i=0; i<keyx_cnt; i++) {
+ if (keyx_conn_z[i]) {
+ LOG(LOG_DEBUG, "vlink close connection\n");
+ ecp_conn_close(keyx_conn[i]);
+ } else {
+ ssize_t _rv;
+
+ LOG(LOG_DEBUG, "vlink send keyx\n");
+ _rv = ecp_send_keyx_req(keyx_conn[i], 1);
+ if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv);
+ }
+ }
+ } while (keyx_next);
+}
+
+static void *_vlink_open(void *arg) {
+ ECPSocket *sock = arg;
+
+ while (1) {
+ LOG(LOG_DEBUG, "vlink open...\n");
+ vlink_open(sock);
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+static void *_vlink_keyx(void *arg) {
+ while (1) {
+ LOG(LOG_DEBUG, "vlink keyx...\n");
+ vlink_keyx();
+ sleep(10);
+ }
+
+ return NULL;
+}
+
+int vlink_start_open(ECPSocket *sock) {
+ int rv;
+
+ rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int vlink_start_keyx(void) {
+ int rv;
+
+ rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL);
+ if (rv) return ECP_ERR;
+ return ECP_OK;
+}
+
+int vlink_init(ECPContext *ctx) {
+ ECPConnHandler *handler;
+ int rv;
+
+ srv_config = srv_get_config();
+
+ handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK);
+ if (handler == NULL) return ECP_ERR;
+
+ handler->handle_open = vlink_handle_open;
+ handler->handle_close = vlink_handle_close;
+
+ rv = pthread_mutex_init(&vlink_conn_mutex, NULL);
+ if (rv) {
+ return ECP_ERR;
+ }
+
+ rv = pthread_mutex_init(&vlink_node_mutex, NULL);
+ if (rv) {
+ pthread_mutex_destroy(&vlink_conn_mutex);
+ return ECP_ERR;
+ }
+
+ rv = ECP_OK;
+ vlink_conn = ecp_ht_create_keys();
+ if (vlink_conn == NULL) rv = ECP_ERR_ALLOC;
+ if (!rv) {
+ vlink_node = ecp_ht_create_keys();
+ if (vlink_node == NULL) rv = ECP_ERR_ALLOC;
+ }
+
+ if (rv) {
+ pthread_mutex_destroy(&vlink_node_mutex);
+ pthread_mutex_destroy(&vlink_conn_mutex);
+ if (vlink_node) ecp_ht_destroy(vlink_node);
+ if (vlink_conn) ecp_ht_destroy(vlink_conn);
+ return rv;
+ }
+
+ return ECP_OK;
+}