aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-03-17 11:47:16 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-03-17 11:48:45 +0800
commit0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4 (patch)
treeeac7f5f97626b080668c68db688332fac4bf376f
parent79b15e9010c247c571ea012f5363d098d711496b (diff)
downloadnfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.gz
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.bz2
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.lz
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.xz
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.zst
nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.zip
Fix committer bug
-rw-r--r--include/collect.h1
-rw-r--r--include/commit.h1
-rw-r--r--include/main.h26
-rw-r--r--lib/collect.c13
-rw-r--r--lib/commit.c34
-rw-r--r--lib/common.c2
6 files changed, 46 insertions, 31 deletions
diff --git a/include/collect.h b/include/collect.h
index d146845..15e65e8 100644
--- a/include/collect.h
+++ b/include/collect.h
@@ -1,5 +1,6 @@
#pragma once
+#include "main.h"
void *nfl_collect_worker(void *targs);
void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max,
nflog_global_t *g);
diff --git a/include/commit.h b/include/commit.h
index add18c7..8181f89 100644
--- a/include/commit.h
+++ b/include/commit.h
@@ -4,6 +4,7 @@
#include "common.h"
void nfl_commit_init();
void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store,
+ enum nflog_compression_t compression_opt,
const char *filename);
#endif
diff --git a/include/main.h b/include/main.h
index bd4d38c..4f0aa20 100644
--- a/include/main.h
+++ b/include/main.h
@@ -51,6 +51,12 @@
exit(1); \
}
+#define FATAL(format, ...) \
+ do { \
+ fprintf(stdout, "Error: " format "\n", ##__VA_ARGS__); \
+ exit(1); \
+ } while(0)
+
#define WARN(command, format, ...) \
if (command) { \
fprintf(stdout, format "\n", ##__VA_ARGS__); \
@@ -78,17 +84,17 @@
#define STORAGE_PREFIX "nflog_storage"
enum nflog_compression_t { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD };
-
typedef struct __attribute__((packed)) _nflog_header_t {
- uint16_t cksum; /* 0 4 */
- enum nflog_compression_t compression_opt; /* 0 4 */
- uint32_t id; /* 4 4 */
- uint32_t n_entries; /* 8 4 */
- uint32_t max_n_entries; /* 12 4 */
- time_t start_time; /* 16 8 */
- time_t end_time; /* 24 8 */
-
- /* size: 32, cachelines: 1, members: 6 */
+ uint32_t id; /* 0 4 */
+ uint32_t n_entries; /* 4 4 */
+ uint32_t max_n_entries; /* 8 4 */
+ uint32_t cksum; /* 12 4 */
+ enum nflog_compression_t compression_opt; /* 16 4 */
+ time_t start_time; /* 20 8 */
+ time_t end_time; /* 28 8 */
+
+ /* size: 36, cachelines: 1, members: 7 */
+ /* last cacheline: 36 bytes */
} nflog_header_t;
typedef struct __attribute__((packed)) _nflog_entry_t {
diff --git a/lib/collect.c b/lib/collect.c
index 0cba852..b7cf430 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -182,7 +182,7 @@ static void *nfl_start_commit_worker(void *targs) {
sem_wait(g.nfl_commit_queue);
debug("Comm worker #%u: commit started.", nf->header->id);
- nfl_commit_worker(nf->header, nf->store, filename);
+ nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename);
debug("Comm worker #%u: commit done.", nf->header->id);
sem_post(g.nfl_commit_queue);
@@ -209,8 +209,9 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max,
(*nf)->global = g;
(*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
(*nf)->header->id = id;
- (*nf)->header->max_n_entries = entries_max;
(*nf)->header->n_entries = 0;
+ (*nf)->header->max_n_entries = entries_max;
+ (*nf)->header->compression_opt = g->compression_opt;
(*nf)->has_finished = true;
pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
@@ -218,10 +219,8 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max,
}
// Ensure trunk with same id in previous run has finished to prevent reusing
- // a trunk
- // which it's still being used. Furthermore, this hopefully alleviate us
- // from
- // bursty network traffic.
+ // a trunk which it's still being used. Furthermore, this hopefully alleviate us
+ // from bursty network traffic.
pthread_mutex_lock(&(*nf)->has_finished_lock);
while (!(*nf)->has_finished)
pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
@@ -236,6 +235,6 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max,
}
static void nfl_state_free(nflog_state_t *nf) {
- // Free only and leave the rest intact
+ // Free only packet store and leave the rest intact
free((void *)nf->store);
}
diff --git a/lib/commit.c b/lib/commit.c
index 202a138..13fd747 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -8,11 +8,6 @@ static void nfl_commit_default(FILE *f, nflog_entry_t *store,
static void nfl_commit_lz4(FILE *f, nflog_entry_t *store, uint32_t store_size);
static void nfl_commit_zstd(FILE *f, nflog_entry_t *store, uint32_t store_size);
-typedef void (*nflog_commit_run_table_t)(FILE *f, nflog_entry_t *store,
- uint32_t size);
-static const nflog_commit_run_table_t commit_run_table[] = {
- nfl_commit_default, nfl_commit_lz4, nfl_commit_zstd};
-
void nfl_commit_init() { /* TODO */ }
static void nfl_commit_default(FILE *f, nflog_entry_t *store,
@@ -31,19 +26,17 @@ static void nfl_commit_zstd(FILE *f, nflog_entry_t *store,
size_t const bufsize = ZSTD_compressBound(store_size);
void *buf;
- ERR((buf = malloc(bufsize)), NULL);
-
+ ERR(!(buf = malloc(bufsize)), "zstd: cannot malloc");
size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1);
- if (ZSTD_isError(csize)) {
- fprintf(stderr, "zstd error: %s \n", ZSTD_getErrorName(csize));
- exit(8);
- }
+ if (ZSTD_isError(csize))
+ FATAL("zstd: %s \n", ZSTD_getErrorName(csize));
- nfl_commit_default(f, buf, bufsize);
+ nfl_commit_default(f, buf, csize);
free(buf);
}
void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store,
+ enum nflog_compression_t compression_opt,
const char *filename) {
FILE *f;
uint32_t written;
@@ -57,7 +50,22 @@ void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store,
// commit store
uint32_t store_size = sizeof(nflog_entry_t) * header->max_n_entries;
- commit_run_table[header->compression_opt](f, store, store_size);
+ switch(compression_opt) {
+ case COMPRESS_NONE:
+ debug("Comm worker #%u: commit without compression\n", header->id)
+ nfl_commit_default(f, store, store_size);
+ break;
+ case COMPRESS_LZ4:
+ debug("Comm worker #%u: commit with compression algorithm: lz4", header->id)
+ nfl_commit_lz4(f, store, store_size);
+ break;
+ case COMPRESS_ZSTD:
+ debug("Comm worker #%u: commit with compression algorithm: zstd", header->id)
+ nfl_commit_zstd(f, store, store_size);
+ break;
+ // Must not reach here ...
+ default: FATAL("Unknown compression option detected");
+ }
// Do fsync ?
fclose(f);
diff --git a/lib/common.c b/lib/common.c
index 1f44e78..bf0b67a 100644
--- a/lib/common.c
+++ b/lib/common.c
@@ -65,7 +65,7 @@ uint32_t nfl_header_cksum(nflog_header_t *header) {
s += header->n_entries;
s ^= header->start_time;
s += header->end_time;
- s &= ULONG_MAX;
+ s &= UINT_MAX;
return s;
}