#include #include #include #include #include #include #include #include #include "server.h" #include "dir.h" #include "ht.h" #include "vlink.h" static ecp_ht_table_t *vlink_conn = NULL; static ecp_ht_table_t *vlink_node = NULL; static pthread_mutex_t vlink_conn_mutex; static pthread_mutex_t vlink_node_mutex; static pthread_t vlink_open_thd; static pthread_t vlink_keyx_thd; static SRVConfig *srv_config; void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { switch (mtype) { case ECP_MTYPE_OPEN_REP: { if (err == ECP_ERR_TIMEOUT) { int rv; rv = vlink_insert_node(conn); if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); } ecp_conn_close(conn); break; } } } int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { int rv; rv = ecp_vlink_handle_open(conn, bufs); if (rv) return rv; if (ecp_conn_is_inb(conn)) return ECP_OK; pthread_mutex_lock(&vlink_conn_mutex); rv = ht_insert_conn(vlink_conn, conn); pthread_mutex_unlock(&vlink_conn_mutex); if (rv) { ecp_vlink_handle_close(conn); return rv; } return ECP_OK; } void vlink_handle_close(ECPConnection *conn) { int rv; ecp_vlink_handle_close(conn); if (ecp_conn_is_inb(conn)) return; rv = vlink_insert_node(conn); if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); } int vlink_open_conn(ECPSocket *sock, ECPNode *node) { ECPConnection *conn; int rv; conn = malloc(sizeof(ECPConnection)); if (conn == NULL) return ECP_ERR_ALLOC; ecp_vlink_init(conn, sock); rv = ecp_conn_open(conn, node); return rv; } void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { int rv; LOG(LOG_DEBUG, "vlink open connection\n"); rv = vlink_open_conn(sock, &dir_item->node); if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); } } int vlink_insert_node(ECPConnection *conn) { DirNode *node; int rv = ECP_OK; node = dir_search_conn(conn); if (node) { pthread_mutex_lock(&vlink_node_mutex); rv = ht_insert_node(vlink_node, node); pthread_mutex_unlock(&vlink_node_mutex); if (rv) { pthread_mutex_lock(&node->mutex); node->refcount--; pthread_mutex_unlock(&node->mutex); } } return rv; } void vlink_open(ECPSocket *sock) { struct hashtable_itr itr; DirNode *node; DirNode *node_next; DirNode *open_node[MAX_OPEN_CNT]; int open_cnt; int i; int rv; node_next = NULL; do { open_cnt = 0; pthread_mutex_lock(&vlink_node_mutex); if (ecp_ht_count(vlink_node) > 0) { ecp_ht_itr_create(&itr, vlink_node); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); rv = ecp_ht_itr_remove(&itr); open_node[open_cnt] = node; open_cnt++; if (open_cnt == MAX_OPEN_CNT) { if (!rv) { node_next = ecp_ht_itr_key(&itr); } else { node_next = NULL; } } } while (rv == ECP_OK); } pthread_mutex_unlock(&vlink_node_mutex); for (i=0; imutex); if (open_node[i]->zombie) { open_node[i]->refcount--; pthread_mutex_unlock(&open_node[i]->mutex); } else { pthread_mutex_unlock(&open_node[i]->mutex); LOG(LOG_DEBUG, "vlink open connection\n"); rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); } } } while (node_next); } void vlink_keyx(void) { struct hashtable_itr itr; ecp_sts_t now; ECPConnection *conn; ECPConnection *keyx_next; ECPConnection *keyx_conn[MAX_KEYX_CNT]; int keyx_conn_z[MAX_KEYX_CNT]; int keyx_cnt; int is_zombie; int i; int rv; now = ecp_tm_get_s(); keyx_next = NULL; do { keyx_cnt = 0; pthread_mutex_lock(&vlink_conn_mutex); if (ecp_ht_count(vlink_conn) > 0) { ecp_ht_itr_create(&itr, vlink_conn); if (keyx_next) { rv = ecp_ht_itr_search(&itr, keyx_next); if (rv) { LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); break; } keyx_next = NULL; } do { conn = ecp_ht_itr_value(&itr); is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); if (is_zombie) { rv = ecp_ht_itr_remove(&itr); } else { rv = ecp_ht_itr_advance(&itr); } keyx_conn[keyx_cnt] = conn; keyx_conn_z[keyx_cnt] = is_zombie; keyx_cnt++; if (keyx_cnt == MAX_KEYX_CNT) { if (!rv) { keyx_next = ecp_ht_itr_key(&itr); } else { keyx_next = NULL; } } } while (rv == ECP_OK); } pthread_mutex_unlock(&vlink_conn_mutex); for (i=0; ihandle_open = vlink_handle_open; handler->handle_close = vlink_handle_close; rv = pthread_mutex_init(&vlink_conn_mutex, NULL); if (rv) { return ECP_ERR; } rv = pthread_mutex_init(&vlink_node_mutex, NULL); if (rv) { pthread_mutex_destroy(&vlink_conn_mutex); return ECP_ERR; } rv = ECP_OK; vlink_conn = ecp_ht_create_keys(); if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; if (!rv) { vlink_node = ecp_ht_create_keys(); if (vlink_node == NULL) rv = ECP_ERR_ALLOC; } if (rv) { pthread_mutex_destroy(&vlink_node_mutex); pthread_mutex_destroy(&vlink_conn_mutex); if (vlink_node) ecp_ht_destroy(vlink_node); if (vlink_conn) ecp_ht_destroy(vlink_conn); return rv; } return ECP_OK; }