aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-11-23 14:02:21 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-11-23 14:02:21 +0800
commitf7074d5b66ab1872f3735eda1736e65c5cf40bc3 (patch)
treef8c8b69dbe44fa25800fe639209a925e7f235e8d
parentc5b6d181707cade0785f84bbd2df17f8268f5a4a (diff)
downloadnfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.gz
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.bz2
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.lz
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.xz
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.tar.zst
nfcollect-f7074d5b66ab1872f3735eda1736e65c5cf40bc3.zip
Old unfinished modifications
-rw-r--r--bin/nfcollect.c33
-rw-r--r--include/commit.h2
-rw-r--r--include/main.h9
-rw-r--r--lib/collect.c6
-rw-r--r--lib/commit.c41
5 files changed, 60 insertions, 31 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c
index 160a21a..834819f 100644
--- a/bin/nfcollect.c
+++ b/bin/nfcollect.c
@@ -54,7 +54,7 @@ const char *help_text =
" -v --version print version information\n"
"\n";
-static uint32_t calculate_starting_trunk(const char *storage_dir);
+static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size);
static nfl_nl_t netlink_fd;
static void sig_handler(int signo) {
@@ -150,6 +150,10 @@ int main(int argc, char *argv[]) {
g.nfl_commit_queue = malloc(sizeof(sem_t));
sem_init(g.nfl_commit_queue, 0, max_commit_worker);
+ // Calculate storage consumed
+ pthread_mutex_init(&g.nfl_storage_consumed_lock, NULL);
+ g.nfl_storage_consumed = 0;
+
// Set up nflog receiver worker
nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *));
@@ -158,13 +162,13 @@ int main(int argc, char *argv[]) {
info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d",
entries_max, trunk_cnt);
+ calculate_starting_trunk(storage_dir, &cur_trunk, &g.nfl_storage_consumed);
if (truncate_trunks) {
cur_trunk = 0;
info(PACKAGE ": requested to truncate (overwrite) trunks in %s",
storage_dir);
} else {
- int calculated_trunk = calculate_starting_trunk(storage_dir);
- cur_trunk = calculated_trunk < 0 ? 0: NEXT(calculated_trunk, trunk_cnt);
+ cur_trunk = cur_trunk < 0 ? 0: NEXT(cur_trunk, trunk_cnt);
const char *fn = nfl_get_filename(storage_dir, cur_trunk);
info(PACKAGE ": will start writing to trunk %s and onward", fn);
free((char *)fn);
@@ -186,20 +190,24 @@ int main(int argc, char *argv[]) {
// We don't actually free trunks or the semaphore at all
sem_destroy(g.nfl_commit_queue);
nfl_close_netlink_fd(&netlink_fd);
- exit(0);
+ xit(0);
+ uint32_t start_trunk;
}
/*
- * Need to find a trunk to start with after a restart
- * We choose the one with newest modification time.
- * If no existing trunk is found, returns -1
+ * traverse_storage_dir does 2 things:
+ * 1. Find starting trunk
+ * Find the trunk to start with after a restart
+ * We choose the one with newest modification time.
+ * If no existing trunk is found, set to -1
+ * 2. Sum storage size consumed by adding up stored sizes.
*/
-static uint32_t calculate_starting_trunk(const char *storage_dir) {
+static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size) {
DIR *dp;
struct stat stat;
struct dirent *ep;
time_t newest = (time_t)0;
- uint32_t newest_index = -1;
+ uint32_t newest_index = -1, _storage_size;
int index;
char cwd[100];
@@ -215,12 +223,15 @@ static uint32_t calculate_starting_trunk(const char *storage_dir) {
ERR(lstat(fn, &stat) < 0, fn);
if (difftime(stat.st_mtime, newest) > 0) {
newest = stat.st_mtime;
- newest_index = (uint32_t)index;
+ _storage_size = (uint32_t)index;
}
+
+ *storage_size += stat.st_size
}
}
closedir(dp);
ERR(chdir(cwd) < 0, "chdir");
- return newest_index;
+ *starting_trunk = newest_index;
+ *storage_size = _storage_size;
}
diff --git a/include/commit.h b/include/commit.h
index 7013bb1..2e16571 100644
--- a/include/commit.h
+++ b/include/commit.h
@@ -3,7 +3,7 @@
#include "common.h"
void nfl_commit_init();
-void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
+int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
enum nfl_compression_t compression_opt,
bool truncate,
const char *filename);
diff --git a/include/main.h b/include/main.h
index 64ed558..417c6bc 100644
--- a/include/main.h
+++ b/include/main.h
@@ -133,9 +133,18 @@ typedef struct __attribute__((packed)) _nfl_entry_t {
/* size: 24, cachelines: 1, members: 8 */
} nfl_entry_t;
+typedef struct _store_manager_t {
+ uint32_t *trunk_size_map;
+
+} nfl_store_manager_t;
+
typedef struct _nfl_global_t {
sem_t *nfl_commit_queue;
uint16_t nfl_group_id;
+
+ uint32_t nfl_storage_consumed;
+ pthread_mutex_t nfl_storage_consumed_lock;
+
const char *storage_dir;
enum nfl_compression_t compression_opt;
} nfl_global_t;
diff --git a/lib/collect.c b/lib/collect.c
index c69c9a9..bc32a93 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -221,12 +221,12 @@ static void *nfl_start_commit_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
debug("Comm worker #%u: thread started.", nf->header->id);
- /* FIXME */
+ /* truncate ? */
bool truncate = true;
sem_wait(g.nfl_commit_queue);
debug("Comm worker #%u: commit started.", nf->header->id);
- nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename);
+ int ret = nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename);
debug("Comm worker #%u: commit done.", nf->header->id);
sem_post(g.nfl_commit_queue);
@@ -238,7 +238,7 @@ static void *nfl_start_commit_worker(void *targs) {
pthread_cond_signal(&nf->has_finished_recv_cond);
pthread_mutex_unlock(&nf->has_finished_recv_lock);
- pthread_exit(NULL);
+ pthread_exit(ret);
}
/*
diff --git a/lib/commit.c b/lib/commit.c
index 53dc493..22b92ad 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -3,43 +3,51 @@
#include <string.h>
#include <zstd.h>
-static void nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
+static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
uint32_t store_size) {
uint32_t written;
header->raw_size = store_size;
// Write header
written = fwrite(header, 1, sizeof(nfl_header_t), f);
- ERR(written != sizeof(nfl_header_t), strerror(errno));
+ WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno));
// Write store
written = fwrite(store, 1, store_size, f);
- ERR(written != store_size, strerror(errno));
+ WARN_RETURN(written != store_size, "commit store: %s", strerror(errno));
+
+ return sizeof(nfl_header_t) + store_size;
}
-static void nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
+static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
uint32_t store_size) {
/* TODO */
+ return -1;
}
-static void nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
+static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
uint32_t store_size) {
size_t const bufsize = ZSTD_compressBound(store_size);
void *buf;
- ERR(!(buf = malloc(bufsize)), "zstd: cannot malloc");
+ WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc");
size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1);
- if (ZSTD_isError(csize))
- FATAL("zstd: %s \n", ZSTD_getErrorName(csize));
+ if (ZSTD_isError(csize)) {
+ WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize));
+ free(buf);
+ return -1;
+ }
- nfl_commit_default(f, header, buf, csize);
+ int ret = nfl_commit_default(f, header, buf, csize);
free(buf);
+ return ret;
}
-void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
+int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
enum nfl_compression_t compression_opt,
bool truncate,
const char *filename) {
+ int ret;
FILE *f;
const char *mode = truncate ? "wb" : "ab";
@@ -50,16 +58,16 @@ void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries;
switch (compression_opt) {
case COMPRESS_NONE:
- debug("Comm worker #%u: commit without compression\n", header->id)
- nfl_commit_default(f, header, store, store_size);
+ debug("Comm worker #%u: commit without compression\n", header->id);
+ ret = nfl_commit_default(f, header, store, store_size);
break;
case COMPRESS_LZ4:
- debug("Comm worker #%u: commit with compression algorithm: lz4",
- header->id) nfl_commit_lz4(f, header, store, store_size);
+ debug("Comm worker #%u: commit with compression algorithm: lz4", header->id);
+ ret = nfl_commit_lz4(f, header, store, store_size);
break;
case COMPRESS_ZSTD:
- debug("Comm worker #%u: commit with compression algorithm: zstd",
- header->id) nfl_commit_zstd(f, header, store, store_size);
+ debug("Comm worker #%u: commit with compression algorithm: zstd", header->id);
+ ret = nfl_commit_zstd(f, header, store, store_size);
break;
// Must not reach here ...
default:
@@ -68,4 +76,5 @@ void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
// Do fsync ?
fclose(f);
+ return ret;
}