aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-04-28 10:53:16 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-04-28 10:53:16 +0800
commit618a3b10e22e7eeded59d463483297f999ce9316 (patch)
tree28a2d7ba71a466c5b0c599dac0de224470e00d39
parentaf4bf7c93f3390a8013de18679f12b336d2a314b (diff)
downloadnfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar.gz
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar.bz2
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar.lz
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar.xz
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.tar.zst
nfcollect-618a3b10e22e7eeded59d463483297f999ce9316.zip
Add raw_size field into header
-rw-r--r--include/commit.h1
-rw-r--r--include/main.h22
-rw-r--r--lib/collect.c1
-rw-r--r--lib/commit.c40
-rw-r--r--lib/common.c2
5 files changed, 35 insertions, 31 deletions
diff --git a/include/commit.h b/include/commit.h
index e01d020..7013bb1 100644
--- a/include/commit.h
+++ b/include/commit.h
@@ -5,6 +5,7 @@
void nfl_commit_init();
void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
enum nfl_compression_t compression_opt,
+ bool truncate,
const char *filename);
#endif
diff --git a/include/main.h b/include/main.h
index cbe0f28..88a7c8c 100644
--- a/include/main.h
+++ b/include/main.h
@@ -82,21 +82,23 @@
#define NEXT(i, l) ((i + 1) % l)
#define PREV(i, l) ((i - 1) % l)
#define TRUNK_SIZE_BY_PAGE (150) // 150 pages
+#define MAX_SEGMENT_PER_TRUNK (1024)
#define MAX_TRUNK_ID (80)
#define STORAGE_PREFIX "nflog_storage"
enum nfl_compression_t { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD };
typedef struct __attribute__((packed)) _nfl_header_t {
- uint32_t id; /* 0 4 */
- uint32_t n_entries; /* 4 4 */
- uint32_t max_n_entries; /* 8 4 */
- uint32_t cksum; /* 12 4 */
- enum nfl_compression_t compression_opt; /* 16 4 */
- time_t start_time; /* 20 8 */
- time_t end_time; /* 28 8 */
-
- /* size: 36, cachelines: 1, members: 7 */
- /* last cacheline: 36 bytes */
+ uint32_t id; /* 0 4 */
+ uint32_t n_entries; /* 4 4 */
+ uint32_t max_n_entries; /* 8 4 */
+ uint32_t cksum; /* 12 4 */
+ uint32_t raw_size; /* 16 4 */
+ enum nfl_compression_t compression_opt; /* 20 4 */
+ time_t start_time; /* 24 8 */
+ time_t end_time; /* 32 8 */
+
+ /* size: 40, cachelines: 1, members: 8 */
+ /* last cacheline: 40 bytes */
} nfl_header_t;
typedef struct __attribute__((packed)) _nfl_entry_t {
diff --git a/lib/collect.c b/lib/collect.c
index 9bffd11..ee37510 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -218,6 +218,7 @@ static void *nfl_start_commit_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
debug("Comm worker #%u: thread started.", nf->header->id);
+ /* FIXME */
bool truncate = true;
sem_wait(g.nfl_commit_queue);
diff --git a/lib/commit.c b/lib/commit.c
index f2fbc06..53dc493 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -3,26 +3,27 @@
#include <string.h>
#include <zstd.h>
-static void nfl_commit_default(FILE *f, nfl_entry_t *store,
- uint32_t store_size);
-static void nfl_commit_lz4(FILE *f, nfl_entry_t *store, uint32_t store_size);
-static void nfl_commit_zstd(FILE *f, nfl_entry_t *store, uint32_t store_size);
-
-void nfl_commit_init() { /* TODO */
-}
-
-static void nfl_commit_default(FILE *f, nfl_entry_t *store,
+static void nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
uint32_t store_size) {
uint32_t written;
+ header->raw_size = store_size;
+
+ // Write header
+ written = fwrite(header, 1, sizeof(nfl_header_t), f);
+ ERR(written != sizeof(nfl_header_t), strerror(errno));
+
+ // Write store
written = fwrite(store, 1, store_size, f);
ERR(written != store_size, strerror(errno));
}
-static void nfl_commit_lz4(FILE *f, nfl_entry_t *store, uint32_t store_size) {
+static void nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
+ uint32_t store_size) {
/* TODO */
}
-static void nfl_commit_zstd(FILE *f, nfl_entry_t *store, uint32_t store_size) {
+static void nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
+ uint32_t store_size) {
size_t const bufsize = ZSTD_compressBound(store_size);
void *buf;
@@ -31,37 +32,34 @@ static void nfl_commit_zstd(FILE *f, nfl_entry_t *store, uint32_t store_size) {
if (ZSTD_isError(csize))
FATAL("zstd: %s \n", ZSTD_getErrorName(csize));
- nfl_commit_default(f, buf, csize);
+ nfl_commit_default(f, header, buf, csize);
free(buf);
}
void nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
enum nfl_compression_t compression_opt,
+ bool truncate,
const char *filename) {
FILE *f;
- uint32_t written;
+ const char *mode = truncate ? "wb" : "ab";
debug("Comm worker #%u: commit to file %s\n", header->id, filename);
- ERR((f = fopen(filename, "wb")) == NULL, strerror(errno));
-
- // commit header
- written = fwrite(header, 1, sizeof(nfl_header_t), f);
- ERR(written != sizeof(nfl_header_t), strerror(errno));
+ ERR((f = fopen(filename, mode)) == NULL, strerror(errno));
// commit store
uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries;
switch (compression_opt) {
case COMPRESS_NONE:
debug("Comm worker #%u: commit without compression\n", header->id)
- nfl_commit_default(f, store, store_size);
+ nfl_commit_default(f, header, store, store_size);
break;
case COMPRESS_LZ4:
debug("Comm worker #%u: commit with compression algorithm: lz4",
- header->id) nfl_commit_lz4(f, store, store_size);
+ header->id) nfl_commit_lz4(f, header, store, store_size);
break;
case COMPRESS_ZSTD:
debug("Comm worker #%u: commit with compression algorithm: zstd",
- header->id) nfl_commit_zstd(f, store, store_size);
+ header->id) nfl_commit_zstd(f, header, store, store_size);
break;
// Must not reach here ...
default:
diff --git a/lib/common.c b/lib/common.c
index cbd7e0f..4e64c9e 100644
--- a/lib/common.c
+++ b/lib/common.c
@@ -77,6 +77,8 @@ uint32_t nfl_header_cksum(nfl_header_t *header) {
s ^= H(header->id);
s ^= H(header->max_n_entries);
s ^= H(header->n_entries);
+ s ^= H(header->raw_size);
+ s ^= H(header->compression_opt);
s ^= H(header->start_time);
s ^= H(header->end_time);
return s & UINT_MAX;