aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-12-07 15:45:30 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2019-03-07 15:02:17 +0800
commitb655c8e74ad5d29db18660a677784f181f8e7590 (patch)
treea227b77bdb13610871ec1394794cb55c103b2429
parentaf48bcec8be1f4b0cc55ce47bd6eb7c7d977f4d1 (diff)
downloadnfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.gz
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.bz2
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.lz
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.xz
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.tar.zst
nfcollect-b655c8e74ad5d29db18660a677784f181f8e7590.zip
sqlite3 rewrite
This rewrite intends to simplifies previous design by hosting the storage in sqlite database instead of counting on individual log files.
-rw-r--r--Makefile.am4
-rw-r--r--bin/nfcollect.c165
-rw-r--r--bin/nfextract.c141
-rw-r--r--configure.ac9
-rw-r--r--include/collect.h17
-rw-r--r--include/commit.h13
-rw-r--r--include/common.h14
-rw-r--r--include/extract.h9
-rw-r--r--include/main.h154
-rw-r--r--include/sql.h18
-rw-r--r--include/util.h8
-rw-r--r--lib/collect.c188
-rw-r--r--lib/commit.c102
-rw-r--r--lib/common.c128
-rw-r--r--lib/extract.c130
-rw-r--r--lib/sql.c276
-rw-r--r--lib/util.c34
17 files changed, 680 insertions, 730 deletions
diff --git a/Makefile.am b/Makefile.am
index cf6dd98..cd4cb4f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -4,5 +4,5 @@ AM_CFLAGS = \
-I$(top_srcdir)/include \
-Werror -Wall -Wno-address-of-packed-member
-nfcollect_SOURCES = lib/common.c lib/commit.c lib/collect.c bin/nfcollect.c
-nfextract_SOURCES = lib/common.c lib/extract.c bin/nfextract.c
+nfcollect_SOURCES = lib/util.c lib/sql.c lib/extract.c lib/commit.c lib/collect.c bin/nfcollect.c
+nfextract_SOURCES = lib/util.c lib/sql.c lib/extract.c lib/commit.c lib/collect.c bin/nfextract.c
diff --git a/bin/nfcollect.c b/bin/nfcollect.c
index 834819f..35d1c34 100644
--- a/bin/nfcollect.c
+++ b/bin/nfcollect.c
@@ -24,8 +24,7 @@
// SOFTWARE.
#include "collect.h"
-#include "commit.h"
-#include "common.h"
+#include "util.h"
#include <dirent.h>
#include <fcntl.h>
#include <getopt.h>
@@ -33,9 +32,9 @@
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
+#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
-#include <string.h>
#include <unistd.h>
const char *help_text =
@@ -44,43 +43,32 @@ const char *help_text =
"Options:\n"
" -c --compression=<algo> compression algorithm to use (default: no "
"compression)\n"
- " -d --storage_dir=<dirname> log files storage directory\n"
+ " -d --storage_file=<filename> sqlite database storage file\n"
" -h --help print this help\n"
" -g --nflog-group=<id> the group id to collect\n"
- " -p --parallelism=<num> max number of committer thread\n"
- " -t --truncate whether or not to truncate existing trunks"
- " (default: no)\n"
" -s --storage_size=<dirsize> log files maximum total size in MiB\n"
" -v --version print version information\n"
"\n";
-static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size);
-static nfl_nl_t netlink_fd;
-
+static Netlink netlink_fd;
static void sig_handler(int signo) {
if (signo == SIGHUP) {
puts("Terminated due to SIGHUP ...");
- nfl_close_netlink_fd(&netlink_fd);
+ collect_close_netlink(&netlink_fd);
}
}
int main(int argc, char *argv[]) {
- uint32_t max_commit_worker = 0, storage_size = 0;
- uint32_t trunk_cnt = 0, trunk_size = 0;
- uint32_t entries_max, cur_trunk;
- bool truncate_trunks = false;
-
- nfl_global_t g;
- int nfl_group_id = -1;
- char *compression_flag = NULL, *storage_dir = NULL;
+ uint32_t storage_size = 0;
+ Global g;
+ int nflog_group_id = -1;
+ char *compression_flag = NULL, *storage = NULL;
struct option longopts[] = {/* name, has_args, flag, val */
- {"nflog-group", required_argument, NULL, 'g'},
- {"storage_dir", required_argument, NULL, 'd'},
+ {"nflog_group", required_argument, NULL, 'g'},
+ {"storage", required_argument, NULL, 'd'},
{"storage_size", required_argument, NULL, 's'},
{"compression", optional_argument, NULL, 'z'},
- {"parallelism", optional_argument, NULL, 'p'},
- {"truncate", no_argument, NULL, 't'},
{"help", no_argument, NULL, 'h'},
{"version", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
@@ -97,20 +85,14 @@ int main(int argc, char *argv[]) {
printf("%s %s", PACKAGE, VERSION);
exit(0);
break;
- case 't':
- truncate_trunks = true;
- break;
case 'c':
compression_flag = optarg;
break;
case 'd':
- storage_dir = strdup(optarg);
+ storage = strdup(optarg);
break;
case 'g':
- nfl_group_id = atoi(optarg);
- break;
- case 'p':
- max_commit_worker = atoi(optarg);
+ nflog_group_id = atoi(optarg);
break;
case 's':
storage_size = atoi(optarg);
@@ -122,116 +104,39 @@ int main(int argc, char *argv[]) {
}
// verify arguments
- ASSERT(nfl_group_id != -1,
+ ASSERT(nflog_group_id != -1,
"You must provide a nflog group (see --help)!\n");
- ASSERT(storage_dir != NULL,
- "You must provide a storage directory (see --help)\n");
+ ASSERT(storage != NULL, "You must provide a storage file (see --help)\n");
ASSERT(storage_size != 0, "You must provide the desired size of log file "
"(in MiB) (see --help)\n");
- ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist");
-
- // max number of commit worker defaults to #processor - 1
- if (max_commit_worker == 0) {
- max_commit_worker = sysconf(_SC_NPROCESSORS_ONLN) - 1;
- max_commit_worker = max_commit_worker > 0 ? max_commit_worker : 1;
- }
-
- g.storage_dir = storage_dir;
+ g.compression_type = get_compression(compression_flag);
+ if (check_basedir_exist(storage) < 0)
+ FATAL("Storage directory does not exist");
// register signal handler
- ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler");
+ if (signal(SIGHUP, sig_handler) == SIG_ERR)
+ ERROR("Could not set SIGHUP handler");
- nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size);
- nfl_cal_entries(trunk_size, &entries_max);
- nfl_setup_compression(compression_flag, &g.compression_opt);
+ pthread_mutex_init(&g.storage_consumed_lock, NULL);
+ g.storage_budget = storage_size * 1024 * 1024; // MB
+ g.storage_consumed = 0;
+ g.storage_file = (const char *)storage;
+ g.max_nr_entries = g_max_nr_entries_default;
- // Set up commit worker
- g.nfl_commit_queue = malloc(sizeof(sem_t));
- sem_init(g.nfl_commit_queue, 0, max_commit_worker);
+ collect_open_netlink(&netlink_fd, nflog_group_id);
- // Calculate storage consumed
- pthread_mutex_init(&g.nfl_storage_consumed_lock, NULL);
- g.nfl_storage_consumed = 0;
-
- // Set up nflog receiver worker
- nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *));
-
- info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir,
+ pthread_t worker;
+ State *state;
+ INFO(PACKAGE ": storing in file '%s', capped by %d MiB", g.storage_file,
storage_size);
- info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d",
- entries_max, trunk_cnt);
-
- calculate_starting_trunk(storage_dir, &cur_trunk, &g.nfl_storage_consumed);
- if (truncate_trunks) {
- cur_trunk = 0;
- info(PACKAGE ": requested to truncate (overwrite) trunks in %s",
- storage_dir);
- } else {
- cur_trunk = cur_trunk < 0 ? 0: NEXT(cur_trunk, trunk_cnt);
- const char *fn = nfl_get_filename(storage_dir, cur_trunk);
- info(PACKAGE ": will start writing to trunk %s and onward", fn);
- free((char *)fn);
- }
+ INFO(PACKAGE ": workers started, entries per block = %d", g.max_nr_entries);
- nfl_open_netlink_fd(&netlink_fd, nfl_group_id);
- for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) {
- debug("Running receiver worker: id = %d", cur_trunk);
- nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g);
- trunks[cur_trunk]->netlink_fd = &netlink_fd;
-
- pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker,
- (void *)trunks[cur_trunk]);
- // wait for current receiver worker
- pthread_join(trunks[cur_trunk]->thread, NULL);
- }
-
- // Won't reach here
- // We don't actually free trunks or the semaphore at all
- sem_destroy(g.nfl_commit_queue);
- nfl_close_netlink_fd(&netlink_fd);
- xit(0);
- uint32_t start_trunk;
-}
-
-/*
- * traverse_storage_dir does 2 things:
- * 1. Find starting trunk
- * Find the trunk to start with after a restart
- * We choose the one with newest modification time.
- * If no existing trunk is found, set to -1
- * 2. Sum storage size consumed by adding up stored sizes.
- */
-static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size) {
- DIR *dp;
- struct stat stat;
- struct dirent *ep;
- time_t newest = (time_t)0;
- uint32_t newest_index = -1, _storage_size;
- int index;
- char cwd[100];
-
- ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory");
-
- ERR(!getcwd(cwd, sizeof(cwd)), "getcwd");
- ERR(chdir(storage_dir) < 0, "chdir");
-
- while ((ep = readdir(dp))) {
- const char *fn = ep->d_name;
- index = nfl_storage_match_index(fn);
- if (index >= 0 && index < MAX_TRUNK_ID) {
- ERR(lstat(fn, &stat) < 0, fn);
- if (difftime(stat.st_mtime, newest) > 0) {
- newest = stat.st_mtime;
- _storage_size = (uint32_t)index;
- }
-
- *storage_size += stat.st_size
- }
+ while (true) {
+ state_init(&state, &netlink_fd, &g);
+ pthread_create(&worker, NULL, collect_worker, (void *)state);
+ pthread_join(worker, NULL);
}
- closedir(dp);
- ERR(chdir(cwd) < 0, "chdir");
- *starting_trunk = newest_index;
- *storage_size = _storage_size;
+ collect_close_netlink(&netlink_fd);
}
diff --git a/bin/nfextract.c b/bin/nfextract.c
index fac52ae..23a3ea9 100644
--- a/bin/nfextract.c
+++ b/bin/nfextract.c
@@ -1,7 +1,7 @@
// The MIT License (MIT)
-// Copyright (c) 2017 Yun-Chih Chen
+// Copyright (c) 2018 Yun-Chih Chen
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
@@ -22,8 +22,11 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
-#include "common.h"
#include "extract.h"
+#include "main.h"
+#include "sql.h"
+#include "util.h"
+
#include <dirent.h>
#include <fcntl.h>
#include <getopt.h>
@@ -34,7 +37,6 @@
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
-#include <string.h>
#include <time.h>
#include <unistd.h>
@@ -51,11 +53,13 @@ const char *help_text =
"Usage: " PROG " [OPTION]\n"
"\n"
"Options:\n"
- " -d --storage_dir=<dirname> log files storage directory\n"
+ " -d --storage=<dirname> sqlite storage file\n"
" -h --help print this help\n"
" -v --version print version information\n"
- " -s --since start showing entries on or newer than the specified date (format: " DATE_FORMAT_HUMAN ")\n"
- " -u --until stop showing entries on or older than the specified date (format: " DATE_FORMAT_HUMAN ")\n"
+ " -s --since start showing entries on or newer than the "
+ "specified date (format: " DATE_FORMAT_HUMAN ")\n"
+ " -u --until stop showing entries on or older than the "
+ "specified date (format: " DATE_FORMAT_HUMAN ")\n"
"\n";
void sig_handler(int signo) {
@@ -63,91 +67,76 @@ void sig_handler(int signo) {
puts("Terminated due to SIGHUP ...");
}
-static void extract_each(const char *storage_dir, const char *filename, const time_range_t *range) {
- nfl_state_t trunk;
+static inline void format_entry(char *output, Entry *e) {
+ sprintf(output,
+ " "
+ "t=%ld\t"
+ "daddr=%s\t"
+ "proto=%s\t"
+ "uid=%d\t"
+ "sport=%d\t"
+ "dport=%d",
+ e->timestamp, inet_ntoa(e->daddr),
+ e->protocol == IPPROTO_TCP ? "TCP" : "UDP", e->uid, e->sport,
+ e->dport);
+}
- // Build full path
- char *fullpath = malloc(strlen(storage_dir) + strlen(filename) + 2);
- sprintf(fullpath, "%s/%s", storage_dir, filename);
+static void callback(const State *s, const Timerange *range) {
+ int nr_entries = s->header->nr_entries;
- debug("Extracting storage file: %s", fullpath);
- int entries = nfl_extract_worker(fullpath, &trunk, range);
- free(fullpath);
+ DEBUG("callback: extracting %d entries", nr_entries);
int i = 0;
- while(i < entries && trunk.store[i].timestamp < range->from)
+ while (i < nr_entries && s->store[i].timestamp < range->from)
i++;
char output[1024];
- while(i < entries && trunk.store[i].timestamp < range->until) {
- nfl_format_output(output, &trunk.store[i]);
+ while (i < nr_entries && s->store[i].timestamp < range->until) {
+ format_entry(output, &s->store[i]);
puts((char *)output);
++i;
}
}
-static void extract_all(const char *storage_dir, const time_range_t *range) {
- DIR *dp;
- struct dirent *ep;
- int i, index, max_index = -1;
- char *trunk_files[MAX_TRUNK_ID];
- memset(trunk_files, 0, sizeof(trunk_files));
-
- ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory");
- while ((ep = readdir(dp))) {
- index = nfl_storage_match_index(ep->d_name);
- if (index >= 0) {
- debug("Storage file %s matches with index %d", ep->d_name, index);
- if (index >= MAX_TRUNK_ID) {
- WARN(1, "Storage trunk file index "
- "out of predefined range: %s",
- ep->d_name);
- return;
- } else {
- trunk_files[index] = strdup(ep->d_name);
- if (index > max_index)
- max_index = index;
- }
- }
- }
-
- closedir(dp);
-
- for (i = 0; i <= max_index; ++i) {
- if (trunk_files[i])
- extract_each(storage_dir, trunk_files[i], range);
- free(trunk_files[i]);
- }
+static void extract_all(const char *storage, const Timerange *range) {
+ sqlite3 *db = NULL;
+ db_open(&db, storage);
+ db_read_data_by_timerange(db, range, callback);
+ db_close(db);
}
static time_t parse_date_string(time_t default_t, const char *date) {
struct tm parsed;
char *ret;
- if(!date) return default_t;
+ if (!date)
+ return default_t;
-#define PARSE(FORMAT) \
- ret = strptime(date, FORMAT, &parsed); \
- if(ret && !*ret) return mktime(&parsed); \
+#define PARSE(FORMAT) \
+ ret = strptime(date, FORMAT, &parsed); \
+ if (ret && !*ret) \
+ return mktime(&parsed);
PARSE(DATE_FORMAT);
PARSE(DATE_FORMAT_FULL);
PARSE(DATE_FORMAT_FULL2);
- FATAL("Wrong date format: expected: \"" DATE_FORMAT_HUMAN "\", got: \"%s\"", date);
+ FATAL("Wrong date format: expected: \"" DATE_FORMAT_HUMAN "\", got: \"%s\"",
+ date);
return -1;
}
-static void populate_date_range(time_range_t *range, const char *since, const char *until) {
- range->from = parse_date_string(0, since);
+
+static void populate_date_range(Timerange *range, const char *since,
+ const char *until) {
+ range->from = parse_date_string(0, since);
range->until = parse_date_string(time(NULL), until);
}
int main(int argc, char *argv[]) {
- char *storage_dir = NULL;
+ char *storage = NULL;
char *date_since_str = NULL, *date_until_str = NULL;
- time_range_t date_range;
+ Timerange date_range;
- struct option longopts[] = {/* name, has_args, flag, val */
- {"storage_dir", required_argument, NULL, 'd'},
+ struct option longopts[] = {{"storage_file", required_argument, NULL, 'd'},
{"since", optional_argument, NULL, 's'},
{"until", optional_argument, NULL, 'u'},
{"help", no_argument, NULL, 'h'},
@@ -155,7 +144,7 @@ int main(int argc, char *argv[]) {
{0, 0, 0, 0}};
int opt;
- while ((opt = getopt_long(argc, argv, "d:hv", longopts, NULL)) != -1) {
+ while ((opt = getopt_long(argc, argv, "d:s:u:hv", longopts, NULL)) != -1) {
switch (opt) {
case 'h':
printf("%s", help_text);
@@ -166,37 +155,41 @@ int main(int argc, char *argv[]) {
exit(0);
break;
case 'd':
- if(!optarg) FATAL("Expected: --storage_dir=[PATH]");
- storage_dir = strdup(optarg);
+ if (!optarg)
+ FATAL("Expected: --storage_file=[PATH]");
+ storage = strdup(optarg);
break;
case 's':
- if(!optarg) FATAL("Expected: --since=\"" DATE_FORMAT_HUMAN "\"");
+ if (!optarg)
+ FATAL("Expected: --since=\"" DATE_FORMAT_HUMAN "\"");
date_since_str = strdup(optarg);
break;
case 'u':
- if(!optarg) FATAL("Expected: --until=\"" DATE_FORMAT_HUMAN "\"");
+ if (!optarg)
+ FATAL("Expected: --until=\"" DATE_FORMAT_HUMAN "\"");
date_until_str = strdup(optarg);
break;
case '?':
- fprintf(stderr, "Unknown argument, see --help");
- exit(1);
+ FATAL("Unknown argument, see --help");
}
}
// verify arguments
- ASSERT(storage_dir != NULL,
+ ASSERT(storage != NULL,
"You must provide a storage directory (see --help)");
- ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist");
+ if (check_file_exist(storage) < 0)
+ ERROR("storage file not exist");
- // register signal handler
- ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler");
+ if (signal(SIGHUP, sig_handler) == SIG_ERR)
+ ERROR("Could not set SIGHUP handler");
populate_date_range(&date_range, date_since_str, date_until_str);
- free(date_since_str); free(date_until_str);
+ free(date_since_str);
+ free(date_until_str);
- extract_all(storage_dir, &date_range);
- free(storage_dir);
+ extract_all(storage, &date_range);
+ free(storage);
return 0;
}
diff --git a/configure.ac b/configure.ac
index ce27d08..bf6aba8 100644
--- a/configure.ac
+++ b/configure.ac
@@ -12,7 +12,7 @@ AC_CONFIG_AUX_DIR([build-aux])
# ChangeLog, COPYING, AUTHORS, INSTALL, README etc. files.
AM_INIT_AUTOMAKE([-Wall -Werror foreign dist-xz subdir-objects])
-AM_SILENT_RULES([yes])
+#AM_SILENT_RULES([yes])
AC_CONFIG_SRCDIR([bin/nfcollect.c])
AC_PROG_CC([clang])
@@ -28,7 +28,9 @@ esac],[debug=false])
if test x"$debug" = x"true"; then
AC_DEFINE(DEBUG, 1, [debug])
fi
-AC_DEFINE(_XOPEN_SOURCE, 700)
+
+AC_DEFINE(_XOPEN_SOURCE, 700) # strptime
+AC_DEFINE(_POSIX_C_SOURCE, 200809) # strdup
AC_CHECK_HEADERS(libnetfilter_log/libnetfilter_log.h)
AC_SEARCH_LIBS(nflog_open, netfilter_log)
@@ -36,6 +38,9 @@ AC_SEARCH_LIBS(nflog_open, netfilter_log)
AC_CHECK_HEADERS(pthread.h)
AC_SEARCH_LIBS(pthread_create, pthread)
+AC_CHECK_HEADERS(sqlite3.h)
+AC_SEARCH_LIBS(sqlite3_exec, sqlite3)
+
AC_CHECK_HEADERS(zstd.h)
AC_SEARCH_LIBS(ZSTD_compress, zstd)
diff --git a/include/collect.h b/include/collect.h
index 1af0506..3e438cf 100644
--- a/include/collect.h
+++ b/include/collect.h
@@ -1,9 +1,12 @@
-#pragma once
+
+#ifndef _COLLECT_H
+#define _COLLECT_H
#include "main.h"
-void *nfl_collect_worker(void *targs);
-void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
- nfl_global_t *g);
-void nfl_state_free(nfl_state_t *nf);
-void nfl_open_netlink_fd(nfl_nl_t *nf, uint16_t group_id);
-void nfl_close_netlink_fd(nfl_nl_t *nf);
+void collect_open_netlink(Netlink *nl, uint16_t group_id);
+void collect_close_netlink(Netlink *nl);
+void *collect_worker(void *targs);
+void state_init(State **s, Netlink *nl, Global *g);
+void state_free(State *s);
+
+#endif // _COLLECT_H
diff --git a/include/commit.h b/include/commit.h
index 2e16571..47a3099 100644
--- a/include/commit.h
+++ b/include/commit.h
@@ -1,11 +1,6 @@
-#ifndef _COMMIT_H
-#define _COMMIT_H
+#ifndef COMMIT_H
+#define COMMIT_H
-#include "common.h"
-void nfl_commit_init();
-int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
- enum nfl_compression_t compression_opt,
- bool truncate,
- const char *filename);
+void *commit(void *targs);
-#endif
+#endif // COMMIT_H
diff --git a/include/common.h b/include/common.h
deleted file mode 100644
index 893bd22..0000000
--- a/include/common.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include "main.h"
-int nfl_check_file(FILE *f);
-int nfl_check_dir(const char *storage_dir);
-int nfl_storage_match_index(const char *fn);
-const char *nfl_get_filename(const char *dir, int id);
-uint32_t nfl_get_filesize(FILE *f);
-uint32_t nfl_header_cksum(nfl_header_t *header);
-void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt,
- uint32_t *trunk_size);
-void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt);
-void nfl_format_output(char *output, nfl_entry_t *entry);
-int nfl_setup_compression(const char *flag, enum nfl_compression_t *opt);
diff --git a/include/extract.h b/include/extract.h
index da7c7bf..afc0b64 100644
--- a/include/extract.h
+++ b/include/extract.h
@@ -1,4 +1,7 @@
-#pragma once
+#ifndef _EXTRACT_H
+#define _EXTRACT_H
-#include "common.h"
-int nfl_extract_worker(const char *filename, nfl_state_t *state, const time_range_t *range);
+#include "main.h"
+bool extract(State *s, const void *src);
+
+#endif // _EXTRACT_H
diff --git a/include/main.h b/include/main.h
index 417c6bc..4a860d4 100644
--- a/include/main.h
+++ b/include/main.h
@@ -23,21 +23,30 @@
#ifndef _MAIN_H
#define _MAIN_H
+#include <arpa/inet.h>
#include <assert.h>
+#include <linux/tcp.h>
#include <netinet/in.h>
#include <netinet/ip.h>
-#include <linux/tcp.h>
#include <netinet/udp.h>
+#include <pthread.h>
#include <semaphore.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
-#include <time.h>
-
-#include <pthread.h>
+#include <sys/socket.h>
#include <sys/types.h>
+#include <time.h>
-#ifdef DEBUG
+// Global variables
+#define g_sqlite_table_header "nfcollect_v1_header"
+#define g_sqlite_table_data "nfcollect_v1_data"
+#define g_sqlite_nr_fail_retry 8
+#define g_gc_rate 16
+// Default number of packets stored in a block
+//#define g_max_nr_entries_default (256*1024)
+#define g_max_nr_entries_default (1 * 128)
+#ifdef DEBUG_OUTPUT
#define DEBUG_ON 1
#else
#define DEBUG_ON 0
@@ -49,22 +58,16 @@
exit(1); \
}
-#define ERR(command, error_msg) \
- if (command) { \
- perror((error_msg)); \
- exit(1); \
- }
+#define ERROR(format, ...) \
+ fprintf(stdout, "[ERROR] " format "\n", ##__VA_ARGS__);
#define FATAL(format, ...) \
do { \
- fprintf(stdout, "[ERROR] " format "\n", ##__VA_ARGS__); \
+ fprintf(stdout, "[FATAL] " format "\n", ##__VA_ARGS__); \
exit(1); \
} while (0)
-#define WARN(command, format, ...) \
- if (command) { \
- fprintf(stdout, "[WARN] " format "\n", ##__VA_ARGS__); \
- }
+#define WARN(format, ...) fprintf(stdout, "[WARN] " format "\n", ##__VA_ARGS__);
#define WARN_RETURN(command, format, ...) \
if (command) { \
@@ -72,106 +75,81 @@
return -1; \
}
-#define debug(format, ...) \
+#define DEBUG(format, ...) \
if (DEBUG_ON) { \
fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \
}
-#define info(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__);
+#define INFO(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__);
#define likely(x) __builtin_expect((x), 1)
#define unlikely(x) __builtin_expect((x), 0)
-#define CEIL_DIV(a, b) (((a) + (b)-1) / (b))
-#define NEXT(i, l) ((i + 1) % l)
-#define PREV(i, l) ((i - 1) % l)
-#define TRUNK_SIZE_BY_PAGE (150) // 150 pages
-#define MAX_SEGMENT_PER_TRUNK (1024)
-#define MAX_TRUNK_ID (80)
-#define STORAGE_PREFIX "nflog_storage"
-
-enum nfl_compression_t { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD };
-typedef struct __attribute__((packed)) _nfl_header_t {
- uint32_t id; /* 0 4 */
- uint32_t n_entries; /* 4 4 */
- uint32_t max_n_entries; /* 8 4 */
- uint32_t cksum; /* 12 4 */
- uint32_t raw_size; /* 16 4 */
- enum nfl_compression_t compression_opt; /* 20 4 */
- time_t start_time; /* 24 8 */
- time_t end_time; /* 32 8 */
-
- /* size: 40, cachelines: 1, members: 8 */
- /* last cacheline: 40 bytes */
-} nfl_header_t;
-
-typedef struct __attribute__((packed)) _nfl_entry_t {
- // current timestamp since UNIX epoch
- time_t timestamp; /* 0 8 */
+#ifdef __GNUC__
+#define UNUSED_FUNCTION(x) __attribute__((__unused__)) UNUSED_ ## x
+#else
+#define UNUSED_FUNCTION(x) UNUSED_ ## x
+#endif
- // dest address
- struct in_addr daddr; /* 8 4 */
+enum CompressionType { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD };
- // uid
- uint32_t uid; /* 12 4 */
+typedef struct _Header {
+ uint32_t nr_entries;
+ uint32_t raw_size;
+ enum CompressionType compression_type;
+ time_t start_time;
+ time_t end_time;
+} Header;
+typedef struct __attribute__((packed)) _Entry {
+ // current timestamp since UNIX epoch
+ time_t timestamp;
+ // dest address
+ struct in_addr daddr;
+ // uid
+ uint32_t uid;
// unused space, just for padding
- uint8_t __unused1; /* 16 1 */
-
+ uint8_t __unused1;
// IP protocol (UDP or TCP)
- uint8_t protocol; /* 17 1 */
-
+ uint8_t protocol;
// unused space, just for padding
- uint16_t __unused2; /* 18 2 */
-
+ uint16_t __unused2;
// source port
- uint16_t sport; /* 20 2 */
-
+ uint16_t sport;
// destination port
- uint16_t dport; /* 22 2 */
+ uint16_t dport;
/* size: 24, cachelines: 1, members: 8 */
-} nfl_entry_t;
-
-typedef struct _store_manager_t {
- uint32_t *trunk_size_map;
-
-} nfl_store_manager_t;
-
-typedef struct _nfl_global_t {
- sem_t *nfl_commit_queue;
- uint16_t nfl_group_id;
-
- uint32_t nfl_storage_consumed;
- pthread_mutex_t nfl_storage_consumed_lock;
-
- const char *storage_dir;
- enum nfl_compression_t compression_opt;
-} nfl_global_t;
+} Entry;
typedef struct _nfl_nl_t {
struct nflog_handle *fd;
struct nflog_g_handle *group_fd;
-} nfl_nl_t;
+} Netlink;
+
+typedef struct _Global {
+ uint16_t nl_group_id;
-typedef struct _nfl_state_t {
- nfl_global_t *global;
- nfl_header_t *header;
- nfl_entry_t *store;
- nfl_nl_t *netlink_fd;
+ uint32_t storage_budget;
+ uint32_t storage_consumed;
+ pthread_mutex_t storage_consumed_lock;
- bool has_finished_recv;
- pthread_cond_t has_finished_recv_cond;
- pthread_mutex_t has_finished_recv_lock;
+ uint32_t max_nr_entries;
+ const char *storage_file;
+ enum CompressionType compression_type;
+} Global;
- pthread_t thread;
-} nfl_state_t;
+typedef struct _State {
+ Header *header;
+ Entry *store;
+ Netlink *netlink_fd;
+ Global *global;
+} State;
-typedef struct _time_range_t {
+typedef struct _Timerange {
time_t from, until;
-} time_range_t;
+} Timerange;
-// only copy size of ipv4 header + tcp header
-static const int nfl_recv_size = sizeof(struct iphdr) + sizeof(struct tcphdr);
+typedef void (*StateCallback)(const State *s, const Timerange *t);
#endif // _MAIN_H
diff --git a/include/sql.h b/include/sql.h
new file mode 100644
index 0000000..8b88f10
--- /dev/null
+++ b/include/sql.h
@@ -0,0 +1,18 @@
+#ifndef SQL_H
+#define SQL_H
+
+#include "main.h"
+#include <sqlite3.h>
+
+int db_set_pragma(sqlite3 *db);
+int db_vacuum(sqlite3 *db);
+int db_create_table(sqlite3 *db);
+int db_open(sqlite3 **db, const char *dbname);
+int db_close(sqlite3 *db);
+int db_insert(sqlite3 *db, const Header *header, const Entry *entries);
+int db_get_space_consumed(sqlite3 *db);
+int db_delete_oldest_bytes(sqlite3 *db, int64_t bytes);
+int db_read_data_by_timerange(sqlite3 *db, const Timerange *t,
+ StateCallback cb);
+
+#endif // SQL_H
diff --git a/include/util.h b/include/util.h
new file mode 100644
index 0000000..f1fd965
--- /dev/null
+++ b/include/util.h
@@ -0,0 +1,8 @@
+#ifndef UTIL_H
+#define UTIL_H
+#include "main.h"
+int check_basedir_exist(const char *storage);
+int check_file_exist(const char *storage);
+enum CompressionType get_compression(const char *flag);
+
+#endif // UTIL_H
diff --git a/lib/collect.c b/lib/collect.c
index bc32a93..21ad415 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -23,10 +23,11 @@
// SOFTWARE.
#include "commit.h"
-#include "common.h"
+#include "main.h"
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
#include <stddef.h> // size_t for libnetfilter_log
+#include <stdint.h>
#include <string.h>
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <time.h>
@@ -36,17 +37,13 @@
// kernel and transmits them as one netlink multipart message to userspace.
#define NF_NFLOG_QTHRESH 64
-nfl_global_t g;
+Global g;
-static void *nfl_start_commit_worker(void *targs);
-static void nfl_commit(nfl_state_t *nf);
-static void nfl_state_free(nfl_state_t *nf);
-
-static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
- struct nflog_data *nfa, void *_nf) {
+static int handle_packet(__attribute__((unused)) struct nflog_g_handle *gh, __attribute__((unused)) struct nfgenmsg *nfmsg,
+ struct nflog_data *nfa, void *_s) {
#define HASH_ENTRY(e) (e->sport ^ e->timestamp)
register const struct iphdr *iph;
- register nfl_entry_t *entry;
+ register Entry *entry;
const struct tcphdr *tcph;
const struct udphdr *udph;
char *payload;
@@ -57,19 +54,19 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
static uint64_t prev_entry_hash;
int payload_len = nflog_get_payload(nfa, &payload);
- nfl_state_t *nf = (nfl_state_t *)_nf;
+ State *s = (State *)_s;
// only process ipv4 packet
if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40)) {
- debug("Ignore non-IPv4 packet");
+ DEBUG("Ignore non-IPv4 packet");
return 1;
}
- if (unlikely(nf->header->n_entries >= nf->header->max_n_entries))
+ if (unlikely(s->header->nr_entries >= g.max_nr_entries))
return 1;
iph = (struct iphdr *)payload;
- entry = &(nf->store[nf->header->n_entries]);
+ entry = &(s->store[s->header->nr_entries]);
inner_hdr = (uint32_t *)iph + iph->ihl;
// Only accept TCP / UDP packets
@@ -86,7 +83,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
entry->sport = ntohs(udph->source);
entry->dport = ntohs(udph->dest);
} else {
- debug("Ignore non-TCP/UDP packet");
+ DEBUG("Ignore non-TCP/UDP packet");
return 1; // Ignore other types of packet
}
@@ -114,16 +111,16 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
entry->uid = uid;
// Advance to next entry
- nf->header->n_entries++;
+ s->header->nr_entries++;
- debug("Recv packet info entry #%d: "
+ DEBUG("Recv packet info entry #%d: "
"timestamp:\t%ld,\t"
"daddr:\t%ld,\t"
"transfer:\t%s,\t"
"uid:\t%d,\t"
"sport:\t%d,\t"
"dport:\t%d",
- nf->header->n_entries, entry->timestamp,
+ s->header->nr_entries, entry->timestamp,
(unsigned long)entry->daddr.s_addr,
iph->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid,
entry->sport, entry->dport);
@@ -132,155 +129,94 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
return 0;
}
-void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) {
+void collect_open_netlink(Netlink *nl, uint16_t group_id) {
// open nflog
- ERR((nl->fd = nflog_open()) == NULL, "nflog_open")
- debug("Opening nflog communication file descriptor");
+ if ((nl->fd = nflog_open()) == NULL) {
+ FATAL("nflog_open failed");
+ }
+
+ DEBUG("Opening nflog communication file descriptor");
// monitor IPv4 packets only
- ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf");
+ if (nflog_bind_pf(nl->fd, AF_INET) < 0) {
+ FATAL("nflog_bind_pf failed");
+ }
// bind to group
nl->group_fd = nflog_bind_group(nl->fd, group_id);
// If the returned group_fd is NULL, it's likely
// that another process (like ulogd) has already
// bound to the same NFLOD group.
- if(!nl->group_fd)
- FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id);
+ if (!nl->group_fd)
+ FATAL("Cannot bind to NFLOG group %d, is it used by another process?",
+ group_id);
- ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
- "Could not set copy mode");
+ // only copy size of ipv4 header + tcp header
+ int recv_size = sizeof(struct iphdr) + sizeof(struct tcphdr);
+ if (nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, recv_size) < 0)
+ FATAL("Could not set copy mode");
// Batch send 128 packets from kernel to userspace
- ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH),
- "Could not set qthresh");
+ if (nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH))
+ FATAL("Could not set qthresh");
}
-void nfl_close_netlink_fd(nfl_nl_t *nl) {
+void collect_close_netlink(Netlink *nl) {
nflog_unbind_group(nl->group_fd);
nflog_close(nl->fd);
}
-void *nfl_collect_worker(void *targs) {
- nfl_state_t *nf = (nfl_state_t *)targs;
- memcpy(&g, nf->global, sizeof(nfl_global_t));
+void *collect_worker(void *targs) {
+ State *s = (State *)targs;
+ memcpy(&g, s->global, sizeof(Global));
- nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf);
- debug("Registering nflog callback");
+ nflog_callback_register(s->netlink_fd->group_fd, &handle_packet, s);
+ DEBUG("Registering nflog callback");
- int fd = nflog_fd(nf->netlink_fd->fd);
- debug("Recv worker #%u: main loop starts", nf->header->id);
+ int fd = nflog_fd(s->netlink_fd->fd);
+ DEBUG("Recv worker #%lu: main loop starts", pthread_self());
// Write start time
- time(&nf->header->start_time);
+ time(&s->header->start_time);
int rv;
// Must have at least 128 for each packet to account for
// sizeof(struct iphdr) + sizeof(struct tcphdr) plus the
// size of meta data needed by the library's data structure.
char buf[128 * NF_NFLOG_QTHRESH + 1];
- while (nf->header->n_entries < nf->header->max_n_entries) {
+ while (s->header->nr_entries < g.max_nr_entries) {
if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) {
- debug("Recv worker #%u: nflog packet received "
+ DEBUG("Recv worker #%lu: packet received "
"(len=%u, #entries=%u)",
- nf->header->id, rv, nf->header->n_entries);
- nflog_handle_packet(nf->netlink_fd->fd, buf, rv);
+ pthread_self(), rv, s->header->nr_entries);
+ nflog_handle_packet(s->netlink_fd->fd, buf, rv);
}
}
- debug("Recv worker #%u: finished, received packets: %u",
- nf->header->id,
- nf->header->max_n_entries);
-
// write end time
- time(&nf->header->end_time);
-
- // write checksum
- nf->header->cksum = nfl_header_cksum(nf->header);
- debug("Recv worker #%u: calculated checksum: %x",
- nf->header->id,
- nf->header->cksum);
-
- // spawn commit thread
- nfl_commit(nf);
-
- pthread_exit(NULL);
-}
+ time(&s->header->end_time);
+ s->header->raw_size = s->header->nr_entries * sizeof(Entry);
-/*
- * Committer
- */
-
-static void nfl_commit(nfl_state_t *nf) {
pthread_t tid;
- pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf);
+ pthread_create(&tid, NULL, commit, (void *)s);
pthread_detach(tid);
+ return NULL;
}
-static void *nfl_start_commit_worker(void *targs) {
- nfl_state_t *nf = (nfl_state_t *)targs;
- const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
- debug("Comm worker #%u: thread started.", nf->header->id);
- /* truncate ? */
- bool truncate = true;
-
- sem_wait(g.nfl_commit_queue);
- debug("Comm worker #%u: commit started.", nf->header->id);
- int ret = nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename);
- debug("Comm worker #%u: commit done.", nf->header->id);
- sem_post(g.nfl_commit_queue);
-
- nfl_state_free(nf);
- free((char *)filename);
-
- pthread_mutex_lock(&nf->has_finished_recv_lock);
- nf->has_finished_recv = true;
- pthread_cond_signal(&nf->has_finished_recv_cond);
- pthread_mutex_unlock(&nf->has_finished_recv_lock);
-
- pthread_exit(ret);
-}
-
-/*
- * State managers
- */
-
-void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
- nfl_global_t *g) {
- assert(nf);
-
- // Check if nf has been allocated
- if (unlikely(*nf == NULL)) {
- *nf = (nfl_state_t *)malloc(sizeof(nfl_state_t));
- (*nf)->global = g;
- (*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t));
- (*nf)->header->id = id;
- (*nf)->header->max_n_entries = entries_max;
- (*nf)->header->compression_opt = g->compression_opt;
-
- (*nf)->has_finished_recv = true;
- pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL);
- pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL);
- }
+void state_init(State **s, Netlink *nl, Global *g) {
+ assert(s);
+ *s = (State *)malloc(sizeof(State));
+ (*s)->global = g;
+ (*s)->netlink_fd = nl;
+ (*s)->header = (Header *)calloc(sizeof(Header), 1);
+ (*s)->header->compression_type = g->compression_type;
- // Ensure trunk with same id in previous run has finished to prevent reusing
- // a trunk which it's still being used. Furthermore, this hopefully
- // alleviate us from bursty network traffic.
- pthread_mutex_lock(&(*nf)->has_finished_recv_lock);
- while (!(*nf)->has_finished_recv)
- pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock);
- (*nf)->has_finished_recv = false;
- pthread_mutex_unlock(&(*nf)->has_finished_recv_lock);
-
- // Don't use calloc here, as it will cause page fault and
- // consume physical memory before we fill the buffer.
- // Instead, fill entries with 0 on the fly, to squeeze
- // more space for compression.
- (*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max);
- (*nf)->header->n_entries = 0;
+ (*s)->store = (Entry *)malloc(sizeof(Entry) * g->max_nr_entries);
+ (*s)->header->nr_entries = 0;
}
-static void nfl_state_free(nfl_state_t *nf) {
- // Free only packet store and leave the rest intact
- free((void *)nf->store);
+void state_free(State *s) {
+ free(s->store);
+ free(s->header);
+ free(s);
}
diff --git a/lib/commit.c b/lib/commit.c
index 22b92ad..45d4d73 100644
--- a/lib/commit.c
+++ b/lib/commit.c
@@ -1,80 +1,80 @@
-#include "commit.h"
-#include <errno.h>
-#include <string.h>
-#include <zstd.h>
-
-static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
- uint32_t written;
- header->raw_size = store_size;
+#include "collect.h"
+#include "main.h"
+#include "sql.h"
- // Write header
- written = fwrite(header, 1, sizeof(nfl_header_t), f);
- WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno));
+#include <zstd.h>
- // Write store
- written = fwrite(store, 1, store_size, f);
- WARN_RETURN(written != store_size, "commit store: %s", strerror(errno));
+static void do_gc(sqlite3 *db, State *s) {
+ uint32_t cur_size = s->header->raw_size;
+ pthread_mutex_lock(&s->global->storage_consumed_lock);
+ uint32_t remain_size =
+ s->global->storage_budget - s->global->storage_consumed - cur_size;
+ uint32_t gc_size = -remain_size + cur_size * g_gc_rate;
+ if (gc_size >= s->global->storage_consumed)
+ gc_size = s->global->storage_consumed;
+ pthread_mutex_unlock(&s->global->storage_consumed_lock);
- return sizeof(nfl_header_t) + store_size;
+ if (remain_size <= 0)
+ db_delete_oldest_bytes(db, gc_size);
}
-static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
+static int commit_lz4(State *s, void **buf) {
/* TODO */
+ (void)s;
+ (void)buf;
return -1;
}
-static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store,
- uint32_t store_size) {
- size_t const bufsize = ZSTD_compressBound(store_size);
- void *buf;
+static int commit_zstd(State *s, void **buf) {
+ size_t const bufsize = ZSTD_compressBound(s->header->raw_size);
+
+ if (!(*buf = malloc(bufsize)))
+ ERROR("zstd: cannot malloc");
- WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc");
- size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1);
+ size_t const csize =
+ ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0);
if (ZSTD_isError(csize)) {
- WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize));
- free(buf);
+ ERROR("zstd: %s \n", ZSTD_getErrorName(csize));
+ free(*buf);
return -1;
}
- int ret = nfl_commit_default(f, header, buf, csize);
- free(buf);
- return ret;
+ s->header->raw_size = csize;
+ return 0;
}
-int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store,
- enum nfl_compression_t compression_opt,
- bool truncate,
- const char *filename) {
- int ret;
- FILE *f;
- const char *mode = truncate ? "wb" : "ab";
+void *commit(void *targs) {
+ sqlite3 *db = NULL;
+ State *s = (State *)targs;
+ uint32_t size = s->header->raw_size;
+ DEBUG("Committing #%d packets", s->header->nr_entries);
- debug("Comm worker #%u: commit to file %s\n", header->id, filename);
- ERR((f = fopen(filename, mode)) == NULL, strerror(errno));
+ db_open(&db, s->global->storage_file);
+ db_create_table(db);
- // commit store
- uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries;
- switch (compression_opt) {
+ void *buf = NULL;
+ switch (s->global->compression_type) {
case COMPRESS_NONE:
- debug("Comm worker #%u: commit without compression\n", header->id);
- ret = nfl_commit_default(f, header, store, store_size);
break;
case COMPRESS_LZ4:
- debug("Comm worker #%u: commit with compression algorithm: lz4", header->id);
- ret = nfl_commit_lz4(f, header, store, store_size);
+ commit_lz4(s, &buf);
break;
case COMPRESS_ZSTD:
- debug("Comm worker #%u: commit with compression algorithm: zstd", header->id);
- ret = nfl_commit_zstd(f, header, store, store_size);
+ commit_zstd(s, &buf);
break;
- // Must not reach here ...
default:
FATAL("Unknown compression option detected");
}
- // Do fsync ?
- fclose(f);
- return ret;
+ do_gc(db, s);
+ db_insert(db, s->header, buf ? buf : s->store);
+ db_close(db);
+
+ DEBUG("Committed #%d packets, compressed size: %u/%u",
+ s->header->nr_entries, s->header->raw_size, size);
+ if (buf)
+ free(buf);
+ state_free(s);
+
+ return NULL;
}
diff --git a/lib/common.c b/lib/common.c
deleted file mode 100644
index 860ef90..0000000
--- a/lib/common.c
+++ /dev/null
@@ -1,128 +0,0 @@
-#include "common.h"
-#include <arpa/inet.h>
-#include <assert.h>
-#include <errno.h>
-#include <limits.h>
-#include <regex.h>
-#include <string.h>
-#include <sys/stat.h>
-#include <unistd.h>
-
-int nfl_check_file(FILE *f) {
- struct stat s;
- assert(f);
- if (fstat(fileno(f), &s) < 0)
- return -errno;
-
- // Ignore file already unlinked
- if (s.st_nlink <= 0)
- return -EIDRM;
-
- return 0;
-}
-
-int nfl_check_dir(const char *storage_dir) {
- struct stat _d;
- if (stat(storage_dir, &_d) != 0 || !S_ISDIR(_d.st_mode)) {
- return -1;
- }
- return 0;
-}
-
-int nfl_storage_match_index(const char *fn) {
- static regex_t regex;
- static bool compiled = false;
- regmatch_t match[2];
- int ret;
-
- if (unlikely(!strcmp(fn, ".") || !strcmp(fn, "..")))
- return -1;
-
- if (!compiled) {
- ERR(regcomp(&regex, "^" STORAGE_PREFIX "_([0-9]+)", REG_EXTENDED),
- "Could not compile regex");
- compiled = true;
- }
-
- ret = regexec(&regex, fn, 2, match, 0);
- if (!ret) {
- assert(match[1].rm_so != (size_t)-1);
- return strtol(fn + match[1].rm_so, NULL, 10);
- } else {
- char buf[100];
- regerror(ret, &regex, buf, sizeof(buf));
- WARN(1, "Regex match failed: %s", buf)
- }
-
- return -1;
-}
-const char *nfl_get_filename(const char *dir, int id) {
- char out[1024];
- sprintf(out, "%s/" STORAGE_PREFIX "_%d", dir, id);
- return strdup(out);
-}
-
-uint32_t nfl_get_filesize(FILE *f) {
- uint32_t size, prepos;
- prepos = ftell(f);
- fseek(f, 0, SEEK_END);
- size = ftell(f);
- fseek(f, prepos, SEEK_SET);
- return size;
-}
-
-uint32_t nfl_header_cksum(nfl_header_t *header) {
- /* simply use a magic number for integrity check */
- return 0x9e37a9b9;
-}
-
-void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt,
- uint32_t *trunk_size) {
- uint32_t pgsize = sysconf(_SC_PAGE_SIZE);
- total_size *= 1024 * 1024; // MiB
-
- assert(trunk_cnt);
- assert(total_size);
-
- *trunk_cnt = CEIL_DIV(total_size, pgsize * TRUNK_SIZE_BY_PAGE);
- if (*trunk_cnt > MAX_TRUNK_ID) {
- *trunk_cnt = MAX_TRUNK_ID;
- *trunk_size = total_size / MAX_TRUNK_ID;
- *trunk_size = (*trunk_size / pgsize) * pgsize; // align with pagesize
- } else {
- *trunk_size = pgsize * TRUNK_SIZE_BY_PAGE;
- }
-}
-
-void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt) {
- assert(entries_cnt);
- *entries_cnt = (trunk_size - sizeof(nfl_header_t)) / sizeof(nfl_entry_t);
-}
-
-void nfl_format_output(char *output, nfl_entry_t *entry) {
- sprintf(output, " "
- "t=%ld\t"
- "daddr=%s\t"
- "proto=%s\t"
- "uid=%d\t"
- "sport=%d\t"
- "dport=%d",
- entry->timestamp, inet_ntoa(entry->daddr),
- entry->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid,
- entry->sport, entry->dport);
-}
-
-int nfl_setup_compression(const char *flag, enum nfl_compression_t *opt) {
- if (flag == NULL) {
- *opt = COMPRESS_NONE;
- } else if (!strcmp(flag, "zstd") || !strcmp(flag, "zstandard")) {
- *opt = COMPRESS_ZSTD;
- } else if (!strcmp(flag, "lz4")) {
- *opt = COMPRESS_LZ4;
- } else {
- fprintf(stderr, "Unknown compression algorithm: %s\n", flag);
- return 0;
- }
-
- return 1;
-}
diff --git a/lib/extract.c b/lib/extract.c
index 8907c26..5687ad2 100644
--- a/lib/extract.c
+++ b/lib/extract.c
@@ -1,128 +1,66 @@
-
-#include "extract.h"
+#include "main.h"
#include <errno.h>
+#include <stdlib.h>
#include <string.h>
#include <time.h>
#define ZSTD_STATIC_LINKING_ONLY // ZSTD_findDecompressedSize
#include <zstd.h>
-static int nfl_extract_default(FILE *f, nfl_state_t *state);
-static int nfl_extract_zstd(FILE *f, nfl_state_t *state);
-static int nfl_extract_lz4(FILE *f, nfl_state_t *state);
-
-static int nfl_verify_header(nfl_header_t *header) {
- if (header->cksum != nfl_header_cksum(header)) {
- debug("Header checksum mismatch: expected: 0x%x, got: 0x%x",
- header->cksum, nfl_header_cksum(header));
- return -1;
- }
-
- if (header->id > MAX_TRUNK_ID)
- return -1;
-
- if (header->max_n_entries < header->n_entries)
- return -1;
-
- time_t now = time(NULL);
- if ((time_t)header->start_time >= now || (time_t)header->end_time >= now ||
- header->start_time > header->end_time)
- return -1;
-
- return 0;
+static bool extract_default(State *s, const void *src) {
+ s->store = malloc(s->header->raw_size);
+ memcpy(s->store, src, s->header->raw_size);
+ return true;
}
-static int nfl_extract_default(FILE *f, nfl_state_t *state) {
- int entries =
- fread(state->store, sizeof(nfl_entry_t), state->header->n_entries, f);
- WARN_RETURN(ferror(f), "%s", strerror(errno));
- return entries;
-}
-
-static int nfl_extract_zstd(FILE *f, nfl_state_t *state) {
- char *buf;
- size_t const compressed_size = nfl_get_filesize(f) - sizeof(nfl_header_t),
- expected_decom_size =
- state->header->n_entries * sizeof(nfl_entry_t);
-
- // It's possible that data or header is not written due to broken commit
- WARN_RETURN(compressed_size <= 0, "%s", "zstd: no data in this trunk");
+static bool extract_zstd(State *s, const void *src) {
+ assert(src);
+ size_t const expected_decom_size = s->header->nr_entries * sizeof(Entry);
- WARN_RETURN(!(buf = malloc(compressed_size)), "zstd: cannot malloc");
- WARN_RETURN(!fread(buf, compressed_size, 1, f), "zstd: broken data section");
- WARN_RETURN(ferror(f), "%s", strerror(errno));
-
- size_t const estimate_decom_size =
- ZSTD_findDecompressedSize(buf, compressed_size);
- if (estimate_decom_size == ZSTD_CONTENTSIZE_ERROR)
+ size_t const r = ZSTD_findDecompressedSize(src, s->header->raw_size);
+ if (r == ZSTD_CONTENTSIZE_ERROR)
FATAL("zstd: file was not compressed by zstd.\n");
- else if (estimate_decom_size == ZSTD_CONTENTSIZE_UNKNOWN)
+ else if (r == ZSTD_CONTENTSIZE_UNKNOWN)
FATAL(
"zstd: original size unknown. Use streaming decompression instead");
+ if (r != expected_decom_size) {
+ WARN("zstd: expected decompressed size: %ld, got: %ld, skipping "
+ "decompression",
+ expected_decom_size, r);
+ return false;
+ }
+
+ s->store = malloc(expected_decom_size);
size_t const actual_decom_size = ZSTD_decompress(
- state->store, expected_decom_size, buf, compressed_size);
+ s->store, expected_decom_size, src, s->header->raw_size);
if (actual_decom_size != expected_decom_size) {
FATAL("zstd: error decoding current file: %s \n",
ZSTD_getErrorName(actual_decom_size));
}
- free(buf);
- return actual_decom_size / sizeof(nfl_entry_t);
+ return true;
}
-static int nfl_extract_lz4(FILE *f, nfl_state_t *state) {
- /* TODO */
- return 0;
+static bool extract_lz4(State *s, const void *src) {
+ // TODO
+ (void)s; (void)src;
+ return true;
}
-int nfl_extract_worker(const char *filename, nfl_state_t *state, const time_range_t *range) {
- FILE *f;
- int got = 0, ret = 0;
- nfl_header_t *h;
-
- debug("Extracting from file %s", filename);
- ERR((f = fopen(filename, "rb")) == NULL, "extract worker");
- ERR(nfl_check_file(f) < 0, "extract worker");
-
- // Read header
- ERR(!(state->header = malloc(sizeof(nfl_header_t))),
- "extract malloc header");
- got = fread(state->header, sizeof(nfl_header_t), 1, f);
- h = state->header;
-
- if(h->end_time < range->from || h->start_time > range->until)
- return 0;
-
- // Check header validity
- WARN_RETURN(ferror(f), "%s", strerror(errno));
- WARN_RETURN(got == 0 || nfl_verify_header(h) < 0,
- "File %s has corrupted header.", filename);
-
- // Read body
- ERR(!(state->store = malloc(sizeof(nfl_entry_t) * h->n_entries)),
- "extract malloc store");
- switch (h->compression_opt) {
+bool extract(State *s, const void *src) {
+ switch (s->header->compression_type) {
case COMPRESS_NONE:
- debug("Extract worker #%u: extract without compression\n", h->id);
- ret = nfl_extract_default(f, state);
- break;
+ DEBUG("extract: extract without compression\n");
+ return extract_default(s, src);
case COMPRESS_LZ4:
- debug("Extract worker #%u: extract with compression algorithm: lz4",
- h->id);
- ret = nfl_extract_lz4(f, state);
- break;
+ DEBUG("extract: extract with compression algorithm: lz4");
+ return extract_lz4(s, src);
case COMPRESS_ZSTD:
- debug("Extract worker #%u: extract with compression algorithm: zstd",
- h->id);
- ret = nfl_extract_zstd(f, state);
- break;
+ DEBUG("extract: extract with compression algorithm: zstd");
+ return extract_zstd(s, src);
// Must not reach here ...
default:
FATAL("Unknown compression option detected");
}
-
- fclose(f);
-
- return ret;
}
diff --git a/lib/sql.c b/lib/sql.c
new file mode 100644
index 0000000..8ac4dbd
--- /dev/null
+++ b/lib/sql.c
@@ -0,0 +1,276 @@
+#include "sql.h"
+#include "collect.h"
+#include "extract.h"
+#include "util.h"
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+static inline int _db_handle_result(sqlite3 *db, int rc, const char *errmsg,
+ const char *err, bool fatal) {
+ if (SQLITE_OK != rc) {
+ ERROR("sqlite3: %s(%i): %s\n", errmsg ? errmsg : "", rc,
+ err ? err : sqlite3_errmsg(db));
+ if (fatal) {
+ sqlite3_close(db);
+ exit(1);
+ }
+ }
+ return rc;
+}
+
+static inline int _db_exec(sqlite3 *db, const char *cmd, const char *errmsg,
+ bool fatal) {
+ char *err = NULL;
+ int rc = sqlite3_exec(db, cmd, NULL, NULL, &err);
+ return _db_handle_result(db, rc, errmsg, err, fatal);
+}
+
+static inline int db_exec_fatal(sqlite3 *db, const char *cmd,
+ const char *errmsg) {
+ return _db_exec(db, cmd, errmsg, true);
+}
+
+static inline int db_exec(sqlite3 *db, const char *cmd, const char *errmsg) {
+ return _db_exec(db, cmd, errmsg, false);
+}
+
+static inline int db_prepare(sqlite3 *db, const char *cmd, const char *errmsg,
+ sqlite3_stmt **stmt) {
+ int rc = sqlite3_prepare_v2(db, cmd, -1, stmt, 0);
+ return _db_handle_result(db, rc, errmsg, NULL, true);
+}
+
+int db_set_pragma(sqlite3 *db) {
+ return db_exec_fatal(db,
+ "PRAGMA journal_mode=WAL;"
+ "PRAGMA foreign_keys = ON;",
+ "Can't set Sqlite3 PRAGMA");
+}
+
+int db_vacuum(sqlite3 *db) {
+ return db_exec(db, "VACUUM", "Can't vacuum database");
+}
+
+int db_create_table(sqlite3 *db) {
+ const char *create_sql =
+ "CREATE TABLE IF NOT EXISTS " g_sqlite_table_data " ("
+ "id INTEGER PRIMARY KEY,"
+ "data BLOB"
+ ");"
+ "CREATE TABLE IF NOT EXISTS " g_sqlite_table_header " ("
+ "id INTEGER PRIMARY KEY,"
+ "nr_entries INTEGER,"
+ "size INTEGER,"
+ "compression_type INTEGER,"
+ "start_time INTEGER,"
+ "end_time INTEGER,"
+ "data_id INTEGER,"
+ "FOREIGN KEY(data_id) REFERENCES " g_sqlite_table_data
+ "(id) ON DELETE SET NULL"
+ ");";
+ int rc = 0, retry = g_sqlite_nr_fail_retry;
+ while (retry--) {
+ rc = db_exec(db, create_sql, "Can't create table");
+ if (SQLITE_LOCKED != rc && SQLITE_BUSY != rc)
+ return rc;
+ sleep(1);
+ }
+
+ ERROR("Can't create table, reach max retry, bailed out!");
+ exit(1);
+}
+
+int db_open(sqlite3 **db, const char *dbname) {
+ int rc;
+ rc = sqlite3_open(dbname, db);
+ if (SQLITE_OK != rc) {
+ ERROR("Can't open database %s (%i): %s\n", dbname, rc,
+ sqlite3_errmsg(*db));
+ exit(1);
+ }
+
+ return db_set_pragma(*db);
+}
+
+int db_close(sqlite3 *db) {
+ sqlite3_close(db);
+ return 0;
+}
+
+int db_insert(sqlite3 *db, const Header *header, const Entry *entries) {
+ int rc;
+ sqlite3_stmt *stmt[2] = {0};
+ const char *insert_sql[] = {
+ "INSERT INTO " g_sqlite_table_data " (data) VALUES(?)",
+ "INSERT INTO " g_sqlite_table_header " "
+ "(nr_entries, size, compression_type, start_time, end_time, data_id) "
+ "VALUES(?, ?, ?, ?, ?, ?)"};
+
+ db_exec_fatal(db, "BEGIN TRANSACTION", "db_insert: Can't begin txn");
+ for (int i = 0; i < 2;) {
+ rc = db_prepare(db, insert_sql[i], "Can't insert data", &stmt[i]);
+ if (i == 0) {
+ printf("Inserting raw data: %02X:%02X", ((char *)entries)[0],
+ ((char *)entries)[1]);
+ sqlite3_bind_blob(stmt[i], 1, entries, header->raw_size,
+ SQLITE_STATIC);
+ } else {
+ sqlite3_int64 data_id = sqlite3_last_insert_rowid(db);
+ sqlite3_bind_int(stmt[i], 1, header->nr_entries);
+ sqlite3_bind_int(stmt[i], 2, header->raw_size);
+ sqlite3_bind_int(stmt[i], 3, header->compression_type);
+ sqlite3_bind_int64(stmt[i], 4, header->start_time);
+ sqlite3_bind_int64(stmt[i], 5, header->end_time);
+ sqlite3_bind_int64(stmt[i], 6, data_id);
+ }
+
+ rc = sqlite3_step(stmt[i]);
+ if (rc != SQLITE_DONE)
+ WARN("sqlite3: Insert data step fail: %d\n", rc);
+
+ if (SQLITE_SCHEMA == sqlite3_finalize(stmt[i]))
+ continue;
+ i++;
+ }
+
+ DEBUG("Inserted #%d of compressed size %d", header->nr_entries,
+ header->raw_size);
+ db_exec_fatal(db, "END TRANSACTION", "db_insert: Can't end txn");
+ return rc;
+}
+
+int db_read_data_by_timerange(sqlite3 *db, const Timerange *t,
+ StateCallback cb) {
+ const char *_select_sql =
+ "SELECT * FROM " g_sqlite_table_header
+ " INNER JOIN " g_sqlite_table_data " ON " g_sqlite_table_header
+ ".data_id = " g_sqlite_table_data ".id"
+ " WHERE " g_sqlite_table_header
+ ".end_time > %ld AND " g_sqlite_table_header ".start_time < %ld";
+ char select_sql[strlen(_select_sql) + 25];
+ sprintf(select_sql, _select_sql, t->from, t->until);
+
+ sqlite3_stmt *stmt;
+ db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn");
+ int rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0);
+ if (rc != SQLITE_OK) {
+ ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db));
+ sqlite3_close(db);
+ exit(1);
+ }
+
+ int count = 0;
+ for (;; count++) {
+ rc = sqlite3_step(stmt);
+ if (rc == SQLITE_DONE)
+ break;
+ assert(rc == SQLITE_ROW);
+
+ State *s = malloc(sizeof(State));
+ s->header = malloc(sizeof(Header));
+ s->header->nr_entries = sqlite3_column_int(stmt, 1);
+ s->header->raw_size = sqlite3_column_int(stmt, 2);
+ s->header->compression_type = sqlite3_column_int(stmt, 3);
+
+ size_t size = sqlite3_column_bytes(stmt, 8);
+ DEBUG("extract: nr_entries: %d "
+ "raw_size: %d "
+ "compression_type: %d "
+ "size: %ld",
+ s->header->nr_entries, s->header->raw_size,
+ s->header->compression_type, size);
+ if (size != (size_t)s->header->raw_size)
+ FATAL("extract: header data size and actual size not match: "
+ "expected: %u, got: %ld",
+ s->header->raw_size, size);
+
+ bool ok = extract(s, sqlite3_column_blob(stmt, 8));
+ if (ok)
+ cb(s, t);
+ state_free(s);
+ }
+
+ assert(SQLITE_SCHEMA != sqlite3_finalize(stmt));
+ db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn");
+
+ return count;
+}
+
+int db_get_space_consumed(sqlite3 *db) {
+ const char *select_data_sql = "SELECT SUM(size) FROM " g_sqlite_table_data;
+ const char *select_header_sql =
+ "SELECT COUNT(*) FROM " g_sqlite_table_header;
+
+ int size;
+ sqlite3_stmt *stmt = NULL;
+ db_prepare(db, select_data_sql, "Can't query data", &stmt);
+ size = sqlite3_column_int64(stmt, 1);
+
+ db_prepare(db, select_header_sql, "Can't query data", &stmt);
+ size += sizeof(Header) * sqlite3_column_int64(stmt, 1);
+ return size;
+}
+
+int db_delete_oldest_bytes(sqlite3 *db, int64_t bytes) {
+ int rc;
+ sqlite3_stmt *stmt;
+ const char *select_sql =
+ "SELECT size, end_time, data_id "
+ "FROM " g_sqlite_table_header " WHERE data_id IS NOT NULL "
+ "ORDER BY end_time";
+ if (!bytes)
+ return 0;
+
+ db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn");
+ rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0);
+ if (rc != SQLITE_OK) {
+ ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db));
+ sqlite3_close(db);
+ exit(1);
+ }
+
+ int count = 0;
+ size_t bufsize = 1024;
+ char *buf = malloc(bufsize);
+
+ while (bytes >= 0) {
+ rc = sqlite3_step(stmt);
+ if (rc == SQLITE_DONE)
+ break;
+ assert(rc == SQLITE_ROW);
+ sqlite3_int64 index = sqlite3_column_int64(stmt, 2);
+ int size = sqlite3_column_int(stmt, 0);
+
+ char _buf[22];
+ sprintf(_buf, count ? "%lld" : ",%lld", index);
+ while (strlen(_buf) + strlen(buf) + 2 >= bufsize) {
+ bufsize *= 2;
+ char *__buf = malloc(bufsize);
+ memcpy(__buf, buf, strlen(buf) + 1);
+ free(buf);
+ buf = __buf;
+ }
+
+ strcat(buf, _buf);
+ bytes -= size;
+ count++;
+ }
+
+ if (!*buf)
+ return 0;
+
+ const char *_delete_sql =
+ "DELETE FROM " g_sqlite_table_data " WHERE id in (%s);";
+ char *delete_sql = malloc(strlen(_delete_sql) + strlen(buf) + 1);
+ sprintf(delete_sql, _delete_sql, buf);
+ db_exec_fatal(db, delete_sql, "Can't delete");
+ DEBUG("Deleted old data, SQL: %s", delete_sql);
+ free(delete_sql);
+ free(buf);
+
+ assert(SQLITE_SCHEMA != sqlite3_finalize(stmt));
+ db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn");
+
+ return count;
+}
diff --git a/lib/util.c b/lib/util.c
new file mode 100644
index 0000000..e80954b
--- /dev/null
+++ b/lib/util.c
@@ -0,0 +1,34 @@
+
+#include "main.h"
+#include <libgen.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <unistd.h>
+int check_file_exist(const char *storage) {
+ return access(storage, F_OK) != -1;
+}
+
+int check_basedir_exist(const char *storage) {
+ char *_storage = strdup(storage);
+ char *basedir = dirname(_storage);
+ free(_storage);
+
+ struct stat d;
+ if (stat(basedir, &d) != 0 || !S_ISDIR(d.st_mode)) {
+ return -1;
+ }
+ return 0;
+}
+
+enum CompressionType get_compression(const char *flag) {
+ if (flag == NULL) {
+ return COMPRESS_NONE;
+ } else if (!strcmp(flag, "zstd") || !strcmp(flag, "zstandard")) {
+ return COMPRESS_ZSTD;
+ } else if (!strcmp(flag, "lz4")) {
+ return COMPRESS_LZ4;
+ } else {
+ fprintf(stderr, "Unknown compression algorithm: %s\n", flag);
+ return 0;
+ }
+}