diff options
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | bin/nfcollect.c | 165 | ||||
-rw-r--r-- | bin/nfextract.c | 141 | ||||
-rw-r--r-- | configure.ac | 9 | ||||
-rw-r--r-- | include/collect.h | 17 | ||||
-rw-r--r-- | include/commit.h | 13 | ||||
-rw-r--r-- | include/common.h | 14 | ||||
-rw-r--r-- | include/extract.h | 9 | ||||
-rw-r--r-- | include/main.h | 154 | ||||
-rw-r--r-- | include/sql.h | 18 | ||||
-rw-r--r-- | include/util.h | 8 | ||||
-rw-r--r-- | lib/collect.c | 188 | ||||
-rw-r--r-- | lib/commit.c | 102 | ||||
-rw-r--r-- | lib/common.c | 128 | ||||
-rw-r--r-- | lib/extract.c | 130 | ||||
-rw-r--r-- | lib/sql.c | 276 | ||||
-rw-r--r-- | lib/util.c | 34 |
17 files changed, 680 insertions, 730 deletions
diff --git a/Makefile.am b/Makefile.am index cf6dd98..cd4cb4f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -4,5 +4,5 @@ AM_CFLAGS = \ -I$(top_srcdir)/include \ -Werror -Wall -Wno-address-of-packed-member -nfcollect_SOURCES = lib/common.c lib/commit.c lib/collect.c bin/nfcollect.c -nfextract_SOURCES = lib/common.c lib/extract.c bin/nfextract.c +nfcollect_SOURCES = lib/util.c lib/sql.c lib/extract.c lib/commit.c lib/collect.c bin/nfcollect.c +nfextract_SOURCES = lib/util.c lib/sql.c lib/extract.c lib/commit.c lib/collect.c bin/nfextract.c diff --git a/bin/nfcollect.c b/bin/nfcollect.c index 834819f..35d1c34 100644 --- a/bin/nfcollect.c +++ b/bin/nfcollect.c @@ -24,8 +24,7 @@ // SOFTWARE. #include "collect.h" -#include "commit.h" -#include "common.h" +#include "util.h" #include <dirent.h> #include <fcntl.h> #include <getopt.h> @@ -33,9 +32,9 @@ #include <signal.h> #include <stdint.h> #include <stdio.h> +#include <string.h> #include <sys/stat.h> #include <sys/types.h> -#include <string.h> #include <unistd.h> const char *help_text = @@ -44,43 +43,32 @@ const char *help_text = "Options:\n" " -c --compression=<algo> compression algorithm to use (default: no " "compression)\n" - " -d --storage_dir=<dirname> log files storage directory\n" + " -d --storage_file=<filename> sqlite database storage file\n" " -h --help print this help\n" " -g --nflog-group=<id> the group id to collect\n" - " -p --parallelism=<num> max number of committer thread\n" - " -t --truncate whether or not to truncate existing trunks" - " (default: no)\n" " -s --storage_size=<dirsize> log files maximum total size in MiB\n" " -v --version print version information\n" "\n"; -static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size); -static nfl_nl_t netlink_fd; - +static Netlink netlink_fd; static void sig_handler(int signo) { if (signo == SIGHUP) { puts("Terminated due to SIGHUP ..."); - nfl_close_netlink_fd(&netlink_fd); + collect_close_netlink(&netlink_fd); } } int main(int argc, char *argv[]) { - uint32_t max_commit_worker = 0, storage_size = 0; - uint32_t trunk_cnt = 0, trunk_size = 0; - uint32_t entries_max, cur_trunk; - bool truncate_trunks = false; - - nfl_global_t g; - int nfl_group_id = -1; - char *compression_flag = NULL, *storage_dir = NULL; + uint32_t storage_size = 0; + Global g; + int nflog_group_id = -1; + char *compression_flag = NULL, *storage = NULL; struct option longopts[] = {/* name, has_args, flag, val */ - {"nflog-group", required_argument, NULL, 'g'}, - {"storage_dir", required_argument, NULL, 'd'}, + {"nflog_group", required_argument, NULL, 'g'}, + {"storage", required_argument, NULL, 'd'}, {"storage_size", required_argument, NULL, 's'}, {"compression", optional_argument, NULL, 'z'}, - {"parallelism", optional_argument, NULL, 'p'}, - {"truncate", no_argument, NULL, 't'}, {"help", no_argument, NULL, 'h'}, {"version", no_argument, NULL, 'v'}, {0, 0, 0, 0}}; @@ -97,20 +85,14 @@ int main(int argc, char *argv[]) { printf("%s %s", PACKAGE, VERSION); exit(0); break; - case 't': - truncate_trunks = true; - break; case 'c': compression_flag = optarg; break; case 'd': - storage_dir = strdup(optarg); + storage = strdup(optarg); break; case 'g': - nfl_group_id = atoi(optarg); - break; - case 'p': - max_commit_worker = atoi(optarg); + nflog_group_id = atoi(optarg); break; case 's': storage_size = atoi(optarg); @@ -122,116 +104,39 @@ int main(int argc, char *argv[]) { } // verify arguments - ASSERT(nfl_group_id != -1, + ASSERT(nflog_group_id != -1, "You must provide a nflog group (see --help)!\n"); - ASSERT(storage_dir != NULL, - "You must provide a storage directory (see --help)\n"); + ASSERT(storage != NULL, "You must provide a storage file (see --help)\n"); ASSERT(storage_size != 0, "You must provide the desired size of log file " "(in MiB) (see --help)\n"); - ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist"); - - // max number of commit worker defaults to #processor - 1 - if (max_commit_worker == 0) { - max_commit_worker = sysconf(_SC_NPROCESSORS_ONLN) - 1; - max_commit_worker = max_commit_worker > 0 ? max_commit_worker : 1; - } - - g.storage_dir = storage_dir; + g.compression_type = get_compression(compression_flag); + if (check_basedir_exist(storage) < 0) + FATAL("Storage directory does not exist"); // register signal handler - ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler"); + if (signal(SIGHUP, sig_handler) == SIG_ERR) + ERROR("Could not set SIGHUP handler"); - nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size); - nfl_cal_entries(trunk_size, &entries_max); - nfl_setup_compression(compression_flag, &g.compression_opt); + pthread_mutex_init(&g.storage_consumed_lock, NULL); + g.storage_budget = storage_size * 1024 * 1024; // MB + g.storage_consumed = 0; + g.storage_file = (const char *)storage; + g.max_nr_entries = g_max_nr_entries_default; - // Set up commit worker - g.nfl_commit_queue = malloc(sizeof(sem_t)); - sem_init(g.nfl_commit_queue, 0, max_commit_worker); + collect_open_netlink(&netlink_fd, nflog_group_id); - // Calculate storage consumed - pthread_mutex_init(&g.nfl_storage_consumed_lock, NULL); - g.nfl_storage_consumed = 0; - - // Set up nflog receiver worker - nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *)); - - info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir, + pthread_t worker; + State *state; + INFO(PACKAGE ": storing in file '%s', capped by %d MiB", g.storage_file, storage_size); - info(PACKAGE ": workers started, entries per trunk = %d, #trunks = %d", - entries_max, trunk_cnt); - - calculate_starting_trunk(storage_dir, &cur_trunk, &g.nfl_storage_consumed); - if (truncate_trunks) { - cur_trunk = 0; - info(PACKAGE ": requested to truncate (overwrite) trunks in %s", - storage_dir); - } else { - cur_trunk = cur_trunk < 0 ? 0: NEXT(cur_trunk, trunk_cnt); - const char *fn = nfl_get_filename(storage_dir, cur_trunk); - info(PACKAGE ": will start writing to trunk %s and onward", fn); - free((char *)fn); - } + INFO(PACKAGE ": workers started, entries per block = %d", g.max_nr_entries); - nfl_open_netlink_fd(&netlink_fd, nfl_group_id); - for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) { - debug("Running receiver worker: id = %d", cur_trunk); - nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g); - trunks[cur_trunk]->netlink_fd = &netlink_fd; - - pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker, - (void *)trunks[cur_trunk]); - // wait for current receiver worker - pthread_join(trunks[cur_trunk]->thread, NULL); - } - - // Won't reach here - // We don't actually free trunks or the semaphore at all - sem_destroy(g.nfl_commit_queue); - nfl_close_netlink_fd(&netlink_fd); - xit(0); - uint32_t start_trunk; -} - -/* - * traverse_storage_dir does 2 things: - * 1. Find starting trunk - * Find the trunk to start with after a restart - * We choose the one with newest modification time. - * If no existing trunk is found, set to -1 - * 2. Sum storage size consumed by adding up stored sizes. - */ -static void traverse_storage_dir(const char *storage_dir, uint32_t *starting_trunk, uint32_t *storage_size) { - DIR *dp; - struct stat stat; - struct dirent *ep; - time_t newest = (time_t)0; - uint32_t newest_index = -1, _storage_size; - int index; - char cwd[100]; - - ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory"); - - ERR(!getcwd(cwd, sizeof(cwd)), "getcwd"); - ERR(chdir(storage_dir) < 0, "chdir"); - - while ((ep = readdir(dp))) { - const char *fn = ep->d_name; - index = nfl_storage_match_index(fn); - if (index >= 0 && index < MAX_TRUNK_ID) { - ERR(lstat(fn, &stat) < 0, fn); - if (difftime(stat.st_mtime, newest) > 0) { - newest = stat.st_mtime; - _storage_size = (uint32_t)index; - } - - *storage_size += stat.st_size - } + while (true) { + state_init(&state, &netlink_fd, &g); + pthread_create(&worker, NULL, collect_worker, (void *)state); + pthread_join(worker, NULL); } - closedir(dp); - ERR(chdir(cwd) < 0, "chdir"); - *starting_trunk = newest_index; - *storage_size = _storage_size; + collect_close_netlink(&netlink_fd); } diff --git a/bin/nfextract.c b/bin/nfextract.c index fac52ae..23a3ea9 100644 --- a/bin/nfextract.c +++ b/bin/nfextract.c @@ -1,7 +1,7 @@ // The MIT License (MIT) -// Copyright (c) 2017 Yun-Chih Chen +// Copyright (c) 2018 Yun-Chih Chen // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -22,8 +22,11 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -#include "common.h" #include "extract.h" +#include "main.h" +#include "sql.h" +#include "util.h" + #include <dirent.h> #include <fcntl.h> #include <getopt.h> @@ -34,7 +37,6 @@ #include <string.h> #include <sys/stat.h> #include <sys/types.h> -#include <string.h> #include <time.h> #include <unistd.h> @@ -51,11 +53,13 @@ const char *help_text = "Usage: " PROG " [OPTION]\n" "\n" "Options:\n" - " -d --storage_dir=<dirname> log files storage directory\n" + " -d --storage=<dirname> sqlite storage file\n" " -h --help print this help\n" " -v --version print version information\n" - " -s --since start showing entries on or newer than the specified date (format: " DATE_FORMAT_HUMAN ")\n" - " -u --until stop showing entries on or older than the specified date (format: " DATE_FORMAT_HUMAN ")\n" + " -s --since start showing entries on or newer than the " + "specified date (format: " DATE_FORMAT_HUMAN ")\n" + " -u --until stop showing entries on or older than the " + "specified date (format: " DATE_FORMAT_HUMAN ")\n" "\n"; void sig_handler(int signo) { @@ -63,91 +67,76 @@ void sig_handler(int signo) { puts("Terminated due to SIGHUP ..."); } -static void extract_each(const char *storage_dir, const char *filename, const time_range_t *range) { - nfl_state_t trunk; +static inline void format_entry(char *output, Entry *e) { + sprintf(output, + " " + "t=%ld\t" + "daddr=%s\t" + "proto=%s\t" + "uid=%d\t" + "sport=%d\t" + "dport=%d", + e->timestamp, inet_ntoa(e->daddr), + e->protocol == IPPROTO_TCP ? "TCP" : "UDP", e->uid, e->sport, + e->dport); +} - // Build full path - char *fullpath = malloc(strlen(storage_dir) + strlen(filename) + 2); - sprintf(fullpath, "%s/%s", storage_dir, filename); +static void callback(const State *s, const Timerange *range) { + int nr_entries = s->header->nr_entries; - debug("Extracting storage file: %s", fullpath); - int entries = nfl_extract_worker(fullpath, &trunk, range); - free(fullpath); + DEBUG("callback: extracting %d entries", nr_entries); int i = 0; - while(i < entries && trunk.store[i].timestamp < range->from) + while (i < nr_entries && s->store[i].timestamp < range->from) i++; char output[1024]; - while(i < entries && trunk.store[i].timestamp < range->until) { - nfl_format_output(output, &trunk.store[i]); + while (i < nr_entries && s->store[i].timestamp < range->until) { + format_entry(output, &s->store[i]); puts((char *)output); ++i; } } -static void extract_all(const char *storage_dir, const time_range_t *range) { - DIR *dp; - struct dirent *ep; - int i, index, max_index = -1; - char *trunk_files[MAX_TRUNK_ID]; - memset(trunk_files, 0, sizeof(trunk_files)); - - ERR(!(dp = opendir(storage_dir)), "Can't open the storage directory"); - while ((ep = readdir(dp))) { - index = nfl_storage_match_index(ep->d_name); - if (index >= 0) { - debug("Storage file %s matches with index %d", ep->d_name, index); - if (index >= MAX_TRUNK_ID) { - WARN(1, "Storage trunk file index " - "out of predefined range: %s", - ep->d_name); - return; - } else { - trunk_files[index] = strdup(ep->d_name); - if (index > max_index) - max_index = index; - } - } - } - - closedir(dp); - - for (i = 0; i <= max_index; ++i) { - if (trunk_files[i]) - extract_each(storage_dir, trunk_files[i], range); - free(trunk_files[i]); - } +static void extract_all(const char *storage, const Timerange *range) { + sqlite3 *db = NULL; + db_open(&db, storage); + db_read_data_by_timerange(db, range, callback); + db_close(db); } static time_t parse_date_string(time_t default_t, const char *date) { struct tm parsed; char *ret; - if(!date) return default_t; + if (!date) + return default_t; -#define PARSE(FORMAT) \ - ret = strptime(date, FORMAT, &parsed); \ - if(ret && !*ret) return mktime(&parsed); \ +#define PARSE(FORMAT) \ + ret = strptime(date, FORMAT, &parsed); \ + if (ret && !*ret) \ + return mktime(&parsed); PARSE(DATE_FORMAT); PARSE(DATE_FORMAT_FULL); PARSE(DATE_FORMAT_FULL2); - FATAL("Wrong date format: expected: \"" DATE_FORMAT_HUMAN "\", got: \"%s\"", date); + FATAL("Wrong date format: expected: \"" DATE_FORMAT_HUMAN "\", got: \"%s\"", + date); return -1; } -static void populate_date_range(time_range_t *range, const char *since, const char *until) { - range->from = parse_date_string(0, since); + +static void populate_date_range(Timerange *range, const char *since, + const char *until) { + range->from = parse_date_string(0, since); range->until = parse_date_string(time(NULL), until); } int main(int argc, char *argv[]) { - char *storage_dir = NULL; + char *storage = NULL; char *date_since_str = NULL, *date_until_str = NULL; - time_range_t date_range; + Timerange date_range; - struct option longopts[] = {/* name, has_args, flag, val */ - {"storage_dir", required_argument, NULL, 'd'}, + struct option longopts[] = {{"storage_file", required_argument, NULL, 'd'}, {"since", optional_argument, NULL, 's'}, {"until", optional_argument, NULL, 'u'}, {"help", no_argument, NULL, 'h'}, @@ -155,7 +144,7 @@ int main(int argc, char *argv[]) { {0, 0, 0, 0}}; int opt; - while ((opt = getopt_long(argc, argv, "d:hv", longopts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "d:s:u:hv", longopts, NULL)) != -1) { switch (opt) { case 'h': printf("%s", help_text); @@ -166,37 +155,41 @@ int main(int argc, char *argv[]) { exit(0); break; case 'd': - if(!optarg) FATAL("Expected: --storage_dir=[PATH]"); - storage_dir = strdup(optarg); + if (!optarg) + FATAL("Expected: --storage_file=[PATH]"); + storage = strdup(optarg); break; case 's': - if(!optarg) FATAL("Expected: --since=\"" DATE_FORMAT_HUMAN "\""); + if (!optarg) + FATAL("Expected: --since=\"" DATE_FORMAT_HUMAN "\""); date_since_str = strdup(optarg); break; case 'u': - if(!optarg) FATAL("Expected: --until=\"" DATE_FORMAT_HUMAN "\""); + if (!optarg) + FATAL("Expected: --until=\"" DATE_FORMAT_HUMAN "\""); date_until_str = strdup(optarg); break; case '?': - fprintf(stderr, "Unknown argument, see --help"); - exit(1); + FATAL("Unknown argument, see --help"); } } // verify arguments - ASSERT(storage_dir != NULL, + ASSERT(storage != NULL, "You must provide a storage directory (see --help)"); - ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist"); + if (check_file_exist(storage) < 0) + ERROR("storage file not exist"); - // register signal handler - ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler"); + if (signal(SIGHUP, sig_handler) == SIG_ERR) + ERROR("Could not set SIGHUP handler"); populate_date_range(&date_range, date_since_str, date_until_str); - free(date_since_str); free(date_until_str); + free(date_since_str); + free(date_until_str); - extract_all(storage_dir, &date_range); - free(storage_dir); + extract_all(storage, &date_range); + free(storage); return 0; } diff --git a/configure.ac b/configure.ac index ce27d08..bf6aba8 100644 --- a/configure.ac +++ b/configure.ac @@ -12,7 +12,7 @@ AC_CONFIG_AUX_DIR([build-aux]) # ChangeLog, COPYING, AUTHORS, INSTALL, README etc. files. AM_INIT_AUTOMAKE([-Wall -Werror foreign dist-xz subdir-objects]) -AM_SILENT_RULES([yes]) +#AM_SILENT_RULES([yes]) AC_CONFIG_SRCDIR([bin/nfcollect.c]) AC_PROG_CC([clang]) @@ -28,7 +28,9 @@ esac],[debug=false]) if test x"$debug" = x"true"; then AC_DEFINE(DEBUG, 1, [debug]) fi -AC_DEFINE(_XOPEN_SOURCE, 700) + +AC_DEFINE(_XOPEN_SOURCE, 700) # strptime +AC_DEFINE(_POSIX_C_SOURCE, 200809) # strdup AC_CHECK_HEADERS(libnetfilter_log/libnetfilter_log.h) AC_SEARCH_LIBS(nflog_open, netfilter_log) @@ -36,6 +38,9 @@ AC_SEARCH_LIBS(nflog_open, netfilter_log) AC_CHECK_HEADERS(pthread.h) AC_SEARCH_LIBS(pthread_create, pthread) +AC_CHECK_HEADERS(sqlite3.h) +AC_SEARCH_LIBS(sqlite3_exec, sqlite3) + AC_CHECK_HEADERS(zstd.h) AC_SEARCH_LIBS(ZSTD_compress, zstd) diff --git a/include/collect.h b/include/collect.h index 1af0506..3e438cf 100644 --- a/include/collect.h +++ b/include/collect.h @@ -1,9 +1,12 @@ -#pragma once + +#ifndef _COLLECT_H +#define _COLLECT_H #include "main.h" -void *nfl_collect_worker(void *targs); -void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max, - nfl_global_t *g); -void nfl_state_free(nfl_state_t *nf); -void nfl_open_netlink_fd(nfl_nl_t *nf, uint16_t group_id); -void nfl_close_netlink_fd(nfl_nl_t *nf); +void collect_open_netlink(Netlink *nl, uint16_t group_id); +void collect_close_netlink(Netlink *nl); +void *collect_worker(void *targs); +void state_init(State **s, Netlink *nl, Global *g); +void state_free(State *s); + +#endif // _COLLECT_H diff --git a/include/commit.h b/include/commit.h index 2e16571..47a3099 100644 --- a/include/commit.h +++ b/include/commit.h @@ -1,11 +1,6 @@ -#ifndef _COMMIT_H -#define _COMMIT_H +#ifndef COMMIT_H +#define COMMIT_H -#include "common.h" -void nfl_commit_init(); -int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, - enum nfl_compression_t compression_opt, - bool truncate, - const char *filename); +void *commit(void *targs); -#endif +#endif // COMMIT_H diff --git a/include/common.h b/include/common.h deleted file mode 100644 index 893bd22..0000000 --- a/include/common.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "main.h" -int nfl_check_file(FILE *f); -int nfl_check_dir(const char *storage_dir); -int nfl_storage_match_index(const char *fn); -const char *nfl_get_filename(const char *dir, int id); -uint32_t nfl_get_filesize(FILE *f); -uint32_t nfl_header_cksum(nfl_header_t *header); -void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, - uint32_t *trunk_size); -void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt); -void nfl_format_output(char *output, nfl_entry_t *entry); -int nfl_setup_compression(const char *flag, enum nfl_compression_t *opt); diff --git a/include/extract.h b/include/extract.h index da7c7bf..afc0b64 100644 --- a/include/extract.h +++ b/include/extract.h @@ -1,4 +1,7 @@ -#pragma once +#ifndef _EXTRACT_H +#define _EXTRACT_H -#include "common.h" -int nfl_extract_worker(const char *filename, nfl_state_t *state, const time_range_t *range); +#include "main.h" +bool extract(State *s, const void *src); + +#endif // _EXTRACT_H diff --git a/include/main.h b/include/main.h index 417c6bc..4a860d4 100644 --- a/include/main.h +++ b/include/main.h @@ -23,21 +23,30 @@ #ifndef _MAIN_H #define _MAIN_H +#include <arpa/inet.h> #include <assert.h> +#include <linux/tcp.h> #include <netinet/in.h> #include <netinet/ip.h> -#include <linux/tcp.h> #include <netinet/udp.h> +#include <pthread.h> #include <semaphore.h> #include <stdbool.h> #include <stdio.h> #include <stdlib.h> -#include <time.h> - -#include <pthread.h> +#include <sys/socket.h> #include <sys/types.h> +#include <time.h> -#ifdef DEBUG +// Global variables +#define g_sqlite_table_header "nfcollect_v1_header" +#define g_sqlite_table_data "nfcollect_v1_data" +#define g_sqlite_nr_fail_retry 8 +#define g_gc_rate 16 +// Default number of packets stored in a block +//#define g_max_nr_entries_default (256*1024) +#define g_max_nr_entries_default (1 * 128) +#ifdef DEBUG_OUTPUT #define DEBUG_ON 1 #else #define DEBUG_ON 0 @@ -49,22 +58,16 @@ exit(1); \ } -#define ERR(command, error_msg) \ - if (command) { \ - perror((error_msg)); \ - exit(1); \ - } +#define ERROR(format, ...) \ + fprintf(stdout, "[ERROR] " format "\n", ##__VA_ARGS__); #define FATAL(format, ...) \ do { \ - fprintf(stdout, "[ERROR] " format "\n", ##__VA_ARGS__); \ + fprintf(stdout, "[FATAL] " format "\n", ##__VA_ARGS__); \ exit(1); \ } while (0) -#define WARN(command, format, ...) \ - if (command) { \ - fprintf(stdout, "[WARN] " format "\n", ##__VA_ARGS__); \ - } +#define WARN(format, ...) fprintf(stdout, "[WARN] " format "\n", ##__VA_ARGS__); #define WARN_RETURN(command, format, ...) \ if (command) { \ @@ -72,106 +75,81 @@ return -1; \ } -#define debug(format, ...) \ +#define DEBUG(format, ...) \ if (DEBUG_ON) { \ fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \ } -#define info(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__); +#define INFO(format, ...) fprintf(stdout, "[INFO] " format "\n", ##__VA_ARGS__); #define likely(x) __builtin_expect((x), 1) #define unlikely(x) __builtin_expect((x), 0) -#define CEIL_DIV(a, b) (((a) + (b)-1) / (b)) -#define NEXT(i, l) ((i + 1) % l) -#define PREV(i, l) ((i - 1) % l) -#define TRUNK_SIZE_BY_PAGE (150) // 150 pages -#define MAX_SEGMENT_PER_TRUNK (1024) -#define MAX_TRUNK_ID (80) -#define STORAGE_PREFIX "nflog_storage" - -enum nfl_compression_t { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD }; -typedef struct __attribute__((packed)) _nfl_header_t { - uint32_t id; /* 0 4 */ - uint32_t n_entries; /* 4 4 */ - uint32_t max_n_entries; /* 8 4 */ - uint32_t cksum; /* 12 4 */ - uint32_t raw_size; /* 16 4 */ - enum nfl_compression_t compression_opt; /* 20 4 */ - time_t start_time; /* 24 8 */ - time_t end_time; /* 32 8 */ - - /* size: 40, cachelines: 1, members: 8 */ - /* last cacheline: 40 bytes */ -} nfl_header_t; - -typedef struct __attribute__((packed)) _nfl_entry_t { - // current timestamp since UNIX epoch - time_t timestamp; /* 0 8 */ +#ifdef __GNUC__ +#define UNUSED_FUNCTION(x) __attribute__((__unused__)) UNUSED_ ## x +#else +#define UNUSED_FUNCTION(x) UNUSED_ ## x +#endif - // dest address - struct in_addr daddr; /* 8 4 */ +enum CompressionType { COMPRESS_NONE, COMPRESS_LZ4, COMPRESS_ZSTD }; - // uid - uint32_t uid; /* 12 4 */ +typedef struct _Header { + uint32_t nr_entries; + uint32_t raw_size; + enum CompressionType compression_type; + time_t start_time; + time_t end_time; +} Header; +typedef struct __attribute__((packed)) _Entry { + // current timestamp since UNIX epoch + time_t timestamp; + // dest address + struct in_addr daddr; + // uid + uint32_t uid; // unused space, just for padding - uint8_t __unused1; /* 16 1 */ - + uint8_t __unused1; // IP protocol (UDP or TCP) - uint8_t protocol; /* 17 1 */ - + uint8_t protocol; // unused space, just for padding - uint16_t __unused2; /* 18 2 */ - + uint16_t __unused2; // source port - uint16_t sport; /* 20 2 */ - + uint16_t sport; // destination port - uint16_t dport; /* 22 2 */ + uint16_t dport; /* size: 24, cachelines: 1, members: 8 */ -} nfl_entry_t; - -typedef struct _store_manager_t { - uint32_t *trunk_size_map; - -} nfl_store_manager_t; - -typedef struct _nfl_global_t { - sem_t *nfl_commit_queue; - uint16_t nfl_group_id; - - uint32_t nfl_storage_consumed; - pthread_mutex_t nfl_storage_consumed_lock; - - const char *storage_dir; - enum nfl_compression_t compression_opt; -} nfl_global_t; +} Entry; typedef struct _nfl_nl_t { struct nflog_handle *fd; struct nflog_g_handle *group_fd; -} nfl_nl_t; +} Netlink; + +typedef struct _Global { + uint16_t nl_group_id; -typedef struct _nfl_state_t { - nfl_global_t *global; - nfl_header_t *header; - nfl_entry_t *store; - nfl_nl_t *netlink_fd; + uint32_t storage_budget; + uint32_t storage_consumed; + pthread_mutex_t storage_consumed_lock; - bool has_finished_recv; - pthread_cond_t has_finished_recv_cond; - pthread_mutex_t has_finished_recv_lock; + uint32_t max_nr_entries; + const char *storage_file; + enum CompressionType compression_type; +} Global; - pthread_t thread; -} nfl_state_t; +typedef struct _State { + Header *header; + Entry *store; + Netlink *netlink_fd; + Global *global; +} State; -typedef struct _time_range_t { +typedef struct _Timerange { time_t from, until; -} time_range_t; +} Timerange; -// only copy size of ipv4 header + tcp header -static const int nfl_recv_size = sizeof(struct iphdr) + sizeof(struct tcphdr); +typedef void (*StateCallback)(const State *s, const Timerange *t); #endif // _MAIN_H diff --git a/include/sql.h b/include/sql.h new file mode 100644 index 0000000..8b88f10 --- /dev/null +++ b/include/sql.h @@ -0,0 +1,18 @@ +#ifndef SQL_H +#define SQL_H + +#include "main.h" +#include <sqlite3.h> + +int db_set_pragma(sqlite3 *db); +int db_vacuum(sqlite3 *db); +int db_create_table(sqlite3 *db); +int db_open(sqlite3 **db, const char *dbname); +int db_close(sqlite3 *db); +int db_insert(sqlite3 *db, const Header *header, const Entry *entries); +int db_get_space_consumed(sqlite3 *db); +int db_delete_oldest_bytes(sqlite3 *db, int64_t bytes); +int db_read_data_by_timerange(sqlite3 *db, const Timerange *t, + StateCallback cb); + +#endif // SQL_H diff --git a/include/util.h b/include/util.h new file mode 100644 index 0000000..f1fd965 --- /dev/null +++ b/include/util.h @@ -0,0 +1,8 @@ +#ifndef UTIL_H +#define UTIL_H +#include "main.h" +int check_basedir_exist(const char *storage); +int check_file_exist(const char *storage); +enum CompressionType get_compression(const char *flag); + +#endif // UTIL_H diff --git a/lib/collect.c b/lib/collect.c index bc32a93..21ad415 100644 --- a/lib/collect.c +++ b/lib/collect.c @@ -23,10 +23,11 @@ // SOFTWARE. #include "commit.h" -#include "common.h" +#include "main.h" #include <libnetfilter_log/libnetfilter_log.h> #include <pthread.h> #include <stddef.h> // size_t for libnetfilter_log +#include <stdint.h> #include <string.h> #include <sys/types.h> // u_int32_t for libnetfilter_log #include <time.h> @@ -36,17 +37,13 @@ // kernel and transmits them as one netlink multipart message to userspace. #define NF_NFLOG_QTHRESH 64 -nfl_global_t g; +Global g; -static void *nfl_start_commit_worker(void *targs); -static void nfl_commit(nfl_state_t *nf); -static void nfl_state_free(nfl_state_t *nf); - -static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, - struct nflog_data *nfa, void *_nf) { +static int handle_packet(__attribute__((unused)) struct nflog_g_handle *gh, __attribute__((unused)) struct nfgenmsg *nfmsg, + struct nflog_data *nfa, void *_s) { #define HASH_ENTRY(e) (e->sport ^ e->timestamp) register const struct iphdr *iph; - register nfl_entry_t *entry; + register Entry *entry; const struct tcphdr *tcph; const struct udphdr *udph; char *payload; @@ -57,19 +54,19 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, static uint64_t prev_entry_hash; int payload_len = nflog_get_payload(nfa, &payload); - nfl_state_t *nf = (nfl_state_t *)_nf; + State *s = (State *)_s; // only process ipv4 packet if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40)) { - debug("Ignore non-IPv4 packet"); + DEBUG("Ignore non-IPv4 packet"); return 1; } - if (unlikely(nf->header->n_entries >= nf->header->max_n_entries)) + if (unlikely(s->header->nr_entries >= g.max_nr_entries)) return 1; iph = (struct iphdr *)payload; - entry = &(nf->store[nf->header->n_entries]); + entry = &(s->store[s->header->nr_entries]); inner_hdr = (uint32_t *)iph + iph->ihl; // Only accept TCP / UDP packets @@ -86,7 +83,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, entry->sport = ntohs(udph->source); entry->dport = ntohs(udph->dest); } else { - debug("Ignore non-TCP/UDP packet"); + DEBUG("Ignore non-TCP/UDP packet"); return 1; // Ignore other types of packet } @@ -114,16 +111,16 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, entry->uid = uid; // Advance to next entry - nf->header->n_entries++; + s->header->nr_entries++; - debug("Recv packet info entry #%d: " + DEBUG("Recv packet info entry #%d: " "timestamp:\t%ld,\t" "daddr:\t%ld,\t" "transfer:\t%s,\t" "uid:\t%d,\t" "sport:\t%d,\t" "dport:\t%d", - nf->header->n_entries, entry->timestamp, + s->header->nr_entries, entry->timestamp, (unsigned long)entry->daddr.s_addr, iph->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid, entry->sport, entry->dport); @@ -132,155 +129,94 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, return 0; } -void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) { +void collect_open_netlink(Netlink *nl, uint16_t group_id) { // open nflog - ERR((nl->fd = nflog_open()) == NULL, "nflog_open") - debug("Opening nflog communication file descriptor"); + if ((nl->fd = nflog_open()) == NULL) { + FATAL("nflog_open failed"); + } + + DEBUG("Opening nflog communication file descriptor"); // monitor IPv4 packets only - ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf"); + if (nflog_bind_pf(nl->fd, AF_INET) < 0) { + FATAL("nflog_bind_pf failed"); + } // bind to group nl->group_fd = nflog_bind_group(nl->fd, group_id); // If the returned group_fd is NULL, it's likely // that another process (like ulogd) has already // bound to the same NFLOD group. - if(!nl->group_fd) - FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id); + if (!nl->group_fd) + FATAL("Cannot bind to NFLOG group %d, is it used by another process?", + group_id); - ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0, - "Could not set copy mode"); + // only copy size of ipv4 header + tcp header + int recv_size = sizeof(struct iphdr) + sizeof(struct tcphdr); + if (nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, recv_size) < 0) + FATAL("Could not set copy mode"); // Batch send 128 packets from kernel to userspace - ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH), - "Could not set qthresh"); + if (nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH)) + FATAL("Could not set qthresh"); } -void nfl_close_netlink_fd(nfl_nl_t *nl) { +void collect_close_netlink(Netlink *nl) { nflog_unbind_group(nl->group_fd); nflog_close(nl->fd); } -void *nfl_collect_worker(void *targs) { - nfl_state_t *nf = (nfl_state_t *)targs; - memcpy(&g, nf->global, sizeof(nfl_global_t)); +void *collect_worker(void *targs) { + State *s = (State *)targs; + memcpy(&g, s->global, sizeof(Global)); - nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf); - debug("Registering nflog callback"); + nflog_callback_register(s->netlink_fd->group_fd, &handle_packet, s); + DEBUG("Registering nflog callback"); - int fd = nflog_fd(nf->netlink_fd->fd); - debug("Recv worker #%u: main loop starts", nf->header->id); + int fd = nflog_fd(s->netlink_fd->fd); + DEBUG("Recv worker #%lu: main loop starts", pthread_self()); // Write start time - time(&nf->header->start_time); + time(&s->header->start_time); int rv; // Must have at least 128 for each packet to account for // sizeof(struct iphdr) + sizeof(struct tcphdr) plus the // size of meta data needed by the library's data structure. char buf[128 * NF_NFLOG_QTHRESH + 1]; - while (nf->header->n_entries < nf->header->max_n_entries) { + while (s->header->nr_entries < g.max_nr_entries) { if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) { - debug("Recv worker #%u: nflog packet received " + DEBUG("Recv worker #%lu: packet received " "(len=%u, #entries=%u)", - nf->header->id, rv, nf->header->n_entries); - nflog_handle_packet(nf->netlink_fd->fd, buf, rv); + pthread_self(), rv, s->header->nr_entries); + nflog_handle_packet(s->netlink_fd->fd, buf, rv); } } - debug("Recv worker #%u: finished, received packets: %u", - nf->header->id, - nf->header->max_n_entries); - // write end time - time(&nf->header->end_time); - - // write checksum - nf->header->cksum = nfl_header_cksum(nf->header); - debug("Recv worker #%u: calculated checksum: %x", - nf->header->id, - nf->header->cksum); - - // spawn commit thread - nfl_commit(nf); - - pthread_exit(NULL); -} + time(&s->header->end_time); + s->header->raw_size = s->header->nr_entries * sizeof(Entry); -/* - * Committer - */ - -static void nfl_commit(nfl_state_t *nf) { pthread_t tid; - pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf); + pthread_create(&tid, NULL, commit, (void *)s); pthread_detach(tid); + return NULL; } -static void *nfl_start_commit_worker(void *targs) { - nfl_state_t *nf = (nfl_state_t *)targs; - const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); - debug("Comm worker #%u: thread started.", nf->header->id); - /* truncate ? */ - bool truncate = true; - - sem_wait(g.nfl_commit_queue); - debug("Comm worker #%u: commit started.", nf->header->id); - int ret = nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename); - debug("Comm worker #%u: commit done.", nf->header->id); - sem_post(g.nfl_commit_queue); - - nfl_state_free(nf); - free((char *)filename); - - pthread_mutex_lock(&nf->has_finished_recv_lock); - nf->has_finished_recv = true; - pthread_cond_signal(&nf->has_finished_recv_cond); - pthread_mutex_unlock(&nf->has_finished_recv_lock); - - pthread_exit(ret); -} - -/* - * State managers - */ - -void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max, - nfl_global_t *g) { - assert(nf); - - // Check if nf has been allocated - if (unlikely(*nf == NULL)) { - *nf = (nfl_state_t *)malloc(sizeof(nfl_state_t)); - (*nf)->global = g; - (*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t)); - (*nf)->header->id = id; - (*nf)->header->max_n_entries = entries_max; - (*nf)->header->compression_opt = g->compression_opt; - - (*nf)->has_finished_recv = true; - pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL); - pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL); - } +void state_init(State **s, Netlink *nl, Global *g) { + assert(s); + *s = (State *)malloc(sizeof(State)); + (*s)->global = g; + (*s)->netlink_fd = nl; + (*s)->header = (Header *)calloc(sizeof(Header), 1); + (*s)->header->compression_type = g->compression_type; - // Ensure trunk with same id in previous run has finished to prevent reusing - // a trunk which it's still being used. Furthermore, this hopefully - // alleviate us from bursty network traffic. - pthread_mutex_lock(&(*nf)->has_finished_recv_lock); - while (!(*nf)->has_finished_recv) - pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock); - (*nf)->has_finished_recv = false; - pthread_mutex_unlock(&(*nf)->has_finished_recv_lock); - - // Don't use calloc here, as it will cause page fault and - // consume physical memory before we fill the buffer. - // Instead, fill entries with 0 on the fly, to squeeze - // more space for compression. - (*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max); - (*nf)->header->n_entries = 0; + (*s)->store = (Entry *)malloc(sizeof(Entry) * g->max_nr_entries); + (*s)->header->nr_entries = 0; } -static void nfl_state_free(nfl_state_t *nf) { - // Free only packet store and leave the rest intact - free((void *)nf->store); +void state_free(State *s) { + free(s->store); + free(s->header); + free(s); } diff --git a/lib/commit.c b/lib/commit.c index 22b92ad..45d4d73 100644 --- a/lib/commit.c +++ b/lib/commit.c @@ -1,80 +1,80 @@ -#include "commit.h" -#include <errno.h> -#include <string.h> -#include <zstd.h> - -static int nfl_commit_default(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { - uint32_t written; - header->raw_size = store_size; +#include "collect.h" +#include "main.h" +#include "sql.h" - // Write header - written = fwrite(header, 1, sizeof(nfl_header_t), f); - WARN_RETURN(written != sizeof(nfl_header_t), "commit header: %s", strerror(errno)); +#include <zstd.h> - // Write store - written = fwrite(store, 1, store_size, f); - WARN_RETURN(written != store_size, "commit store: %s", strerror(errno)); +static void do_gc(sqlite3 *db, State *s) { + uint32_t cur_size = s->header->raw_size; + pthread_mutex_lock(&s->global->storage_consumed_lock); + uint32_t remain_size = + s->global->storage_budget - s->global->storage_consumed - cur_size; + uint32_t gc_size = -remain_size + cur_size * g_gc_rate; + if (gc_size >= s->global->storage_consumed) + gc_size = s->global->storage_consumed; + pthread_mutex_unlock(&s->global->storage_consumed_lock); - return sizeof(nfl_header_t) + store_size; + if (remain_size <= 0) + db_delete_oldest_bytes(db, gc_size); } -static int nfl_commit_lz4(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { +static int commit_lz4(State *s, void **buf) { /* TODO */ + (void)s; + (void)buf; return -1; } -static int nfl_commit_zstd(FILE *f, nfl_header_t *header, nfl_entry_t *store, - uint32_t store_size) { - size_t const bufsize = ZSTD_compressBound(store_size); - void *buf; +static int commit_zstd(State *s, void **buf) { + size_t const bufsize = ZSTD_compressBound(s->header->raw_size); + + if (!(*buf = malloc(bufsize))) + ERROR("zstd: cannot malloc"); - WARN_RETURN(!(buf = malloc(bufsize)), "zstd: cannot malloc"); - size_t const csize = ZSTD_compress(buf, bufsize, store, store_size, 1); + size_t const csize = + ZSTD_compress(*buf, bufsize, s->store, s->header->raw_size, 0); if (ZSTD_isError(csize)) { - WARN(1, "zstd: %s \n", ZSTD_getErrorName(csize)); - free(buf); + ERROR("zstd: %s \n", ZSTD_getErrorName(csize)); + free(*buf); return -1; } - int ret = nfl_commit_default(f, header, buf, csize); - free(buf); - return ret; + s->header->raw_size = csize; + return 0; } -int nfl_commit_worker(nfl_header_t *header, nfl_entry_t *store, - enum nfl_compression_t compression_opt, - bool truncate, - const char *filename) { - int ret; - FILE *f; - const char *mode = truncate ? "wb" : "ab"; +void *commit(void *targs) { + sqlite3 *db = NULL; + State *s = (State *)targs; + uint32_t size = s->header->raw_size; + DEBUG("Committing #%d packets", s->header->nr_entries); - debug("Comm worker #%u: commit to file %s\n", header->id, filename); - ERR((f = fopen(filename, mode)) == NULL, strerror(errno)); + db_open(&db, s->global->storage_file); + db_create_table(db); - // commit store - uint32_t store_size = sizeof(nfl_entry_t) * header->max_n_entries; - switch (compression_opt) { + void *buf = NULL; + switch (s->global->compression_type) { case COMPRESS_NONE: - debug("Comm worker #%u: commit without compression\n", header->id); - ret = nfl_commit_default(f, header, store, store_size); break; case COMPRESS_LZ4: - debug("Comm worker #%u: commit with compression algorithm: lz4", header->id); - ret = nfl_commit_lz4(f, header, store, store_size); + commit_lz4(s, &buf); break; case COMPRESS_ZSTD: - debug("Comm worker #%u: commit with compression algorithm: zstd", header->id); - ret = nfl_commit_zstd(f, header, store, store_size); + commit_zstd(s, &buf); break; - // Must not reach here ... default: FATAL("Unknown compression option detected"); } - // Do fsync ? - fclose(f); - return ret; + do_gc(db, s); + db_insert(db, s->header, buf ? buf : s->store); + db_close(db); + + DEBUG("Committed #%d packets, compressed size: %u/%u", + s->header->nr_entries, s->header->raw_size, size); + if (buf) + free(buf); + state_free(s); + + return NULL; } diff --git a/lib/common.c b/lib/common.c deleted file mode 100644 index 860ef90..0000000 --- a/lib/common.c +++ /dev/null @@ -1,128 +0,0 @@ -#include "common.h" -#include <arpa/inet.h> -#include <assert.h> -#include <errno.h> -#include <limits.h> -#include <regex.h> -#include <string.h> -#include <sys/stat.h> -#include <unistd.h> - -int nfl_check_file(FILE *f) { - struct stat s; - assert(f); - if (fstat(fileno(f), &s) < 0) - return -errno; - - // Ignore file already unlinked - if (s.st_nlink <= 0) - return -EIDRM; - - return 0; -} - -int nfl_check_dir(const char *storage_dir) { - struct stat _d; - if (stat(storage_dir, &_d) != 0 || !S_ISDIR(_d.st_mode)) { - return -1; - } - return 0; -} - -int nfl_storage_match_index(const char *fn) { - static regex_t regex; - static bool compiled = false; - regmatch_t match[2]; - int ret; - - if (unlikely(!strcmp(fn, ".") || !strcmp(fn, ".."))) - return -1; - - if (!compiled) { - ERR(regcomp(®ex, "^" STORAGE_PREFIX "_([0-9]+)", REG_EXTENDED), - "Could not compile regex"); - compiled = true; - } - - ret = regexec(®ex, fn, 2, match, 0); - if (!ret) { - assert(match[1].rm_so != (size_t)-1); - return strtol(fn + match[1].rm_so, NULL, 10); - } else { - char buf[100]; - regerror(ret, ®ex, buf, sizeof(buf)); - WARN(1, "Regex match failed: %s", buf) - } - - return -1; -} -const char *nfl_get_filename(const char *dir, int id) { - char out[1024]; - sprintf(out, "%s/" STORAGE_PREFIX "_%d", dir, id); - return strdup(out); -} - -uint32_t nfl_get_filesize(FILE *f) { - uint32_t size, prepos; - prepos = ftell(f); - fseek(f, 0, SEEK_END); - size = ftell(f); - fseek(f, prepos, SEEK_SET); - return size; -} - -uint32_t nfl_header_cksum(nfl_header_t *header) { - /* simply use a magic number for integrity check */ - return 0x9e37a9b9; -} - -void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, - uint32_t *trunk_size) { - uint32_t pgsize = sysconf(_SC_PAGE_SIZE); - total_size *= 1024 * 1024; // MiB - - assert(trunk_cnt); - assert(total_size); - - *trunk_cnt = CEIL_DIV(total_size, pgsize * TRUNK_SIZE_BY_PAGE); - if (*trunk_cnt > MAX_TRUNK_ID) { - *trunk_cnt = MAX_TRUNK_ID; - *trunk_size = total_size / MAX_TRUNK_ID; - *trunk_size = (*trunk_size / pgsize) * pgsize; // align with pagesize - } else { - *trunk_size = pgsize * TRUNK_SIZE_BY_PAGE; - } -} - -void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt) { - assert(entries_cnt); - *entries_cnt = (trunk_size - sizeof(nfl_header_t)) / sizeof(nfl_entry_t); -} - -void nfl_format_output(char *output, nfl_entry_t *entry) { - sprintf(output, " " - "t=%ld\t" - "daddr=%s\t" - "proto=%s\t" - "uid=%d\t" - "sport=%d\t" - "dport=%d", - entry->timestamp, inet_ntoa(entry->daddr), - entry->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid, - entry->sport, entry->dport); -} - -int nfl_setup_compression(const char *flag, enum nfl_compression_t *opt) { - if (flag == NULL) { - *opt = COMPRESS_NONE; - } else if (!strcmp(flag, "zstd") || !strcmp(flag, "zstandard")) { - *opt = COMPRESS_ZSTD; - } else if (!strcmp(flag, "lz4")) { - *opt = COMPRESS_LZ4; - } else { - fprintf(stderr, "Unknown compression algorithm: %s\n", flag); - return 0; - } - - return 1; -} diff --git a/lib/extract.c b/lib/extract.c index 8907c26..5687ad2 100644 --- a/lib/extract.c +++ b/lib/extract.c @@ -1,128 +1,66 @@ - -#include "extract.h" +#include "main.h" #include <errno.h> +#include <stdlib.h> #include <string.h> #include <time.h> #define ZSTD_STATIC_LINKING_ONLY // ZSTD_findDecompressedSize #include <zstd.h> -static int nfl_extract_default(FILE *f, nfl_state_t *state); -static int nfl_extract_zstd(FILE *f, nfl_state_t *state); -static int nfl_extract_lz4(FILE *f, nfl_state_t *state); - -static int nfl_verify_header(nfl_header_t *header) { - if (header->cksum != nfl_header_cksum(header)) { - debug("Header checksum mismatch: expected: 0x%x, got: 0x%x", - header->cksum, nfl_header_cksum(header)); - return -1; - } - - if (header->id > MAX_TRUNK_ID) - return -1; - - if (header->max_n_entries < header->n_entries) - return -1; - - time_t now = time(NULL); - if ((time_t)header->start_time >= now || (time_t)header->end_time >= now || - header->start_time > header->end_time) - return -1; - - return 0; +static bool extract_default(State *s, const void *src) { + s->store = malloc(s->header->raw_size); + memcpy(s->store, src, s->header->raw_size); + return true; } -static int nfl_extract_default(FILE *f, nfl_state_t *state) { - int entries = - fread(state->store, sizeof(nfl_entry_t), state->header->n_entries, f); - WARN_RETURN(ferror(f), "%s", strerror(errno)); - return entries; -} - -static int nfl_extract_zstd(FILE *f, nfl_state_t *state) { - char *buf; - size_t const compressed_size = nfl_get_filesize(f) - sizeof(nfl_header_t), - expected_decom_size = - state->header->n_entries * sizeof(nfl_entry_t); - - // It's possible that data or header is not written due to broken commit - WARN_RETURN(compressed_size <= 0, "%s", "zstd: no data in this trunk"); +static bool extract_zstd(State *s, const void *src) { + assert(src); + size_t const expected_decom_size = s->header->nr_entries * sizeof(Entry); - WARN_RETURN(!(buf = malloc(compressed_size)), "zstd: cannot malloc"); - WARN_RETURN(!fread(buf, compressed_size, 1, f), "zstd: broken data section"); - WARN_RETURN(ferror(f), "%s", strerror(errno)); - - size_t const estimate_decom_size = - ZSTD_findDecompressedSize(buf, compressed_size); - if (estimate_decom_size == ZSTD_CONTENTSIZE_ERROR) + size_t const r = ZSTD_findDecompressedSize(src, s->header->raw_size); + if (r == ZSTD_CONTENTSIZE_ERROR) FATAL("zstd: file was not compressed by zstd.\n"); - else if (estimate_decom_size == ZSTD_CONTENTSIZE_UNKNOWN) + else if (r == ZSTD_CONTENTSIZE_UNKNOWN) FATAL( "zstd: original size unknown. Use streaming decompression instead"); + if (r != expected_decom_size) { + WARN("zstd: expected decompressed size: %ld, got: %ld, skipping " + "decompression", + expected_decom_size, r); + return false; + } + + s->store = malloc(expected_decom_size); size_t const actual_decom_size = ZSTD_decompress( - state->store, expected_decom_size, buf, compressed_size); + s->store, expected_decom_size, src, s->header->raw_size); if (actual_decom_size != expected_decom_size) { FATAL("zstd: error decoding current file: %s \n", ZSTD_getErrorName(actual_decom_size)); } - free(buf); - return actual_decom_size / sizeof(nfl_entry_t); + return true; } -static int nfl_extract_lz4(FILE *f, nfl_state_t *state) { - /* TODO */ - return 0; +static bool extract_lz4(State *s, const void *src) { + // TODO + (void)s; (void)src; + return true; } -int nfl_extract_worker(const char *filename, nfl_state_t *state, const time_range_t *range) { - FILE *f; - int got = 0, ret = 0; - nfl_header_t *h; - - debug("Extracting from file %s", filename); - ERR((f = fopen(filename, "rb")) == NULL, "extract worker"); - ERR(nfl_check_file(f) < 0, "extract worker"); - - // Read header - ERR(!(state->header = malloc(sizeof(nfl_header_t))), - "extract malloc header"); - got = fread(state->header, sizeof(nfl_header_t), 1, f); - h = state->header; - - if(h->end_time < range->from || h->start_time > range->until) - return 0; - - // Check header validity - WARN_RETURN(ferror(f), "%s", strerror(errno)); - WARN_RETURN(got == 0 || nfl_verify_header(h) < 0, - "File %s has corrupted header.", filename); - - // Read body - ERR(!(state->store = malloc(sizeof(nfl_entry_t) * h->n_entries)), - "extract malloc store"); - switch (h->compression_opt) { +bool extract(State *s, const void *src) { + switch (s->header->compression_type) { case COMPRESS_NONE: - debug("Extract worker #%u: extract without compression\n", h->id); - ret = nfl_extract_default(f, state); - break; + DEBUG("extract: extract without compression\n"); + return extract_default(s, src); case COMPRESS_LZ4: - debug("Extract worker #%u: extract with compression algorithm: lz4", - h->id); - ret = nfl_extract_lz4(f, state); - break; + DEBUG("extract: extract with compression algorithm: lz4"); + return extract_lz4(s, src); case COMPRESS_ZSTD: - debug("Extract worker #%u: extract with compression algorithm: zstd", - h->id); - ret = nfl_extract_zstd(f, state); - break; + DEBUG("extract: extract with compression algorithm: zstd"); + return extract_zstd(s, src); // Must not reach here ... default: FATAL("Unknown compression option detected"); } - - fclose(f); - - return ret; } diff --git a/lib/sql.c b/lib/sql.c new file mode 100644 index 0000000..8ac4dbd --- /dev/null +++ b/lib/sql.c @@ -0,0 +1,276 @@ +#include "sql.h" +#include "collect.h" +#include "extract.h" +#include "util.h" +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +static inline int _db_handle_result(sqlite3 *db, int rc, const char *errmsg, + const char *err, bool fatal) { + if (SQLITE_OK != rc) { + ERROR("sqlite3: %s(%i): %s\n", errmsg ? errmsg : "", rc, + err ? err : sqlite3_errmsg(db)); + if (fatal) { + sqlite3_close(db); + exit(1); + } + } + return rc; +} + +static inline int _db_exec(sqlite3 *db, const char *cmd, const char *errmsg, + bool fatal) { + char *err = NULL; + int rc = sqlite3_exec(db, cmd, NULL, NULL, &err); + return _db_handle_result(db, rc, errmsg, err, fatal); +} + +static inline int db_exec_fatal(sqlite3 *db, const char *cmd, + const char *errmsg) { + return _db_exec(db, cmd, errmsg, true); +} + +static inline int db_exec(sqlite3 *db, const char *cmd, const char *errmsg) { + return _db_exec(db, cmd, errmsg, false); +} + +static inline int db_prepare(sqlite3 *db, const char *cmd, const char *errmsg, + sqlite3_stmt **stmt) { + int rc = sqlite3_prepare_v2(db, cmd, -1, stmt, 0); + return _db_handle_result(db, rc, errmsg, NULL, true); +} + +int db_set_pragma(sqlite3 *db) { + return db_exec_fatal(db, + "PRAGMA journal_mode=WAL;" + "PRAGMA foreign_keys = ON;", + "Can't set Sqlite3 PRAGMA"); +} + +int db_vacuum(sqlite3 *db) { + return db_exec(db, "VACUUM", "Can't vacuum database"); +} + +int db_create_table(sqlite3 *db) { + const char *create_sql = + "CREATE TABLE IF NOT EXISTS " g_sqlite_table_data " (" + "id INTEGER PRIMARY KEY," + "data BLOB" + ");" + "CREATE TABLE IF NOT EXISTS " g_sqlite_table_header " (" + "id INTEGER PRIMARY KEY," + "nr_entries INTEGER," + "size INTEGER," + "compression_type INTEGER," + "start_time INTEGER," + "end_time INTEGER," + "data_id INTEGER," + "FOREIGN KEY(data_id) REFERENCES " g_sqlite_table_data + "(id) ON DELETE SET NULL" + ");"; + int rc = 0, retry = g_sqlite_nr_fail_retry; + while (retry--) { + rc = db_exec(db, create_sql, "Can't create table"); + if (SQLITE_LOCKED != rc && SQLITE_BUSY != rc) + return rc; + sleep(1); + } + + ERROR("Can't create table, reach max retry, bailed out!"); + exit(1); +} + +int db_open(sqlite3 **db, const char *dbname) { + int rc; + rc = sqlite3_open(dbname, db); + if (SQLITE_OK != rc) { + ERROR("Can't open database %s (%i): %s\n", dbname, rc, + sqlite3_errmsg(*db)); + exit(1); + } + + return db_set_pragma(*db); +} + +int db_close(sqlite3 *db) { + sqlite3_close(db); + return 0; +} + +int db_insert(sqlite3 *db, const Header *header, const Entry *entries) { + int rc; + sqlite3_stmt *stmt[2] = {0}; + const char *insert_sql[] = { + "INSERT INTO " g_sqlite_table_data " (data) VALUES(?)", + "INSERT INTO " g_sqlite_table_header " " + "(nr_entries, size, compression_type, start_time, end_time, data_id) " + "VALUES(?, ?, ?, ?, ?, ?)"}; + + db_exec_fatal(db, "BEGIN TRANSACTION", "db_insert: Can't begin txn"); + for (int i = 0; i < 2;) { + rc = db_prepare(db, insert_sql[i], "Can't insert data", &stmt[i]); + if (i == 0) { + printf("Inserting raw data: %02X:%02X", ((char *)entries)[0], + ((char *)entries)[1]); + sqlite3_bind_blob(stmt[i], 1, entries, header->raw_size, + SQLITE_STATIC); + } else { + sqlite3_int64 data_id = sqlite3_last_insert_rowid(db); + sqlite3_bind_int(stmt[i], 1, header->nr_entries); + sqlite3_bind_int(stmt[i], 2, header->raw_size); + sqlite3_bind_int(stmt[i], 3, header->compression_type); + sqlite3_bind_int64(stmt[i], 4, header->start_time); + sqlite3_bind_int64(stmt[i], 5, header->end_time); + sqlite3_bind_int64(stmt[i], 6, data_id); + } + + rc = sqlite3_step(stmt[i]); + if (rc != SQLITE_DONE) + WARN("sqlite3: Insert data step fail: %d\n", rc); + + if (SQLITE_SCHEMA == sqlite3_finalize(stmt[i])) + continue; + i++; + } + + DEBUG("Inserted #%d of compressed size %d", header->nr_entries, + header->raw_size); + db_exec_fatal(db, "END TRANSACTION", "db_insert: Can't end txn"); + return rc; +} + +int db_read_data_by_timerange(sqlite3 *db, const Timerange *t, + StateCallback cb) { + const char *_select_sql = + "SELECT * FROM " g_sqlite_table_header + " INNER JOIN " g_sqlite_table_data " ON " g_sqlite_table_header + ".data_id = " g_sqlite_table_data ".id" + " WHERE " g_sqlite_table_header + ".end_time > %ld AND " g_sqlite_table_header ".start_time < %ld"; + char select_sql[strlen(_select_sql) + 25]; + sprintf(select_sql, _select_sql, t->from, t->until); + + sqlite3_stmt *stmt; + db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn"); + int rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0); + if (rc != SQLITE_OK) { + ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db)); + sqlite3_close(db); + exit(1); + } + + int count = 0; + for (;; count++) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) + break; + assert(rc == SQLITE_ROW); + + State *s = malloc(sizeof(State)); + s->header = malloc(sizeof(Header)); + s->header->nr_entries = sqlite3_column_int(stmt, 1); + s->header->raw_size = sqlite3_column_int(stmt, 2); + s->header->compression_type = sqlite3_column_int(stmt, 3); + + size_t size = sqlite3_column_bytes(stmt, 8); + DEBUG("extract: nr_entries: %d " + "raw_size: %d " + "compression_type: %d " + "size: %ld", + s->header->nr_entries, s->header->raw_size, + s->header->compression_type, size); + if (size != (size_t)s->header->raw_size) + FATAL("extract: header data size and actual size not match: " + "expected: %u, got: %ld", + s->header->raw_size, size); + + bool ok = extract(s, sqlite3_column_blob(stmt, 8)); + if (ok) + cb(s, t); + state_free(s); + } + + assert(SQLITE_SCHEMA != sqlite3_finalize(stmt)); + db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn"); + + return count; +} + +int db_get_space_consumed(sqlite3 *db) { + const char *select_data_sql = "SELECT SUM(size) FROM " g_sqlite_table_data; + const char *select_header_sql = + "SELECT COUNT(*) FROM " g_sqlite_table_header; + + int size; + sqlite3_stmt *stmt = NULL; + db_prepare(db, select_data_sql, "Can't query data", &stmt); + size = sqlite3_column_int64(stmt, 1); + + db_prepare(db, select_header_sql, "Can't query data", &stmt); + size += sizeof(Header) * sqlite3_column_int64(stmt, 1); + return size; +} + +int db_delete_oldest_bytes(sqlite3 *db, int64_t bytes) { + int rc; + sqlite3_stmt *stmt; + const char *select_sql = + "SELECT size, end_time, data_id " + "FROM " g_sqlite_table_header " WHERE data_id IS NOT NULL " + "ORDER BY end_time"; + if (!bytes) + return 0; + + db_exec_fatal(db, "BEGIN TRANSACTION", "db_delete: Can't begin txn"); + rc = sqlite3_prepare_v2(db, select_sql, -1, &stmt, 0); + if (rc != SQLITE_OK) { + ERROR("Can't select (%i): %s\n", rc, sqlite3_errmsg(db)); + sqlite3_close(db); + exit(1); + } + + int count = 0; + size_t bufsize = 1024; + char *buf = malloc(bufsize); + + while (bytes >= 0) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_DONE) + break; + assert(rc == SQLITE_ROW); + sqlite3_int64 index = sqlite3_column_int64(stmt, 2); + int size = sqlite3_column_int(stmt, 0); + + char _buf[22]; + sprintf(_buf, count ? "%lld" : ",%lld", index); + while (strlen(_buf) + strlen(buf) + 2 >= bufsize) { + bufsize *= 2; + char *__buf = malloc(bufsize); + memcpy(__buf, buf, strlen(buf) + 1); + free(buf); + buf = __buf; + } + + strcat(buf, _buf); + bytes -= size; + count++; + } + + if (!*buf) + return 0; + + const char *_delete_sql = + "DELETE FROM " g_sqlite_table_data " WHERE id in (%s);"; + char *delete_sql = malloc(strlen(_delete_sql) + strlen(buf) + 1); + sprintf(delete_sql, _delete_sql, buf); + db_exec_fatal(db, delete_sql, "Can't delete"); + DEBUG("Deleted old data, SQL: %s", delete_sql); + free(delete_sql); + free(buf); + + assert(SQLITE_SCHEMA != sqlite3_finalize(stmt)); + db_exec_fatal(db, "END TRANSACTION", "db_begin: Can't end txn"); + + return count; +} diff --git a/lib/util.c b/lib/util.c new file mode 100644 index 0000000..e80954b --- /dev/null +++ b/lib/util.c @@ -0,0 +1,34 @@ + +#include "main.h" +#include <libgen.h> +#include <string.h> +#include <sys/stat.h> +#include <unistd.h> +int check_file_exist(const char *storage) { + return access(storage, F_OK) != -1; +} + +int check_basedir_exist(const char *storage) { + char *_storage = strdup(storage); + char *basedir = dirname(_storage); + free(_storage); + + struct stat d; + if (stat(basedir, &d) != 0 || !S_ISDIR(d.st_mode)) { + return -1; + } + return 0; +} + +enum CompressionType get_compression(const char *flag) { + if (flag == NULL) { + return COMPRESS_NONE; + } else if (!strcmp(flag, "zstd") || !strcmp(flag, "zstandard")) { + return COMPRESS_ZSTD; + } else if (!strcmp(flag, "lz4")) { + return COMPRESS_LZ4; + } else { + fprintf(stderr, "Unknown compression algorithm: %s\n", flag); + return 0; + } +} |