diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2018-03-17 11:47:16 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2018-03-17 11:48:45 +0800 |
commit | 0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4 (patch) | |
tree | eac7f5f97626b080668c68db688332fac4bf376f | |
parent | 79b15e9010c247c571ea012f5363d098d711496b (diff) | |
download | nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.gz nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.bz2 nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.lz nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.xz nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.tar.zst nfcollect-0b41d9d0b4cbc761ffbcd2a8f56b1749b1f42ed4.zip |
Fix committer bug
-rw-r--r-- | include/collect.h | 1 | ||||
-rw-r--r-- | include/commit.h | 1 | ||||
-rw-r--r-- | include/main.h | 26 | ||||
-rw-r--r-- | lib/collect.c | 13 | ||||
-rw-r--r-- | lib/commit.c | 34 | ||||
-rw-r--r-- | lib/common.c | 2 |
6 files changed, 46 insertions, 31 deletions
diff --git a/include/collect.h b/include/collect.h index d146845..15e65e8 100644 --- a/include/collect.h +++ b/include/collect.h @@ -1,5 +1,6 @@ #pragma once +#include "main.h" void *nfl_collect_worker(void *targs); void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g); diff --git a/include/commit.h b/include/commit.h index add18c7..8181f89 100644 --- a/include/commit.h +++ b/include/commit.h @@ -4,6 +4,7 @@ #include "common.h" void nfl_commit_init(); void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store, + enum nflog_compression_t compression_opt, const char *filename); #endif diff --git a/include/main.h b/include/main.h index bd4d38c..4f0aa20 100644 --- a/include/main.h +++ b/include/main.h @@ -51,6 +51,12 @@ exit(1); \ } +#define FATAL(format, ...) \ + do { \ + fprintf(stdout, "Error: " format "\n", ##__VA_ARGS__); \ + exit(1); \ + } while(0) + #define WARN(command, format, ...) \ if (command) { \ fprintf(stdout, format "\n", ##__VA_ARGS__); \ @@ -78,17 +84,17 @@ #define STORAGE_PREFIX "nflog_storage" enum nflog_compression_t { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD }; - typedef struct __attribute__((packed)) _nflog_header_t { - uint16_t cksum; /* 0 4 */ - enum nflog_compression_t compression_opt; /* 0 4 */ - uint32_t id; /* 4 4 */ - uint32_t n_entries; /* 8 4 */ - uint32_t max_n_entries; /* 12 4 */ - time_t start_time; /* 16 8 */ - time_t end_time; /* 24 8 */ - - /* size: 32, cachelines: 1, members: 6 */ + uint32_t id; /* 0 4 */ + uint32_t n_entries; /* 4 4 */ + uint32_t max_n_entries; /* 8 4 */ + uint32_t cksum; /* 12 4 */ + enum nflog_compression_t compression_opt; /* 16 4 */ + time_t start_time; /* 20 8 */ + time_t end_time; /* 28 8 */ + + /* size: 36, cachelines: 1, members: 7 */ + /* last cacheline: 36 bytes */ } nflog_header_t; typedef struct __attribute__((packed)) _nflog_entry_t { diff --git a/lib/collect.c b/lib/collect.c index 0cba852..b7cf430 100644 --- a/lib/collect.c +++ b/lib/collect.c @@ -182,7 +182,7 @@ static void *nfl_start_commit_worker(void *targs) { sem_wait(g.nfl_commit_queue); debug("Comm worker #%u: commit started.", nf->header->id); - nfl_commit_worker(nf->header, nf->store, filename); + nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename); debug("Comm worker #%u: commit done.", nf->header->id); sem_post(g.nfl_commit_queue); @@ -209,8 +209,9 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, (*nf)->global = g; (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); (*nf)->header->id = id; - (*nf)->header->max_n_entries = entries_max; (*nf)->header->n_entries = 0; + (*nf)->header->max_n_entries = entries_max; + (*nf)->header->compression_opt = g->compression_opt; (*nf)->has_finished = true; pthread_mutex_init(&(*nf)->has_finished_lock, NULL); @@ -218,10 +219,8 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, } // Ensure trunk with same id in previous run has finished to prevent reusing - // a trunk - // which it's still being used. Furthermore, this hopefully alleviate us - // from - // bursty network traffic. + // a trunk which it's still being used. Furthermore, this hopefully alleviate us + // from bursty network traffic. pthread_mutex_lock(&(*nf)->has_finished_lock); while (!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); @@ -236,6 +235,6 @@ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, } static void nfl_state_free(nflog_state_t *nf) { - // Free only and leave the rest intact + // Free only packet store and leave the rest intact free((void *)nf->store); } diff --git a/lib/commit.c b/lib/commit.c index 202a138..13fd747 100644 --- a/lib/commit.c +++ b/lib/commit.c @@ -8,11 +8,6 @@ static void nfl_commit_default(FILE *f, nflog_entry_t *store, static void nfl_commit_lz4(FILE *f, nflog_entry_t *store, uint32_t store_size); static void nfl_commit_zstd(FILE *f, nflog_entry_t *store, uint32_t store_size); -typedef void (*nflog_commit_run_table_t)(FILE *f, nflog_entry_t *store, - uint32_t size); -static const nflog_commit_run_table_t commit_run_table[] = { - nfl_commit_default, nfl_commit_lz4, nfl_commit_zstd}; - void nfl_commit_init() { /* TODO */ } static void nfl_commit_default(FILE *f, nflog_entry_t *store, @@ -31,19 +26,17 @@ static void nfl_commit_zstd(FILE *f, nflog_entry_t *store, size_t const bufsize = ZSTD_compressBound(store_size); void *buf; - ERR((buf = malloc(bufsize)), NULL); - + ERR(!(buf = malloc(bufsize)), "zstd: cannot malloc"); size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1); - if (ZSTD_isError(csize)) { - fprintf(stderr, "zstd error: %s \n", ZSTD_getErrorName(csize)); - exit(8); - } + if (ZSTD_isError(csize)) + FATAL("zstd: %s \n", ZSTD_getErrorName(csize)); - nfl_commit_default(f, buf, bufsize); + nfl_commit_default(f, buf, csize); free(buf); } void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store, + enum nflog_compression_t compression_opt, const char *filename) { FILE *f; uint32_t written; @@ -57,7 +50,22 @@ void nfl_commit_worker(nflog_header_t *header, nflog_entry_t *store, // commit store uint32_t store_size = sizeof(nflog_entry_t) * header->max_n_entries; - commit_run_table[header->compression_opt](f, store, store_size); + switch(compression_opt) { + case COMPRESS_NONE: + debug("Comm worker #%u: commit without compression\n", header->id) + nfl_commit_default(f, store, store_size); + break; + case COMPRESS_LZ4: + debug("Comm worker #%u: commit with compression algorithm: lz4", header->id) + nfl_commit_lz4(f, store, store_size); + break; + case COMPRESS_ZSTD: + debug("Comm worker #%u: commit with compression algorithm: zstd", header->id) + nfl_commit_zstd(f, store, store_size); + break; + // Must not reach here ... + default: FATAL("Unknown compression option detected"); + } // Do fsync ? fclose(f); diff --git a/lib/common.c b/lib/common.c index 1f44e78..bf0b67a 100644 --- a/lib/common.c +++ b/lib/common.c @@ -65,7 +65,7 @@ uint32_t nfl_header_cksum(nflog_header_t *header) { s += header->n_entries; s ^= header->start_time; s += header->end_time; - s &= ULONG_MAX; + s &= UINT_MAX; return s; } |