From 9c8ae2d28f0c3e75371fbae6e9f688798b44255c Mon Sep 17 00:00:00 2001 From: Uros Majstorovic Date: Thu, 23 May 2024 00:42:25 +0200 Subject: added multiple server receiver threads; added server dir query --- ecp/server/dir.c | 15 ++- ecp/server/dir.h | 1 + ecp/server/server.c | 290 +++++++++++++++++++++++++++++++++------------------- ecp/server/server.h | 12 ++- 4 files changed, 210 insertions(+), 108 deletions(-) (limited to 'ecp/server') diff --git a/ecp/server/dir.c b/ecp/server/dir.c index 95db4b5..c045599 100644 --- a/ecp/server/dir.c +++ b/ecp/server/dir.c @@ -428,7 +428,7 @@ void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t * pthread_mutex_lock(&node->mutex); pthread_rwlock_unlock(&dir_shadow_rwlock); - if (node->zombie) goto process_item_fin; + if (node->zombie || (s_public == NULL)) goto process_item_fin; if (!node->verified) { int key_ex = 0; @@ -754,6 +754,19 @@ void dit_init_serial(uint16_t serial) { } } +int dir_init_dir_cnt(unsigned int dir_cnt, uint16_t serial) { + int rv = ECP_OK; + + if (dir_online) { + pthread_rwlock_wrlock(&dir_online_rwlock); + if (dir_online->serial != serial) rv = ECP_ERR; + if (!rv) dir_vkey_req = dir_cnt / 2 + 1; + pthread_rwlock_unlock(&dir_online_rwlock); + } + + return rv; +} + void dir_announce(ECPSocket *sock, int ann_period) { struct hashtable_itr itr; DIRNode *node; diff --git a/ecp/server/dir.h b/ecp/server/dir.h index fee3a5e..dbcbc2e 100644 --- a/ecp/server/dir.h +++ b/ecp/server/dir.h @@ -69,6 +69,7 @@ void dir_remove_nodes(DIRNode *remove_node[], int remove_cnt); void dir_announce_allow(void); void dir_announce_block(void); void dit_init_serial(uint16_t serial); +int dir_init_dir_cnt(unsigned int dir_cnt, uint16_t serial); void dir_announce(ECPSocket *sock, int ann_period); int dir_start_announce(ECPSocket *sock); void dir_init_switch(ECPSocket *sock, int init_ann); diff --git a/ecp/server/server.c b/ecp/server/server.c index 3fc0e7c..a7140a1 100644 --- a/ecp/server/server.c +++ b/ecp/server/server.c @@ -10,6 +10,7 @@ #include #include #include +#include #include @@ -23,8 +24,12 @@ static SRVConfig srv_config; static int proc_forked = 0; -static FILE *log_file = NULL; +static pthread_t rcvr_thd[MAX_THREADS]; +static pthread_mutex_t dir_sync_mutex; +static pthread_cond_t dir_sync_cond; +static int dir_sync_res = 0; +static FILE *log_file = NULL; int log_level; static const char *log_level_str[] = { "ERROR", @@ -40,6 +45,29 @@ static void handle_err(ECPConnection *conn, unsigned char mtype, int err) { LOG(LOG_ERR, "handle_err: ctype:%d mtype:%d err:%d\n", conn->type, mtype, err); } +static ssize_t conn_auth(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public, unsigned char *msg, size_t msg_size) { + switch (ctype) { + case CTYPE_DIR: { + if (public == NULL) return ECP_ERR_AUTH; + if (!acl_inlist(public)) return ECP_ERR_AUTH; + return 0; + } + + case ECP_CTYPE_VLINK: { + if (public == NULL) return ECP_ERR_AUTH; + if ((parent == NULL) && !acl_inlist(public)) return ECP_ERR_AUTH; + return 0; + } + + case ECP_CTYPE_DIR: + case ECP_CTYPE_VCONN: + return 0; + + default: + return ECP_ERR_AUTH; + } +} + static ECPConnection *conn_new(ECPSocket *sock, ECPConnection *parent, unsigned char ctype) { ECPConnection *conn = NULL; @@ -93,35 +121,11 @@ static void conn_free(ECPConnection *conn) { free(conn); } -static ssize_t conn_auth(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public, unsigned char *msg, size_t msg_size) { - switch (ctype) { - case CTYPE_DIR: { - if (public == NULL) return ECP_ERR_AUTH; - if (!acl_inlist(public)) return ECP_ERR_AUTH; - return 0; - } - - case ECP_CTYPE_VLINK: { - if (public == NULL) return ECP_ERR_AUTH; - if ((parent == NULL) && !acl_inlist(public)) return ECP_ERR_AUTH; - return 0; - } - - case ECP_CTYPE_DIR: - case ECP_CTYPE_VCONN: - return 0; - - default: - return ECP_ERR_AUTH; - } -} - static int core_logger(const char *format, ...) { va_list ap; - FILE *file = log_file ? log_file : stderr; va_start(ap, format); - log_vfprintf(LOG_ERR, file, format, ap); + log_vprintf(LOG_ERR, format, ap); va_end(ap); return 0; @@ -142,50 +146,56 @@ int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vli return ECP_OK; } -void log_vfprintf(int level, FILE *file, const char *format, va_list ap) { - time_t t; - char t_buf[26]; - char s_buf[256]; +void log_vprintf(int level, const char *format, va_list ap) { + FILE *file; if (level > log_level) return; if (level >= (sizeof(log_level_str) / sizeof(char *))) return; - t = time(NULL); - ctime_r(&t, t_buf); - /* terminate before newline */ - t_buf[24] = '\0'; - - vsnprintf(s_buf, sizeof(s_buf), format, ap); - - fprintf(file, "%s [%s]: %s", t_buf, log_level_str[level], s_buf); + file = srv_config.log_file ? srv_config.log_file : stderr; + if (proc_forked) { + time_t t; + char t_buf[26]; + char s_buf[256]; + int rv; + + t = time(NULL); + ctime_r(&t, t_buf); + /* terminate string before newline */ + t_buf[24] = '\0'; + + rv = vsnprintf(s_buf, sizeof(s_buf), format, ap); + /* always print newline at the and of string */ + if (rv <= 0) return; + if (rv >= (sizeof(s_buf) - 1)) { + s_buf[sizeof(s_buf) - 2] = '\n'; + } else if (s_buf[rv - 1] != '\n') { + s_buf[rv] = '\n'; + s_buf[rv + 1] = '\0'; + } + fprintf(file, "%s [%s]: %s", t_buf, log_level_str[level], s_buf); + } else { + vfprintf(file, format, ap); + } fflush(file); } void log_printf(int level, const char *format, ...) { va_list ap; - FILE *file = log_file ? log_file : stderr; if (level > log_level) return; va_start(ap, format); - log_vfprintf(level, file, format, ap); + log_vprintf(level, format, ap); va_end(ap); } static void fail(const char *format, ...) { va_list ap; - FILE *file; va_start(ap, format); - if (proc_forked) { - file = log_file ? log_file : stderr; - log_vfprintf(LOG_ERR, file, format, ap); - } else { - file = stderr; - vfprintf(file, format, ap); - } + log_vprintf(LOG_ERR, format, ap); va_end(ap); - fflush(file); exit(1); } @@ -196,6 +206,9 @@ static void usage(char *arg) { fprintf(stderr, "\t-d\n"); fprintf(stderr, "\t\tdetach\n"); + fprintf(stderr, "\t-t \n"); + fprintf(stderr, "\t\tstart receiver threads (maximum: %d)\n", MAX_THREADS); + fprintf(stderr, "\t-l \n"); fprintf(stderr, "\t\tset log level: 0 - error, 1 - info, 2 - debug\n"); @@ -221,7 +234,7 @@ static void daemonize(void) { proc_forked = 1; - pid = setsid() + pid = setsid(); if (pid < 0) fail("setsid failed\n"); /* second fork */ @@ -229,14 +242,20 @@ static void daemonize(void) { if (pid < 0) fail("fork2 failed\n"); if (pid > 0) exit(0); - /* redirect stdio to /dev/null */ + /* redirect /dev/null to stdin */ null_rd = open("/dev/null", O_RDONLY); if (null_rd < 0) fail("open (r) /dev/null failed\n"); - null_wr = open("/dev/null", O_WRONLY); - if (null_wr < 0) fail("open (w) /dev/null failed\n"); - rv = dup2(null_rd, STDIN_FILENO); if (rv < 0) fail("dup2 (stdin) failed\n"); + close(null_rd); + + /* redirect stdout and stderr to srv_config.log_file or /dev/null */ + if (srv_config.log_file) { + null_wr = fileno(srv_config.log_file); + } else { + null_wr = open("/dev/null", O_WRONLY); + if (null_wr < 0) fail("open (w) /dev/null failed\n"); + } rv = dup2(null_wr, STDOUT_FILENO); if (rv < 0) fail("dup2 (stdout) failed\n"); @@ -244,44 +263,89 @@ static void daemonize(void) { rv = dup2(null_wr, STDERR_FILENO); if (rv < 0) fail("dup2 (stderr) failed\n"); - close(null_rd); - close(null_wr); + if (srv_config.log_file == NULL) close(null_wr); +} + +static void *_ecp_receiver(void *arg) { + ecp_receiver((ECPSocket *)arg); + return NULL; +} + +static int start_receivers(ECPSocket *sock) { + int i, rv; + + for (i=0; icount; i++) { + if (list->items[i].roles & ECP_ROLE_DIR) dir_cnt++; + } + rv = dir_init_dir_cnt(dir_cnt, list->serial); + if (rv) fail("Bad timing\n"); + + for (i=0; icount; i++) { + dir_process_item(&list->items[i], sock, NULL); + } + ecp_dir_list_destroy(list); + + pthread_mutex_lock(&dir_sync_mutex); + dir_sync_res = 1; + pthread_cond_signal(&dir_sync_cond); + pthread_mutex_unlock(&dir_sync_mutex); } int main(int argc, char *argv[]) { char *endptr; - char *log_fn; - uid_t uid; - gid_t gid; - int detach; + unsigned int rcvr_num; char *init_switch; - ecp_tr_addr_t my_addr; ECPNode node; ECPContext ctx; ECPSocket sock; + ECPConnection dir_conn; ECPConnHandler vconn_handler; ECPConnHandler vlink_handler; + ECPConnHandler dir_handler; int _argi, _argc, fd; int rv; memset(&srv_config, 0, sizeof(srv_config)); memset(&node, 0, sizeof(node)); - if (argc < 1) fail("Bad arguments\n"); + if (argc < 1) fail("Bad argument count\n"); _argc = argc - 1; _argi = 1; - log_fn = NULL; - detach = 0; - uid = 0; - gid = 0; + srv_config.rcvr_thd_num = 1; while (_argc && (argv[_argi][0] == '-')) { switch (argv[_argi][1]) { case 'd': { _argi++; _argc--; - detach = 1; + srv_config.detach = 1; + break; + } + + case 't': { + _argi++; + _argc--; + if (_argc == 0) usage(argv[0]); + srv_config.rcvr_thd_num = (unsigned int)strtol(argv[_argi], &endptr, 10); + if (endptr[0] != '\0') fail("Bad number of receiver threads: %s\n", argv[_argi]); + if (srv_config.rcvr_thd_num > MAX_THREADS) fail ("Maximum number of threads allowed: %d\n", MAX_THREADS); + _argi++; + _argc--; break; } @@ -289,9 +353,9 @@ int main(int argc, char *argv[]) { _argi++; _argc--; if (_argc == 0) usage(argv[0]); - log_fn = strdup(argv[_argi]); log_level = (uint8_t)strtol(argv[_argi], &endptr, 10); - if ((endptr[0] != '\0') || log_level > LOG_MAX_LEVEL) fail("Bad log level\n"); + if (endptr[0] != '\0') fail("Bad log level: %s\n", argv[_argi]); + if (log_level > LOG_MAX_LEVEL) fail("Maximum log level allowed: %d\n", LOG_MAX_LEVEL); _argi++; _argc--; break; @@ -301,7 +365,7 @@ int main(int argc, char *argv[]) { _argi++; _argc--; if (_argc == 0) usage(argv[0]); - log_fn = strdup(argv[_argi]); + srv_config.log_fn = strdup(argv[_argi]); _argi++; _argc--; break; @@ -311,8 +375,8 @@ int main(int argc, char *argv[]) { _argi++; _argc--; if (_argc == 0) usage(argv[0]); - uid = (uid_t)strtol(argv[_argi], &endptr, 10); - if (endptr[0] != '\0') fail("Bad uid\n"); + srv_config.uid = (uid_t)strtol(argv[_argi], &endptr, 10); + if (endptr[0] != '\0') fail("Bad uid: %s\n", argv[_argi]); _argi++; _argc--; break; @@ -322,8 +386,8 @@ int main(int argc, char *argv[]) { _argi++; _argc--; if (_argc == 0) usage(argv[0]); - gid = (gid_t)strtol(argv[_argi], &endptr, 10); - if (endptr[0] != '\0') fail("Bad gid\n"); + srv_config.gid = (gid_t)strtol(argv[_argi], &endptr, 10); + if (endptr[0] != '\0') fail("Bad gid: %s\n", argv[_argi]); _argi++; _argc--; break; @@ -337,28 +401,28 @@ int main(int argc, char *argv[]) { if (_argc < 4) usage(argv[0]); srv_config.region = (uint8_t)strtol(argv[_argi], &endptr, 16); - if (endptr[0] != '\0') fail("Bad region\n"); - if (srv_config.region >= MAX_REGION) fail("Bad region\n"); + if (endptr[0] != '\0') fail("Bad region: %s\n", argv[_argi]); + if (srv_config.region >= MAX_REGION) fail("Bad region: %d\n", srv_config.region); _argi++; _argc--; srv_config.roles = (uint8_t)strtol(argv[_argi], &endptr, 16); - if (endptr[0] != '\0') fail("Bad roles\n"); + if (endptr[0] != '\0') fail("Bad roles: %s\n", argv[_argi]); _argi++; _argc--; fd = open(argv[_argi], O_RDONLY); - if (fd < 0) fail("Unable to open %s\n", argv[_argi]); + if (fd < 0) fail("Unable to open key file: %s\n", argv[_argi]); - rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private); + rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private, NULL); close(fd); - if (rv) fail("Unable to read key from %s\n", argv[_argi]); + if (rv) fail("Bad key file: %s\n", argv[_argi]); srv_config.key_perma.valid = 1; _argi++; _argc--; - rv = ecp_addr_init(&my_addr, argv[_argi]); - if (rv) fail("ecp_addr_init err:%d\n", rv); + rv = ecp_addr_init(&srv_config.my_addr, argv[_argi]); + if (rv) fail("Bad local address: %s\n", argv[_argi]); _argi++; _argc--; @@ -376,25 +440,25 @@ int main(int argc, char *argv[]) { _argc--; rv = acl_load(); - if (rv) fail("acl_load err:%d\n", rv); + if (rv) fail("ACL load failed\n"); } if (_argc == 2) { ecp_ecdh_public_t node_pub; fd = open(argv[_argi], O_RDONLY); - if (fd < 0) fail("Unable to open %s\n", argv[_argi]); + if (fd < 0) fail("Unable to open public key file: %s\n", argv[_argi]); - rv = ecp_util_read_key(fd, &node_pub, NULL); + rv = ecp_util_read_key(fd, &node_pub, NULL, NULL); close(fd); - if (rv) fail("Unable to read public key from %s\n", argv[_argi]); + if (rv) fail("Bad public key file: %s\n", argv[_argi]); _argi++; _argc--; ecp_node_init(&node, &node_pub, NULL); rv = ecp_node_set_addr(&node, argv[_argi]); - if (rv) fail("ecp_node_set_addr err:%d\n", rv); + if (rv) fail("Bad remote address: %s\n", argv[_argi]); _argi++; _argc--; } else if (_argc) { @@ -402,12 +466,12 @@ int main(int argc, char *argv[]) { } umask(077); /* rw for owner only */ - if (log_fn) { - log_file = fopen(log_fn, "a"); - if (log_file == NULL) fail("Unable to open log file: %s\n", log_fn); + if (srv_config.log_fn) { + srv_config.log_file = fopen(srv_config.log_fn, "a"); + if (srv_config.log_file == NULL) fail("Unable to open log file: %s\n", srv_config.log_fn); } - if (detach) daemonize(); + if (srv_config.detach) daemonize(); rv = sig_init(); if (rv) fail("sig_init err:%d\n", rv); @@ -424,17 +488,16 @@ int main(int argc, char *argv[]) { rv = ecp_vconn_sock_create(&sock); if (rv) fail("ecp_vconn_sock_create err:%d\n", rv); - rv = ecp_sock_open(&sock, &my_addr); + rv = ecp_sock_open(&sock, &srv_config.my_addr); if (rv) fail("ecp_sock_open err:%d\n", rv); - srv_config.my_addr = my_addr; - if (uid || gid) { - if (gid) { - rv = setgid(gid); + if (srv_config.uid || srv_config.gid) { + if (srv_config.gid) { + rv = setgid(srv_config.gid); if (rv) fail("Unable to set group id\n"); } - if (uid) { - rv = setuid(uid); + if (srv_config.uid) { + rv = setuid(srv_config.uid); if (rv) fail("Unable to set user id\n"); } } @@ -445,24 +508,39 @@ int main(int argc, char *argv[]) { rv = vlink_init(&sock); if (rv) fail("vlink_init err:%d\n", rv); - rv = ecp_start_receiver(&sock); - if (rv) fail("ecp_start_receiver err:%d\n", rv); + rv = start_receivers(&sock); + if (rv) fail("start_receivers err:%d\n", rv); + init_switch = getenv("ECP_INITSW"); if (node.key_perma.valid) { - rv = dir_init_ann(&sock, &node); - if (rv) fail("dir_init_ann err:%d\n", rv); + if (init_switch == NULL) { + rv = ecp_dir_set_handler(&ctx, &dir_handler, dir_result); + if (rv) fail("ecp_dir_set_handler err:%d\n", rv); + + rv = ecp_dir_get(&dir_conn, &sock, &node, 0); + if (rv) fail("ecp_dir_get err:%d\n", rv); + + pthread_mutex_lock(&dir_sync_mutex); + while (dir_sync_res == 0) { + pthread_cond_wait(&dir_sync_cond, &dir_sync_mutex); + } + pthread_mutex_unlock(&dir_sync_mutex); + LOG(LOG_DEBUG, "ecp_dir_get: done\n"); + } else { + rv = dir_init_ann(&sock, &node); + if (rv) fail("dir_init_ann err:%d\n", rv); + } } - init_switch = getenv("ECP_INITSW"); if (init_switch) { int init_ann; init_ann = (int)strtol(init_switch, &endptr, 10); - if (endptr[0] != '\0') fail("Bad environment"); + if (endptr[0] != '\0') fail("Bad environment: ECP_INITSW:%s\n", init_switch); - LOG(LOG_DEBUG, "init switch start - number of announces:%d\n", init_ann); + LOG(LOG_DEBUG, "dir_init_switch: start - number of announces:%d\n", init_ann); dir_init_switch(&sock, init_ann); - LOG(LOG_DEBUG, "init switch done\n"); + LOG(LOG_DEBUG, "dir_init_switch: done\n"); } rv = dir_start_announce(&sock); diff --git a/ecp/server/server.h b/ecp/server/server.h index 3c720ba..c17fa62 100644 --- a/ecp/server/server.h +++ b/ecp/server/server.h @@ -5,6 +5,8 @@ #define LOG_MAX_LEVEL 2 #define LOG(l, ...) { log_printf(l, __VA_ARGS__); } +#define MAX_THREADS 128 + typedef struct SRVConfig { ECPDHKey key_perma; char *acl_fn; @@ -12,8 +14,16 @@ typedef struct SRVConfig { ecp_tr_addr_t my_addr; uint8_t region; uint8_t roles; + int detach; + pthread_t rcvr_thd[MAX_THREADS]; + unsigned int rcvr_thd_num; + uid_t uid; + gid_t gid; + int log_level; + char *log_fn; + FILE *log_file; } SRVConfig; SRVConfig *srv_get_config(void); -void log_vfprintf(int level, FILE *file, const char *format, va_list ap); +void log_vprintf(int level, const char *format, va_list ap); void log_printf(int level, const char *format, ...); -- cgit v1.2.3