From 692a091becf285adbc606758125a735c10c8287d Mon Sep 17 00:00:00 2001 From: Yunchih Chen Date: Mon, 5 Mar 2018 09:41:11 +0800 Subject: Add more extractor implementation --- collect.c | 3 +-- common.c | 32 +++++++++++++++++++++++--- common.h | 4 +++- extract.c | 50 ++++++++++++++++++++++++++++++++++------- extract.h | 3 ++- nfcollect.c | 11 ++++++--- nfextract.c | 75 ++++++++++++++++++++++++++++++++++++++++--------------------- 7 files changed, 135 insertions(+), 43 deletions(-) diff --git a/collect.c b/collect.c index cfdb071..faef7b8 100644 --- a/collect.c +++ b/collect.c @@ -33,7 +33,6 @@ nflog_global_t g; -static void nfl_cleanup(void *nf); static void nfl_init(nflog_state_t *nf); static void *nfl_start_commit_worker(void *targs); static void nfl_commit(nflog_state_t *nf); @@ -231,7 +230,7 @@ void nfl_state_init(nflog_state_t **nf, entries_max); } -void nfl_state_free(nflog_state_t *nf) { +static void nfl_state_free(nflog_state_t *nf) { // Free only and leave the rest intact free((void*)nf->store); } diff --git a/common.c b/common.c index b0f59b0..e544556 100644 --- a/common.c +++ b/common.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,31 @@ int nfl_check_dir(const char *storage_dir) { return 0; } +int nfl_storage_match_index(const char *fn) { + static regex_t regex; + static bool compiled = false; + regmatch_t match[1]; + int ret; + + if(!compiled) { + ERR(regcomp(®ex, "^" STORAGE_PREFIX "_[0-9]+", 0), + "Could not compile regex"); + compiled = true; + } + + ret = regexec(®ex, fn, 1, match, 0); + if (!ret) { + assert(match[0].rm_so != (size_t)-1); + return strtol(fn + match[0].rm_so, NULL, 10); + } + else if (ret != REG_NOMATCH) { + char buf[100]; + regerror(ret, ®ex, buf, sizeof(buf)); + WARN(1, "Regex match failed: %s", buf) + } + + return -1; +} const char *nfl_get_filename(const char *dir, int id) { char out[1024]; sprintf(out, "%s/" STORAGE_PREFIX "_%d", dir, id); @@ -67,10 +93,10 @@ void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt) { *entries_cnt = (trunk_size - sizeof(nflog_header_t)) / sizeof(nflog_entry_t); } -const char *nfl_format_output(nflog_entry_t *entry) { - char out[1024], dest_ip[16]; +void nfl_format_output(char* output, nflog_entry_t *entry) { + char dest_ip[16]; snprintf(dest_ip, 16, "%pI4", &entry->daddr); - sprintf(out, + sprintf(output, "t=%ld\t" "daddr=%s\t" "proto=%s\t" diff --git a/common.h b/common.h index 941ed1e..6030780 100644 --- a/common.h +++ b/common.h @@ -3,7 +3,9 @@ #include "main.h" int nfl_check_file(FILE *f); int nfl_check_dir(const char *storage_dir); +int nfl_storage_match_index(const char *fn); const char *nfl_get_filename(const char *dir, int id); void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_size); void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt); -const char *nfl_format_output(nflog_entry_t *entry); +void nfl_format_output(char* output, nflog_entry_t *entry); +int nfl_setup_compression(const char *flag, enum nflog_compression_t *opt); diff --git a/extract.c b/extract.c index 4d0b402..b33d183 100644 --- a/extract.c +++ b/extract.c @@ -1,9 +1,20 @@ -#include "common.h" +#include "extract.h" #include #include #include +static int nfl_extract_default(FILE *f, nflog_state_t *state); +static int nfl_extract_zstd(FILE *f, nflog_state_t *state); +static int nfl_extract_lz4(FILE *f, nflog_state_t *state); + +typedef int (*nflog_extract_run_table_t)(FILE* f, nflog_state_t* state); +static const nflog_extract_run_table_t extract_run_table[] = { + nfl_extract_default, + nfl_extract_lz4, + nfl_extract_zstd +}; + static int nfl_verify_header(nflog_header_t *header) { if(header->id > MAX_TRUNK_ID) return -1; @@ -19,24 +30,47 @@ static int nfl_verify_header(nflog_header_t *header) { return 0; } -int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename) { +static int nfl_extract_default(FILE *f, nflog_state_t *state) { + fread(state->store, state->header->n_entries, sizeof(nflog_entry_t), f); + WARN_RETURN(ferror(f), "%s", strerror(errno)); + return 0; +} + +static int nfl_extract_zstd(FILE *f, nflog_state_t *state) { + /* TODO */ + return 0; +} + +static int nfl_extract_lz4(FILE *f, nflog_state_t *state) { + /* TODO */ + return 0; +} + +int nfl_extract_worker(const char *filename, nflog_state_t *state) { FILE* f; - uint32_t got; - int i, failed = 0; + int got = 0, ret = 0; + nflog_header_t **header = &state->header; + nflog_entry_t **store = &state->store; debug("Extracting from file %s", filename); ERR((f = fopen(filename, "rb")) == NULL, "extract worker"); ERR(nfl_check_file(f) < 0, "extract worker"); // Read header - got = fread(header, 1, sizeof(nflog_header_t), f); + ERR((*header = malloc(sizeof(nflog_header_t))), NULL); + got = fread(*header, 1, sizeof(nflog_header_t), f); // Check header validity WARN_RETURN(ferror(f), "%s", strerror(errno)); - WARN_RETURN(nfl_verify_header(header) < 0, "File %s has corrupted header.", filename); + WARN_RETURN(got != sizeof(nflog_header_t) || nfl_verify_header(*header) < 0, + "File %s has corrupted header.", filename); // Read body - fread(store, header->n_entries, sizeof(nflog_entry_t), f); - WARN_RETURN(ferror(f), "%s", strerror(errno)); + WARN_RETURN((*header)->compression_opt > sizeof(extract_run_table)/sizeof(extract_run_table[0]), + "Unknown compression in %s", filename); + ERR((*store = malloc(sizeof(nflog_entry_t) * (*header)->n_entries)), NULL); + ret = extract_run_table[(*header)->compression_opt](f, state); fclose(f); + + return ret; } diff --git a/extract.h b/extract.h index 1273b77..233f557 100644 --- a/extract.h +++ b/extract.h @@ -1,3 +1,4 @@ #pragma once -int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename); +#include "common.h" +int nfl_extract_worker(const char *filename, nflog_state_t *state); diff --git a/nfcollect.c b/nfcollect.c index 5d7aaf1..10ad512 100644 --- a/nfcollect.c +++ b/nfcollect.c @@ -58,19 +58,20 @@ int main(int argc, char *argv[]) { uint32_t trunk_cnt = 0, trunk_size = 0; uint32_t entries_max; nflog_global_t g; - int nfl_group_id; - char *storage_dir = NULL; + int nfl_group_id = -1; + char *compression_flag = NULL, *storage_dir = NULL; struct option longopts[] = {/* name, has_args, flag, val */ {"nflog-group", required_argument, NULL, 'g'}, {"storage_dir", required_argument, NULL, 'd'}, {"storage_size", required_argument, NULL, 's'}, + {"compression", optional_argument, NULL, 'z'}, {"help", no_argument, NULL, 'h'}, {"version", no_argument, NULL, 'v'}, {0, 0, 0, 0}}; int opt; - while ((opt = getopt_long(argc, argv, "g:d:s:hv", longopts, NULL)) != -1) { + while ((opt = getopt_long(argc, argv, "c:g:d:s:hv", longopts, NULL)) != -1) { switch (opt) { case 'h': printf("%s", help_text); @@ -80,6 +81,9 @@ int main(int argc, char *argv[]) { printf("%s %s", PACKAGE, VERSION); exit(0); break; + case 'c': + compression_flag = optarg; + break; case 'd': storage_dir = optarg; break; @@ -119,6 +123,7 @@ int main(int argc, char *argv[]) { nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size); nfl_cal_entries(trunk_size, &entries_max); + nfl_setup_compression(compression_flag, &g.compression_opt); // Set up commit worker g.nfl_commit_queue = malloc(sizeof(sem_t)); diff --git a/nfextract.c b/nfextract.c index cf7e51f..884665a 100644 --- a/nfextract.c +++ b/nfextract.c @@ -22,22 +22,24 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -#include "commit.h" +#include "extract.h" #include "common.h" +#include #include #include #include #include #include #include +#include #include +#include #include #define PROG "nfextract" sem_t nfl_commit_queue; uint16_t nfl_group_id; -char *storage_dir = NULL; const char *help_text = "Usage: " PROG " [OPTION]\n" @@ -48,18 +50,58 @@ const char *help_text = " -v --version print version information\n" "\n"; -void sig_handler(int signo) { +static void sig_handler(int signo) { if (signo == SIGHUP) { /* TODO */ } } -int main(int argc, char *argv[]) { +static void extract_each(const char *filename) { + nflog_state_t trunk; + if(nfl_extract_worker(filename, &trunk) < 0) + return; + + char output[1024]; + for(int entry = 0; entry < trunk.header->n_entries; ++entry){ + nfl_format_output(output, trunk.store); + puts((char*)output); + free((char*)output); + } - uint32_t i, max_commit_worker = 0, storage_size = 0; - uint32_t trunk_cnt, trunk_size, entries_max; - int nflog_group_id; + free((char*)filename); +} +static void extract_all(const char *storage_dir) { + DIR *dp; + struct dirent *ep; + int i, index, max_index = -1; + char *trunk_files[MAX_TRUNK_ID]; + memset(trunk_files, MAX_TRUNK_ID, 0); + + ERR(!(dp = opendir(storage_dir)), + "Can't open the storage directory"); + while ((ep = readdir(dp))) { + index = nfl_storage_match_index(ep->d_name); + if(index >= 0) { + if(index >= MAX_TRUNK_ID) { + WARN(1, "Storage trunk file index " + "out of predefined range: %s", ep->d_name); + } else { + trunk_files[index] = strdup(ep->d_name); + if(index > max_index) max_index = index; + } + } + } + + closedir (dp); + + for(i = 0; i < max_index; ++i) + if(trunk_files[i]) + extract_each(trunk_files[i]); +} + +int main(int argc, char *argv[]) { + char *storage_dir = NULL; struct option longopts[] = {/* name, has_args, flag, val */ {"storage_dir", required_argument, NULL, 'd'}, {"help", no_argument, NULL, 'h'}, @@ -95,23 +137,6 @@ int main(int argc, char *argv[]) { // register signal handler ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler"); - nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size); - nfl_cal_entries(trunk_size, &entries_max); - - nflog_state_t trunk; - const char *filename, *output; - for (int i = 0; i < trunk_cnt; ++i) { - filename = nfl_get_filename(storage_dir, i); - nfl_extract_worker(trunk.header, trunk.store, filename); - - for(int entry = 0; entry < trunk.header->n_entries; ++entry){ - output = nfl_format_output(trunk.store); - puts((char*)output); - free((char*)output); - } - - free((char*)filename); - } - + extract_all(storage_dir); return 0; } -- cgit v1.2.3