diff options
| author | Uros Majstorovic <majstor@majstor.org> | 2024-05-06 02:08:31 +0200 | 
|---|---|---|
| committer | Uros Majstorovic <majstor@majstor.org> | 2024-05-06 02:08:31 +0200 | 
| commit | 5f55d9d4d14635678e7f582215e3642de2e232a4 (patch) | |
| tree | 3322f643e0fbc16984e8eebfca4de7bd4cf63391 | |
| parent | 1060b5e4712db12b52944bdcf7f2588cea23382b (diff) | |
new ecp directory and vconn server
| -rw-r--r-- | ecp/server/Makefile | 7 | ||||
| -rw-r--r-- | ecp/server/acl.c | 275 | ||||
| -rw-r--r-- | ecp/server/acl.h | 21 | ||||
| -rw-r--r-- | ecp/server/dir.c | 1053 | ||||
| -rw-r--r-- | ecp/server/dir.h | 89 | ||||
| -rw-r--r-- | ecp/server/ht.c | 21 | ||||
| -rw-r--r-- | ecp/server/ht.h | 10 | ||||
| -rw-r--r-- | ecp/server/server.c | 270 | ||||
| -rw-r--r-- | ecp/server/server.h | 15 | ||||
| -rw-r--r-- | ecp/server/timer.c | 98 | ||||
| -rw-r--r-- | ecp/server/timer.h | 9 | ||||
| -rw-r--r-- | ecp/server/vlink.c | 360 | ||||
| -rw-r--r-- | ecp/server/vlink.h | 21 | ||||
| -rw-r--r-- | ecp/src/ecp/dir/dir.c | 24 | ||||
| -rw-r--r-- | ecp/src/ecp/dir/dir.h | 11 | 
15 files changed, 1602 insertions, 682 deletions
diff --git a/ecp/server/Makefile b/ecp/server/Makefile index 2105e9f..84070bf 100644 --- a/ecp/server/Makefile +++ b/ecp/server/Makefile @@ -1,9 +1,10 @@  src_dir = ../src  include $(src_dir)/ecp/common.mk -CFLAGS += -Wno-int-to-void-pointer-cast +CFLAGS += -I../util -Wno-int-to-void-pointer-cast +LDFLAGS += -lrt -obj = server.o dir.o vlink.o ht.o -dep = ../build-posix/*.a +obj = server.o dir.o vlink.o ht.o acl.o timer.o +dep = ../build-posix/*.a ../util/libecputil.a  %.o: %.c  	$(CC) $(CFLAGS) -c $< diff --git a/ecp/server/acl.c b/ecp/server/acl.c new file mode 100644 index 0000000..d8cdc6e --- /dev/null +++ b/ecp/server/acl.c @@ -0,0 +1,275 @@ +#include <stdlib.h> +#include <string.h> +#include <fcntl.h> +#include <unistd.h> +#include <sys/stat.h> + +#include <ecp/core.h> +#include <ecp/cr.h> +#include <ecp/ht.h> +#include <ecp/dir/dir.h> + +#include <util.h> + +#include "server.h" +#include "acl.h" + +static SRVConfig *srv_config; + +static ecp_ht_table_t *acl_keys = NULL; +static ecp_ht_table_t *acl_keys_dir = NULL; +static pthread_mutex_t acl_li_mutex; +static pthread_rwlock_t acl_ht_rwlock; +static ACLItem *acl_head = NULL; +static ACLItem *acl_head_dir = NULL; +static int acl_mark = 0; + +ACLItem *acl_create_item(void) { +    ACLItem *ret = NULL; + +    ret = malloc(sizeof(ACLItem)); +    if (ret == NULL) return ret; +    memset(&ret->key, 0, sizeof(ret->key)); +    ret->key_cnt = 0; +    ret->next = NULL; + +    return ret; +} + +void acl_destroy_item(ACLItem *item) { +    free(item); +} + +void acl_destroy_list(ACLItem *head) { +    ACLItem *acl_next; + +    while (head) { +        acl_next = head->next; +        acl_destroy_item(head); +        head = acl_next; +    } +} + +static int _add_key(ecp_ecdh_public_t *public, uint8_t capabilities) { +    int rv; + +    if ((acl_keys == NULL) || (acl_keys_dir == NULL)) return ECP_ERR; + +    if ((srv_config->capabilities & ECP_DIR_CAP_DIR) || (srv_config->capabilities & capabilities & ECP_DIR_CAP_VCONN)) { +        /* directory server accepts all connections +           vconn server accepts connections only from other vconn servers */ +        rv = ecp_ht_insert_uniq(acl_keys, public, &acl_mark); +        if (rv && (rv != ECP_ERR_DUP)) return rv; +    } +    if (srv_config->capabilities & capabilities & ECP_DIR_CAP_DIR) { +        rv = ecp_ht_insert_uniq(acl_keys_dir, public, &acl_mark); +        if (rv && (rv != ECP_ERR_DUP)) return rv; +    } + +    return ECP_OK; +} + +static int _read_file(int fd, ACLItem *head) { +    ecp_ecdh_public_t public; +    int rv; + +    if (head == NULL) return ECP_ERR_ALLOC; + +    while (head->next) { +        head = head->next; +    } + +    while(ecp_util_read_key(fd, &public, NULL) == ECP_OK) { +        if (head->key_cnt == ACL_MAX_KEY) { +            head->next = acl_create_item(); +            if (head->next == NULL) return ECP_ERR_ALLOC; +            head = head->next; +        } +        memcpy(&head->key[head->key_cnt], &public, sizeof(head->key[head->key_cnt])); +        head->key_cnt++; +    } + +    return ECP_OK; +} + +static int _li2ht(ACLItem *head, int is_dir) { +    int i; +    int rv; + +    while (head) { +        for (i=0; i<head->key_cnt; i++) { +            rv = _add_key(&head->key[i], is_dir ? ECP_DIR_CAP_DIR : 0); +            if (rv) return rv; +        } +        head = head->next; +    } + +    return ECP_OK; +} + +int acl_add_key(ECPDirItem *dir_item) { +    int rv; + +    pthread_rwlock_wrlock(&acl_ht_rwlock); +    rv = _add_key(&dir_item->node.key_perma.public, dir_item->capabilities); +    pthread_rwlock_unlock(&acl_ht_rwlock); + +    return rv; +} + +int acl_inlist(ecp_ecdh_public_t *public) { +    void *item = NULL; + +    pthread_rwlock_rdlock(&acl_ht_rwlock); +    if (acl_keys) item = ecp_ht_search(acl_keys, public); +    pthread_rwlock_unlock(&acl_ht_rwlock); + +    return (item != NULL); +} + +int acl_dir_inlist(ecp_ecdh_public_t *public) { +    void *item = NULL; + +    pthread_rwlock_rdlock(&acl_ht_rwlock); +    if (acl_keys_dir) item = ecp_ht_search(acl_keys_dir, public); +    pthread_rwlock_unlock(&acl_ht_rwlock); + +    return (item != NULL); +} + +int acl_reset_ht(void) { +    int rv = ECP_OK; + +    pthread_rwlock_wrlock(&acl_ht_rwlock); +    if (acl_keys) ecp_ht_destroy(acl_keys); +    if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir); +    acl_keys = ecp_ht_create_keys(); +    acl_keys_dir = ecp_ht_create_keys(); +    if ((acl_keys == NULL) || (acl_keys_dir == NULL)) rv = ECP_ERR_ALLOC; +    pthread_rwlock_unlock(&acl_ht_rwlock); + +    return rv; +} + +int acl_load_ht(void) { +    int rv = ECP_OK; +    int _rv; + +    pthread_mutex_lock(&acl_li_mutex); +    pthread_rwlock_wrlock(&acl_ht_rwlock); + +    _rv = _li2ht(acl_head, 0); +    if (_rv) rv = ECP_ERR; +    _rv = _li2ht(acl_head_dir, 1); +    if (_rv) rv = ECP_ERR; + +    pthread_rwlock_unlock(&acl_ht_rwlock); +    pthread_mutex_unlock(&acl_li_mutex); + +    return rv; +} + +int acl_load(void) { +    ACLItem *acl_new = NULL; +    ACLItem *acl_dir_new = NULL; +    int fd = -1; +    int fd_dir = -1; +    int rv = ECP_OK; +    int _rv; + +    if ((srv_config->acl_fn == NULL) && (srv_config->acl_fn_dir == NULL)) return ECP_OK; + +    if (srv_config->acl_fn) { +        fd = open(srv_config->acl_fn, O_RDONLY); +        if (fd < 0) { +            LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn); +            return ECP_ERR; +        } +    } + +    if (srv_config->acl_fn_dir) { +        fd_dir = open(srv_config->acl_fn_dir, O_RDONLY); +        if (fd_dir < 0) { +            LOG(LOG_ERR, "acl_load: unable to open: %s\n", srv_config->acl_fn_dir); +            close(fd); +            return ECP_ERR; +        } +    } + +    pthread_mutex_lock(&acl_li_mutex); +    if (fd >= 0) { +        acl_new = acl_create_item(); +        rv = _read_file(fd, acl_new); +        if (rv) { +            LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn, rv); +            acl_destroy_list(acl_new); +            goto load_fin; +        } +    } + +    if (fd_dir >= 0) { +        acl_dir_new = acl_create_item(); +        rv = _read_file(fd_dir, acl_dir_new); +        if (rv) { +            LOG(LOG_ERR, "acl_load: read from file: %s err:%d\n", srv_config->acl_fn_dir, rv); +            acl_destroy_list(acl_dir_new); +            acl_destroy_list(acl_new); +            goto load_fin; +        } +    } +    if (acl_new) { +        acl_destroy_list(acl_head); +        acl_head = acl_new; +    } +    if (acl_dir_new) { +        acl_destroy_list(acl_head_dir); +        acl_head_dir = acl_dir_new; +    } + +    pthread_rwlock_wrlock(&acl_ht_rwlock); +    if (acl_new) { +        _rv = _li2ht(acl_new, 0); +        if (_rv) rv = ECP_ERR; +    } +    if (acl_dir_new) { +        _rv = _li2ht(acl_dir_new, 1); +        if (_rv) rv = ECP_ERR; +    } + +    pthread_rwlock_unlock(&acl_ht_rwlock); + +load_fin: +    pthread_mutex_unlock(&acl_li_mutex); + +    close(fd_dir); +    close(fd); + +    return rv; +} + +int acl_init(void) { +    int rv; + +    srv_config = srv_get_config(); + +    rv = pthread_mutex_init(&acl_li_mutex, NULL); +    if (rv) return ECP_ERR; + +    rv = pthread_rwlock_init(&acl_ht_rwlock, NULL); +    if (rv) { +        pthread_mutex_destroy(&acl_li_mutex); +        return ECP_ERR; +    } + +    acl_keys = ecp_ht_create_keys(); +    acl_keys_dir = ecp_ht_create_keys(); +    if ((acl_keys == NULL) || (acl_keys_dir == NULL)) { +        pthread_rwlock_destroy(&acl_ht_rwlock); +        pthread_mutex_destroy(&acl_li_mutex); +        if (acl_keys_dir) ecp_ht_destroy(acl_keys_dir); +        if (acl_keys) ecp_ht_destroy(acl_keys); +        return ECP_ERR_ALLOC; +    } + +    return ECP_OK; +} diff --git a/ecp/server/acl.h b/ecp/server/acl.h new file mode 100644 index 0000000..5e3f83c --- /dev/null +++ b/ecp/server/acl.h @@ -0,0 +1,21 @@ +#define ACL_MAX_KEY         50 + +typedef struct ACLItem { +    ecp_ecdh_public_t key[ACL_MAX_KEY]; +    unsigned short key_cnt; +    struct ACLItem *next; +} ACLItem; + +ACLItem *acl_create_item(void); +void acl_destroy_item(ACLItem *acl_item); +void acl_destroy_list(ACLItem *head); + +int acl_add_key(ECPDirItem *dir_item); +int acl_inlist(ecp_ecdh_public_t *public); +int acl_dir_inlist(ecp_ecdh_public_t *public); + +int acl_reset_ht(void); +int acl_load_ht(void); + +int acl_load(void); +int acl_init(void); diff --git a/ecp/server/dir.c b/ecp/server/dir.c index 0066bac..7244fd9 100644 --- a/ecp/server/dir.c +++ b/ecp/server/dir.c @@ -1,31 +1,122 @@  #include <stdlib.h>  #include <unistd.h>  #include <string.h> +#include <time.h>  #include <ecp/core.h> -#include <ecp/dir/dir.h> +#include <ecp/cr.h>  #include <ecp/ht.h>  #include <ecp/tm.h> -#include "server.h" -#include "vlink.h"  #include "dir.h" +#include "vlink.h"  #include "ht.h" +#include "acl.h" + +#include "timer.h" +#include "server.h"  static ecp_ht_table_t *dir_shadow = NULL;  static pthread_rwlock_t dir_shadow_rwlock;  static pthread_rwlock_t dir_online_rwlock; +static pthread_rwlock_t dir_timer_rwlock;  static pthread_t dir_announce_thd; -static DirList dir_online; +static DIROnline *dir_online = NULL; +static unsigned int dir_vkey_req; +static int dir_process_enable;  static SRVConfig *srv_config; +#define MAX(X, Y)   (((X) > (Y)) ? (X) : (Y)) + +/* messages from clients */ +ssize_t dir_send_online(ECPConnection *conn, uint8_t region) { +    DIRList *list; +    ECPBuffer packet; +    ECPBuffer payload; +    unsigned char pkt_buf[ECP_MAX_PKT]; +    unsigned char pld_buf[ECP_MAX_PLD]; +    unsigned char *msg; +    size_t msg_size; +    ssize_t rv; +    int i; + +    packet.buffer = pkt_buf; +    packet.size = ECP_MAX_PKT; +    payload.buffer = pld_buf; +    payload.size = ECP_MAX_PLD; + +    if (dir_online == NULL) return ECP_ERR; + +    rv = 0; +    pthread_rwlock_rdlock(&dir_online_rwlock); +    list = &dir_online->list[region]; + +    if (list->msg_count == 0) goto send_online_fin; + +    for (i=0; i<list->msg_count; i++) { +        ssize_t rv_snd; + +        ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); +        msg = ecp_pld_get_msg(payload.buffer, payload.size); +        msg_size = 0; + +        msg[0] = i;                         // frag_cnt +        msg[1] = list->msg_count;           // frag_tot +        msg[2] = list->msg[i].count;        // item_cnt +        msg[3] = region;                    // region +        msg[4] = dir_online->serial >> 8;   // serial +        msg[5] = dir_online->serial; + +        msg += 4 + sizeof(uint16_t); +        msg_size += 4 + sizeof(uint16_t); + +        memcpy(msg, list->msg[i].buffer, list->msg[i].count * ECP_SIZE_DIR_ITEM); +        msg_size += list->msg[i].count * ECP_SIZE_DIR_ITEM; + +        rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); +        if (rv_snd < 0) { +            rv = rv_snd; +            break; +        } + +        rv += rv_snd; +    } + +send_online_fin: +    pthread_rwlock_unlock(&dir_online_rwlock); +    return rv; +} + +ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) { +    switch (mtype) { +        case ECP_MTYPE_DIR_REQ: { +            uint8_t region; +            ssize_t rv; + +            if (msg_size < 1) return ECP_ERR_SIZE; + +            region = *msg; +            if (region >= MAX_REGION) return ECP_ERR; + +            rv = dir_send_online(conn, region); +            if (rv < 0) return rv; + +            return 1; +        } + +        default: +            return ECP_ERR_MTYPE; +    } +} + +/* messages from other servers */  ssize_t dir_send_ann(ECPConnection *conn) {      ECPBuffer packet;      ECPBuffer payload; -    unsigned char pkt_buf[ECP_SIZE_PKT_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; -    unsigned char pld_buf[ECP_SIZE_PLD_BUF(sizeof(uint16_t), ECP_MTYPE_DIR_ANN, conn)]; +    unsigned char pkt_buf[ECP_SIZE_PKT_BUF(2, MTYPE_DIR_ANN, conn)]; +    unsigned char pld_buf[ECP_SIZE_PLD_BUF(2, MTYPE_DIR_ANN, conn)];      unsigned char *msg;      packet.buffer = pkt_buf; @@ -33,12 +124,12 @@ ssize_t dir_send_ann(ECPConnection *conn) {      payload.buffer = pld_buf;      payload.size = sizeof(pld_buf); -    ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_ANN); +    ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ANN);      msg = ecp_pld_get_msg(payload.buffer, payload.size); -    msg[0] = srv_config->capabilities >> 8; +    msg[0] = srv_config->region;      msg[1] = srv_config->capabilities; -    return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(sizeof(uint16_t), ECP_MTYPE_DIR_ANN), 0); +    return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(2, MTYPE_DIR_ANN), 0);  }  ssize_t dir_send_shadow(ECPConnection *conn) { @@ -49,111 +140,170 @@ ssize_t dir_send_shadow(ECPConnection *conn) {      unsigned char *msg;      struct hashtable_itr itr; -    DirNode *node; -    uint16_t count; -    ecp_sts_t access_ts; +    DIRNode *node; +    ecp_ecdh_public_t *node_next; +    uint8_t count;      size_t msg_size; +    ssize_t rv;      packet.buffer = pkt_buf;      packet.size = ECP_MAX_PKT;      payload.buffer = pld_buf;      payload.size = ECP_MAX_PLD; -    ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_SHADOW); -    msg = ecp_pld_get_msg(payload.buffer, payload.size); -    msg_size = 0; +    rv = 0; +    node_next = NULL; +    do { +        ssize_t rv_snd; -    pthread_rwlock_rdlock(&dir_shadow_rwlock); +        ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_SHADOW); +        msg = ecp_pld_get_msg(payload.buffer, payload.size); -    count = ecp_ht_count(dir_shadow); -    msg[0] = count >> 8; -    msg[1] = count; -    msg += sizeof(uint16_t); -    msg_size += sizeof(uint16_t); +        memset(msg, 0, 4 + sizeof(uint16_t));   // frag_cnt, frag_tot, item_cnt, region, serial -    if (count > 0) { -        size_t rv; -        int _rv; +        pthread_rwlock_rdlock(&dir_shadow_rwlock); -        ecp_ht_itr_create(&itr, dir_shadow); -        do { -            node = ecp_ht_itr_value(&itr); +        if (ecp_ht_count(dir_shadow) > 0) { +            unsigned char *_msg; +            size_t _msg_size; +            int _rv; -            pthread_mutex_lock(&node->mutex); -            access_ts = node->access_ts; -            pthread_mutex_unlock(&node->mutex); +            _msg = msg + 4 + sizeof(uint16_t); +            _msg_size = 0; +            ecp_ht_itr_create(&itr, dir_shadow); +            if (node_next) { +                _rv = ecp_ht_itr_search(&itr, node_next); +                if (_rv) { +                    LOG(LOG_ERR, "dir_send_shadow: itr search err:%d\n", _rv); +                    break; +                } +                node_next = NULL; +            } -            rv = ecp_dir_item_serialize(&node->dir_item, msg); -            msg += rv; -            msg_size += rv; +            count = 0; +            do { +                node = ecp_ht_itr_value(&itr); -            msg[0] = access_ts >> 24; -            msg[1] = access_ts >> 16; -            msg[2] = access_ts >> 8; -            msg[3] = access_ts; -            msg += sizeof(uint32_t); -            msg_size += sizeof(uint32_t); +                pthread_mutex_lock(&node->mutex); +                if (node->verified || node->ann_local) { +                    if (count < MAX_DIR_ITEM_IN_MSG) { +                        size_t rv_ser; + +                        rv_ser = ecp_dir_item_serialize(&node->dir_item, _msg); +                        _msg += rv_ser; +                        _msg_size += rv_ser; +                        count++; +                    } else { +                        node_next = ecp_ht_itr_key(&itr); +                        break; +                    } +                } +                pthread_mutex_unlock(&node->mutex); -            _rv = ecp_ht_itr_advance(&itr); -        } while (_rv == ECP_OK); -    } +                _rv = ecp_ht_itr_advance(&itr); +            } while (_rv == ECP_OK); +            msg[2] = count; +            msg_size = 4 + sizeof(uint16_t) + _msg_size; +        } -    pthread_rwlock_unlock(&dir_shadow_rwlock); +        /* no need to copy node_next key, since announce is verified nodes will not be removed during online switch node removal */ +        pthread_rwlock_unlock(&dir_shadow_rwlock); + +        rv_snd = ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_SHADOW), 0); +        if (rv_snd < 0) { +            rv = rv_snd; +            break; +        } + +        rv += rv_snd; +    } while (node_next); -    return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_SHADOW), 0); +    return rv;  } -ssize_t dir_send_online(ECPConnection *conn) { +ssize_t dir_send_origin_rep(ECPConnection *conn, ecp_ecdh_public_t *public) {      ECPBuffer packet;      ECPBuffer payload;      unsigned char pkt_buf[ECP_MAX_PKT];      unsigned char pld_buf[ECP_MAX_PLD];      unsigned char *msg; -    size_t msg_size; +    unsigned short vkey_max; +    size_t msg_size, msg_size_max, hdr_size; +    DIRNode *node; +    int i;      packet.buffer = pkt_buf;      packet.size = ECP_MAX_PKT;      payload.buffer = pld_buf;      payload.size = ECP_MAX_PLD; -    ecp_pld_set_type(payload.buffer, payload.size, ECP_MTYPE_DIR_REP); +    ecp_pld_set_type(payload.buffer, payload.size, MTYPE_DIR_ORIGIN_REP);      msg = ecp_pld_get_msg(payload.buffer, payload.size); +    hdr_size = msg - payload.buffer; +    vkey_max = (payload.size - hdr_size - 1) / sizeof(ecp_ecdh_public_t); +    msg_size_max = 1 + vkey_max * sizeof(ecp_ecdh_public_t); -    pthread_rwlock_rdlock(&dir_online_rwlock); +    pthread_rwlock_rdlock(&dir_shadow_rwlock); -    msg[0] = dir_online.count >> 8; -    msg[1] = dir_online.count; -    msg += sizeof(uint16_t); -    msg_size = sizeof(uint16_t) + dir_online.count * ECP_SIZE_DIR_ITEM; -    memcpy(msg, dir_online.msg, dir_online.count * ECP_SIZE_DIR_ITEM); +    node = ecp_ht_search(dir_shadow, public); +    if (node) pthread_mutex_lock(&node->mutex); -    pthread_rwlock_unlock(&dir_online_rwlock); +    pthread_rwlock_unlock(&dir_shadow_rwlock); + +    if (node == NULL) return ECP_ERR; -    return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, ECP_MTYPE_DIR_REP), 0); +    msg_size = 0; + +    *msg = node->vkey_cnt; +    msg++; +    msg_size++; + +    for (i=0; i<node->vkey_cnt; i++) { +        memcpy(msg, &node->vkey[i], sizeof(ecp_ecdh_public_t)); +        msg += sizeof(ecp_ecdh_public_t); +        msg_size += sizeof(ecp_ecdh_public_t); +        if (msg_size == msg_size_max) break; +    } +    pthread_mutex_unlock(&node->mutex); + +    return ecp_pld_send(conn, &packet, &payload, ECP_SIZE_PLD(msg_size, MTYPE_DIR_ORIGIN_REP), 0);  }  ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size) {      ECPDirItem dir_item; -    int is_new; -    uint16_t capabilities; -    ecp_sts_t now; -    ssize_t rsize; +    uint8_t region; +    uint8_t capabilities; +    size_t rsize;      ssize_t rv; +    int ann_enable; -    rsize = sizeof(uint16_t); +    rsize = 2;      if (msg_size < rsize) return ECP_ERR_SIZE; -    capabilities = \ -        ((uint16_t)msg[0] << 8) | \ -        ((uint16_t)msg[1]); +    pthread_rwlock_rdlock(&dir_timer_rwlock); +    ann_enable = (dir_process_enable > PROC_BLOCK_ANN); +    pthread_rwlock_unlock(&dir_timer_rwlock); + +    if (!ann_enable) return rsize; + +    region = msg[0]; +    capabilities = msg[1]; +    if ((capabilities & ECP_DIR_CAP_DIR) && !acl_dir_inlist(&conn->remote.key_perma.public)) { +        LOG(LOG_ERR, "dir_handle_ann: not a directory server\n"); +        return ECP_ERR; +    }      memset(&dir_item, 0, sizeof(ECPDirItem));      dir_item.node = conn->remote; +    dir_item.region = region;      dir_item.capabilities = capabilities; -    now = ecp_tm_get_s(); -    is_new = dir_process_item(&dir_item, now); -    if (is_new) vlink_new_node(conn->sock, &dir_item); +    if (dir_item.region >= MAX_REGION) { +        LOG(LOG_ERR, "dir_handle_ann: bad region\n"); +        return ECP_ERR; +    } + +    dir_process_item(&dir_item, conn->sock, &srv_config->key_perma.public);      rv = dir_send_shadow(conn);      if (rv < 0) return rv; @@ -161,348 +311,573 @@ ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size)      return rsize;  } -ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { -    ssize_t rv; - -    rv = dir_send_online(conn); -    if (rv < 0) return rv; - -    return 0; -} -  ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size) { -    ecp_sts_t now; -    uint16_t count; +    uint8_t count;      size_t rsize; -    int i; +    int i, ann_enable; -    if (msg_size < sizeof(uint16_t)) return ECP_ERR_SIZE; +    if (msg_size < 4 + sizeof(uint16_t)) return ECP_ERR_SIZE; -    count = \ -        ((uint16_t)msg[0] << 8) | \ -        ((uint16_t)msg[1]); +    count = msg[2]; +    msg += 4 + sizeof(uint16_t);    // frag_cnt, frag_tot, item_cnt, region, serial -    rsize = sizeof(uint16_t) + count * (ECP_SIZE_DIR_ITEM + sizeof(uint32_t)); +    rsize = 4 + sizeof(uint16_t) + count * ECP_SIZE_DIR_ITEM;      if (msg_size < rsize) return ECP_ERR_SIZE; -    msg += sizeof(uint16_t); +    pthread_rwlock_rdlock(&dir_timer_rwlock); +    ann_enable = (dir_process_enable > PROC_BLOCK_ANN); +    pthread_rwlock_unlock(&dir_timer_rwlock); + +    if (!ann_enable) return rsize; -    now = ecp_tm_get_s();      for (i=0; i<count; i++) {          ECPDirItem dir_item; -        ecp_sts_t access_ts;          size_t rv;          rv = ecp_dir_item_parse(&dir_item, msg);          msg += rv; -        access_ts = \ -            ((uint32_t)msg[0] << 24) | \ -            ((uint32_t)msg[1] << 16) | \ -            ((uint32_t)msg[2] << 8)  | \ -            ((uint32_t)msg[3]); -        msg += sizeof(uint32_t); - -        if (ECP_STS_LT(now, access_ts) || (now - access_ts < (NODE_EXP_TIME / 2))) { -            int is_new; - -            is_new = dir_process_item(&dir_item, access_ts); -            if (is_new) vlink_new_node(conn->sock, &dir_item); +        if (dir_item.region >= MAX_REGION) { +            LOG(LOG_ERR, "dir_handle_shadow: bad region\n"); +            return ECP_ERR;          } + +        dir_process_item(&dir_item, conn->sock, &conn->remote.key_perma.public);      }      return rsize;  } +ssize_t dir_handle_origin_req(ECPConnection *conn, unsigned char *msg, size_t msg_size) { +    size_t rsize; +    ssize_t rv; + +    rsize = ECP_SIZE_ECDH_PUB; +    if (msg_size < rsize) return ECP_ERR_SIZE; + +    rv = dir_send_origin_rep(conn, (ecp_ecdh_public_t *)msg); +    if (rv < 0) return rv; + +    return rsize; +} +  ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b) {      switch (mtype) { -        case ECP_MTYPE_DIR_REQ: { -            if (ecp_conn_is_outb(conn)) return ECP_ERR; -            return dir_handle_req(conn, msg, msg_size); -        } +        case MTYPE_DIR_ANN: { +            int is_dir; + +            is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; +            if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; -        case ECP_MTYPE_DIR_ANN: { -            if (ecp_conn_is_outb(conn)) return ECP_ERR; -            if (!conn->remote.key_perma.valid) return ECP_ERR_VBOX;              return dir_handle_ann(conn, msg, msg_size);          } -        case ECP_MTYPE_DIR_SHADOW: { +        case MTYPE_DIR_SHADOW: {              if (ecp_conn_is_inb(conn)) return ECP_ERR; +              return dir_handle_shadow(conn, msg, msg_size);          } +        case MTYPE_DIR_ORIGIN_REQ: { +            int is_dir; + +            is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR; +            if (!is_dir || ecp_conn_is_outb(conn)) return ECP_ERR; + +            return dir_handle_origin_req(conn, msg, msg_size); +        } +          default:              return ECP_ERR_MTYPE;      }  } -int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { -    ssize_t rv; - -    if (ecp_conn_is_inb(conn)) return ECP_OK; - -    rv = dir_send_ann(conn); -    if (rv < 0) return rv; - -    return ECP_OK; -} +void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public) { +    DIRNode *node; +    unsigned int vkey_req_dir; +    int is_verified = 0; +    int rv = ECP_OK; -void dir_online_switch(void) { -    struct hashtable_itr itr; -    DirNode *node; -    unsigned char *msg; +    pthread_rwlock_rdlock(&dir_online_rwlock); +    vkey_req_dir = dir_vkey_req; +    pthread_rwlock_unlock(&dir_online_rwlock);      pthread_rwlock_rdlock(&dir_shadow_rwlock); -    pthread_rwlock_wrlock(&dir_online_rwlock); - -    msg = dir_online.msg; -    dir_online.count = ecp_ht_count(dir_shadow); - -    if (dir_online.count > 0) { -        size_t _rv; -        int rv; -        ecp_ht_itr_create(&itr, dir_shadow); -        do { -            node = ecp_ht_itr_value(&itr); +    node = ht_search_item(dir_shadow, dir_item); +    if (node == NULL) { +        pthread_rwlock_unlock(&dir_shadow_rwlock); -            _rv = ecp_dir_item_serialize(&node->dir_item, msg); -            msg += _rv; +        rv = dir_create_node(dir_item, sock, &node); +        if (!rv) { +            pthread_rwlock_wrlock(&dir_shadow_rwlock); +            if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; +            if (!rv) rv = ht_insert_node(dir_shadow, node); +            if (rv) pthread_rwlock_unlock(&dir_shadow_rwlock); +        } +        if (rv) { +            if (node) { +                dir_destroy_node(node); +                node = NULL; +            } +            LOG(LOG_ERR, "dir_process_item: err:%d\n", rv); +            return; +        } -            rv = ecp_ht_itr_advance(&itr); -        } while (rv == ECP_OK); +        LOG(LOG_DEBUG, "dir_process_item: new node\n");      } -    pthread_rwlock_unlock(&dir_online_rwlock); +    pthread_mutex_lock(&node->mutex);      pthread_rwlock_unlock(&dir_shadow_rwlock); -} -int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts) { -    DirNode *node; -    int is_new = 0; -    int rv = ECP_OK; +    if (node->zombie) goto process_item_fin; -    pthread_rwlock_rdlock(&dir_shadow_rwlock); - -    node = ht_search_item(dir_shadow, dir_item); -    if (node) { -        pthread_mutex_lock(&node->mutex); -        if (!node->zombie) node->access_ts = access_ts; -        pthread_mutex_unlock(&node->mutex); +    if (!node->verified) { +        int key_ex = 0; +        int i; -        pthread_rwlock_unlock(&dir_shadow_rwlock); -    } else { -        pthread_rwlock_unlock(&dir_shadow_rwlock); +        if (memcmp(s_public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0) { +            node->ann_local = 1; +        } -        LOG(LOG_DEBUG, "dir new node\n"); +        for (i=0; i<node->vkey_cnt; i++) { +            if (memcmp(s_public, &node->vkey[i], sizeof(node->vkey[i])) == 0) { +                key_ex = 1; +                break; +            } +        } +        if (!key_ex) { +            unsigned int vkey_req; -        node = dir_create_node(dir_item, access_ts); -        if (node == NULL) rv = ECP_ERR_ALLOC; +            if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { +                vkey_req = vkey_req_dir; +            } else { +                vkey_req = MIN_VKEY_REQ; +            } -        if (!rv) { -            pthread_rwlock_wrlock(&dir_shadow_rwlock); -            if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL; -            if (!rv) { -                if (ht_search_node(dir_shadow, node) == NULL) { -                    rv = ht_insert_node(dir_shadow, node); -                } else { -                    rv = ECP_ERR_DUP; +            if (node->vkey_cnt < vkey_req) { +                /* vkey list is used to track origin of particular server */ +                memcpy(&node->vkey[node->vkey_cnt], s_public, sizeof(node->vkey[node->vkey_cnt])); +                node->vkey_cnt++; +                if (node->vkey_cnt == vkey_req) { +                    node->verified = 1; +                    node->is_new = 0; +                    is_verified = 1;                  }              } -            pthread_rwlock_unlock(&dir_shadow_rwlock); -        } -        if (!rv) { -            is_new = 1; -            /* always switch */ -            dir_online_switch(); -        } else { -            if (node) dir_destroy_node(node); -            LOG(LOG_ERR, "dir create node err:%d\n", rv); +            /* protocol does not support server properties change */ +            if (!ecp_dir_item_eq(dir_item, &node->dir_item)) { +                char buffer[ECP_SIZE_ECDH_KEY_BUF]; + +                ecp_key2str(buffer, (uint8_t *)&node->dir_item.node.key_perma.public); +                LOG(LOG_ERR, "dir_process_item: dir entry changed from:%s\n", buffer); +            }          }      } -    return is_new; +process_item_fin: +    pthread_mutex_unlock(&node->mutex); + +    if (is_verified) { +        LOG(LOG_DEBUG, "dir_process_item: node verified\n"); + +        rv = acl_add_key(&node->dir_item); +        if (rv) LOG(LOG_ERR, "dir_process_item: acl add key err:%d\n", rv); + +        vlink_new_node(sock, &node->dir_item); +    }  } -DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts) { -    DirNode *node; +int dir_open_conn(DIRNode *node, ECPSocket *sock) {      int rv; -    node = malloc(sizeof(DirNode)); -    if (node == NULL) return NULL; +    node->conn = malloc(sizeof(ECPConnection)); +    if (node->conn == NULL) return ECP_ERR_ALLOC; + +    ecp_conn_init(node->conn, sock, CTYPE_DIR); +    ecp_conn_set_flags(node->conn, ECP_CONN_FLAG_VBOX); + +    rv = ecp_conn_open(node->conn, &node->dir_item.node); +    if (rv) node->conn = NULL; -    memset(node, 0, sizeof(DirNode)); +    return rv; +} -    rv = pthread_mutex_init(&node->mutex, NULL); +int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) { +    DIRNode *_node; +    int rv; + +    *node = NULL; +    _node = malloc(sizeof(DIRNode)); +    if (_node == NULL) return ECP_ERR_ALLOC; + +    memset(_node, 0, sizeof(DIRNode)); +    rv = pthread_mutex_init(&_node->mutex, NULL);      if (rv) { -        free(node); -        return NULL; +        free(_node); +        return ECP_ERR;      } -    node->dir_item = *dir_item; -    node->access_ts = access_ts; +    _node->dir_item = *dir_item; +    _node->is_new = 1; + +    /* open connection to other directory servers */ +    if ((dir_item->capabilities & ECP_DIR_CAP_DIR) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { +        rv = dir_open_conn(_node, sock); +        if (rv) { +            pthread_mutex_destroy(&_node->mutex); +            free(_node); +            return rv; +        } +    } -    return node; +    *node = _node; +    return ECP_OK;  } -void dir_destroy_node(DirNode *node) { +void dir_destroy_node(DIRNode *node) { +    if (node->conn) ecp_conn_close(node->conn);      pthread_mutex_destroy(&node->mutex);      free(node);  } -int dir_open_conn(ECPSocket *sock, ECPNode *node) { -    ECPConnection *conn; -    int rv; +static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) { +    if (conn->type == CTYPE_DIR) return 1; +    return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO); +} -    conn = malloc(sizeof(ECPConnection)); -    if (conn == NULL) return ECP_ERR_ALLOC; +static void remove_nodes(DIRNode *remove_node[], int remove_cnt) { +    int i; -    ecp_dir_conn_init(conn, sock); -    ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC | ECP_CONN_FLAG_VBOX); -    rv = ecp_conn_try_open(conn, node); -    return rv; +    pthread_rwlock_wrlock(&dir_shadow_rwlock); +    for (i=0; i<remove_cnt; i++) { +        ht_remove_node(dir_shadow, remove_node[i]); +    } +    pthread_rwlock_unlock(&dir_shadow_rwlock); + +    for (i=0; i<remove_cnt; i++) { +        vlink_del_node(&remove_node[i]->dir_item); +        dir_destroy_node(remove_node[i]); +    }  } -DirNode *dir_search_conn(ECPConnection *conn) { -    DirNode *node; -    DirNode *ret = NULL; +void dir_online_switch(ECPSocket *sock, int inc_serial) { +    struct hashtable_itr itr; +    DIRNode *node; +    uint8_t count[MAX_REGION]; +    uint8_t msg_count[MAX_REGION]; +    unsigned char *msg[MAX_REGION]; +    ecp_ecdh_public_t *node_next; +    DIRNode *remove_node[MAX_NODE_REMOVE]; +    unsigned int dir_cnt, node_cnt; +    int i, remove_cnt; +    int rv;      pthread_rwlock_rdlock(&dir_shadow_rwlock); +    pthread_rwlock_wrlock(&dir_online_rwlock); -    node = ht_search_conn(dir_shadow, conn); -    if (node) { -        pthread_mutex_lock(&node->mutex); -        if (!node->zombie) { -            node->refcount++; -            ret = node; -        } else { -            ret = NULL; +    if (dir_online) { +        for (i=0; i<MAX_REGION; i++) { +            dir_online->list[i].msg_count = 0; +            count[i] = 0; +            msg_count[i] = 0; +            msg[i] = dir_online->list[i].msg[0].buffer; +        } +    } + +    dir_cnt = 0; +    node_cnt = 0; +    remove_cnt = 0; + +    if (ecp_ht_count(dir_shadow) > 0) { +        uint8_t region; +        int verified; + +        ecp_ht_itr_create(&itr, dir_shadow); + +        node_next = NULL; +        do { +            node = ecp_ht_itr_value(&itr); + +            pthread_mutex_lock(&node->mutex); +            region = node->dir_item.region; +            verified = node->verified; + +            if (verified) { +                if (dir_online) { +                    size_t rv_ser; + +                    rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[0]); +                    msg[0] += rv_ser; +                    count[0]++; +                    if (region) { +                        rv_ser = ecp_dir_item_serialize(&node->dir_item, msg[region]); +                        msg[region] += rv_ser; +                        count[region]++; +                    } +                } +                if (node->dir_item.capabilities & ECP_DIR_CAP_DIR) { +                    dir_cnt++; +                } +                node_cnt++; +            } else { +                node->zombie = 1; +                if (node_next == NULL) { +                    if (remove_cnt < MAX_NODE_REMOVE) { +                        remove_node[remove_cnt] = node; +                        remove_cnt++; +                    } else { +                        node_next = ecp_ht_itr_key(&itr); +                        break; +                    } +                } +            } +            node->verified = 0; +            node->ann_local = 0; +            memset(&node->vkey, 0, sizeof(node->vkey)); +            node->vkey_cnt = 0; +            pthread_mutex_unlock(&node->mutex); + +            /* let's reconnect all directory servers +               node->conn is currently immutable since announce is disabled well before online switch */ +            if (node->conn) ecp_conn_set_uflags(node->conn, DIR_UFLAG_RECONNECT); + +            if (dir_online && verified) { +                if (count[0] == MAX_DIR_ITEM_IN_MSG) { +                    dir_online->list[0].msg[msg_count[0]].count = count[0]; +                    msg_count[0]++; +                    msg[0] = dir_online->list[0].msg[msg_count[0]].buffer; +                    count[0] = 0; +                } +                if (region && count[region] == MAX_DIR_ITEM_IN_MSG) { +                    dir_online->list[region].msg[msg_count[region]].count = count[region]; +                    msg_count[region]++; +                    msg[region] = dir_online->list[region].msg[msg_count[region]].buffer; +                    count[region] = 0; +                } +            } + +            rv = ecp_ht_itr_advance(&itr); +        } while (rv == ECP_OK); + +        if (dir_online) { +            for (i=0; i<MAX_REGION; i++) { +                if (count[i]) { +                    dir_online->list[i].msg[msg_count[i]].count = count[i]; +                    dir_online->list[i].msg_count = msg_count[i] + 1; +                } else { +                    dir_online->list[i].msg_count = msg_count[i]; +                } +            }          } -        pthread_mutex_unlock(&node->mutex);      } +    dir_vkey_req = dir_cnt / 2 + 1; +    if (dir_vkey_req > MAX_VKEY) { +        LOG(LOG_ERR, "dir_online_switch: number of dir vkeys required > MAX_VKEY (%d)\n", MAX_VKEY); +        dir_vkey_req = MAX_VKEY; +    } + +    if (dir_online && inc_serial) dir_online->serial++; + +    pthread_rwlock_unlock(&dir_online_rwlock);      pthread_rwlock_unlock(&dir_shadow_rwlock); -    return ret; +    rv = acl_reset_ht(); +    if (rv) LOG(LOG_ERR, "dir_online_switch: acl reset err:%d\n", rv); + +    ecp_sock_expire(sock, online_switch_expired); + +    if (remove_cnt) { +        remove_nodes(remove_node, remove_cnt); +        while (node_next) { +            remove_cnt = 0; +            pthread_rwlock_rdlock(&dir_shadow_rwlock); + +            ecp_ht_itr_create(&itr, dir_shadow); +            if (node_next) { +                rv = ecp_ht_itr_search(&itr, node_next); +                if (rv) { +                    LOG(LOG_ERR, "dir_online_switch: itr search err:%d\n", rv); +                    break; +                } +                node_next = NULL; +            } +            do { +                node = ecp_ht_itr_value(&itr); + +                pthread_mutex_lock(&node->mutex); +                if (node->zombie) { +                    if (remove_cnt < MAX_NODE_REMOVE) { +                        remove_node[remove_cnt] = node; +                        remove_cnt++; +                    } else { +                        node_next = ecp_ht_itr_key(&itr); +                        break; +                    } +                } +                pthread_mutex_unlock(&node->mutex); + +                rv = ecp_ht_itr_advance(&itr); +            } while (rv == ECP_OK); + +            /* no need to copy node_next key, since this is the only thread that can destroy node */ +            pthread_rwlock_unlock(&dir_shadow_rwlock); + +            if (remove_cnt) remove_nodes(remove_node, remove_cnt); +        } +    } + +    if (srv_config->capabilities & ECP_DIR_CAP_DIR) acl_load_ht(); + +    LOG(LOG_DEBUG, "dir_online_switch: node:%d dir:%d serial:%d\n", node_cnt, dir_cnt, dir_online ? dir_online->serial : 0); +} + +void dir_announce_allow(void) { +    pthread_rwlock_wrlock(&dir_timer_rwlock); +    dir_process_enable = PROC_ALLOW_ALL; +    pthread_rwlock_unlock(&dir_timer_rwlock); +} + +void dir_announce_block(void) { +    pthread_rwlock_wrlock(&dir_timer_rwlock); +    dir_process_enable = PROC_BLOCK_ANN; +    pthread_rwlock_unlock(&dir_timer_rwlock); + +    LOG(LOG_DEBUG, "dir_announce_block\n");  } -void dir_announce(ECPSocket *sock) { -    static DirNode *expire_node[MAX_EXPIRE_CNT]; -    static int expire_node_z[MAX_EXPIRE_CNT]; -    static int expire_cnt; +void dit_init_serial(uint16_t serial) { +    if (dir_online) { +        pthread_rwlock_wrlock(&dir_online_rwlock); +        dir_online->serial = serial; +        pthread_rwlock_unlock(&dir_online_rwlock); +    } +} +void dir_announce(ECPSocket *sock, int ann_period) {      struct hashtable_itr itr; -    ecp_sts_t now; -    DirNode *node; -    DirNode *node_next; -    DirNode *announce_node[MAX_ANNOUNCE_CNT]; +    DIRNode *node; +    ecp_ecdh_public_t *node_next; +    DIRNode *announce_node[MAX_NODE_ANNOUNCE]; +    unsigned int ht_count;      int announce_cnt; -    int refcount;      int i;      int rv; -    now = ecp_tm_get_s();      node_next = NULL;      do { +        uint32_t delay; +        uint32_t rnd; +          announce_cnt = 0;          pthread_rwlock_rdlock(&dir_shadow_rwlock); -        if (ecp_ht_count(dir_shadow) > 0) { +        ht_count = ecp_ht_count(dir_shadow); +        if (ht_count > 0) {              ecp_ht_itr_create(&itr, dir_shadow);              if (node_next) {                  rv = ecp_ht_itr_search(&itr, node_next);                  if (rv) { -                    LOG(LOG_ERR, "dir ann itr search err:%d\n", rv); +                    LOG(LOG_ERR, "dir_announce: itr search err:%d\n", rv);                      break;                  }                  node_next = NULL;              }              do {                  node = ecp_ht_itr_value(&itr); -                rv = ecp_ht_itr_advance(&itr); -                pthread_mutex_lock(&node->mutex); -                if (ECP_STS_LT(node->access_ts, now) && (now - node->access_ts > NODE_EXP_TIME)) { -                    node->zombie = 1; -                    pthread_mutex_unlock(&node->mutex); - -                    if (expire_cnt < MAX_EXPIRE_CNT) { -                        expire_node[expire_cnt] = node; -                        expire_node_z[expire_cnt] = 1; -                        expire_cnt++; -                    } -                } else { -                    pthread_mutex_unlock(&node->mutex); - -                    if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { +                if ((node->dir_item.capabilities & ECP_DIR_CAP_DIR) && (memcmp(&node->dir_item.node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) != 0)) { +                    if (announce_cnt < MAX_NODE_ANNOUNCE) {                          announce_node[announce_cnt] = node;                          announce_cnt++; - -                        if (announce_cnt == MAX_ANNOUNCE_CNT) { -                            if (!rv) { -                                node_next = ecp_ht_itr_key(&itr); -                            } else { -                                node_next = NULL; -                            } -                            break; -                        } +                    } else { +                        node_next = ecp_ht_itr_key(&itr); +                        break;                      }                  } + +                rv = ecp_ht_itr_advance(&itr);              } while (rv == ECP_OK);          } +        /* no need to copy node_next key, since announce is disabled during online switch node removal */          pthread_rwlock_unlock(&dir_shadow_rwlock); -        if (expire_cnt) { -            pthread_rwlock_wrlock(&dir_shadow_rwlock); -            for (i=0; i<expire_cnt; i++) { -                if (expire_node_z[i]) { -                    expire_node_z[i] = 0; -                    ht_remove_node(dir_shadow, expire_node[i]); -                    LOG(LOG_DEBUG, "dir remove node\n"); -                } -            } -            pthread_rwlock_unlock(&dir_shadow_rwlock); -        } - +        /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, announce period of 600s) is ~70ms */ +        delay = ann_period * 1000000 / MAX(ht_count, 100);          for (i=0; i<announce_cnt; i++) { -            rv = dir_open_conn(sock, &announce_node[i]->dir_item.node); -            if (rv) LOG(LOG_ERR, "dir open connection err:%d\n", rv); -        } - -        i = expire_cnt - 1; -        while (i >= 0) { -            node = expire_node[i]; - -            pthread_mutex_lock(&node->mutex); -            refcount = node->refcount; -            pthread_mutex_unlock(&node->mutex); +            ECPConnection *conn; +            int is_reg, is_open, is_z; +            ecp_sts_t now; + +            conn = announce_node[i]->conn; +            if (conn == NULL) { +                rv = dir_open_conn(announce_node[i], sock); +                if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); +                continue; +            } -            if (refcount == 0) { -                int j = i; +            is_reg = is_open = is_z = 0; +            now = ecp_tm_get_s(); + +            ecp_conn_lock(conn); +            is_reg = _ecp_conn_is_reg(conn); +            if (is_reg) is_open = _ecp_conn_is_open(conn); +            if (is_open) is_z = _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3) || _ecp_conn_test_uflags(conn, DIR_UFLAG_RECONNECT); +            ecp_conn_unlock(conn); + +            if (!is_reg || is_z) { +                LOG(LOG_DEBUG, "dir_announce: reconnect\n"); +                ecp_conn_close(conn); +                rv = dir_open_conn(announce_node[i], sock); +                if (rv) LOG(LOG_ERR, "dir_announce: conn open err:%d\n", rv); +            } else if (is_open) { +                ssize_t _rv; + +                _rv = dir_send_ann(conn); +                if (_rv < 0) LOG(LOG_ERR, "dir_announce: send ann err:%ld\n", _rv); +            } -                expire_cnt--; -                while (j < expire_cnt) { -                    expire_node[j] = expire_node[j+1]; -                    j++; -                } -                expire_node[j] = NULL; -                dir_destroy_node(node); +            if (delay) { +                /* randomize over delay / 2 and delay + delay / 2 */ +                rnd = arc4random_uniform(delay + 1); +                usleep(delay / 2 + rnd);              } -            i--;          } +      } while (node_next); + +    LOG(LOG_DEBUG, "dir_announce: node count:%u\n", ht_count);  }  static void *_dir_announce(void *arg) {      ECPSocket *sock = arg; +    uint32_t rnd; +    time_t tv_sec; +    int delay; +    int ann_enable;      while (1) { -        LOG(LOG_DEBUG, "dir announce...\n"); -        dir_announce(sock); -        sleep(10); +        tv_sec = time(NULL); + +        pthread_rwlock_rdlock(&dir_timer_rwlock); +        ann_enable = (dir_process_enable > PROC_BLOCK_ANN); +        pthread_rwlock_unlock(&dir_timer_rwlock); + +        if (ann_enable) { +            dir_announce(sock, ANN_PERIOD); +        } else { +            sleep(ANN_BLOCK_TIME + ANN_PERIOD); +        } + +        /* resolution is 1s */ +        delay = ANN_PERIOD - (time(NULL) - tv_sec); +        if (delay > 0) { +            rnd = arc4random_uniform(delay + 1); +            sleep(delay / 2 + rnd); +        }      }      return NULL; @@ -516,18 +891,72 @@ int dir_start_announce(ECPSocket *sock) {      return ECP_OK;  } -int dir_init(ECPContext *ctx) { -    ECPConnHandler *handler; +void dir_init_switch(ECPSocket *sock, int init_ann) { +    int i; + +    dir_announce_allow(); +    for (i=0; i<init_ann; i++) { +        dir_announce(sock, 0); +        sleep(1); +    } +    dir_announce_block(); +    LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann); +    sleep(init_ann); +    dir_online_switch(sock, 0); +    dir_announce_allow(); +    LOG(LOG_DEBUG, "init switch sleeping for %ds...\n", init_ann); +    sleep(init_ann); +} + +int dir_init_ann(ECPSocket *sock, ECPNode *node) { +    ECPConnection *conn; +    int i, is_open;      int rv; +    ssize_t _rv; -    srv_config = srv_get_config(); +    conn = malloc(sizeof(ECPConnection)); +    if (conn == NULL) return ECP_ERR_ALLOC; -    handler = malloc(sizeof(ECPConnHandler)); -    if (handler == NULL) return ECP_ERR_ALLOC; +    ecp_conn_init(conn, sock, CTYPE_DIR); +    ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX); +    rv = ecp_conn_open(conn, node); +    if (rv) { +        LOG(LOG_ERR, "dir_init_ann: conn open err:%d\n", rv); +        return rv; +    } -    ecp_conn_handler_init(handler, dir_handle_msg, dir_handle_open, NULL, NULL); -    rv = ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, handler); -    if (rv) return rv; +    is_open = 0; +    for (i=0; i<ECP_SEND_TRIES; i++) { +        usleep(ECP_SEND_TIMEOUT * 1000); +        is_open = ecp_conn_is_open(conn); +        if (is_open) break; +    } + +    if (!is_open) { +        LOG(LOG_ERR, "dir_init_ann: conn open timeout\n"); +        ecp_conn_close(conn); +        return ECP_ERR_TIMEOUT; +    } + +    _rv = dir_send_ann(conn); +    if (_rv < 0) { +        LOG(LOG_ERR, "dir_init_ann: send ann err:%ld\n", _rv); +        return _rv; +    } + +    return ECP_OK; +} + +int dir_init(ECPSocket *sock) { +    ECPContext *ctx = sock->ctx; +    ECPConnHandler *handler, *c_handler; +    int is_dir; +    int rv; + +    /* dir_process_enable and dir_online.serial will be set from timer */ +    dir_vkey_req = 1; +    srv_config = srv_get_config(); +    is_dir = srv_config->capabilities & ECP_DIR_CAP_DIR;      rv = pthread_rwlock_init(&dir_shadow_rwlock, NULL);      if (rv) { @@ -540,12 +969,64 @@ int dir_init(ECPContext *ctx) {          return ECP_ERR;      } +    rv = pthread_rwlock_init(&dir_timer_rwlock, NULL); +    if (rv) { +        pthread_rwlock_destroy(&dir_online_rwlock); +        pthread_rwlock_destroy(&dir_shadow_rwlock); +        return ECP_ERR; +    } +      dir_shadow = ecp_ht_create_keys();      if (dir_shadow == NULL) { +        pthread_rwlock_destroy(&dir_timer_rwlock); +        pthread_rwlock_destroy(&dir_online_rwlock);          pthread_rwlock_destroy(&dir_shadow_rwlock); +        return ECP_ERR_ALLOC; +    } + +    handler = malloc(sizeof(ECPConnHandler)); +    if (handler == NULL) { +        ecp_ht_destroy(dir_shadow); +        pthread_rwlock_destroy(&dir_timer_rwlock);          pthread_rwlock_destroy(&dir_online_rwlock); +        pthread_rwlock_destroy(&dir_shadow_rwlock);          return ECP_ERR_ALLOC;      } +    ecp_conn_handler_init(handler, dir_handle_msg, NULL, NULL, NULL); + +    if (is_dir) { +        c_handler = malloc(sizeof(ECPConnHandler)); +        dir_online = malloc(sizeof(DIROnline)); +        if ((c_handler == NULL) || (dir_online == NULL)) { +            free(handler); +            ecp_ht_destroy(dir_shadow); +            pthread_rwlock_destroy(&dir_timer_rwlock); +            pthread_rwlock_destroy(&dir_online_rwlock); +            pthread_rwlock_destroy(&dir_shadow_rwlock); +            if (dir_online) free(dir_online); +            if (c_handler) free(c_handler); +            return ECP_ERR_ALLOC; +        } +        ecp_conn_handler_init(c_handler, dir_handle_client_msg, NULL, NULL, NULL); +        memset(dir_online, 0, sizeof(DIROnline)); +    } + +    rv = timer_init(sock); +    if (rv) { +        free(handler); +        if (is_dir) { +            free(dir_online); +            free(c_handler); +        } +        ecp_ht_destroy(dir_shadow); +        pthread_rwlock_destroy(&dir_timer_rwlock); +        pthread_rwlock_destroy(&dir_online_rwlock); +        pthread_rwlock_destroy(&dir_shadow_rwlock); +        return rv; +    } + +    ecp_ctx_set_handler(ctx, CTYPE_DIR, handler); +    if (is_dir) ecp_ctx_set_handler(ctx, ECP_CTYPE_DIR, c_handler);      return ECP_OK;  } diff --git a/ecp/server/dir.h b/ecp/server/dir.h index cbc0c87..24af3bc 100644 --- a/ecp/server/dir.h +++ b/ecp/server/dir.h @@ -1,35 +1,80 @@ -#define MAX_DIR_ITEM        30 -#define MAX_EXPIRE_CNT      100 -#define MAX_ANNOUNCE_CNT    100 +#include <ecp/dir/dir.h> -#define NODE_EXP_TIME       86400 +#define MAX_DIR_ITEM_IN_MSG     ((ECP_MAX_PLD - (4 + sizeof(uint16_t))) / ECP_SIZE_DIR_ITEM) -typedef struct DirNode { +#define MAX_DIR_MSG             10 +#define MAX_DIR_ITEM            (MAX_DIR_MSG * MAX_DIR_ITEM_IN_MSG) +#define MAX_DIR_ITEM_DIR        100 +#define MAX_REGION              10 + +#define MAX_VKEY                ((MAX_DIR_ITEM_DIR / 2) + 1) +#define MIN_VKEY_REQ            2       /* minimum number of vkeys required for non directory server */ + +#define MAX_NODE_ANNOUNCE       100 +#define MAX_NODE_REMOVE         100 + +#define CTYPE_DIR               0x00 + +#define MTYPE_DIR_ANN           0x00 +#define MTYPE_DIR_SHADOW        0x01 +#define MTYPE_DIR_ORIGIN_REQ    0x02 +#define MTYPE_DIR_ORIGIN_REP    0x03 + +#define PROC_BLOCK_ALL          0 +#define PROC_BLOCK_ANN          1 +#define PROC_ALLOW_ALL          2 + +#define ANN_PERIOD              600     /* announce priod (s); can't exceed 1h */ +#define CONN_EXPIRE_TO          60 + +#define DIR_UFLAG_RECONNECT     0x80 + +typedef struct DIRNode {      ECPDirItem dir_item; -    int refcount;      int zombie; -    ecp_sts_t access_ts; +    int verified; +    int ann_local; +    int is_new; +    ecp_ecdh_public_t vkey[MAX_VKEY]; +    unsigned short vkey_cnt; +    ECPConnection *conn;      pthread_mutex_t mutex; -} DirNode; +} DIRNode; + +typedef struct DIRList { +    struct { +        uint8_t count; +        unsigned char buffer[MAX_DIR_ITEM_IN_MSG * ECP_SIZE_DIR_ITEM]; +    } msg[MAX_DIR_MSG]; +    uint8_t msg_count; +} DIRList; -typedef struct DirList { -    unsigned char msg[ECP_MAX_PLD]; -    uint16_t count; -} DirList; +typedef struct DIROnline { +    DIRList list[MAX_REGION]; +    uint16_t serial; +} DIROnline; + +ssize_t dir_send_online(ECPConnection *conn, uint8_t region); +ssize_t dir_handle_client_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b);  ssize_t dir_send_ann(ECPConnection *conn);  ssize_t dir_send_shadow(ECPConnection *conn); -ssize_t dir_send_online(ECPConnection *conn);  ssize_t dir_handle_ann(ECPConnection *conn, unsigned char *msg, size_t msg_size); -ssize_t dir_handle_req(ECPConnection *conn, unsigned char *msg, size_t msg_size);  ssize_t dir_handle_shadow(ECPConnection *conn, unsigned char *msg, size_t msg_size);  ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char mtype, unsigned char *msg, size_t msg_size, struct ECP2Buffer *b); -int dir_handle_open(ECPConnection *conn, ECP2Buffer *bufs); -int dir_process_item(ECPDirItem *dir_item, ecp_sts_t access_ts); -DirNode *dir_create_node(ECPDirItem *dir_item, ecp_sts_t access_ts); -void dir_destroy_node(DirNode *node); -int dir_open_conn(ECPSocket *sock, ECPNode *node); -DirNode *dir_search_conn(ECPConnection *conn); -void dir_announce(ECPSocket *sock); + +void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public); +int dir_open_conn(DIRNode *node, ECPSocket *sock); +int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node); +void dir_destroy_node(DIRNode *node); + +void dir_online_switch(ECPSocket *sock, int inc_serial); +void dir_remove_nodes(DIRNode *remove_node[], int remove_cnt); +void dir_announce_allow(void); +void dir_announce_block(void); +void dit_init_serial(uint16_t serial); +void dir_announce(ECPSocket *sock, int ann_period);  int dir_start_announce(ECPSocket *sock); -int dir_init(ECPContext *ctx); +void dir_init_switch(ECPSocket *sock, int init_ann); +int dir_init_ann(ECPSocket *sock, ECPNode *node); +int dir_init(ECPSocket *sock); diff --git a/ecp/server/ht.c b/ecp/server/ht.c index 0daf8d5..dc67d30 100644 --- a/ecp/server/ht.c +++ b/ecp/server/ht.c @@ -1,34 +1,25 @@  #include <ecp/core.h> -#include <ecp/dir/dir.h>  #include <ecp/ht.h>  #include "dir.h"  #include "ht.h" -int ht_insert_node(ecp_ht_table_t *table, DirNode *node) { -    return ecp_ht_insert(table, &node->dir_item.node.key_perma.public, node); +int ht_insert_node(ecp_ht_table_t *table, DIRNode *node) { +    return ecp_ht_insert_uniq(table, &node->dir_item.node.key_perma.public, node);  } -void ht_remove_node(ecp_ht_table_t *table, DirNode *node) { +void ht_remove_node(ecp_ht_table_t *table, DIRNode *node) {      ecp_ht_remove(table, &node->dir_item.node.key_perma.public);  } -void *ht_search_node(ecp_ht_table_t *table, DirNode *node) { -    return ecp_ht_search(table, &node->dir_item.node.key_perma.public); +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) { +    return ecp_ht_search(table, &dir_item->node.key_perma.public);  }  int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn) { -    return ecp_ht_insert(table, &conn->remote.key_perma.public, conn); +    return ecp_ht_insert_uniq(table, &conn->remote.key_perma.public, conn);  }  void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn) {      ecp_ht_remove(table, &conn->remote.key_perma.public);  } - -void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn) { -    return ecp_ht_search(table, &conn->remote.key_perma.public); -} - -void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item) { -    return ecp_ht_search(table, &dir_item->node.key_perma.public); -} diff --git a/ecp/server/ht.h b/ecp/server/ht.h index cba4691..a9cb02f 100644 --- a/ecp/server/ht.h +++ b/ecp/server/ht.h @@ -1,7 +1,5 @@ -int ht_insert_node(ecp_ht_table_t *table, DirNode *node); -void ht_remove_node(ecp_ht_table_t *table, DirNode *node); -void *ht_search_node(ecp_ht_table_t *table, DirNode *node); +int ht_insert_node(ecp_ht_table_t *table, DIRNode *node); +void ht_remove_node(ecp_ht_table_t *table, DIRNode *node); +void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item);  int ht_insert_conn(ecp_ht_table_t *table, ECPConnection *conn); -void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn); -void *ht_search_conn(ecp_ht_table_t *table, ECPConnection *conn); -void *ht_search_item(ecp_ht_table_t *table, ECPDirItem *dir_item);
\ No newline at end of file +void ht_remove_conn(ecp_ht_table_t *table, ECPConnection *conn);
\ No newline at end of file diff --git a/ecp/server/server.c b/ecp/server/server.c index d34fb19..890d468 100644 --- a/ecp/server/server.c +++ b/ecp/server/server.c @@ -1,41 +1,81 @@  #include <stdlib.h> -#include <fcntl.h> +#include <stdarg.h> +#include <stdio.h> +#include <string.h>  #include <unistd.h> +#include <time.h> +#include <fcntl.h>  #include <sys/stat.h>  #include <ecp/core.h> -#include <ecp/dir/dir.h>  #include <ecp/vconn/vconn.h> +#include <util.h> +  #include "dir.h"  #include "vlink.h"  #include "ht.h" +#include "acl.h"  #include "server.h"  static SRVConfig srv_config; +static const char *srv_llevel_str[] = { +    "DEBUG", +    "INFO", +    "ERROR" +};  SRVConfig *srv_get_config(void) {      return &srv_config;  }  static void usage(char *arg) { -    fprintf(stderr, "Usage: %s <capabilities> <priv> <addr> [ <pub> <addr> ]\n", arg); +    fprintf(stderr, "Usage: %s <region> <capabilities> <private key> <addr> [ <dir acl> <vconn acl> ] [ <public key> <addr> ]\n", arg); +    exit(1); +} + +static void fail(char *format, ...) { +    va_list args; + +    va_start(args, format); +    vfprintf(stderr, format, args); +    va_end(args);      exit(1);  }  static void handle_err(ECPConnection *conn, unsigned char mtype, int err) { -    if (conn->type == ECP_CTYPE_VLINK) vlink_handle_err(conn, mtype, err); -    LOG(LOG_ERR, "handle error ctype:%d mtype:%d err:%d\n", conn->type, mtype, err); +    LOG(LOG_ERR, "handle_err: ctype:%d mtype:%d err:%d\n", conn->type, mtype, err);  } -static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) { +static ECPConnection *conn_new(ECPSocket *sock, ECPConnection *parent, unsigned char ctype) {      ECPConnection *conn = NULL; -    switch (type) { +    switch (ctype) { +        case CTYPE_DIR: { +            if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL; + +            conn = malloc(sizeof(ECPConnection)); +            if (conn) { +                ecp_conn_init(conn, sock, ctype); +                ecp_conn_set_flags(conn, ECP_CONN_FLAG_VBOX); +            } +            break; +        } + +        case ECP_CTYPE_DIR: { +            if (!(srv_config.capabilities & ECP_DIR_CAP_DIR)) return NULL; + +            conn = malloc(sizeof(ECPConnection)); +            if (conn) ecp_conn_init(conn, sock, ctype); +            break; +        } +          case ECP_CTYPE_VCONN: {              ECPVConnInb *_conn; +            if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL; +              _conn = malloc(sizeof(ECPVConnInb));              if (_conn) {                  ecp_vconn_init_inb(_conn, sock); @@ -45,29 +85,61 @@ static ECPConnection *conn_new(ECPSocket *sock, unsigned char type) {          }          case ECP_CTYPE_VLINK: { -            conn = malloc(sizeof(ECPConnection)); -            if (conn) ecp_vlink_init(conn, sock); -            break; -        } +            if (!(srv_config.capabilities & ECP_DIR_CAP_VCONN)) return NULL; -        default: {              conn = malloc(sizeof(ECPConnection)); -            if (conn) ecp_conn_init(conn, sock, type); +            if (conn) ecp_vlink_init(conn, sock);              break;          }      } +    if (conn) ecp_conn_set_flags(conn, ECP_CONN_FLAG_GC);      return conn;  } +static int key_check(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public) { +    switch (ctype) { +        case CTYPE_DIR: { +            if (public == NULL) return 0; +            return acl_inlist(public); +        } + +        case ECP_CTYPE_VLINK: { +            if (public == NULL) return 0; +            if (parent == NULL) return acl_inlist(public); +            return 1; +        } + +        default: +            return 1; +    } +} +  static void conn_free(ECPConnection *conn) {      free(conn);  } +void log_print(int level, char *format, ...) { +    va_list args; +    time_t t; +    char buf[26]; + +    if (level >= (sizeof(srv_llevel_str) / sizeof(char *))) return; + +    t = time(NULL); +    ctime_r(&t, buf); +    buf[24] = '\0'; +    fprintf(stderr, "%s [%s]: ", buf, srv_llevel_str[level]); + +    va_start(args, format); +    vfprintf(stderr, format, args); +    va_end(args); +} +  int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) {      int rv; -    rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, NULL); +    rv = ecp_ctx_init(ctx, handle_err, conn_new, conn_free, key_check);      if (rv) return rv;      rv = ecp_vconn_handler_init(ctx, vconn_handler); @@ -86,118 +158,120 @@ int main(int argc, char *argv[]) {      ECPConnHandler vconn_handler;      ECPConnHandler vlink_handler;      char *endptr; -    int fd; +    char *debug_init_str; +    int _argc, fd;      int rv; -    if ((argc < 4) || (argc > 6)) usage(argv[0]); +    memset(&srv_config, 0, sizeof(srv_config)); -    srv_config.capabilities = (uint16_t)strtol(argv[1], &endptr, 16); -    if (endptr[0] != '\0') { -        fprintf(stderr, "Bad capabilities\n"); -        exit(1); -    } +    if (argc < 3) usage(argv[0]); -    if ((fd = open(argv[2], O_RDONLY)) < 0) { -        fprintf(stderr, "Unable to open %s\n", argv[2]); -        exit(1); -    } -    if (read(fd, &srv_config.key_perma.public, sizeof(ecp_ecdh_public_t)) != sizeof(ecp_ecdh_public_t)) { -        close(fd); -        fprintf(stderr, "Unable to read public key from %s\n", argv[2]); -        exit(1); -    } -    if (read(fd, &srv_config.key_perma.private, sizeof(ecp_ecdh_private_t)) != sizeof(ecp_ecdh_private_t)) { -        close(fd); -        fprintf(stderr, "Unable to read private key from %s\n", argv[2]); -        exit(1); +    _argc = 1; +    srv_config.region = (uint8_t)strtol(argv[_argc], &endptr, 16); +    if (endptr[0] != '\0') fail("Bad region\n"); +    if (srv_config.region >= MAX_REGION) fail("Bad region\n"); +    _argc++; + +    srv_config.capabilities = (uint8_t)strtol(argv[_argc], &endptr, 16); +    if (endptr[0] != '\0') fail("Bad capabilities\n"); +    _argc++; + +    if (srv_config.capabilities & ECP_DIR_CAP_DIR) { +        if (argc < 7) usage(argv[0]); +    } else { +        if (argc < 5) usage(argv[0]);      } + +    fd = open(argv[_argc], O_RDONLY); +    if (fd < 0) fail("Unable to open %s\n", argv[_argc]); + +    rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private);      close(fd); +    if (rv) fail("Unable to read key from %s\n", argv[_argc]);      srv_config.key_perma.valid = 1; +    _argc++;      rv = ecp_init(&ctx, &vconn_handler, &vlink_handler); -    if (rv) { -        fprintf(stderr, "ecp_init RV:%d\n", rv); -        exit(1); -    } +    if (rv) fail("ecp_init err:%d\n", rv); +      rv = ecp_sock_create(&sock, &ctx, &srv_config.key_perma); -    if (rv) { -        fprintf(stderr, "ecp_sock_create RV:%d\n", rv); -        exit(1); -    } +    if (rv) fail("ecp_sock_create err:%d\n", rv); -    rv = ecp_addr_init(&addr, argv[3]); -    if (rv) { -        fprintf(stderr, "ecp_addr_init RV:%d\n", rv); -        exit(1); -    } +    rv = ecp_vconn_sock_create(&sock); +    if (rv) fail("ecp_vconn_sock_create err:%d\n", rv); + +    rv = ecp_addr_init(&addr, argv[_argc]); +    if (rv) fail("ecp_addr_init err:%d\n", rv); +    _argc++;      rv = ecp_sock_open(&sock, &addr); -    if (rv) { -        fprintf(stderr, "ecp_sock_open RV:%d\n", rv); -        exit(1); -    } +    if (rv) fail("ecp_sock_open err:%d\n", rv); +    srv_config.my_addr = addr; -    rv = dir_init(&ctx); -    if (rv) { -        fprintf(stderr, "dir_init RV:%d\n", rv); -        exit(1); -    } +    rv = acl_init(); +    if (rv) fail("acl_init err:%d\n", rv); -    rv = vlink_init(&ctx); -    if (rv) { -        fprintf(stderr, "vlink_init RV:%d\n", rv); -        exit(1); -    } +    if (srv_config.capabilities & ECP_DIR_CAP_DIR) { +        srv_config.acl_fn_dir = strdup(argv[_argc]); +        _argc++; +        srv_config.acl_fn = strdup(argv[_argc]); +        _argc++; -    rv = dir_start_announce(&sock); -    if (rv) { -        fprintf(stderr, "dir_start_announce RV:%d\n", rv); -        exit(1); +        rv = acl_load(); +        if (rv) fail("acl_load err:%d\n", rv);      } -    rv = vlink_start_open(&sock); -    if (rv) { -        fprintf(stderr, "vlink_start_open RV:%d\n", rv); -        exit(1); -    } +    rv = dir_init(&sock); +    if (rv) fail("dir_init err:%d\n", rv); -    rv = vlink_start_keyx(); -    if (rv) { -        fprintf(stderr, "vlink_start_keyx RV:%d\n", rv); -        exit(1); -    } +    rv = vlink_init(&sock); +    if (rv) fail("vlink_init err:%d\n", rv); -    if (argc == 6) { +    rv = ecp_start_receiver(&sock); +    if (rv) fail("ecp_start_receiver err:%d\n", rv); + +    if (argc == _argc + 2) {          ECPNode node;          ecp_ecdh_public_t node_pub;          ecp_tr_addr_t node_addr; -        if ((fd = open(argv[4], O_RDONLY)) < 0) { -            fprintf(stderr, "Unable to open %s\n", argv[4]); -            exit(1); -        } -        if (read(fd, &node_pub, sizeof(ecp_ecdh_public_t))  != sizeof(ecp_ecdh_public_t)) { -            close(fd); -            fprintf(stderr, "Unable to read public key from %s\n", argv[4]); -            exit(1); -        } -        close(fd); +        fd = open(argv[_argc], O_RDONLY); +        if (fd < 0) fail("Unable to open %s\n", argv[_argc]); -        int rv; +        rv = ecp_util_read_key(fd, &node_pub, NULL); +        close(fd); +        if (rv) fail("Unable to read public key from %s\n", argv[_argc]); +        _argc++;          ecp_node_init(&node, &node_pub, NULL); -        rv = ecp_node_set_addr(&node, argv[5]); -        if (rv) { -            fprintf(stderr, "ecp_node_set_addr RV:%d\n", rv); -            exit(1); -        } -        rv = dir_open_conn(&sock, &node); -        if (rv) { -            fprintf(stderr, "dir_open_conn RV:%d\n", rv); -            exit(1); +        rv = ecp_node_set_addr(&node, argv[_argc]); +        if (rv) fail("ecp_node_set_addr err:%d\n", rv); +        _argc++; + +        rv = dir_init_ann(&sock, &node); +        if (rv) fail("dir_init_ann err:%d\n", rv); +    } + +    if (_argc != argc) usage(argv[0]); + +    debug_init_str = getenv("ECP_DBG_INIT"); +    if (debug_init_str) { +        int init_ann; + +        init_ann = (int)strtol(debug_init_str, &endptr, 10); +        if (endptr[0] == '\0') { +            LOG(LOG_DEBUG, "init switch start - number of announces:%d\n", init_ann); +            dir_init_switch(&sock, init_ann); +            LOG(LOG_DEBUG, "init switch done\n");          }      } -    ecp_receiver(&sock); -}
\ No newline at end of file +    rv = dir_start_announce(&sock); +    if (rv) fail("dir_start_announce err:%d\n", rv); + +    rv = vlink_start_keyx(&sock); +    if (rv) fail("vlink_start_keyx err:%d\n", rv); + +    while(1) pause(); +} diff --git a/ecp/server/server.h b/ecp/server/server.h index ff8e444..21f2129 100644 --- a/ecp/server/server.h +++ b/ecp/server/server.h @@ -1,15 +1,20 @@  #include <stdio.h> -#define LOG_DEBUG           1 -#define LOG_INFO            2 -#define LOG_ERR             3 +#define LOG_DEBUG           0 +#define LOG_INFO            1 +#define LOG_ERR             2  #define LOG_LEVEL           LOG_DEBUG -#define LOG(l, ...)         (l >= LOG_LEVEL ? fprintf(stderr, __VA_ARGS__) : 0 ) +#define LOG(l, ...)         log_print(l, __VA_ARGS__);  typedef struct SRVConfig {      ECPDHKey key_perma; -    uint16_t capabilities; +    char *acl_fn; +    char *acl_fn_dir; +    ecp_tr_addr_t my_addr; +    uint8_t region; +    uint8_t capabilities;  } SRVConfig;  SRVConfig *srv_get_config(void); +void log_print(int level, char *format, ...); diff --git a/ecp/server/timer.c b/ecp/server/timer.c new file mode 100644 index 0000000..ab30633 --- /dev/null +++ b/ecp/server/timer.c @@ -0,0 +1,98 @@ +#include <stdlib.h> +#include <unistd.h> +#include <time.h> + +#include <ecp/core.h> + +#include "dir.h" + +#include "server.h" +#include "timer.h" + +static struct timespec timer_ts_mono; +static timer_t timer_ann_block_id; +static timer_t timer_online_switch_id; + +int timer_set_next(time_t tv_now) { +    struct itimerspec its = { 0 }; +    time_t tv_midnight; +    int rv = 0; + +    /* next midnight */ +    tv_midnight = (tv_now / ONLINE_SWITCH_PERIOD) * ONLINE_SWITCH_PERIOD + ONLINE_SWITCH_PERIOD; + +    if (tv_now < (tv_midnight - ANN_BLOCK_TIME)) { +        dir_announce_allow(); + +        its.it_value.tv_sec = tv_midnight - ANN_BLOCK_TIME; +        rv = timer_settime(timer_ann_block_id, TIMER_ABSTIME, &its, NULL); +        if (rv) return ECP_ERR; +    } else { +        dir_announce_block(); +    } + +    its.it_value.tv_sec = tv_midnight; +    rv = timer_settime(timer_online_switch_id, TIMER_ABSTIME, &its, NULL); +    if (rv) return ECP_ERR; + +    return ECP_OK; +} + +void timer_ann_block(union sigval timer_data) { +    dir_announce_block(); +} + +void timer_online_switch(union sigval timer_data) { +    ECPSocket *sock = timer_data.sival_ptr; +    struct timespec ts_prev; +    int rv; + +    ts_prev = timer_ts_mono; +    clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono); + +    /* exit if someone is messing with realtime clock */ +    if ((timer_ts_mono.tv_sec - ts_prev.tv_sec) < (ONLINE_SWITCH_PERIOD / 2)) goto online_switch_fin; + +    dir_online_switch(sock, 1); + +online_switch_fin: +    rv = timer_set_next(time(NULL)); +    if (rv) LOG(LOG_ERR, "timer_online_switch: set next timer err:%d\n", rv); +} + +int timer_init(ECPSocket *sock) { +    struct sigevent timer_ann_block_evt = { 0 }; +    struct sigevent timer_online_switch_evt = { 0 }; +    time_t tv_now; +    int rv = 0; + +    timer_ann_block_evt.sigev_notify = SIGEV_THREAD; +    timer_ann_block_evt.sigev_notify_function = timer_ann_block; +    timer_ann_block_evt.sigev_value.sival_ptr = NULL; +    rv = timer_create(CLOCK_REALTIME, &timer_ann_block_evt, &timer_ann_block_id); +    if (rv) return ECP_ERR; + +    timer_online_switch_evt.sigev_notify = SIGEV_THREAD; +    timer_online_switch_evt.sigev_notify_function = timer_online_switch; +    timer_online_switch_evt.sigev_value.sival_ptr = sock; +    rv = timer_create(CLOCK_REALTIME, &timer_online_switch_evt, &timer_online_switch_id); +    if (rv) { +        timer_delete(timer_ann_block_id); +        return ECP_ERR; +    } + +    /* ensure that first dir_online_switch is executed */ +    clock_gettime(CLOCK_MONOTONIC, &timer_ts_mono); +    timer_ts_mono.tv_sec -= ONLINE_SWITCH_PERIOD / 2; + +    tv_now = time(NULL); +    dit_init_serial(tv_now / ONLINE_SWITCH_PERIOD); +    rv = timer_set_next(tv_now); +    if (rv) { +        timer_delete(timer_ann_block_id); +        timer_delete(timer_online_switch_id); +        return rv; +    } + +    return ECP_OK; +} diff --git a/ecp/server/timer.h b/ecp/server/timer.h new file mode 100644 index 0000000..526f0b8 --- /dev/null +++ b/ecp/server/timer.h @@ -0,0 +1,9 @@ +#include <signal.h> + +#define ANN_BLOCK_TIME         7200    /* time to block announce before online switch (s) */ +#define ONLINE_SWITCH_PERIOD   86400   /* online switch period (s) */ + +int timer_set_next(time_t tv_now); +void timer_ann_block(union sigval timer_data); +void timer_online_switch(union sigval timer_data); +int timer_init(ECPSocket *sock); diff --git a/ecp/server/vlink.c b/ecp/server/vlink.c index e98dacc..d0c865b 100644 --- a/ecp/server/vlink.c +++ b/ecp/server/vlink.c @@ -3,318 +3,238 @@  #include <string.h>  #include <ecp/core.h> -#include <ecp/dir/dir.h>  #include <ecp/vconn/vconn.h>  #include <ecp/ht.h>  #include <ecp/tm.h> -#include "server.h"  #include "dir.h" +#include "vlink.h"  #include "ht.h" -#include "vlink.h" +#include "server.h"  static ecp_ht_table_t *vlink_conn = NULL; -static ecp_ht_table_t *vlink_node = NULL; -static pthread_mutex_t vlink_conn_mutex; -static pthread_mutex_t vlink_node_mutex; -static pthread_t vlink_open_thd; +static pthread_rwlock_t vlink_conn_rwlock;  static pthread_t vlink_keyx_thd;  static SRVConfig *srv_config; -void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err) { -    switch (mtype) { -        case ECP_MTYPE_OPEN_REP: { -            if (err == ECP_ERR_TIMEOUT) { -                int rv; - -                rv = vlink_insert_node(conn); -                if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); -            } -            ecp_conn_close(conn); -            break; -        } -    } -} - -int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs) { -    int rv; - -    rv = ecp_vlink_handle_open(conn, bufs); -    if (rv) return rv; - -    if (ecp_conn_is_inb(conn)) return ECP_OK; - -    pthread_mutex_lock(&vlink_conn_mutex); -    rv = ht_insert_conn(vlink_conn, conn); -    pthread_mutex_unlock(&vlink_conn_mutex); - -    if (rv) { -        ecp_vlink_handle_close(conn); -        return rv; -    } - -    return ECP_OK; -} - -void vlink_handle_close(ECPConnection *conn) { -    int rv; - -    ecp_vlink_handle_close(conn); +#define MAX(X, Y)   (((X) > (Y)) ? (X) : (Y)) -    if (ecp_conn_is_inb(conn)) return; - -    rv = vlink_insert_node(conn); -    if (rv) LOG(LOG_ERR, "vlink insert node err:%d\n", rv); -} - -int vlink_open_conn(ECPSocket *sock, ECPNode *node) { +int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn) {      ECPConnection *conn;      int rv; +    *_conn = NULL;      conn = malloc(sizeof(ECPConnection));      if (conn == NULL) return ECP_ERR_ALLOC;      ecp_vlink_init(conn, sock);      rv = ecp_conn_open(conn, node); -    return rv; +    if (rv) return rv; + +    *_conn = conn; +    return ECP_OK;  }  void vlink_new_node(ECPSocket *sock, ECPDirItem *dir_item) { -    if ((dir_item->capabilities & ECP_DIR_CAP_VCONN) && (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) < 0)) { -        int rv; +    ECPConnection *conn; +    int rv; -        LOG(LOG_DEBUG, "vlink open connection\n"); -        rv = vlink_open_conn(sock, &dir_item->node); -        if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); -    } -} +    if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; -int vlink_insert_node(ECPConnection *conn) { -    DirNode *node; -    int rv = ECP_OK; +    pthread_rwlock_rdlock(&vlink_conn_rwlock); +    conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); +    pthread_rwlock_unlock(&vlink_conn_rwlock); -    node = dir_search_conn(conn); -    if (node) { -        pthread_mutex_lock(&vlink_node_mutex); -        rv = ht_insert_node(vlink_node, node); -        pthread_mutex_unlock(&vlink_node_mutex); +    if (conn) return; -        if (rv) { -            pthread_mutex_lock(&node->mutex); -            node->refcount--; -            pthread_mutex_unlock(&node->mutex); -        } +    rv = vlink_open_conn(sock, &dir_item->node, &conn); +    if (rv) { +        LOG(LOG_ERR, "vlink_new_node: open conn err:%d\n", rv); +        return;      } -    return rv; -} - -void vlink_open(ECPSocket *sock) { -    struct hashtable_itr itr; -    DirNode *node; -    DirNode *node_next; -    DirNode *open_node[MAX_OPEN_CNT]; -    int open_cnt; -    int i; -    int rv; - -    node_next = NULL; -    do { -        open_cnt = 0; -        pthread_mutex_lock(&vlink_node_mutex); - -        if (ecp_ht_count(vlink_node) > 0) { -            ecp_ht_itr_create(&itr, vlink_node); -            if (node_next) { -                rv = ecp_ht_itr_search(&itr, node_next); -                if (rv) { -                    LOG(LOG_ERR, "vlink open itr search err:%d\n", rv); -                    break; -                } -                node_next = NULL; -            } -            do { -                node = ecp_ht_itr_value(&itr); -                rv = ecp_ht_itr_remove(&itr); - -                open_node[open_cnt] = node; -                open_cnt++; -                if (open_cnt == MAX_OPEN_CNT) { -                    if (!rv) { -                        node_next = ecp_ht_itr_key(&itr); -                    } else { -                        node_next = NULL; -                    } -                } -            } while (rv == ECP_OK); -        } +    pthread_rwlock_wrlock(&vlink_conn_rwlock); +    rv = ht_insert_conn(vlink_conn, conn); +    pthread_rwlock_unlock(&vlink_conn_rwlock); -        pthread_mutex_unlock(&vlink_node_mutex); +    if (rv) { +        LOG(LOG_ERR, "vlink_new_node: ins conn err:%d\n", rv); +        ecp_conn_close(conn); +        return; +    } -        for (i=0; i<open_cnt; i++) { -            pthread_mutex_lock(&open_node[i]->mutex); +    LOG(LOG_DEBUG, "vlink_new_node: new connection\n"); +} -            if (open_node[i]->zombie) { -                open_node[i]->refcount--; +void vlink_del_node(ECPDirItem *dir_item) { +    ECPConnection *conn; -                pthread_mutex_unlock(&open_node[i]->mutex); -            } else { -                pthread_mutex_unlock(&open_node[i]->mutex); +    if (!(dir_item->capabilities & ECP_DIR_CAP_VCONN) || (memcmp(&dir_item->node.key_perma.public, &srv_config->key_perma.public, sizeof(srv_config->key_perma.public)) == 0)) return; -                LOG(LOG_DEBUG, "vlink open connection\n"); -                rv = vlink_open_conn(sock, &open_node[i]->dir_item.node); -                if (rv) LOG(LOG_ERR, "vlink open connection err:%d\n", rv); -            } -        } -    } while (node_next); +    pthread_rwlock_rdlock(&vlink_conn_rwlock); +    conn = ecp_ht_search(vlink_conn, &dir_item->node.key_perma.public); +    if (conn) ecp_conn_set_uflags(conn, VLINK_UFLAG_DISCONNECT); +    pthread_rwlock_unlock(&vlink_conn_rwlock);  } -void vlink_keyx(void) { +void vlink_keyx(ECPSocket *sock, int keyx_period) {      struct hashtable_itr itr; -    ecp_sts_t now;      ECPConnection *conn; -    ECPConnection *keyx_next; -    ECPConnection *keyx_conn[MAX_KEYX_CNT]; -    int keyx_conn_z[MAX_KEYX_CNT]; +    ecp_ecdh_public_t *conn_next; +    ECPConnection *keyx_conn[MAX_NODE_ANNOUNCE]; +    unsigned int ht_count;      int keyx_cnt; -    int is_zombie;      int i;      int rv; -    now = ecp_tm_get_s(); -    keyx_next = NULL; +    conn_next = NULL;      do { +        uint32_t delay; +        uint32_t rnd; +          keyx_cnt = 0; -        pthread_mutex_lock(&vlink_conn_mutex); +        pthread_rwlock_rdlock(&vlink_conn_rwlock); -        if (ecp_ht_count(vlink_conn) > 0) { +        ht_count = ecp_ht_count(vlink_conn); +        if (ht_count > 0) {              ecp_ht_itr_create(&itr, vlink_conn); -            if (keyx_next) { -                rv = ecp_ht_itr_search(&itr, keyx_next); +            if (conn_next) { +                rv = ecp_ht_itr_search(&itr, conn_next);                  if (rv) { -                    LOG(LOG_ERR, "vlink keyx itr search err:%d\n", rv); +                    LOG(LOG_ERR, "vlink_keyx: itr search err:%d\n", rv);                      break;                  } -                keyx_next = NULL; +                conn_next = NULL;              }              do {                  conn = ecp_ht_itr_value(&itr); -                is_zombie = ecp_conn_is_zombie(conn, now, CONN_EXP_TIME); -                if (is_zombie) { -                    rv = ecp_ht_itr_remove(&itr); +                if (keyx_cnt < MAX_NODE_ANNOUNCE) { +                    keyx_conn[keyx_cnt] = conn; +                    keyx_cnt++;                  } else { -                    rv = ecp_ht_itr_advance(&itr); +                    conn_next = ecp_ht_itr_key(&itr); +                    break;                  } -                keyx_conn[keyx_cnt] = conn; -                keyx_conn_z[keyx_cnt] = is_zombie; -                keyx_cnt++; - -                if (keyx_cnt == MAX_KEYX_CNT) { -                    if (!rv) { -                        keyx_next = ecp_ht_itr_key(&itr); -                    } else { -                        keyx_next = NULL; -                    } -                } +                rv = ecp_ht_itr_advance(&itr);              } while (rv == ECP_OK);          } -        pthread_mutex_unlock(&vlink_conn_mutex); +        /* no need to copy conn_next key, since this is the only thread that can remove connection */ +        pthread_rwlock_unlock(&vlink_conn_rwlock); +        /* we are counting delay for total hashtable entries; delay for absolute max ht_count (256 msgs with max load, keyx period of 600s) is ~70ms */ +        delay = keyx_period * 1000000 / MAX(ht_count, 100);          for (i=0; i<keyx_cnt; i++) { -            if (keyx_conn_z[i]) { -                LOG(LOG_DEBUG, "vlink close connection\n"); -                ecp_conn_close(keyx_conn[i]); -            } else { +            int is_d, is_reg, is_open, is_z; +            ecp_sts_t now; + +            conn = keyx_conn[i]; + +            is_d = is_reg = is_open = is_z = 0; +            now = ecp_tm_get_s(); + +            ecp_conn_lock(conn); +            is_d = _ecp_conn_test_uflags(conn, VLINK_UFLAG_DISCONNECT); +            is_reg = _ecp_conn_is_reg(conn); +            if (is_reg) is_open = _ecp_conn_is_open(conn); +            if (is_open) is_z = _ecp_conn_is_zombie(conn, now, KEYX_PERIOD * 3); +            ecp_conn_unlock(conn); + +            if (is_d) { +                LOG(LOG_DEBUG, "vlink_keyx: disconnect\n"); +                // close all inbound connections; +                pthread_rwlock_wrlock(&vlink_conn_rwlock); +                ht_remove_conn(vlink_conn, conn); +                pthread_rwlock_unlock(&vlink_conn_rwlock); + +                ecp_conn_close(conn); +            } else if (!is_reg || is_z) { +                ECPConnection *_conn = NULL; + +                LOG(LOG_DEBUG, "vlink_keyx: reconnect\n"); +                rv = vlink_open_conn(conn->sock, &conn->remote, &_conn); +                if (rv) { +                    LOG(LOG_ERR, "vlink_keyx: conn open err:%d\n", rv); +                    continue; +                } + +                pthread_rwlock_wrlock(&vlink_conn_rwlock); +                ht_remove_conn(vlink_conn, conn); +                rv = ht_insert_conn(vlink_conn, _conn); +                pthread_rwlock_unlock(&vlink_conn_rwlock); + +                ecp_conn_close(conn); +                if (rv) { +                    LOG(LOG_ERR, "vlink_keyx: conn insert err:%d\n", rv); +                    ecp_conn_close(_conn); +                } +            } else if (is_open) {                  ssize_t _rv; -                LOG(LOG_DEBUG, "vlink send keyx\n"); -                _rv = ecp_send_keyx_req(keyx_conn[i], 1); -                if (_rv < 0) LOG(LOG_ERR, "vlink send keyx err:%ld\n", _rv); +                _rv = ecp_send_keyx_req(conn, 1); +                if (_rv < 0) LOG(LOG_ERR, "vlink_keyx: send keyx req err:%ld\n", _rv); +            } + +            if (delay) { +                /* randomize over delay / 2 and delay + delay / 2 */ +                rnd = arc4random_uniform(delay + 1); +                usleep(delay / 2 + rnd);              }          } -    } while (keyx_next); + +    } while (conn_next); + +    LOG(LOG_DEBUG, "vlink_keyx: connection count:%u\n", ht_count);  } -static void *_vlink_open(void *arg) { +static void *_vlink_keyx(void *arg) {      ECPSocket *sock = arg; +    uint32_t rnd; +    time_t tv_sec; +    int delay;      while (1) { -        LOG(LOG_DEBUG, "vlink open...\n"); -        vlink_open(sock); -        sleep(10); -    } +        tv_sec = time(NULL); -    return NULL; -} +        vlink_keyx(sock, KEYX_PERIOD); -static void *_vlink_keyx(void *arg) { -    while (1) { -        LOG(LOG_DEBUG, "vlink keyx...\n"); -        vlink_keyx(); -        sleep(10); +        /* resolution is 1s */ +        delay = KEYX_PERIOD - (time(NULL) - tv_sec); +        if (delay > 0) { +            rnd = arc4random_uniform(delay + 1); +            sleep(delay / 2 + rnd); +        }      }      return NULL;  } -int vlink_start_open(ECPSocket *sock) { -    int rv; - -    rv = pthread_create(&vlink_open_thd, NULL, _vlink_open, sock); -    if (rv) return ECP_ERR; -    return ECP_OK; -} -int vlink_start_keyx(void) { +int vlink_start_keyx(ECPSocket *sock) {      int rv; -    rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, NULL); +    rv = pthread_create(&vlink_keyx_thd, NULL, _vlink_keyx, sock);      if (rv) return ECP_ERR;      return ECP_OK;  } -int vlink_init(ECPContext *ctx) { -    ECPConnHandler *handler; +int vlink_init(ECPSocket *sock) { +    ECPContext *ctx = sock->ctx;      int rv;      srv_config = srv_get_config(); -    handler = ecp_ctx_get_handler(ctx, ECP_CTYPE_VLINK); -    if (handler == NULL) return ECP_ERR; - -    handler->handle_open = vlink_handle_open; -    handler->handle_close = vlink_handle_close; - -    rv = pthread_mutex_init(&vlink_conn_mutex, NULL); -    if (rv) { -        return ECP_ERR; -    } - -    rv = pthread_mutex_init(&vlink_node_mutex, NULL); -    if (rv) { -        pthread_mutex_destroy(&vlink_conn_mutex); -        return ECP_ERR; -    } +    rv = pthread_rwlock_init(&vlink_conn_rwlock, NULL); +    if (rv) return ECP_ERR; -    rv = ECP_OK;      vlink_conn = ecp_ht_create_keys(); -    if (vlink_conn == NULL) rv = ECP_ERR_ALLOC; -    if (!rv) { -        vlink_node = ecp_ht_create_keys(); -        if (vlink_node == NULL) rv = ECP_ERR_ALLOC; -    } - -    if (rv) { -        pthread_mutex_destroy(&vlink_node_mutex); -        pthread_mutex_destroy(&vlink_conn_mutex); -        if (vlink_node) ecp_ht_destroy(vlink_node); -        if (vlink_conn) ecp_ht_destroy(vlink_conn); -        return rv; +    if (vlink_conn == NULL) { +        pthread_rwlock_destroy(&vlink_conn_rwlock); +        return ECP_ERR_ALLOC;      }      return ECP_OK; diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h index 78af6b1..3e8022f 100644 --- a/ecp/server/vlink.h +++ b/ecp/server/vlink.h @@ -1,17 +1,12 @@ -#define MAX_KEYX_CNT        100 -#define MAX_OPEN_CNT        100 +#define MAX_KEYX_CNT            100 -#define CONN_EXP_TIME       22 +#define KEYX_PERIOD             600     /* key exchange priod (s); can't exceed 1h */ +#define VLINK_UFLAG_DISCONNECT  0x80 -void vlink_handle_err(ECPConnection *conn, unsigned char mtype, int err); -int vlink_handle_open(ECPConnection *conn, ECP2Buffer *bufs); -void vlink_handle_close(ECPConnection *conn); -int vlink_open_conn(ECPSocket *sock, ECPNode *node); +int vlink_open_conn(ECPSocket *sock, ECPNode *node, ECPConnection **_conn);  void vlink_new_node(ECPSocket *sock, ECPDirItem *item); -int vlink_insert_node(ECPConnection *conn); +void vlink_del_node(ECPDirItem *dir_item); -void vlink_keyx(void); -void vlink_open(ECPSocket *sock); -int vlink_start_open(ECPSocket *sock); -int vlink_start_keyx(void); -int vlink_init(ECPContext *ctx); +void vlink_keyx(ECPSocket *sock, int keyx_period); +int vlink_start_keyx(ECPSocket *sock); +int vlink_init(ECPSocket *sock); diff --git a/ecp/src/ecp/dir/dir.c b/ecp/src/ecp/dir/dir.c index d826e24..6258e0e 100644 --- a/ecp/src/ecp/dir/dir.c +++ b/ecp/src/ecp/dir/dir.c @@ -2,10 +2,19 @@  #include <string.h>  #include <ecp/core.h> +#include <ecp/tr.h>  #include <ecp/cr.h>  #include "dir.h" +int ecp_dir_item_eq(ECPDirItem *item1, ECPDirItem *item2) { +    if (item1->region != item2->region) return 0; +    if (item1->capabilities != item2->capabilities) return 0; +    if (!item1->node.key_perma.valid || !item1->node.key_perma.valid) return 0; +    if (memcmp(&item1->node.key_perma.public, &item2->node.key_perma.public, sizeof(item1->node.key_perma.public)) != 0) return 0; +    return ecp_tr_addr_eq(&item1->node.addr, &item2->node.addr); +} +  size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) {      ECPDHPub *key;      ecp_tr_addr_t *addr; @@ -31,11 +40,10 @@ size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf) {      buf += sizeof(uint16_t);      rsize += sizeof(uint16_t); -    item->capabilities =    \ -        ((uint16_t)buf[0] << 8) |     \ -        ((uint16_t)buf[1]); -    buf += sizeof(uint16_t); -    rsize += sizeof(uint16_t); +    item->region = buf[0]; +    item->capabilities = buf[1]; +    buf += 2; +    rsize += 2;      return rsize;  } @@ -61,10 +69,10 @@ size_t ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf) {      buf += sizeof(uint16_t);      rsize += sizeof(uint16_t); -    buf[0] = item->capabilities >> 8; +    buf[0] = item->region;      buf[1] = item->capabilities; -    buf += sizeof(uint16_t); -    rsize += sizeof(uint16_t); +    buf += 2; +    rsize += 2;      return rsize;  } diff --git a/ecp/src/ecp/dir/dir.h b/ecp/src/ecp/dir/dir.h index b28f801..83b97df 100644 --- a/ecp/src/ecp/dir/dir.h +++ b/ecp/src/ecp/dir/dir.h @@ -1,10 +1,7 @@  #define ECP_SIZE_DIR_ITEM       40 -#define ECP_MTYPE_DIR_UPD       0x00 -#define ECP_MTYPE_DIR_REQ       0x01 -#define ECP_MTYPE_DIR_ANN       0x02 -#define ECP_MTYPE_DIR_REP       0x03 -#define ECP_MTYPE_DIR_SHADOW    0x04 +#define ECP_MTYPE_DIR_REQ       0x00 +#define ECP_MTYPE_DIR_REP       0x01  #define ECP_CTYPE_DIR           (0x00 | ECP_CTYPE_FLAG_SYS) @@ -13,9 +10,11 @@  typedef struct ECPDirItem {      ECPNode node; -    uint16_t capabilities; +    uint8_t region; +    uint8_t capabilities;  } ECPDirItem; +int ecp_dir_item_eq(ECPDirItem *item1, ECPDirItem *item2);  size_t ecp_dir_item_parse(ECPDirItem *item, unsigned char *buf);  size_t ecp_dir_item_serialize(ECPDirItem *item, unsigned char *buf);  | 
