aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-03-01 20:17:22 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-03-01 20:17:22 +0800
commitcb02fbda5ca090b6e69288967b831d9f8f3184ef (patch)
treefad07a9fbdc29ab7bb26fecddf402de93cc5ecc6
parentbfe4f8470c744766bda97ec19f71ac1ae1acebea (diff)
downloadnfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar.gz
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar.bz2
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar.lz
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar.xz
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.tar.zst
nfcollect-cb02fbda5ca090b6e69288967b831d9f8f3184ef.zip
Snapshot after long development stall
-rw-r--r--.gitignore1
-rw-r--r--Makefile.am5
-rw-r--r--collect.c (renamed from nflog.c)69
-rw-r--r--collect.h (renamed from nflog.h)4
-rw-r--r--commit.c9
-rw-r--r--commit.h9
-rw-r--r--common.c83
-rw-r--r--common.h9
-rw-r--r--configure.ac2
-rw-r--r--extract.c42
-rw-r--r--extract.h3
-rw-r--r--main.h51
-rw-r--r--nfcollect.c (renamed from main.c)61
-rw-r--r--nfextract.c117
14 files changed, 381 insertions, 84 deletions
diff --git a/.gitignore b/.gitignore
index c2458e8..5491918 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,3 +7,4 @@ build-aux
autom4te.cache
aclocal.*
Makefile.in
+Makefile
diff --git a/Makefile.am b/Makefile.am
index 2d8def4..f2b5310 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -1,2 +1,3 @@
-bin_PROGRAMS = nfcollect
-nfcollect_SOURCES = nflog.c commit.c main.c
+bin_PROGRAMS = nfcollect nfextract
+nfcollect_SOURCES = commit.c collect.c common.c nfcollect.c
+nfextract_SOURCES = common.c extract.c nfextract.c
diff --git a/nflog.c b/collect.c
index ff9480b..323100b 100644
--- a/nflog.c
+++ b/collect.c
@@ -23,21 +23,20 @@
// SOFTWARE.
#include "commit.h"
-#include "nflog.h"
-#include "main.h"
+#include "common.h"
#include <stddef.h> // size_t for libnetfilter_log
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
#include <time.h>
-extern sem_t nfl_commit_queue;
-extern uint16_t nfl_group_id;
+nflog_global_t *g;
-static void nfl_cleanup(nflog_state_t *nf);
+static void nfl_cleanup(void *nf);
static void nfl_init(nflog_state_t *nf);
-static void *_nfl_commit_worker(void *targs);
+static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nflog_state_t *nf);
+static void nfl_state_free(nflog_state_t *nf);
static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
struct nflog_data *nfa, void *_nf) {
@@ -52,6 +51,8 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
int payload_len = nflog_get_payload(nfa, &payload);
nflog_state_t *nf = (nflog_state_t *)_nf;
+ pthread_testcancel(); /* cancellation point */
+
// only process ipv4 packet
if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40))
return 1;
@@ -92,13 +93,13 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
debug("Recv packet info entry #%d: "
"timestamp:\t%ld\t"
- "daddr:\t%d\t"
+ "daddr:\t%ld\t"
"transfer:\t%s\t"
"uid:\t%d\t"
"sport:\t%d\t"
"dport:\t%d",
nf->header->n_entries,
- entry->timestamp, entry->daddr,
+ entry->timestamp, (unsigned long)entry->daddr.s_addr,
iph->protocol == IPPROTO_TCP ? "TCP" : "UDP",
entry->uid, entry->sport, entry->dport);
@@ -115,7 +116,7 @@ static void nfl_init(nflog_state_t *nf) {
ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf");
// bind to group
- nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nfl_group_id);
+ nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id);
/* ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, sizeof(struct iphdr) + 4) < 0, */
ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nflog_recv_size) < 0,
@@ -123,14 +124,23 @@ static void nfl_init(nflog_state_t *nf) {
nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
debug("Registering nflog callback");
+
+ g = nf->global;
}
-static void nfl_cleanup(nflog_state_t *nf) {
+static void nfl_cleanup(void *args) {
+ nflog_state_t *nf = (nflog_state_t *)args;
+
+ // write end time
+ time(&nf->header->end_time);
nflog_unbind_group(nf->nfl_group_fd);
nflog_close(nf->nfl_fd);
+
+ // commit to file
+ if(g->commit_when_died) nfl_commit(nf);
}
-void *nflog_worker(void *targs) {
+void *nfl_collect_worker(void *targs) {
nflog_state_t *nf = (nflog_state_t *)targs;
nfl_init(nf);
@@ -140,10 +150,13 @@ void *nflog_worker(void *targs) {
debug("Recv worker #%u: main loop starts", nf->header->id);
time(&nf->header->start_time);
+ pthread_cleanup_push(nfl_cleanup, (void*)nf);
+
+ int rv; char buf[4096];
while (*p_cnt_now < cnt_max) {
- int rv; char buf[4096];
- if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) {
+ pthread_testcancel(); /* cancellation point */
+ if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) {
debug("Recv worker #%u: nflog packet received (len=%u)", nf->header->id,
rv);
nflog_handle_packet(nf->nfl_fd, buf, rv);
@@ -151,11 +164,7 @@ void *nflog_worker(void *targs) {
}
debug("Recv worker #%u: finish recv", nf->header->id);
- time(&nf->header->end_time);
- nfl_cleanup(nf);
- nfl_commit(nf);
-
- /* TODO: can return exit status */
+ pthread_cleanup_pop(1);
pthread_exit(NULL);
}
@@ -163,35 +172,41 @@ void *nflog_worker(void *targs) {
* Committer
*/
-void nfl_commit(nflog_state_t *nf) {
+static void nfl_commit(nflog_state_t *nf) {
pthread_t tid;
- pthread_create(&tid, NULL, _nfl_commit_worker, (void *)nf);
+ pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf);
pthread_detach(tid);
}
-void *_nfl_commit_worker(void *targs) {
+static void *nfl_start_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
+ const char *filename = nfl_get_filename(g->storage_dir, nf->header->id);
debug("Comm worker #%u: thread started", nf->header->id);
- sem_wait(&nfl_commit_queue);
+ sem_wait(g->nfl_commit_queue);
debug("Comm worker #%u: commit started", nf->header->id);
- nfl_commit_worker(nf->header, nf->store);
+ nfl_commit_worker(nf->header, nf->store, filename);
debug("Comm worker #%u: commit done", nf->header->id);
- sem_post(&nfl_commit_queue);
+ sem_post(g->nfl_commit_queue);
// Commit finished
nfl_state_free(nf);
+ free((char*)filename);
pthread_mutex_unlock(&(nf->lock));
+ pthread_exit(NULL);
}
/*
* State managers
*/
-void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max) {
+void nfl_state_update_or_create(nflog_state_t **nf,
+ uint32_t id, uint32_t entries_max,
+ nflog_global_t *g) {
if(*nf == NULL) {
*nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
pthread_mutex_init(&((*nf)->lock), NULL);
+ (*nf)->global = g;
}
// Don't use calloc here, as it will consume physical memory
@@ -208,6 +223,6 @@ void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entrie
void nfl_state_free(nflog_state_t *nf) {
// Free header and store only
// Leave the rest intact
- free(nf->header);
- free(nf->store);
+ free((void*)nf->header);
+ free((void*)nf->store);
}
diff --git a/nflog.h b/collect.h
index 3ca8544..0f525e3 100644
--- a/nflog.h
+++ b/collect.h
@@ -1,5 +1,5 @@
#pragma once
-void* nflog_worker(void *targs);
-void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max);
+void *nfl_collect_worker(void *targs);
+void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g);
void nfl_state_free(nflog_state_t *nf);
diff --git a/commit.c b/commit.c
index 0db4163..076f100 100644
--- a/commit.c
+++ b/commit.c
@@ -2,19 +2,14 @@
#include <string.h>
#include "commit.h"
-extern char *storage_dir;
-extern char *storage_prefix;
-
void nfl_commit_init() {
}
-void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store) {
+void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char* filename) {
FILE* f;
- char filename[1024];
- uint32_t written, id = header->id;
+ uint32_t written;
- sprintf(filename, "%s/%s_%d", storage_dir, storage_prefix, id);
debug("Comm worker #%u: commit to file %s\n", header->id, filename);
ERR((f = fopen(filename, "wb")) == NULL, strerror(errno));
diff --git a/commit.h b/commit.h
index d424867..315cf8e 100644
--- a/commit.h
+++ b/commit.h
@@ -1,5 +1,8 @@
-#pragma once
-#include "main.h"
+#ifndef _COMMIT_H
+#define _COMMIT_H
+#include "main.h"
void nfl_commit_init();
-void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store);
+void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char* filename);
+
+#endif
diff --git a/common.c b/common.c
new file mode 100644
index 0000000..7f44e6c
--- /dev/null
+++ b/common.c
@@ -0,0 +1,83 @@
+#include "common.h"
+#include <assert.h>
+#include <errno.h>
+#include <limits.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+int nfl_check_file(FILE *f) {
+ struct stat s;
+ assert(f);
+ if (fstat(fileno(f), &s) < 0)
+ return -errno;
+
+ // Ignore file already unlinked
+ if (s.st_nlink <= 0)
+ return -EIDRM;
+
+ return 0;
+}
+
+int nfl_check_dir(const char *storage_dir) {
+ struct stat _d;
+ if(stat(storage_dir, &_d) != 0 || !S_ISDIR(_d.st_mode)){
+ return -1;
+ }
+ return 0;
+}
+
+const char *nfl_get_filename(const char *dir, int id) {
+ char out[1024];
+ sprintf(out, "%s/" STORAGE_PREFIX "_%d", dir, id);
+ return strdup(out);
+}
+
+uint32_t nfl_header_cksum(nflog_header_t *header) {
+ register uint64_t s = 3784672181;
+ s += header->id;
+ s ^= header->max_n_entries;
+ s += header->n_entries;
+ s ^= header->start_time;
+ s += header->end_time;
+ s &= ULONG_MAX;
+ return s;
+}
+
+void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_size) {
+ uint32_t pgsize = getpagesize();
+ total_size *= 1024 * 1024; // MiB
+
+ assert(trunk_cnt);
+ assert(total_size);
+
+ *trunk_cnt = CEIL_DIV(total_size, pgsize * TRUNK_SIZE_BY_PAGE);
+ if(*trunk_cnt > MAX_TRUNK_ID) {
+ *trunk_size = total_size / MAX_TRUNK_ID;
+ *trunk_size = (*trunk_size / pgsize) * pgsize; // align with pagesize
+ }
+ else {
+ *trunk_size = TRUNK_SIZE_BY_PAGE;
+ }
+}
+
+void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt) {
+ assert(entries_cnt);
+ *entries_cnt = (trunk_size - sizeof(nflog_header_t)) / sizeof(nflog_entry_t);
+}
+
+const char *nfl_format_output(nflog_entry_t *entry) {
+ char out[1024], dest_ip[16];
+ snprintf(dest_ip, 16, "%pI4", &entry->daddr);
+ sprintf(out,
+ "t=%ld\t"
+ "daddr=%s\t"
+ "proto=%s\t"
+ "uid=%d\t"
+ "sport=%d\t"
+ "dport=%d",
+ entry->timestamp, dest_ip,
+ entry->protocol == IPPROTO_TCP ? "TCP" : "UDP",
+ entry->uid, entry->sport, entry->dport);
+ return strdup(out);
+}
diff --git a/common.h b/common.h
new file mode 100644
index 0000000..941ed1e
--- /dev/null
+++ b/common.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include "main.h"
+int nfl_check_file(FILE *f);
+int nfl_check_dir(const char *storage_dir);
+const char *nfl_get_filename(const char *dir, int id);
+void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_size);
+void nfl_cal_entries(uint32_t trunk_size, uint32_t *entries_cnt);
+const char *nfl_format_output(nflog_entry_t *entry);
diff --git a/configure.ac b/configure.ac
index 343a8c9..5601c83 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,7 +1,7 @@
AC_INIT([nfcollect], [1.0])
# Safety checks in case user overwritten --srcdir
-AC_CONFIG_SRCDIR([main.c])
+AC_CONFIG_SRCDIR([nfcollect.c])
# Store the auxiliary build tools (e.g., install-sh, config.sub, config.guess)
# in this dir (build-aux)
diff --git a/extract.c b/extract.c
new file mode 100644
index 0000000..4d0b402
--- /dev/null
+++ b/extract.c
@@ -0,0 +1,42 @@
+
+#include "common.h"
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+
+static int nfl_verify_header(nflog_header_t *header) {
+ if(header->id > MAX_TRUNK_ID)
+ return -1;
+
+ if(header->max_n_entries < header->n_entries)
+ return -1;
+
+ time_t now = time(NULL);
+ if((time_t) header->start_time >= now ||
+ (time_t) header->end_time >= now ||
+ header->start_time > header->end_time)
+ return -1;
+ return 0;
+}
+
+int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename) {
+ FILE* f;
+ uint32_t got;
+ int i, failed = 0;
+
+ debug("Extracting from file %s", filename);
+ ERR((f = fopen(filename, "rb")) == NULL, "extract worker");
+ ERR(nfl_check_file(f) < 0, "extract worker");
+
+ // Read header
+ got = fread(header, 1, sizeof(nflog_header_t), f);
+
+ // Check header validity
+ WARN_RETURN(ferror(f), "%s", strerror(errno));
+ WARN_RETURN(nfl_verify_header(header) < 0, "File %s has corrupted header.", filename);
+
+ // Read body
+ fread(store, header->n_entries, sizeof(nflog_entry_t), f);
+ WARN_RETURN(ferror(f), "%s", strerror(errno));
+ fclose(f);
+}
diff --git a/extract.h b/extract.h
new file mode 100644
index 0000000..1273b77
--- /dev/null
+++ b/extract.h
@@ -0,0 +1,3 @@
+#pragma once
+
+int nfl_extract_worker(nflog_header_t *header, nflog_entry_t *store, const char *filename);
diff --git a/main.h b/main.h
index ff975fe..7e47403 100644
--- a/main.h
+++ b/main.h
@@ -22,7 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
-#pragma once
+#ifndef _MAIN_H
+#define _MAIN_H
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
@@ -42,11 +43,24 @@
fputs((error_msg), stderr); \
exit(1); \
}
+
#define ERR(command, error_msg) \
if (command) { \
perror((error_msg)); \
exit(1); \
}
+
+#define WARN(command, format, ...) \
+ if (command) { \
+ fprintf(stdout, format "\n", ##__VA_ARGS__); \
+ }
+
+#define WARN_RETURN(command, format, ...) \
+ if (command) { \
+ fprintf(stdout, format "\n", ##__VA_ARGS__); \
+ return -1; \
+ }
+
#define debug(format, ...) \
if (DEBUG_ON) { \
fprintf(stdout, format "\n", ##__VA_ARGS__); \
@@ -55,16 +69,25 @@
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
-#define CEILING(a,b) ((a)%(b) == 0 ? ((a)/(b)) : ((a)/(b)+1))
-#define TRUNK_SIZE (4096 * 150)
+#define CEIL_DIV(a,b) (((a)+(b) - 1)/(b))
+#define TRUNK_SIZE_BY_PAGE (150) // 150 pages
+#define MAX_TRUNK_ID (80)
+#define STORAGE_PREFIX "nflog_storage"
+
+enum nflog_flag_t {
+ COMPRESS_NONE = 1,
+ COMPRESS_LZ4 = 2,
+ COMPRESS_ZSTD = 4
+};
typedef struct __attribute__((packed)) _nflog_header_t {
- uint32_t id; /* 0 4 */
- uint32_t n_entries; /* 4 4 */
- uint32_t max_n_entries; /* 8 4 */
- uint32_t _unused; /* 12 4 */
- uint64_t start_time; /* 16 8 */
- uint64_t end_time; /* 24 8 */
+ uint16_t cksum; /* 0 4 */
+ enum nflog_flag_t flag; /* 0 4 */
+ uint32_t id; /* 4 4 */
+ uint32_t n_entries; /* 8 4 */
+ uint32_t max_n_entries; /* 12 4 */
+ time_t start_time; /* 16 8 */
+ time_t end_time; /* 24 8 */
/* size: 32, cachelines: 1, members: 6 */
} nflog_header_t;
@@ -99,7 +122,15 @@ typedef struct __attribute__((packed)) _nflog_entry_t {
} nflog_entry_t;
+typedef struct _nflog_global_t {
+ sem_t* nfl_commit_queue;
+ uint16_t nfl_group_id;
+ uint8_t commit_when_died;
+ const char* storage_dir;
+} nflog_global_t;
+
typedef struct _nflog_state_t {
+ nflog_global_t* global;
nflog_header_t* header;
nflog_entry_t* store;
@@ -112,3 +143,5 @@ typedef struct _nflog_state_t {
// only copy size of ipv4 header + tcp header
static const int nflog_recv_size = sizeof(struct iphdr) + sizeof(struct tcphdr);
+
+#endif // _MAIN_H
diff --git a/main.c b/nfcollect.c
index 91183d3..b701f5a 100644
--- a/main.c
+++ b/nfcollect.c
@@ -25,7 +25,7 @@
#include "commit.h"
#include "main.h"
-#include "nflog.h"
+#include "collect.h"
#include <fcntl.h>
#include <getopt.h>
#include <pthread.h>
@@ -35,11 +35,6 @@
#include <sys/stat.h>
#include <unistd.h>
-sem_t nfl_commit_queue;
-uint16_t nfl_group_id;
-char *storage_dir = NULL;
-const char *storage_prefix = "nflog_storage";
-
const char *help_text =
"Usage: " PACKAGE " [OPTION]\n"
"\n"
@@ -60,19 +55,28 @@ void sig_handler(int signo) {
int main(int argc, char *argv[]) {
uint32_t i, max_commit_worker = 0, storage_size = 0;
- int nflog_group_id;
+ uint32_t trunk_cnt = 0, trunk_size = 0;
+ uint32_t entries_max;
+ nflog_global_t g;
+ int nfl_group_id;
+ char *storage_dir = NULL;
+ uint8_t commit_when_died = 0;
struct option longopts[] = {/* name, has_args, flag, val */
{"nflog-group", required_argument, NULL, 'g'},
{"storage_dir", required_argument, NULL, 'd'},
{"storage_size", required_argument, NULL, 's'},
{"help", no_argument, NULL, 'h'},
+ {"commit_when_died", no_argument, NULL, 'c'},
{"version", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
int opt;
while ((opt = getopt_long(argc, argv, "g:d:s:hv", longopts, NULL)) != -1) {
switch (opt) {
+ case 'c':
+ commit_when_died = 1;
+ break;
case 'h':
printf("%s", help_text);
exit(0);
@@ -85,7 +89,7 @@ int main(int argc, char *argv[]) {
storage_dir = optarg;
break;
case 'g':
- nflog_group_id = atoi(optarg);
+ nfl_group_id = atoi(optarg);
break;
case 's':
storage_size = atoi(optarg);
@@ -97,16 +101,14 @@ int main(int argc, char *argv[]) {
}
// verify arguments
- ASSERT(nflog_group_id != -1,
+ ASSERT(nfl_group_id != -1,
"You must provide a nflog group (see --help)!\n");
ASSERT(storage_dir != NULL,
"You must provide a storage directory (see --help)\n");
ASSERT(storage_size != 0, "You must provide the desired size of log file "
"(in MiB) (see --help)\n");
- struct stat _d;
- if(stat(storage_dir, &_d) != 0 || !S_ISDIR(_d.st_mode)){
- fprintf(stderr, "storage directory '%s' not exist", storage_dir);
- }
+
+ ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist");
// max number of commit worker defaults to #processor - 1
if (max_commit_worker == 0) {
@@ -114,40 +116,33 @@ int main(int argc, char *argv[]) {
max_commit_worker = max_commit_worker > 0 ? max_commit_worker : 1;
}
- nfl_group_id = nflog_group_id;
+ g.nfl_group_id = nfl_group_id;
+ g.commit_when_died = commit_when_died;
+ g.storage_dir = storage_dir;
// register signal handler
ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler");
- uint32_t pgsize = getpagesize();
- storage_size *= 1024 * 1024; // MiB
- uint32_t trunk_size_byte = storage_size / TRUNK_SIZE ;
- trunk_size_byte = (trunk_size_byte < TRUNK_SIZE) ? TRUNK_SIZE : trunk_size_byte;
- trunk_size_byte = (trunk_size_byte / pgsize) * pgsize; // align with pagesize
-
- uint32_t trunk_cnt = CEILING(storage_size, trunk_size_byte);
- uint32_t entries_max = (trunk_size_byte - sizeof(nflog_header_t)) /
- sizeof(nflog_entry_t);
+ nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size);
+ nfl_cal_entries(trunk_size, &entries_max);
// Set up commit worker
- sem_init(&nfl_commit_queue, 0, max_commit_worker);
+ g.nfl_commit_queue = malloc(sizeof(sem_t));
+ sem_init(g.nfl_commit_queue, 0, max_commit_worker);
// Set up nflog receiver worker
nflog_state_t **trunks = (nflog_state_t **)calloc(trunk_cnt, sizeof(void*));
- for (i = 0; i < trunk_cnt; ++i) {
- trunks[i] = NULL;
- }
-
nfl_commit_init(trunk_cnt);
- debug("Worker started, entries_max = %d, trunk_cnt = %d, trunk_size_byte = %d",
- entries_max, trunk_cnt, trunk_size_byte);
+ debug("Worker started, entries_max = %d, trunk_cnt = %d", entries_max, trunk_cnt);
for (i = 0;; i = (i + 1) % trunk_cnt) {
+ // will be unlocked when #i has finished receiving & committing
if(trunks[i])
pthread_mutex_lock(&(trunks[i]->lock));
- nfl_state_update_or_create(&(trunks[i]), i, entries_max);
- // will be unlocked when #i has finished receiving & committing
- pthread_create(&(trunks[i]->thread), NULL, nflog_worker,
+
+ nfl_state_update_or_create(&(trunks[i]), i, entries_max, &g);
+
+ pthread_create(&(trunks[i]->thread), NULL, nfl_collect_worker,
(void *)trunks[i]);
// wait for current receiver worker
pthread_join(trunks[i]->thread, NULL);
diff --git a/nfextract.c b/nfextract.c
new file mode 100644
index 0000000..cf7e51f
--- /dev/null
+++ b/nfextract.c
@@ -0,0 +1,117 @@
+
+// The MIT License (MIT)
+
+// Copyright (c) 2017 Yun-Chih Chen
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+#include "commit.h"
+#include "common.h"
+#include <fcntl.h>
+#include <getopt.h>
+#include <pthread.h>
+#include <signal.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#define PROG "nfextract"
+
+sem_t nfl_commit_queue;
+uint16_t nfl_group_id;
+char *storage_dir = NULL;
+
+const char *help_text =
+ "Usage: " PROG " [OPTION]\n"
+ "\n"
+ "Options:\n"
+ " -d --storage_dir=<dirname> log files storage directory\n"
+ " -h --help print this help\n"
+ " -v --version print version information\n"
+ "\n";
+
+void sig_handler(int signo) {
+ if (signo == SIGHUP) {
+ /* TODO */
+ }
+}
+
+int main(int argc, char *argv[]) {
+
+ uint32_t i, max_commit_worker = 0, storage_size = 0;
+ uint32_t trunk_cnt, trunk_size, entries_max;
+ int nflog_group_id;
+
+ struct option longopts[] = {/* name, has_args, flag, val */
+ {"storage_dir", required_argument, NULL, 'd'},
+ {"help", no_argument, NULL, 'h'},
+ {"version", no_argument, NULL, 'v'},
+ {0, 0, 0, 0}};
+
+ int opt;
+ while ((opt = getopt_long(argc, argv, "d:hv", longopts, NULL)) != -1) {
+ switch (opt) {
+ case 'h':
+ printf("%s", help_text);
+ exit(0);
+ break;
+ case 'v':
+ printf("%s %s", PROG, VERSION);
+ exit(0);
+ break;
+ case 'd':
+ storage_dir = optarg;
+ break;
+ case '?':
+ fprintf(stderr, "Unknown argument, see --help");
+ exit(1);
+ }
+ }
+
+ // verify arguments
+ ASSERT(storage_dir != NULL,
+ "You must provide a storage directory (see --help)");
+
+ ERR(nfl_check_dir(storage_dir) < 0, "storage directory not exist");
+
+ // register signal handler
+ ERR(signal(SIGHUP, sig_handler) == SIG_ERR, "Could not set SIGHUP handler");
+
+ nfl_cal_trunk(storage_size, &trunk_cnt, &trunk_size);
+ nfl_cal_entries(trunk_size, &entries_max);
+
+ nflog_state_t trunk;
+ const char *filename, *output;
+ for (int i = 0; i < trunk_cnt; ++i) {
+ filename = nfl_get_filename(storage_dir, i);
+ nfl_extract_worker(trunk.header, trunk.store, filename);
+
+ for(int entry = 0; entry < trunk.header->n_entries; ++entry){
+ output = nfl_format_output(trunk.store);
+ puts((char*)output);
+ free((char*)output);
+ }
+
+ free((char*)filename);
+ }
+
+ return 0;
+}