#include #include #include #include #include #include #include #include "server.h" #include "vlink.h" #include "dir.h" #include "ht.h" static ecp_ht_table_t *dir_shadow = NULL; static pthread_rwlock_t dir_shadow_rwlock; static pthread_rwlock_t dir_online_rwlock; static pthread_t dir_announce_thd; static DirList dir_online; static SRVConfig *srv_config; ssize_t dir_send_ann(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; unsigned char *msg; packet.buffer = pkt_buf; packet.size = sizeof(pkt_buf); payload.buffer = pld_buf; payload.size = sizeof(pld_buf); ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); msg = ecp_pld_get_msg(payload.buffer, payload.size); msg[0] = srv_config->capabilities >> 8; msg[1] = srv_config->capabilities; return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); } ssize_t dir_send_shadow(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; struct hashtable_itr itr; DirNode *node; uint16_t count; ecp_sts_t access_ts; size_t msg_size; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); msg = ecp_pld_get_msg(payload.buffer, payload.size); msg_size = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); count = ecp_ht_count(dir_shadow); msg[0] = count >> 8; msg[1] = count; msg += sizeof(uint16_t); msg_size += sizeof(uint16_t); if (count > 0) { size_t rv; int _rv; ecp_ht_itr_create(&itr, dir_shadow); do { node = ecp_ht_itr_value(&itr); pthread_mutex_lock(&node->mutex); access_ts = node->access_ts; pthread_mutex_unlock(&node->mutex); rv = ecp_dir_item_serialize(&node->dir_item, msg); msg += rv; msg_size += rv; msg[0] = access_ts >> 24; msg[1] = access_ts >> 16; msg[2] = access_ts >> 8; msg[3] = access_ts; msg += sizeof(uint32_t); msg_size += sizeof(uint32_t); _rv = ecp_ht_itr_advance(&itr); } while (_rv == ECP_OK); } pthread_rwlock_unlock(&dir_shadow_rwlock); return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); } ssize_t dir_send_online(ECPConnection *conn) { ECPBuffer packet; ECPBuffer payload; unsigned char pkt_buf[ECP_MAX_PKT]; unsigned char pld_buf[ECP_MAX_PLD]; unsigned char *msg; size_t msg_size; packet.buffer = pkt_buf; packet.size = ECP_MAX_PKT; payload.buffer = pld_buf; payload.size = ECP_MAX_PLD; ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); msg = ecp_pld_get_msg(payload.buffer, payload.size); pthread_rwlock_rdlock(&dir_online_rwlock); msg[0] = dir_online.count >> 8; msg[1] = dir_online.count; msg += sizeof(uint16_t); msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); pthread_rwlock_unlock(&dir_online_rwlock); return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); } ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ECPDirItem dir_item; int is_new; uint16_t capabilities; ecp_sts_t now; ssize_t rsize; ssize_t rv; rsize = sizeof(uint16_t); if (msg_size < rsize) return ECP_ERR_SIZE; capabilities = \ ((uint16_t)msg[0] << 8) | \ ((uint16_t)msg[1]); memset(&dir_item, 0, sizeof(ECPDirItem)); dir_item.node = conn->remote; dir_item.capabilities = capabilities; now = ecp_tm_get_s(); is_new = dir_process_item(&dir_item, now); if (is_new) vlink_new_node(conn->sock, &dir_item); rv = dir_send_shadow(conn); if (rv < 0) return rv; return rsize; } ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ssize_t rv; rv = dir_send_online(conn); if (rv < 0) return rv; return 0; } ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { ecp_sts_t now; uint16_t count; size_t rsize; int i; if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; count = \ ((uint16_t)msg[0] << 8) | \ ((uint16_t)msg[1]); rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); if (msg_size < rsize) return ECP_ERR_SIZE; msg += sizeof(uint16_t); now = ecp_tm_get_s(); for (i=0; isock, &dir_item); } } return rsize; } ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { switch (mtype) { case ECP_MTYPE_DIR_REQ: { if (ecp_conn_is_outb(conn)) return ECP_ERR; return dir_handle_req(conn, msg, msg_size); } case ECP_MTYPE_DIR_ANN: { if (ecp_conn_is_outb(conn)) return ECP_ERR; if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX; return dir_handle_ann(conn, msg, msg_size); } case ECP_MTYPE_DIR_SHADOW: { if (ecp_conn_is_inb(conn)) return ECP_ERR; return dir_handle_shadow(conn, msg, msg_size); } default: return ECP_ERR_MTYPE; } } int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { ssize_t rv; if (ecp_conn_is_inb(conn)) return ECP_OK; rv = dir_send_ann(conn); if (rv < 0) return rv; return ECP_OK; } void dir_online_switch(void) { struct hashtable_itr itr; DirNode *node; unsigned char *msg; pthread_rwlock_rdlock(&dir_shadow_rwlock); pthread_rwlock_wrlock(&dir_online_rwlock); msg = dir_online.msg; dir_online.count = ecp_ht_count(dir_shadow); if (dir_online.count > 0) { size_t _rv; int rv; ecp_ht_itr_create(&itr, dir_shadow); do { node = ecp_ht_itr_value(&itr); _rv = ecp_dir_item_serialize(&node->dir_item, msg); msg += _rv; rv = ecp_ht_itr_advance(&itr); } while (rv == ECP_OK); } pthread_rwlock_unlock(&dir_online_rwlock); pthread_rwlock_unlock(&dir_shadow_rwlock); } int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { DirNode *node; int is_new = 0; int rv = ECP_OK; pthread_rwlock_rdlock(&dir_shadow_rwlock); node = ht_search_item(dir_shadow, dir_item); if (node) { pthread_mutex_lock(&node->mutex); if (!node->zombie) node->access_ts = access_ts; pthread_mutex_unlock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); } else { pthread_rwlock_unlock(&dir_shadow_rwlock); LOG(LOG_DEBUG, "dir new node\n"); node = dir_create_node(dir_item, access_ts); if (node == NULL) rv = ECP_ERR_ALLOC; if (!rv) { pthread_rwlock_wrlock(&dir_shadow_rwlock); if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; if (!rv) { if (ht_search_node(dir_shadow, node) == NULL) { rv = ht_insert_node(dir_shadow, node); } else { rv = ECP_ERR_DUP; } } pthread_rwlock_unlock(&dir_shadow_rwlock); } if (!rv) { is_new = 1; /* always switch */ dir_online_switch(); } else { if (node) dir_destroy_node(node); LOG(LOG_ERR, "dir create node err:%d\n", rv); } } return is_new; } DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { DirNode *node; int rv; node = malloc(sizeof(DirNode)); if (node == NULL) return NULL; memset(node, 0, sizeof(DirNode)); rv = pthread_mutex_init(&node->mutex, NULL); if (rv) { free(node); return NULL; } node->dir_item = *dir_item; node->access_ts = access_ts; return node; } void dir_destroy_node(DirNode *node) { pthread_mutex_destroy(&node->mutex); free(node); } int dir_open_conn(ECPSocket *sock, ECPNode *node) { ECPConnection *conn; int rv; conn = malloc(sizeof(ECPConnection)); if (conn == NULL) return ECP_ERR_ALLOC; ecp_dir_conn_init(conn, sock); ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); rv = ecp_conn_try_open(conn, node); return rv; } DirNode *dir_search_conn(ECPConnection *conn) { DirNode *node; DirNode *ret = NULL; pthread_rwlock_rdlock(&dir_shadow_rwlock); node = ht_search_conn(dir_shadow, conn); if (node) { pthread_mutex_lock(&node->mutex); if (!node->zombie) { node->refcount++; ret = node; } else { ret = NULL; } pthread_mutex_unlock(&node->mutex); } pthread_rwlock_unlock(&dir_shadow_rwlock); return ret; } void dir_announce(ECPSocket *sock) { static DirNode *expire_node[MAX_EXPIRE_CNT]; static int expire_node_z[MAX_EXPIRE_CNT]; static int expire_cnt; struct hashtable_itr itr; ecp_sts_t now; DirNode *node; DirNode *node_next; DirNode *announce_node[MAX_ANNOUNCE_CNT]; int announce_cnt; int refcount; int i; int rv; now = ecp_tm_get_s(); node_next = NULL; do { announce_cnt = 0; pthread_rwlock_rdlock(&dir_shadow_rwlock); if (ecp_ht_count(dir_shadow) > 0) { ecp_ht_itr_create(&itr, dir_shadow); if (node_next) { rv = ecp_ht_itr_search(&itr, node_next); if (rv) { LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); break; } node_next = NULL; } do { node = ecp_ht_itr_value(&itr); rv = ecp_ht_itr_advance(&itr); pthread_mutex_lock(&node->mutex); if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { node->zombie = 1; pthread_mutex_unlock(&node->mutex); if (expire_cnt < MAX_EXPIRE_CNT) { expire_node[expire_cnt] = node; expire_node_z[expire_cnt] = 1; expire_cnt++; } } else { pthread_mutex_unlock(&node->mutex); if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { announce_node[announce_cnt] = node; announce_cnt++; if (announce_cnt == MAX_ANNOUNCE_CNT) { if (!rv) { node_next = ecp_ht_itr_key(&itr); } else { node_next = NULL; } break; } } } } while (rv == ECP_OK); } pthread_rwlock_unlock(&dir_shadow_rwlock); if (expire_cnt) { pthread_rwlock_wrlock(&dir_shadow_rwlock); for (i=0; idir_item.node); if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); } i = expire_cnt - 1; while (i >= 0) { node = expire_node[i]; pthread_mutex_lock(&node->mutex); refcount = node->refcount; pthread_mutex_unlock(&node->mutex); if (refcount == 0) { int j = i; expire_cnt--; while (j < expire_cnt) { expire_node[j] = expire_node[j+1]; j++; } expire_node[j] = NULL; dir_destroy_node(node); } i--; } } while (node_next); } static void *_dir_announce(void *arg) { ECPSocket *sock = arg; while (1) { LOG(LOG_DEBUG, "dir announce...\n"); dir_announce(sock); sleep(10); } return NULL; } int dir_start_announce(ECPSocket *sock) { int rv; rv = pthread_create(&dir_announce_thd, NULL, _dir_announce, sock); if (rv) return ECP_ERR; return ECP_OK; } int dir_init(ECPContext *ctx) { ECPConnHandler *handler; int rv; srv_config = srv_get_config(); handler = malloc(sizeof(ECPConnHandler)); if (handler == NULL) return ECP_ERR_ALLOC; ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL); rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler); if (rv) return rv; rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL); if (rv) { return ECP_ERR; } rv = pthread_rwlock_init(&dir_online_rwlock, NULL); if (rv) { pthread_rwlock_destroy(&dir_shadow_rwlock); return ECP_ERR; } dir_shadow = ecp_ht_create_keys(); if (dir_shadow == NULL) { pthread_rwlock_destroy(&dir_shadow_rwlock); pthread_rwlock_destroy(&dir_online_rwlock); return ECP_ERR_ALLOC; } return ECP_OK; }