aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bin/nfcollect.c84
-rw-r--r--include/main.h2
2 files changed, 74 insertions, 12 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c
index 3afd05c..2c88ad1 100644
--- a/bin/nfcollect.c
+++ b/bin/nfcollect.c
@@ -26,6 +26,7 @@
#include "collect.h"
#include "commit.h"
#include "common.h"
+#include <dirent.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
@@ -33,6 +34,7 @@
#include <stdint.h>
#include <stdio.h>
#include <sys/stat.h>
+#include <sys/types.h>
#include <unistd.h>
const char *help_text =
@@ -45,20 +47,25 @@ const char *help_text =
" -h --help print this help\n"
" -g --nflog-group=<id> the group id to collect\n"
" -p --parallelism=<num> max number of committer thread\n"
+ " -t --truncate whether or not to truncate existing trunks"
+ " (default: no)\n"
" -s --storage_size=<dirsize> log files maximum total size in MiB\n"
" -v --version print version information\n"
"\n";
-void sig_handler(int signo) {
+static uint32_t calculate_starting_trunk(const char *storage_dir);
+
+static void sig_handler(int signo) {
if (signo == SIGHUP)
puts("Terminated due to SIGHUP ...");
}
int main(int argc, char *argv[]) {
-
- uint32_t i, max_commit_worker = 0, storage_size = 0;
+ uint32_t max_commit_worker = 0, storage_size = 0;
uint32_t trunk_cnt = 0, trunk_size = 0;
- uint32_t entries_max;
+ uint32_t entries_max, cur_trunk;
+ bool truncate_trunks = false;
+
nfl_global_t g;
int nfl_group_id = -1;
char *compression_flag = NULL, *storage_dir = NULL;
@@ -69,6 +76,7 @@ int main(int argc, char *argv[]) {
{"storage_size", required_argument, NULL, 's'},
{"compression", optional_argument, NULL, 'z'},
{"parallelism", optional_argument, NULL, 'p'},
+ {"truncate", no_argument, NULL, 't'},
{"help", no_argument, NULL, 'h'},
{"version", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
@@ -85,6 +93,9 @@ int main(int argc, char *argv[]) {
printf("%s %s", PACKAGE, VERSION);
exit(0);
break;
+ case 't':
+ truncate_trunks = true;
+ break;
case 'c':
compression_flag = optarg;
break;
@@ -140,15 +151,29 @@ int main(int argc, char *argv[]) {
nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *));
nfl_commit_init(trunk_cnt);
- debug("Worker started, entries_max = %d, trunk_cnt = %d", entries_max,
- trunk_cnt);
- for (i = 0;; i = NEXT(i, trunk_cnt)) {
- debug("Running receiver worker: id = %d", i);
- nfl_state_init(&(trunks[i]), i, entries_max, &g);
- pthread_create(&(trunks[i]->thread), NULL, nfl_collect_worker,
- (void *)trunks[i]);
+ info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir,
+ storage_size);
+ info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d",
+ entries_max, trunk_cnt);
+
+ if (truncate_trunks) {
+ cur_trunk = 0;
+ info(PACKAGE ": requested to truncate (overwrite) trunks in %s",
+ storage_dir);
+ } else {
+ cur_trunk = NEXT(calculate_starting_trunk(storage_dir), trunk_cnt);
+ const char *fn = nfl_get_filename(storage_dir, cur_trunk);
+ info(PACKAGE ": will start writing to trunk %s and onward", fn);
+ free((char *)fn);
+ }
+
+ for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) {
+ debug("Running receiver worker: id = %d", cur_trunk);
+ nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g);
+ pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker,
+ (void *)trunks[cur_trunk]);
// wait for current receiver worker
- pthread_join(trunks[i]->thread, NULL);
+ pthread_join(trunks[cur_trunk]->thread, NULL);
}
// Won't reach here
@@ -156,3 +181,38 @@ int main(int argc, char *argv[]) {
// sem_destroy(&nfl_commit_queue);
exit(0);
}
+
+/*
+ * Need to find a trunk to start with after a restart
+ * We choose the one with newest modification time.
+ */
+static uint32_t calculate_starting_trunk(const char *storage_dir) {
+ DIR *dp;
+ struct stat stat;
+ struct dirent *ep;
+ time_t newest = (time_t)0;
+ uint32_t newest_index = 0;
+ int index;
+ char cwd[100];
+
+ ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory");
+
+ getcwd(cwd, sizeof(cwd));
+ chdir(storage_dir);
+
+ while ((ep = readdir(dp))) {
+ const char *fn = ep->d_name;
+ index = nfl_storage_match_index(fn);
+ if (index >= 0 && index < MAX_TRUNK_ID) {
+ ERR(lstat(fn, &stat) < 0, fn);
+ if (difftime(stat.st_mtime, newest) > 0) {
+ newest = stat.st_mtime;
+ newest_index = (uint32_t)index;
+ }
+ }
+ }
+
+ closedir(dp);
+ chdir(cwd);
+ return newest_index;
+}
diff --git a/include/main.h b/include/main.h
index 575271d..d33d499 100644
--- a/include/main.h
+++ b/include/main.h
@@ -73,6 +73,8 @@
fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \
}
+#define info(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__);
+
#define likely(x) __builtin_expect((x), 1)
#define unlikely(x) __builtin_expect((x), 0)