summaryrefslogtreecommitdiff
path: root/ecp/server
diff options
context:
space:
mode:
authorUros Majstorovic <majstor@majstor.org>2024-06-06 22:33:46 +0200
committerUros Majstorovic <majstor@majstor.org>2024-06-06 22:33:46 +0200
commit4a6d383192ac59195cfe927f5a0b1eb104da5550 (patch)
tree67a787120f0332f619ee333b1b9f65393acfc563 /ecp/server
parent109f39e09630409a30a9f4e8183f539c499f07ba (diff)
open / keyx sync implemented; vconn close fixed; rendezvous hashing fixed
Diffstat (limited to 'ecp/server')
-rw-r--r--ecp/server/dir.c8
-rw-r--r--ecp/server/dir.h4
-rw-r--r--ecp/server/server.c173
-rw-r--r--ecp/server/server.h10
-rw-r--r--ecp/server/sig.c4
-rw-r--r--ecp/server/vlink.h1
6 files changed, 127 insertions, 73 deletions
diff --git a/ecp/server/dir.c b/ecp/server/dir.c
index 8064d0c..a43ff91 100644
--- a/ecp/server/dir.c
+++ b/ecp/server/dir.c
@@ -408,7 +408,7 @@ void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *
if (node == NULL) {
pthread_rwlock_unlock(&dir_shadow_rwlock);
- rv = dir_create_node(dir_item, sock, &node);
+ rv = dir_create_node(dir_item, &node);
if (!rv) {
pthread_rwlock_wrlock(&dir_shadow_rwlock);
if (ecp_ht_count(dir_shadow) > MAX_DIR_ITEM) rv = ECP_ERR_FULL;
@@ -512,7 +512,7 @@ int dir_open_conn(DIRNode *node, ECPSocket *sock) {
return rv;
}
-int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node) {
+int dir_create_node(ECPDirItem *dir_item, DIRNode **node) {
DIRNode *_node;
int rv;
@@ -543,7 +543,7 @@ void dir_destroy_node(DIRNode *node) {
static int online_switch_expired(ECPConnection *conn, ecp_sts_t now) {
if (conn->type == CTYPE_DIR) return 1;
- return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO);
+ return conn_expired(conn, now);
}
static void remove_nodes(DIRNode *remove_node[], int remove_cnt) {
@@ -737,6 +737,8 @@ void dir_announce_allow(void) {
pthread_rwlock_wrlock(&dir_timer_rwlock);
dir_process_enable = PROC_ALLOW_ALL;
pthread_rwlock_unlock(&dir_timer_rwlock);
+
+ LOG(LOG_DEBUG, "dir_announce_allow\n");
}
void dir_announce_block(void) {
diff --git a/ecp/server/dir.h b/ecp/server/dir.h
index e5eba75..879f1eb 100644
--- a/ecp/server/dir.h
+++ b/ecp/server/dir.h
@@ -21,7 +21,7 @@
#define PROC_ALLOW_ALL 2
#define ANN_PERIOD 600 /* announce priod (s); can't exceed 1h */
-#define CONN_EXPIRE_TO 60
+#define DIR_EXPIRE_TO 60
#define DIR_UFLAG_RECONNECT 0x80
@@ -61,7 +61,7 @@ ssize_t dir_handle_msg(struct ECPConnection *conn, ecp_seq_t seq, unsigned char
void dir_process_item(ECPDirItem *dir_item, ECPSocket *sock, ecp_ecdh_public_t *s_public);
int dir_open_conn(DIRNode *node, ECPSocket *sock);
-int dir_create_node(ECPDirItem *dir_item, ECPSocket *sock, DIRNode **node);
+int dir_create_node(ECPDirItem *dir_item, DIRNode **node);
void dir_destroy_node(DIRNode *node);
void dir_online_switch(ECPSocket *sock, int inc_serial);
diff --git a/ecp/server/server.c b/ecp/server/server.c
index fd8b317..91287e2 100644
--- a/ecp/server/server.c
+++ b/ecp/server/server.c
@@ -24,13 +24,12 @@
static SRVConfig srv_config;
static int proc_forked = 0;
-static pthread_t rcvr_thd[MAX_THREADS];
+
+/* dir client result sync */
static pthread_mutex_t dir_sync_mutex;
static pthread_cond_t dir_sync_cond;
static int dir_sync_res = 0;
-static FILE *log_file = NULL;
-int log_level;
static const char *log_level_str[] = {
"ERROR",
"INFO",
@@ -131,7 +130,7 @@ static int core_logger(const char *format, ...) {
return 0;
}
-int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) {
+static int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vlink_handler) {
int rv;
rv = ecp_ctx_init(ctx, conn_auth, conn_new, conn_free, handle_err, core_logger);
@@ -146,10 +145,29 @@ int ecp_init(ECPContext *ctx, ECPConnHandler *vconn_handler, ECPConnHandler *vli
return ECP_OK;
}
+int conn_expired(ECPConnection *conn, ecp_sts_t now) {
+ switch (conn->type) {
+ case CTYPE_DIR:
+ return _ecp_conn_is_zombie(conn, now, ANN_PERIOD * 3);
+ case ECP_CTYPE_VLINK:
+ if (conn->parent) {
+ return _ecp_conn_is_zombie(conn, now, VCONN_EXPIRE_TO);
+ } else {
+ return _ecp_conn_is_zombie(conn, now, KEYX_PERIOD * 3);
+ }
+ return _ecp_conn_is_zombie(conn, now, KEYX_PERIOD * 3);
+ case ECP_CTYPE_VCONN:
+ return _ecp_conn_is_zombie(conn, now, VCONN_EXPIRE_TO);
+ case ECP_CTYPE_DIR:
+ return _ecp_conn_is_zombie(conn, now, DIR_EXPIRE_TO);
+ }
+ return 0;
+}
+
void log_vprintf(int level, const char *format, va_list ap) {
FILE *file;
- if (level > log_level) return;
+ if (level > srv_config.log_level) return;
if (level >= (sizeof(log_level_str) / sizeof(char *))) return;
file = srv_config.log_file ? srv_config.log_file : stderr;
@@ -183,7 +201,7 @@ void log_vprintf(int level, const char *format, va_list ap) {
void log_printf(int level, const char *format, ...) {
va_list ap;
- if (level > log_level) return;
+ if (level > srv_config.log_level) return;
va_start(ap, format);
log_vprintf(level, format, ap);
@@ -275,29 +293,30 @@ static int start_receivers(ECPSocket *sock) {
int i, rv;
for (i=0; i<srv_config.rcvr_thd_num; i++) {
- rv = pthread_create(&rcvr_thd[i], NULL, _ecp_receiver, sock);
+ rv = pthread_create(&srv_config.rcvr_thd[i], NULL, _ecp_receiver, sock);
if (rv) return ECP_ERR;
}
return ECP_OK;
}
-static void dir_result(ECPSocket *sock, ECPDirList *list, int rv) {
+static void dir_result(ECPConnection *conn, void *list, int rv) {
+ ECPDirList *dir_list = list;
unsigned int dir_cnt;
int i;
if (rv) fail("Fetching directory listing failed err:%d\n", rv);
dir_cnt = 0;
- for (i=0; i<list->count; i++) {
- if (list->items[i].roles & ECP_ROLE_DIR) dir_cnt++;
+ for (i=0; i<dir_list->count; i++) {
+ if (dir_list->items[i].roles & ECP_ROLE_DIR) dir_cnt++;
}
- rv = dir_init_dir_cnt(dir_cnt, list->serial);
+ rv = dir_init_dir_cnt(dir_cnt, dir_list->serial);
if (rv) fail("Bad timing\n");
- for (i=0; i<list->count; i++) {
- dir_process_item(&list->items[i], sock, NULL);
+ for (i=0; i<dir_list->count; i++) {
+ dir_process_item(&dir_list->items[i], srv_config.sock, NULL);
}
- ecp_dir_list_destroy(list);
+ ecp_dir_list_destroy(dir_list);
pthread_mutex_lock(&dir_sync_mutex);
dir_sync_res = 1;
@@ -306,16 +325,16 @@ static void dir_result(ECPSocket *sock, ECPDirList *list, int rv) {
}
int main(int argc, char *argv[]) {
- char *endptr;
- char *init_switch;
ecp_ecdh_public_t remote_pub;
ecp_tr_addr_t remote_addr;
ECPContext ctx;
ECPSocket sock;
- ECPConnection dir_conn;
ECPConnHandler vconn_handler;
ECPConnHandler vlink_handler;
- ECPConnHandler dir_handler;
+ char opt;
+ char *opt_arg;
+ char *endptr;
+ char *init_switch;
int _argi, _argc, remote, fd;
int rv;
@@ -328,72 +347,74 @@ int main(int argc, char *argv[]) {
srv_config.rcvr_thd_num = 1;
while (_argc && (argv[_argi][0] == '-')) {
- switch (argv[_argi][1]) {
+ if (strlen(argv[_argi]) < 2) usage(argv[0]);
+ opt = argv[_argi][1];
+ switch (opt) {
case 'd': {
_argi++;
_argc--;
+ break;
+ }
+
+ case 't':
+ case 'l':
+ case 'f':
+ case 'u':
+ case 'g': {
+ if (strlen(argv[_argi]) > 2) {
+ opt_arg = argv[_argi] + 2;
+ _argi++;
+ _argc--;
+ } else {
+ _argi++;
+ _argc--;
+ if (_argc == 0) usage(argv[0]);
+ opt_arg = argv[_argi];
+ _argi++;
+ _argc--;
+ }
+ break;
+ }
+
+ default:
+ usage(argv[0]);
+ }
+ switch (opt) {
+ case 'd': {
srv_config.detach = 1;
break;
}
case 't': {
- _argi++;
- _argc--;
- if (_argc == 0) usage(argv[0]);
- srv_config.rcvr_thd_num = (unsigned int)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad number of receiver threads: %s\n", argv[_argi]);
+ srv_config.rcvr_thd_num = (unsigned int)strtol(opt_arg, &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad number of receiver threads: %s\n", opt_arg);
if (srv_config.rcvr_thd_num > MAX_THREADS) fail ("Maximum number of threads allowed: %d\n", MAX_THREADS);
- _argi++;
- _argc--;
break;
}
case 'l': {
- _argi++;
- _argc--;
- if (_argc == 0) usage(argv[0]);
- log_level = (uint8_t)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad log level: %s\n", argv[_argi]);
- if (log_level > LOG_MAX_LEVEL) fail("Maximum log level allowed: %d\n", LOG_MAX_LEVEL);
- _argi++;
- _argc--;
+ srv_config.log_level = (unsigned int)strtol(opt_arg, &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad log level: %s\n", opt_arg);
+ if (srv_config.log_level > LOG_MAX_LEVEL) fail("Maximum log level allowed: %d\n", LOG_MAX_LEVEL);
break;
}
case 'f': {
- _argi++;
- _argc--;
- if (_argc == 0) usage(argv[0]);
- srv_config.log_fn = strdup(argv[_argi]);
- _argi++;
- _argc--;
+ srv_config.log_fn = strdup(opt_arg);
break;
}
case 'u': {
- _argi++;
- _argc--;
- if (_argc == 0) usage(argv[0]);
- srv_config.uid = (uid_t)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad uid: %s\n", argv[_argi]);
- _argi++;
- _argc--;
+ srv_config.uid = (uid_t)strtol(opt_arg, &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad uid: %s\n", opt_arg);
break;
}
case 'g': {
- _argi++;
- _argc--;
- if (_argc == 0) usage(argv[0]);
- srv_config.gid = (gid_t)strtol(argv[_argi], &endptr, 10);
- if (endptr[0] != '\0') fail("Bad gid: %s\n", argv[_argi]);
- _argi++;
- _argc--;
+ srv_config.gid = (gid_t)strtol(opt_arg, &endptr, 10);
+ if (endptr[0] != '\0') fail("Bad gid: %s\n", opt_arg);
break;
}
-
- default:
- usage(argv[0]);
}
}
@@ -487,6 +508,7 @@ int main(int argc, char *argv[]) {
rv = ecp_sock_open(&sock, &srv_config.my_addr);
if (rv) fail("ecp_sock_open err:%d\n", rv);
+ srv_config.sock = &sock;
if (srv_config.uid || srv_config.gid) {
if (srv_config.gid) {
@@ -511,11 +533,37 @@ int main(int argc, char *argv[]) {
init_switch = getenv("ECP_INITSW");
if (remote) {
if (init_switch == NULL) {
+ ECPContext ctx;
+ ECPSocket sock;
+ ECPConnHandler dir_handler;
+ ECPConnection *dir_conn;
+
+ rv = pthread_mutex_init(&dir_sync_mutex, NULL);
+ if (rv) fail("pthread_mutex_init (client) failed\n");
+
+ rv = pthread_cond_init(&dir_sync_cond, NULL);
+ if (rv) fail("pthread_cond_init (client) failed\n");
+
+ rv = ecp_ctx_init(&ctx, NULL, NULL, (ecp_conn_free_t)free, handle_err, core_logger);
+ if (rv) fail("ecp_ctx_init (client) err:%d\n", rv);
+
rv = ecp_dir_set_handler(&ctx, &dir_handler, dir_result);
- if (rv) fail("ecp_dir_set_handler err:%d\n", rv);
+ if (rv) fail("ecp_dir_set_handler (client) err:%d\n", rv);
+
+ rv = ecp_sock_create(&sock, &ctx, NULL);
+ if (rv) fail("ecp_sock_create (client) err:%d\n", rv);
+
+ rv = ecp_sock_open(&sock, NULL);
+ if (rv) fail("ecp_sock_open (client) err:%d\n", rv);
+
+ rv = ecp_start_receiver(&sock);
+ if (rv) fail("ecp_start_receiver (client) err:%d\n", rv);
- rv = ecp_dir_get(&dir_conn, &sock, &remote_pub, &remote_addr, 0);
- if (rv) fail("ecp_dir_get err:%d\n", rv);
+ dir_conn = malloc(sizeof(ECPConnection));
+ if (dir_conn == NULL) fail("dir_conn malloc (client) failed\n");
+
+ rv = ecp_dir_get(dir_conn, &sock, &remote_pub, &remote_addr, 0);
+ if (rv) fail("ecp_dir_get (client) err:%d\n", rv);
pthread_mutex_lock(&dir_sync_mutex);
while (dir_sync_res == 0) {
@@ -523,6 +571,11 @@ int main(int argc, char *argv[]) {
}
pthread_mutex_unlock(&dir_sync_mutex);
LOG(LOG_DEBUG, "ecp_dir_get: done\n");
+
+ rv = ecp_stop_receiver(&sock);
+ if (rv) fail("ecp_stop_receiver (client) err:%d\n", rv);
+
+ ecp_sock_close(&sock);
} else {
rv = dir_init_ann(&sock, &remote_pub, &remote_addr);
if (rv) fail("dir_init_ann err:%d\n", rv);
diff --git a/ecp/server/server.h b/ecp/server/server.h
index c17fa62..e8705a7 100644
--- a/ecp/server/server.h
+++ b/ecp/server/server.h
@@ -9,21 +9,23 @@
typedef struct SRVConfig {
ECPDHKey key_perma;
+ ECPSocket *sock;
char *acl_fn;
char *acl_fn_dir;
ecp_tr_addr_t my_addr;
uint8_t region;
uint8_t roles;
int detach;
- pthread_t rcvr_thd[MAX_THREADS];
unsigned int rcvr_thd_num;
- uid_t uid;
- gid_t gid;
- int log_level;
+ pthread_t rcvr_thd[MAX_THREADS];
+ unsigned int log_level;
char *log_fn;
FILE *log_file;
+ uid_t uid;
+ gid_t gid;
} SRVConfig;
SRVConfig *srv_get_config(void);
+int conn_expired(ECPConnection *conn, ecp_sts_t now);
void log_vprintf(int level, const char *format, va_list ap);
void log_printf(int level, const char *format, ...);
diff --git a/ecp/server/sig.c b/ecp/server/sig.c
index 6128838..397fef1 100644
--- a/ecp/server/sig.c
+++ b/ecp/server/sig.c
@@ -15,10 +15,6 @@
static pthread_t sig_handler_thd;
static sigset_t sig_set;
-static int conn_expired(ECPConnection *conn, ecp_sts_t now) {
- return _ecp_conn_is_zombie(conn, now, CONN_EXPIRE_TO);
-}
-
static void * _sig_handler(void *arg) {
ECPSocket *sock = arg;
int rv, sig;
diff --git a/ecp/server/vlink.h b/ecp/server/vlink.h
index b591d11..4affada 100644
--- a/ecp/server/vlink.h
+++ b/ecp/server/vlink.h
@@ -1,6 +1,7 @@
#define MAX_KEYX_CNT 100
#define KEYX_PERIOD 600 /* key exchange priod (s); can't exceed 1h */
+#define VCONN_EXPIRE_TO 7200
#define VLINK_UFLAG_DISCONNECT 0x80