aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-03-05 09:41:11 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-03-05 09:41:44 +0800
commit692a091becf285adbc606758125a735c10c8287d (patch)
tree8cc9435ac891e92ddb76c0279cc725c03af1352e
parent71f6b20e99e03c16bf9bf8c5d509ade84b6d5db4 (diff)
downloadnfcollect-692a091becf285adbc606758125a735c10c8287d.tar
nfcollect-692a091becf285adbc606758125a735c10c8287d.tar.gz
nfcollect-692a091becf285adbc606758125a735c10c8287d.tar.bz2
nfcollect-692a091becf285adbc606758125a735c10c8287d.tar.lz
nfcollect-692a091becf285adbc606758125a735c10c8287d.tar.xz
nfcollect-692a091becf285adbc606758125a735c10c8287d.tar.zst
nfcollect-692a091becf285adbc606758125a735c10c8287d.zip
Add more extractor implementation
-rw-r--r--collect.c3
-rw-r--r--common.c32
-rw-r--r--common.h4
-rw-r--r--extract.c50
-rw-r--r--extract.h3
-rw-r--r--nfcollect.c11
-rw-r--r--nfextract.c75
7 files changed, 135 insertions, 43 deletions
diff --git a/collect.c b/collect.c
index cfdb071..faef7b8 100644
--- a/collect.c
+++ b/collect.c
@@ -33,7 +33,6 @@
nflog_global_t g;
-static void nfl_cleanup(void *nf);
static void nfl_init(nflog_state_t *nf);
static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nflog_state_t *nf);
@@ -231,7 +230,7 @@ void nfl_state_init(nflog_state_t **nf,
entries_max);
}
-void nfl_state_free(nflog_state_t *nf) {
+static void nfl_state_free(nflog_state_t *nf) {
// Free only and leave the rest intact
free((void*)nf->store);
}
diff --git a/common.c b/common.c
index b0f59b0..e544556 100644
--- a/common.c
+++ b/common.c
@@ -2,6 +2,7 @@
#include <assert.h>
#include <errno.h>
#include <limits.h>
+#include <regex.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
@@ -27,6 +28,31 @@ int nfl_check_dir(const char *storage_dir) {
return 0;
}
+int nfl_storage_match_index(const char *fn) {
+ static regex_t regex;
+ static bool compiled = false;
+ regmatch_t match[1];
+ int ret;
+
+ if(!compiled) {
+ ERR(regcomp(&regex, "^" STORAGE_PREFIX "_[0-9]+", 0),
+ "Could not compile regex");
+ compiled = true;
+ }
+
+ ret = regexec(&regex, fn, 1, match, 0);
+ if (!ret) {
+ assert(match[0].rm_so != (size_t)-1);
+ return strtol(fn + match[0].rm_so, NULL, 10);
+ }
+ else if (ret != REG_NOMATCH) {
+ char buf[100];
+ regerror(ret, &regex, buf, sizeof(buf));
+ WARN(1, "Regex match failed: %s", buf)
+ }
+
+ return -1;
+}
const char *nfl_get_filename(const char *dir, int id) {
char out[1024];
sprintf(out, "%s/" STORAGE_PREFIX "_%d", dir, id);
@@ -67,10 +93,10 @@ void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt) {
*entries_cnt = (trunk_size - sizeof(nflog_header_t)) / sizeof(nflog_entry_t);
}
-const char *nfl_format_output(nflog_entry_t *entry) {
- char out[1024], dest_ip[16];
+void nfl_format_output(char* output, nflog_entry_t *entry) {
+ char dest_ip[16];
snprintf(dest_ip, 16, "%pI4", &entry->daddr);
- sprintf(out,
+ sprintf(output,
"t=%ld\t"
"daddr=%s\t"
"proto=%s\t"
diff --git a/common.h b/common.h
index 941ed1e..6030780 100644
--- a/common.h
+++ b/common.h
@@ -3,7 +3,9 @@
#include "main.h"
int nfl_check_file(FILE *f);
int nfl_check_dir(const char *storage_dir);
+int nfl_storage_match_index(const char *fn);
const char *nfl_get_filename(const char *dir, int id);
void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_size);
void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt);
-const char *nfl_format_output(nflog_entry_t *entry);
+void nfl_format_output(char* output, nflog_entry_t *entry);
+int nfl_setup_compression(const char *flag, enum nflog_compression_t *opt);
diff --git a/extract.c b/extract.c
index 4d0b402..b33d183 100644
--- a/extract.c
+++ b/extract.c
@@ -1,9 +1,20 @@
-#include "common.h"
+#include "extract.h"
#include <errno.h>
#include <string.h>
#include <time.h>
+static int nfl_extract_default(FILE *f, nflog_state_t *state);
+static int nfl_extract_zstd(FILE *f, nflog_state_t *state);
+static int nfl_extract_lz4(FILE *f, nflog_state_t *state);
+
+typedef int (*nflog_extract_run_table_t)(FILE* f, nflog_state_t* state);
+static const nflog_extract_run_table_t extract_run_table[] = {
+ nfl_extract_default,
+ nfl_extract_lz4,
+ nfl_extract_zstd
+};
+
static int nfl_verify_header(nflog_header_t *header) {
if(header->id > MAX_TRUNK_ID)
return -1;
@@ -19,24 +30,47 @@ static int nfl_verify_header(nflog_header_t *header) {
return 0;
}
-int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename) {
+static int nfl_extract_default(FILE *f, nflog_state_t *state) {
+ fread(state->store, state->header->n_entries, sizeof(nflog_entry_t), f);
+ WARN_RETURN(ferror(f), "%s", strerror(errno));
+ return 0;
+}
+
+static int nfl_extract_zstd(FILE *f, nflog_state_t *state) {
+ /* TODO */
+ return 0;
+}
+
+static int nfl_extract_lz4(FILE *f, nflog_state_t *state) {
+ /* TODO */
+ return 0;
+}
+
+int nfl_extract_worker(const char *filename, nflog_state_t *state) {
FILE* f;
- uint32_t got;
- int i, failed = 0;
+ int got = 0, ret = 0;
+ nflog_header_t **header = &state->header;
+ nflog_entry_t **store = &state->store;
debug("Extracting from file %s", filename);
ERR((f = fopen(filename, "rb")) == NULL, "extract worker");
ERR(nfl_check_file(f) < 0, "extract worker");
// Read header
- got = fread(header, 1, sizeof(nflog_header_t), f);
+ ERR((*header = malloc(sizeof(nflog_header_t))), NULL);
+ got = fread(*header, 1, sizeof(nflog_header_t), f);
// Check header validity
WARN_RETURN(ferror(f), "%s", strerror(errno));
- WARN_RETURN(nfl_verify_header(header) < 0, "File %s has corrupted header.", filename);
+ WARN_RETURN(got != sizeof(nflog_header_t) || nfl_verify_header(*header) < 0,
+ "File %s has corrupted header.", filename);
// Read body
- fread(store, header->n_entries, sizeof(nflog_entry_t), f);
- WARN_RETURN(ferror(f), "%s", strerror(errno));
+ WARN_RETURN((*header)->compression_opt > sizeof(extract_run_table)/sizeof(extract_run_table[0]),
+ "Unknown compression in %s", filename);
+ ERR((*store = malloc(sizeof(nflog_entry_t) * (*header)->n_entries)), NULL);
+ ret = extract_run_table[(*header)->compression_opt](f, state);
fclose(f);
+
+ return ret;
}
diff --git a/extract.h b/extract.h
index 1273b77..233f557 100644
--- a/extract.h
+++ b/extract.h
@@ -1,3 +1,4 @@
#pragma once
-int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename);
+#include "common.h"
+int nfl_extract_worker(const char *filename, nflog_state_t *state);
diff --git a/nfcollect.c b/nfcollect.c
index 5d7aaf1..10ad512 100644
--- a/nfcollect.c
+++ b/nfcollect.c
@@ -58,19 +58,20 @@ int main(int argc, char *argv[]) {
uint32_t trunk_cnt = 0, trunk_size = 0;
uint32_t entries_max;
nflog_global_t g;
- int nfl_group_id;
- char *storage_dir = NULL;
+ int nfl_group_id = -1;
+ char *compression_flag = NULL, *storage_dir = NULL;
struct option longopts[] = {/* name, has_args, flag, val */
{"nflog-group", required_argument, NULL, 'g'},
{"storage_dir", required_argument, NULL, 'd'},
{"storage_size", required_argument, NULL, 's'},
+ {"compression", optional_argument, NULL, 'z'},
{"help", no_argument, NULL, 'h'},
{"version", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
int opt;
- while ((opt = getopt_long(argc, argv, "g:d:s:hv", longopts, NULL)) != -1) {
+ while ((opt = getopt_long(argc, argv, "c:g:d:s:hv", longopts, NULL)) != -1) {
switch (opt) {
case 'h':
printf("%s", help_text);
@@ -80,6 +81,9 @@ int main(int argc, char *argv[]) {
printf("%s %s", PACKAGE, VERSION);
exit(0);
break;
+ case 'c':
+ compression_flag = optarg;
+ break;
case 'd':
storage_dir = optarg;
break;
@@ -119,6 +123,7 @@ int main(int argc, char *argv[]) {
nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size);
nfl_cal_entries(trunk_size, &entries_max);
+ nfl_setup_compression(compression_flag, &g.compression_opt);
// Set up commit worker
g.nfl_commit_queue = malloc(sizeof(sem_t));
diff --git a/nfextract.c b/nfextract.c
index cf7e51f..884665a 100644
--- a/nfextract.c
+++ b/nfextract.c
@@ -22,22 +22,24 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
-#include "commit.h"
+#include "extract.h"
#include "common.h"
+#include <dirent.h>
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
+#include <string.h>
#include <sys/stat.h>
+#include <sys/types.h>
#include <unistd.h>
#define PROG "nfextract"
sem_t nfl_commit_queue;
uint16_t nfl_group_id;
-char *storage_dir = NULL;
const char *help_text =
"Usage: " PROG " [OPTION]\n"
@@ -48,18 +50,58 @@ const char *help_text =
" -v --version print version information\n"
"\n";
-void sig_handler(int signo) {
+static void sig_handler(int signo) {
if (signo == SIGHUP) {
/* TODO */
}
}
-int main(int argc, char *argv[]) {
+static void extract_each(const char *filename) {
+ nflog_state_t trunk;
+ if(nfl_extract_worker(filename, &trunk) < 0)
+ return;
+
+ char output[1024];
+ for(int entry = 0; entry < trunk.header->n_entries; ++entry){
+ nfl_format_output(output, trunk.store);
+ puts((char*)output);
+ free((char*)output);
+ }
- uint32_t i, max_commit_worker = 0, storage_size = 0;
- uint32_t trunk_cnt, trunk_size, entries_max;
- int nflog_group_id;
+ free((char*)filename);
+}
+static void extract_all(const char *storage_dir) {
+ DIR *dp;
+ struct dirent *ep;
+ int i, index, max_index = -1;
+ char *trunk_files[MAX_TRUNK_ID];
+ memset(trunk_files, MAX_TRUNK_ID, 0);
+
+ ERR(!(dp = opendir(storage_dir)),
+ "Can't open the storage directory");
+ while ((ep = readdir(dp))) {
+ index = nfl_storage_match_index(ep->d_name);
+ if(index >= 0) {
+ if(index >= MAX_TRUNK_ID) {
+ WARN(1, "Storage trunk file index "
+ "out of predefined range: %s", ep->d_name);
+ } else {
+ trunk_files[index] = strdup(ep->d_name);
+ if(index > max_index) max_index = index;
+ }
+ }
+ }
+
+ closedir (dp);
+
+ for(i = 0; i < max_index; ++i)
+ if(trunk_files[i])
+ extract_each(trunk_files[i]);
+}
+
+int main(int argc, char *argv[]) {
+ char *storage_dir = NULL;
struct option longopts[] = {/* name, has_args, flag, val */
{"storage_dir", required_argument, NULL, 'd'},
{"help", no_argument, NULL, 'h'},
@@ -95,23 +137,6 @@ int main(int argc, char *argv[]) {
// register signal handler
ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler");
- nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size);
- nfl_cal_entries(trunk_size, &entries_max);
-
- nflog_state_t trunk;
- const char *filename, *output;
- for (int i = 0; i < trunk_cnt; ++i) {
- filename = nfl_get_filename(storage_dir, i);
- nfl_extract_worker(trunk.header, trunk.store, filename);
-
- for(int entry = 0; entry < trunk.header->n_entries; ++entry){
- output = nfl_format_output(trunk.store);
- puts((char*)output);
- free((char*)output);
- }
-
- free((char*)filename);
- }
-
+ extract_all(storage_dir);
return 0;
}