diff options
-rw-r--r-- | bin/nfcollect.c | 84 | ||||
-rw-r--r-- | include/main.h | 2 |
2 files changed, 74 insertions, 12 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c index 3afd05c..2c88ad1 100644 --- a/bin/nfcollect.c +++ b/bin/nfcollect.c @@ -26,6 +26,7 @@ #include "collect.h" #include "commit.h" #include "common.h" +#include <dirent.h> #include <fcntl.h> #include <getopt.h> #include <pthread.h> @@ -33,6 +34,7 @@ #include <stdint.h> #include <stdio.h> #include <sys/stat.h> +#include <sys/types.h> #include <unistd.h> const char *help_text = @@ -45,20 +47,25 @@ const char *help_text = " -h --help print this help\n" " -g --nflog-group=<id> the group id to collect\n" " -p --parallelism=<num> max number of committer thread\n" + " -t --truncate whether or not to truncate existing trunks" + " (default: no)\n" " -s --storage_size=<dirsize> log files maximum total size in MiB\n" " -v --version print version information\n" "\n"; -void sig_handler(int signo) { +static uint32_t calculate_starting_trunk(const char *storage_dir); + +static void sig_handler(int signo) { if (signo == SIGHUP) puts("Terminated due to SIGHUP ..."); } int main(int argc, char *argv[]) { - - uint32_t i, max_commit_worker = 0, storage_size = 0; + uint32_t max_commit_worker = 0, storage_size = 0; uint32_t trunk_cnt = 0, trunk_size = 0; - uint32_t entries_max; + uint32_t entries_max, cur_trunk; + bool truncate_trunks = false; + nfl_global_t g; int nfl_group_id = -1; char *compression_flag = NULL, *storage_dir = NULL; @@ -69,6 +76,7 @@ int main(int argc, char *argv[]) { {"storage_size", required_argument, NULL, 's'}, {"compression", optional_argument, NULL, 'z'}, {"parallelism", optional_argument, NULL, 'p'}, + {"truncate", no_argument, NULL, 't'}, {"help", no_argument, NULL, 'h'}, {"version", no_argument, NULL, 'v'}, {0, 0, 0, 0}}; @@ -85,6 +93,9 @@ int main(int argc, char *argv[]) { printf("%s %s", PACKAGE, VERSION); exit(0); break; + case 't': + truncate_trunks = true; + break; case 'c': compression_flag = optarg; break; @@ -140,15 +151,29 @@ int main(int argc, char *argv[]) { nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *)); nfl_commit_init(trunk_cnt); - debug("Worker started, entries_max = %d, trunk_cnt = %d", entries_max, - trunk_cnt); - for (i = 0;; i = NEXT(i, trunk_cnt)) { - debug("Running receiver worker: id = %d", i); - nfl_state_init(&(trunks[i]), i, entries_max, &g); - pthread_create(&(trunks[i]->thread), NULL, nfl_collect_worker, - (void *)trunks[i]); + info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir, + storage_size); + info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d", + entries_max, trunk_cnt); + + if (truncate_trunks) { + cur_trunk = 0; + info(PACKAGE ": requested to truncate (overwrite) trunks in %s", + storage_dir); + } else { + cur_trunk = NEXT(calculate_starting_trunk(storage_dir), trunk_cnt); + const char *fn = nfl_get_filename(storage_dir, cur_trunk); + info(PACKAGE ": will start writing to trunk %s and onward", fn); + free((char *)fn); + } + + for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) { + debug("Running receiver worker: id = %d", cur_trunk); + nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g); + pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker, + (void *)trunks[cur_trunk]); // wait for current receiver worker - pthread_join(trunks[i]->thread, NULL); + pthread_join(trunks[cur_trunk]->thread, NULL); } // Won't reach here @@ -156,3 +181,38 @@ int main(int argc, char *argv[]) { // sem_destroy(&nfl_commit_queue); exit(0); } + +/* + * Need to find a trunk to start with after a restart + * We choose the one with newest modification time. + */ +static uint32_t calculate_starting_trunk(const char *storage_dir) { + DIR *dp; + struct stat stat; + struct dirent *ep; + time_t newest = (time_t)0; + uint32_t newest_index = 0; + int index; + char cwd[100]; + + ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory"); + + getcwd(cwd, sizeof(cwd)); + chdir(storage_dir); + + while ((ep = readdir(dp))) { + const char *fn = ep->d_name; + index = nfl_storage_match_index(fn); + if (index >= 0 && index < MAX_TRUNK_ID) { + ERR(lstat(fn, &stat) < 0, fn); + if (difftime(stat.st_mtime, newest) > 0) { + newest = stat.st_mtime; + newest_index = (uint32_t)index; + } + } + } + + closedir(dp); + chdir(cwd); + return newest_index; +} diff --git a/include/main.h b/include/main.h index 575271d..d33d499 100644 --- a/include/main.h +++ b/include/main.h @@ -73,6 +73,8 @@ fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \ } +#define info(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__); + #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) |