summaryrefslogtreecommitdiff
path: root/ecp/server/server.c
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-05-23 00:42:25 +0200
committerUros Majstorovic <majstor@majstor.org>2024-05-23 00:42:25 +0200
commit9c8ae2d28f0c3e75371fbae6e9f688798b44255c (patch)
tree12d62040b51ac9fbd86fd0259bf4e5f46b2ad801 /ecp/server/server.c
parentea2bfa175c9fcc4c0d4aa708fea6ce497b2c73d6 (diff)
added multiple server receiver threads; added server dir query
Diffstat (limited to 'ecp/server/server.c')
-rw-r--r--ecp/server/server.c290
1 files changed, 184 insertions, 106 deletions
diff --git a/ecp/server/server.c b/ecp/server/server.c
index 3fc0e7c..a7140a1 100644
--- a/ecp/server/server.c
+++ b/ecp/server/server.c
@@ -10,6 +10,7 @@
#include <ecp/core.h>
#include <ecp/vconn/vconn.h>
#include <ecp/dir/dir.h>
+#include <ecp/dir/dir_client.h>
#include <util.h>
@@ -23,8 +24,12 @@
static SRVConfig srv_config;
static int proc_forked = 0;
-static FILE *log_file = NULL;
+static pthread_t rcvr_thd[MAX_THREADS];
+static pthread_mutex_t dir_sync_mutex;
+static pthread_cond_t dir_sync_cond;
+static int dir_sync_res = 0;
+static FILE *log_file = NULL;
int log_level;
static const char *log_level_str[] = {
"ERROR",
@@ -40,6 +45,29 @@ static void handle_err(ECPConnection *conn, unsigned char mtype, int err) {
LOG(LOG_ERR, "handle_err: ctype:%d mtype:%d err:%d\n", conn->type, mtype, err);
}
+static ssize_t conn_auth(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public, unsigned char *msg, size_t msg_size) {
+ switch (ctype) {
+ case CTYPE_DIR: {
+ if (public == NULL) return ECP_ERR_AUTH;
+ if (!acl_inlist(public)) return ECP_ERR_AUTH;
+ return 0;
+ }
+
+ case ECP_CTYPE_VLINK: {
+ if (public == NULL) return ECP_ERR_AUTH;
+ if ((parent == NULL) && !acl_inlist(public)) return ECP_ERR_AUTH;
+ return 0;
+ }
+
+ case ECP_CTYPE_DIR:
+ case ECP_CTYPE_VCONN:
+ return 0;
+
+ default:
+ return ECP_ERR_AUTH;
+ }
+}
+
static ECPConnection *conn_new(ECPSocket *sock, ECPConnection *parent, unsigned char ctype) {
ECPConnection *conn = NULL;
@@ -93,35 +121,11 @@ static void conn_free(ECPConnection *conn) {
free(conn);
}
-static ssize_t conn_auth(ECPSocket *sock, ECPConnection *parent, unsigned char ctype, ecp_ecdh_public_t *public, unsigned char *msg, size_t msg_size) {
- switch (ctype) {
- case CTYPE_DIR: {
- if (public == NULL) return ECP_ERR_AUTH;
- if (!acl_inlist(public)) return ECP_ERR_AUTH;
- return 0;
- }
-
- case ECP_CTYPE_VLINK: {
- if (public == NULL) return ECP_ERR_AUTH;
- if ((parent == NULL) && !acl_inlist(public)) return ECP_ERR_AUTH;
- return 0;
- }
-
- case ECP_CTYPE_DIR:
- case ECP_CTYPE_VCONN:
- return 0;
-
- default:
- return ECP_ERR_AUTH;
- }
-}
-
static int core_logger(const char *format, ...) {
va_list ap;
- FILE *file = log_file ? log_file : stderr;
va_start(ap, format);
- log_vfprintf(LOG_ERR, file, format, ap);
+ log_vprintf(LOG_ERR, format, ap);
va_end(ap);
return 0;
@@ -142,50 +146,56 @@ int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vli
return ECP_OK;
}
-void log_vfprintf(int level, FILE *file, const char *format, va_list ap) {
- time_t t;
- char t_buf[26];
- char s_buf[256];
+void log_vprintf(int level, const char *format, va_list ap) {
+ FILE *file;
if (level > log_level) return;
if (level >= (sizeof(log_level_str) / sizeof(char *))) return;
- t = time(NULL);
- ctime_r(&t, t_buf);
- /* terminate before newline */
- t_buf[24] = '\0';
-
- vsnprintf(s_buf, sizeof(s_buf), format, ap);
-
- fprintf(file, "%s [%s]: %s", t_buf, log_level_str[level], s_buf);
+ file = srv_config.log_file ? srv_config.log_file : stderr;
+ if (proc_forked) {
+ time_t t;
+ char t_buf[26];
+ char s_buf[256];
+ int rv;
+
+ t = time(NULL);
+ ctime_r(&t, t_buf);
+ /* terminate string before newline */
+ t_buf[24] = '\0';
+
+ rv = vsnprintf(s_buf, sizeof(s_buf), format, ap);
+ /* always print newline at the and of string */
+ if (rv <= 0) return;
+ if (rv >= (sizeof(s_buf) - 1)) {
+ s_buf[sizeof(s_buf) - 2] = '\n';
+ } else if (s_buf[rv - 1] != '\n') {
+ s_buf[rv] = '\n';
+ s_buf[rv + 1] = '\0';
+ }
+ fprintf(file, "%s [%s]: %s", t_buf, log_level_str[level], s_buf);
+ } else {
+ vfprintf(file, format, ap);
+ }
fflush(file);
}
void log_printf(int level, const char *format, ...) {
va_list ap;
- FILE *file = log_file ? log_file : stderr;
if (level > log_level) return;
va_start(ap, format);
- log_vfprintf(level, file, format, ap);
+ log_vprintf(level, format, ap);
va_end(ap);
}
static void fail(const char *format, ...) {
va_list ap;
- FILE *file;
va_start(ap, format);
- if (proc_forked) {
- file = log_file ? log_file : stderr;
- log_vfprintf(LOG_ERR, file, format, ap);
- } else {
- file = stderr;
- vfprintf(file, format, ap);
- }
+ log_vprintf(LOG_ERR, format, ap);
va_end(ap);
- fflush(file);
exit(1);
}
@@ -196,6 +206,9 @@ static void usage(char *arg) {
fprintf(stderr, "\t-d\n");
fprintf(stderr, "\t\tdetach\n");
+ fprintf(stderr, "\t-t <number>\n");
+ fprintf(stderr, "\t\tstart <number> receiver threads (maximum: %d)\n", MAX_THREADS);
+
fprintf(stderr, "\t-l <level>\n");
fprintf(stderr, "\t\tset log level: 0 - error, 1 - info, 2 - debug\n");
@@ -221,7 +234,7 @@ static void daemonize(void) {
proc_forked = 1;
- pid = setsid()
+ pid = setsid();
if (pid < 0) fail("setsid failed\n");
/* second fork */
@@ -229,14 +242,20 @@ static void daemonize(void) {
if (pid < 0) fail("fork2 failed\n");
if (pid > 0) exit(0);
- /* redirect stdio to /dev/null */
+ /* redirect /dev/null to stdin */
null_rd = open("/dev/null", O_RDONLY);
if (null_rd < 0) fail("open (r) /dev/null failed\n");
- null_wr = open("/dev/null", O_WRONLY);
- if (null_wr < 0) fail("open (w) /dev/null failed\n");
-
rv = dup2(null_rd, STDIN_FILENO);
if (rv < 0) fail("dup2 (stdin) failed\n");
+ close(null_rd);
+
+ /* redirect stdout and stderr to srv_config.log_file or /dev/null */
+ if (srv_config.log_file) {
+ null_wr = fileno(srv_config.log_file);
+ } else {
+ null_wr = open("/dev/null", O_WRONLY);
+ if (null_wr < 0) fail("open (w) /dev/null failed\n");
+ }
rv = dup2(null_wr, STDOUT_FILENO);
if (rv < 0) fail("dup2 (stdout) failed\n");
@@ -244,44 +263,89 @@ static void daemonize(void) {
rv = dup2(null_wr, STDERR_FILENO);
if (rv < 0) fail("dup2 (stderr) failed\n");
- close(null_rd);
- close(null_wr);
+ if (srv_config.log_file == NULL) close(null_wr);
+}
+
+static void *_ecp_receiver(void *arg) {
+ ecp_receiver((ECPSocket *)arg);
+ return NULL;
+}
+
+static int start_receivers(ECPSocket *sock) {
+ int i, rv;
+
+ for (i=0; i<srv_config.rcvr_thd_num; i++) {
+ rv = pthread_create(&rcvr_thd[i], NULL, _ecp_receiver, sock);
+ if (rv) return ECP_ERR;
+ }
+ return ECP_OK;
+}
+
+static void dir_result(ECPSocket *sock, ECPDirList *list, int rv) {
+ unsigned int dir_cnt;
+ int i;
+
+ if (rv) fail("Fetching directory listing failed err:%d\n", rv);
+
+ dir_cnt = 0;
+ for (i=0; i<list->count; i++) {
+ if (list->items[i].roles & ECP_ROLE_DIR) dir_cnt++;
+ }
+ rv = dir_init_dir_cnt(dir_cnt, list->serial);
+ if (rv) fail("Bad timing\n");
+
+ for (i=0; i<list->count; i++) {
+ dir_process_item(&list->items[i], sock, NULL);
+ }
+ ecp_dir_list_destroy(list);
+
+ pthread_mutex_lock(&dir_sync_mutex);
+ dir_sync_res = 1;
+ pthread_cond_signal(&dir_sync_cond);
+ pthread_mutex_unlock(&dir_sync_mutex);
}
int main(int argc, char *argv[]) {
char *endptr;
- char *log_fn;
- uid_t uid;
- gid_t gid;
- int detach;
+ unsigned int rcvr_num;
char *init_switch;
- ecp_tr_addr_t my_addr;
ECPNode node;
ECPContext ctx;
ECPSocket sock;
+ ECPConnection dir_conn;
ECPConnHandler vconn_handler;
ECPConnHandler vlink_handler;
+ ECPConnHandler dir_handler;
int _argi, _argc, fd;
int rv;
memset(&srv_config, 0, sizeof(srv_config));
memset(&node, 0, sizeof(node));
- if (argc < 1) fail("Bad arguments\n");
+ if (argc < 1) fail("Bad argument count\n");
_argc = argc - 1;
_argi = 1;
- log_fn = NULL;
- detach = 0;
- uid = 0;
- gid = 0;
+ srv_config.rcvr_thd_num = 1;
while (_argc && (argv[_argi][0] == '-')) {
switch (argv[_argi][1]) {
case 'd': {
_argi++;
_argc--;
- detach = 1;
+ srv_config.detach = 1;
+ break;
+ }
+
+ case 't': {
+ _argi++;
+ _argc--;
+ if (_argc == 0) usage(argv[0]);
+ srv_config.rcvr_thd_num = (unsigned int)strtol(argv[_argi], &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad number of receiver threads: %s\n", argv[_argi]);
+ if (srv_config.rcvr_thd_num > MAX_THREADS) fail ("Maximum number of threads allowed: %d\n", MAX_THREADS);
+ _argi++;
+ _argc--;
break;
}
@@ -289,9 +353,9 @@ int main(int argc, char *argv[]) {
_argi++;
_argc--;
if (_argc == 0) usage(argv[0]);
- log_fn = strdup(argv[_argi]);
log_level = (uint8_t)strtol(argv[_argi], &endptr, 10);
- if ((endptr[0] != '\0') || log_level > LOG_MAX_LEVEL) fail("Bad log level\n");
+ if (endptr[0] != '\0') fail("Bad log level: %s\n", argv[_argi]);
+ if (log_level > LOG_MAX_LEVEL) fail("Maximum log level allowed: %d\n", LOG_MAX_LEVEL);
_argi++;
_argc--;
break;
@@ -301,7 +365,7 @@ int main(int argc, char *argv[]) {
_argi++;
_argc--;
if (_argc == 0) usage(argv[0]);
- log_fn = strdup(argv[_argi]);
+ srv_config.log_fn = strdup(argv[_argi]);
_argi++;
_argc--;
break;
@@ -311,8 +375,8 @@ int main(int argc, char *argv[]) {
_argi++;
_argc--;
if (_argc == 0) usage(argv[0]);
- uid = (uid_t)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad uid\n");
+ srv_config.uid = (uid_t)strtol(argv[_argi], &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad uid: %s\n", argv[_argi]);
_argi++;
_argc--;
break;
@@ -322,8 +386,8 @@ int main(int argc, char *argv[]) {
_argi++;
_argc--;
if (_argc == 0) usage(argv[0]);
- gid = (gid_t)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad gid\n");
+ srv_config.gid = (gid_t)strtol(argv[_argi], &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad gid: %s\n", argv[_argi]);
_argi++;
_argc--;
break;
@@ -337,28 +401,28 @@ int main(int argc, char *argv[]) {
if (_argc < 4) usage(argv[0]);
srv_config.region = (uint8_t)strtol(argv[_argi], &endptr, 16);
- if (endptr[0] != '\0') fail("Bad region\n");
- if (srv_config.region >= MAX_REGION) fail("Bad region\n");
+ if (endptr[0] != '\0') fail("Bad region: %s\n", argv[_argi]);
+ if (srv_config.region >= MAX_REGION) fail("Bad region: %d\n", srv_config.region);
_argi++;
_argc--;
srv_config.roles = (uint8_t)strtol(argv[_argi], &endptr, 16);
- if (endptr[0] != '\0') fail("Bad roles\n");
+ if (endptr[0] != '\0') fail("Bad roles: %s\n", argv[_argi]);
_argi++;
_argc--;
fd = open(argv[_argi], O_RDONLY);
- if (fd < 0) fail("Unable to open %s\n", argv[_argi]);
+ if (fd < 0) fail("Unable to open key file: %s\n", argv[_argi]);
- rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private);
+ rv = ecp_util_read_key(fd, &srv_config.key_perma.public, &srv_config.key_perma.private, NULL);
close(fd);
- if (rv) fail("Unable to read key from %s\n", argv[_argi]);
+ if (rv) fail("Bad key file: %s\n", argv[_argi]);
srv_config.key_perma.valid = 1;
_argi++;
_argc--;
- rv = ecp_addr_init(&my_addr, argv[_argi]);
- if (rv) fail("ecp_addr_init err:%d\n", rv);
+ rv = ecp_addr_init(&srv_config.my_addr, argv[_argi]);
+ if (rv) fail("Bad local address: %s\n", argv[_argi]);
_argi++;
_argc--;
@@ -376,25 +440,25 @@ int main(int argc, char *argv[]) {
_argc--;
rv = acl_load();
- if (rv) fail("acl_load err:%d\n", rv);
+ if (rv) fail("ACL load failed\n");
}
if (_argc == 2) {
ecp_ecdh_public_t node_pub;
fd = open(argv[_argi], O_RDONLY);
- if (fd < 0) fail("Unable to open %s\n", argv[_argi]);
+ if (fd < 0) fail("Unable to open public key file: %s\n", argv[_argi]);
- rv = ecp_util_read_key(fd, &node_pub, NULL);
+ rv = ecp_util_read_key(fd, &node_pub, NULL, NULL);
close(fd);
- if (rv) fail("Unable to read public key from %s\n", argv[_argi]);
+ if (rv) fail("Bad public key file: %s\n", argv[_argi]);
_argi++;
_argc--;
ecp_node_init(&node, &node_pub, NULL);
rv = ecp_node_set_addr(&node, argv[_argi]);
- if (rv) fail("ecp_node_set_addr err:%d\n", rv);
+ if (rv) fail("Bad remote address: %s\n", argv[_argi]);
_argi++;
_argc--;
} else if (_argc) {
@@ -402,12 +466,12 @@ int main(int argc, char *argv[]) {
}
umask(077); /* rw for owner only */
- if (log_fn) {
- log_file = fopen(log_fn, "a");
- if (log_file == NULL) fail("Unable to open log file: %s\n", log_fn);
+ if (srv_config.log_fn) {
+ srv_config.log_file = fopen(srv_config.log_fn, "a");
+ if (srv_config.log_file == NULL) fail("Unable to open log file: %s\n", srv_config.log_fn);
}
- if (detach) daemonize();
+ if (srv_config.detach) daemonize();
rv = sig_init();
if (rv) fail("sig_init err:%d\n", rv);
@@ -424,17 +488,16 @@ int main(int argc, char *argv[]) {
rv = ecp_vconn_sock_create(&sock);
if (rv) fail("ecp_vconn_sock_create err:%d\n", rv);
- rv = ecp_sock_open(&sock, &my_addr);
+ rv = ecp_sock_open(&sock, &srv_config.my_addr);
if (rv) fail("ecp_sock_open err:%d\n", rv);
- srv_config.my_addr = my_addr;
- if (uid || gid) {
- if (gid) {
- rv = setgid(gid);
+ if (srv_config.uid || srv_config.gid) {
+ if (srv_config.gid) {
+ rv = setgid(srv_config.gid);
if (rv) fail("Unable to set group id\n");
}
- if (uid) {
- rv = setuid(uid);
+ if (srv_config.uid) {
+ rv = setuid(srv_config.uid);
if (rv) fail("Unable to set user id\n");
}
}
@@ -445,24 +508,39 @@ int main(int argc, char *argv[]) {
rv = vlink_init(&sock);
if (rv) fail("vlink_init err:%d\n", rv);
- rv = ecp_start_receiver(&sock);
- if (rv) fail("ecp_start_receiver err:%d\n", rv);
+ rv = start_receivers(&sock);
+ if (rv) fail("start_receivers err:%d\n", rv);
+ init_switch = getenv("ECP_INITSW");
if (node.key_perma.valid) {
- rv = dir_init_ann(&sock, &node);
- if (rv) fail("dir_init_ann err:%d\n", rv);
+ if (init_switch == NULL) {
+ rv = ecp_dir_set_handler(&ctx, &dir_handler, dir_result);
+ if (rv) fail("ecp_dir_set_handler err:%d\n", rv);
+
+ rv = ecp_dir_get(&dir_conn, &sock, &node, 0);
+ if (rv) fail("ecp_dir_get err:%d\n", rv);
+
+ pthread_mutex_lock(&dir_sync_mutex);
+ while (dir_sync_res == 0) {
+ pthread_cond_wait(&dir_sync_cond, &dir_sync_mutex);
+ }
+ pthread_mutex_unlock(&dir_sync_mutex);
+ LOG(LOG_DEBUG, "ecp_dir_get: done\n");
+ } else {
+ rv = dir_init_ann(&sock, &node);
+ if (rv) fail("dir_init_ann err:%d\n", rv);
+ }
}
- init_switch = getenv("ECP_INITSW");
if (init_switch) {
int init_ann;
init_ann = (int)strtol(init_switch, &endptr, 10);
- if (endptr[0] != '\0') fail("Bad environment");
+ if (endptr[0] != '\0') fail("Bad environment: ECP_INITSW:%s\n", init_switch);
- LOG(LOG_DEBUG, "init switch start - number of announces:%d\n", init_ann);
+ LOG(LOG_DEBUG, "dir_init_switch: start - number of announces:%d\n", init_ann);
dir_init_switch(&sock, init_ann);
- LOG(LOG_DEBUG, "init switch done\n");
+ LOG(LOG_DEBUG, "dir_init_switch: done\n");
}
rv = dir_start_announce(&sock);