aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-03-02 18:08:47 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-03-02 18:08:47 +0800
commit87e6bdc58160b85664c43ebc731a07ae8bceed79 (patch)
treefb571dde267521c9ea8aa18a6ffb84e7c4e3799a
parentcb02fbda5ca090b6e69288967b831d9f8f3184ef (diff)
downloadnfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.gz
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.bz2
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.lz
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.xz
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.zst
nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.zip
Implement receive thread synchronization
-rw-r--r--collect.c97
-rw-r--r--collect.h2
-rw-r--r--commit.c3
-rw-r--r--common.c5
-rw-r--r--configure.ac2
-rw-r--r--main.h11
-rw-r--r--nfcollect.c20
7 files changed, 73 insertions, 67 deletions
diff --git a/collect.c b/collect.c
index 323100b..cfdb071 100644
--- a/collect.c
+++ b/collect.c
@@ -28,9 +28,10 @@
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
+#include <string.h>
#include <time.h>
-nflog_global_t *g;
+nflog_global_t g;
static void nfl_cleanup(void *nf);
static void nfl_init(nflog_state_t *nf);
@@ -92,11 +93,11 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
nf->header->n_entries++;
debug("Recv packet info entry #%d: "
- "timestamp:\t%ld\t"
- "daddr:\t%ld\t"
- "transfer:\t%s\t"
- "uid:\t%d\t"
- "sport:\t%d\t"
+ "timestamp:\t%ld,\t"
+ "daddr:\t%ld,\t"
+ "transfer:\t%s,\t"
+ "uid:\t%d,\t"
+ "sport:\t%d,\t"
"dport:\t%d",
nf->header->n_entries,
entry->timestamp, (unsigned long)entry->daddr.s_addr,
@@ -125,19 +126,7 @@ static void nfl_init(nflog_state_t *nf) {
nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
debug("Registering nflog callback");
- g = nf->global;
-}
-
-static void nfl_cleanup(void *args) {
- nflog_state_t *nf = (nflog_state_t *)args;
-
- // write end time
- time(&nf->header->end_time);
- nflog_unbind_group(nf->nfl_group_fd);
- nflog_close(nf->nfl_fd);
-
- // commit to file
- if(g->commit_when_died) nfl_commit(nf);
+ memcpy(&g, nf->global, sizeof(nflog_global_t));
}
void *nfl_collect_worker(void *targs) {
@@ -150,8 +139,6 @@ void *nfl_collect_worker(void *targs) {
debug("Recv worker #%u: main loop starts", nf->header->id);
time(&nf->header->start_time);
- pthread_cleanup_push(nfl_cleanup, (void*)nf);
-
int rv; char buf[4096];
while (*p_cnt_now < cnt_max) {
@@ -163,8 +150,15 @@ void *nfl_collect_worker(void *targs) {
}
}
- debug("Recv worker #%u: finish recv", nf->header->id);
- pthread_cleanup_pop(1);
+ debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, cnt_max);
+
+ // write end time
+ time(&nf->header->end_time);
+ nflog_unbind_group(nf->nfl_group_fd);
+ nflog_close(nf->nfl_fd);
+
+ // spawn commit thread
+ nfl_commit(nf);
pthread_exit(NULL);
}
@@ -180,19 +174,23 @@ static void nfl_commit(nflog_state_t *nf) {
static void *nfl_start_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
- const char *filename = nfl_get_filename(g->storage_dir, nf->header->id);
- debug("Comm worker #%u: thread started", nf->header->id);
+ const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
+ debug("Comm worker #%u: thread started.", nf->header->id);
- sem_wait(g->nfl_commit_queue);
- debug("Comm worker #%u: commit started", nf->header->id);
+ sem_wait(g.nfl_commit_queue);
+ debug("Comm worker #%u: commit started.", nf->header->id);
nfl_commit_worker(nf->header, nf->store, filename);
- debug("Comm worker #%u: commit done", nf->header->id);
- sem_post(g->nfl_commit_queue);
+ debug("Comm worker #%u: commit done.", nf->header->id);
+ sem_post(g.nfl_commit_queue);
- // Commit finished
nfl_state_free(nf);
free((char*)filename);
- pthread_mutex_unlock(&(nf->lock));
+
+ pthread_mutex_lock(&nf->has_finished_lock);
+ nf->has_finished = true;
+ pthread_cond_signal(&nf->has_finished_cond);
+ pthread_mutex_unlock(&nf->has_finished_lock);
+
pthread_exit(NULL);
}
@@ -200,29 +198,40 @@ static void *nfl_start_commit_worker(void *targs) {
* State managers
*/
-void nfl_state_update_or_create(nflog_state_t **nf,
+void nfl_state_init(nflog_state_t **nf,
uint32_t id, uint32_t entries_max,
nflog_global_t *g) {
- if(*nf == NULL) {
+ assert(nf);
+ if(unlikely(*nf == NULL)) {
*nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
- pthread_mutex_init(&((*nf)->lock), NULL);
(*nf)->global = g;
+ (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
+ (*nf)->header->id = id;
+ (*nf)->header->max_n_entries = entries_max;
+ (*nf)->header->n_entries = 0;
+
+ (*nf)->has_finished = true;
+ pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
+ pthread_cond_init(&(*nf)->has_finished_cond, NULL);
}
- // Don't use calloc here, as it will consume physical memory
- // before we fill the buffer. Instead, fill entries with 0
- // on the fly, to squeeze more space for compression.
+ // Ensure trunk with same id in previous run has finished to prevent reusing a trunk
+ // which it's still being used. Furthermore, this hopefully alleviate us from
+ // bursty network traffic.
+ pthread_mutex_lock(&(*nf)->has_finished_lock);
+ while(!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
+ (*nf)->has_finished = false;
+ pthread_mutex_unlock(&(*nf)->has_finished_lock);
+
+ // Don't use calloc here, as it will cause page fault and
+ // consume physical memory before we fill the buffer.
+ // Instead, fill entries with 0 on the fly, to squeeze
+ // more space for compression.
(*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
entries_max);
- (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
- (*nf)->header->id = id;
- (*nf)->header->max_n_entries = entries_max;
- (*nf)->header->n_entries = 0;
}
void nfl_state_free(nflog_state_t *nf) {
- // Free header and store only
- // Leave the rest intact
- free((void*)nf->header);
+ // Free only and leave the rest intact
free((void*)nf->store);
}
diff --git a/collect.h b/collect.h
index 0f525e3..98eee4d 100644
--- a/collect.h
+++ b/collect.h
@@ -1,5 +1,5 @@
#pragma once
void *nfl_collect_worker(void *targs);
-void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g);
+void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g);
void nfl_state_free(nflog_state_t *nf);
diff --git a/commit.c b/commit.c
index 076f100..a19aefe 100644
--- a/commit.c
+++ b/commit.c
@@ -12,7 +12,7 @@ void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char*
debug("Comm worker #%u: commit to file %s\n", header->id, filename);
ERR((f = fopen(filename, "wb")) == NULL, strerror(errno));
-
+
// commit header
written = fwrite(header, 1, sizeof(nflog_header_t), f);
ERR(written != sizeof(nflog_header_t), strerror(errno));
@@ -25,4 +25,3 @@ void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char*
// Do fsync ?
fclose(f);
}
-
diff --git a/common.c b/common.c
index 7f44e6c..3e3b9ad 100644
--- a/common.c
+++ b/common.c
@@ -51,13 +51,14 @@ void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_siz
assert(trunk_cnt);
assert(total_size);
- *trunk_cnt = CEIL_DIV(total_size, pgsize * TRUNK_SIZE_BY_PAGE);
+ *trunk_cnt = CEIL_DIV(total_size, pgsize*TRUNK_SIZE_BY_PAGE);
if(*trunk_cnt > MAX_TRUNK_ID) {
+ *trunk_cnt = MAX_TRUNK_ID;
*trunk_size = total_size / MAX_TRUNK_ID;
*trunk_size = (*trunk_size / pgsize) * pgsize; // align with pagesize
}
else {
- *trunk_size = TRUNK_SIZE_BY_PAGE;
+ *trunk_size = pgsize*TRUNK_SIZE_BY_PAGE;
}
}
diff --git a/configure.ac b/configure.ac
index 5601c83..805cb4f 100644
--- a/configure.ac
+++ b/configure.ac
@@ -14,6 +14,8 @@ AM_INIT_AUTOMAKE([-Wall -Werror foreign dist-xz])
AC_PROG_CC
+AC_DEFINE(DEBUG)
+
AC_CHECK_HEADERS(libnetfilter_log/libnetfilter_log.h)
AC_SEARCH_LIBS(nflog_open, netfilter_log)
diff --git a/main.h b/main.h
index 7e47403..f4c4a38 100644
--- a/main.h
+++ b/main.h
@@ -24,7 +24,9 @@
#ifndef _MAIN_H
#define _MAIN_H
+#include <assert.h>
#include <semaphore.h>
+#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <netinet/in.h>
@@ -63,13 +65,15 @@
#define debug(format, ...) \
if (DEBUG_ON) { \
- fprintf(stdout, format "\n", ##__VA_ARGS__); \
+ fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \
}
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
#define CEIL_DIV(a,b) (((a)+(b) - 1)/(b))
+#define NEXT(i, l) ((i+1) % l)
+#define PREV(i, l) ((i-1) % l)
#define TRUNK_SIZE_BY_PAGE (150) // 150 pages
#define MAX_TRUNK_ID (80)
#define STORAGE_PREFIX "nflog_storage"
@@ -125,7 +129,6 @@ typedef struct __attribute__((packed)) _nflog_entry_t {
typedef struct _nflog_global_t {
sem_t* nfl_commit_queue;
uint16_t nfl_group_id;
- uint8_t commit_when_died;
const char* storage_dir;
} nflog_global_t;
@@ -137,7 +140,9 @@ typedef struct _nflog_state_t {
struct nflog_handle *nfl_fd;
struct nflog_g_handle *nfl_group_fd;
- pthread_mutex_t lock;
+ bool has_finished;
+ pthread_cond_t has_finished_cond;
+ pthread_mutex_t has_finished_lock;
pthread_t thread;
} nflog_state_t;
diff --git a/nfcollect.c b/nfcollect.c
index b701f5a..5d7aaf1 100644
--- a/nfcollect.c
+++ b/nfcollect.c
@@ -24,7 +24,7 @@
// SOFTWARE.
#include "commit.h"
-#include "main.h"
+#include "common.h"
#include "collect.h"
#include <fcntl.h>
#include <getopt.h>
@@ -60,23 +60,18 @@ int main(int argc, char *argv[]) {
nflog_global_t g;
int nfl_group_id;
char *storage_dir = NULL;
- uint8_t commit_when_died = 0;
struct option longopts[] = {/* name, has_args, flag, val */
{"nflog-group", required_argument, NULL, 'g'},
{"storage_dir", required_argument, NULL, 'd'},
{"storage_size", required_argument, NULL, 's'},
{"help", no_argument, NULL, 'h'},
- {"commit_when_died", no_argument, NULL, 'c'},
{"version", no_argument, NULL, 'v'},
{0, 0, 0, 0}};
int opt;
while ((opt = getopt_long(argc, argv, "g:d:s:hv", longopts, NULL)) != -1) {
switch (opt) {
- case 'c':
- commit_when_died = 1;
- break;
case 'h':
printf("%s", help_text);
exit(0);
@@ -117,7 +112,6 @@ int main(int argc, char *argv[]) {
}
g.nfl_group_id = nfl_group_id;
- g.commit_when_died = commit_when_died;
g.storage_dir = storage_dir;
// register signal handler
@@ -135,19 +129,15 @@ int main(int argc, char *argv[]) {
nfl_commit_init(trunk_cnt);
debug("Worker started, entries_max = %d, trunk_cnt = %d", entries_max, trunk_cnt);
- for (i = 0;; i = (i + 1) % trunk_cnt) {
- // will be unlocked when #i has finished receiving & committing
- if(trunks[i])
- pthread_mutex_lock(&(trunks[i]->lock));
-
- nfl_state_update_or_create(&(trunks[i]), i, entries_max, &g);
-
+ for (i = 0;; i = NEXT(i, trunk_cnt)) {
+ debug("Running receiver worker: id = %d", i);
+ nfl_state_init(&(trunks[i]), i, entries_max, &g);
pthread_create(&(trunks[i]->thread), NULL, nfl_collect_worker,
(void *)trunks[i]);
// wait for current receiver worker
pthread_join(trunks[i]->thread, NULL);
}
-
+
// Won't reach here
// We don't actually free trunks or the semaphore at all
// sem_destroy(&nfl_commit_queue);