aboutsummaryrefslogtreecommitdiffstats
path: root/collect.c
diff options
context:
space:
mode:
Diffstat (limited to 'collect.c')
-rw-r--r--collect.c97
1 files changed, 53 insertions, 44 deletions
diff --git a/collect.c b/collect.c
index 323100b..cfdb071 100644
--- a/collect.c
+++ b/collect.c
@@ -28,9 +28,10 @@
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
+#include <string.h>
#include <time.h>
-nflog_global_t *g;
+nflog_global_t g;
static void nfl_cleanup(void *nf);
static void nfl_init(nflog_state_t *nf);
@@ -92,11 +93,11 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
nf->header->n_entries++;
debug("Recv packet info entry #%d: "
- "timestamp:\t%ld\t"
- "daddr:\t%ld\t"
- "transfer:\t%s\t"
- "uid:\t%d\t"
- "sport:\t%d\t"
+ "timestamp:\t%ld,\t"
+ "daddr:\t%ld,\t"
+ "transfer:\t%s,\t"
+ "uid:\t%d,\t"
+ "sport:\t%d,\t"
"dport:\t%d",
nf->header->n_entries,
entry->timestamp, (unsigned long)entry->daddr.s_addr,
@@ -125,19 +126,7 @@ static void nfl_init(nflog_state_t *nf) {
nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
debug("Registering nflog callback");
- g = nf->global;
-}
-
-static void nfl_cleanup(void *args) {
- nflog_state_t *nf = (nflog_state_t *)args;
-
- // write end time
- time(&nf->header->end_time);
- nflog_unbind_group(nf->nfl_group_fd);
- nflog_close(nf->nfl_fd);
-
- // commit to file
- if(g->commit_when_died) nfl_commit(nf);
+ memcpy(&g, nf->global, sizeof(nflog_global_t));
}
void *nfl_collect_worker(void *targs) {
@@ -150,8 +139,6 @@ void *nfl_collect_worker(void *targs) {
debug("Recv worker #%u: main loop starts", nf->header->id);
time(&nf->header->start_time);
- pthread_cleanup_push(nfl_cleanup, (void*)nf);
-
int rv; char buf[4096];
while (*p_cnt_now < cnt_max) {
@@ -163,8 +150,15 @@ void *nfl_collect_worker(void *targs) {
}
}
- debug("Recv worker #%u: finish recv", nf->header->id);
- pthread_cleanup_pop(1);
+ debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, cnt_max);
+
+ // write end time
+ time(&nf->header->end_time);
+ nflog_unbind_group(nf->nfl_group_fd);
+ nflog_close(nf->nfl_fd);
+
+ // spawn commit thread
+ nfl_commit(nf);
pthread_exit(NULL);
}
@@ -180,19 +174,23 @@ static void nfl_commit(nflog_state_t *nf) {
static void *nfl_start_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
- const char *filename = nfl_get_filename(g->storage_dir, nf->header->id);
- debug("Comm worker #%u: thread started", nf->header->id);
+ const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
+ debug("Comm worker #%u: thread started.", nf->header->id);
- sem_wait(g->nfl_commit_queue);
- debug("Comm worker #%u: commit started", nf->header->id);
+ sem_wait(g.nfl_commit_queue);
+ debug("Comm worker #%u: commit started.", nf->header->id);
nfl_commit_worker(nf->header, nf->store, filename);
- debug("Comm worker #%u: commit done", nf->header->id);
- sem_post(g->nfl_commit_queue);
+ debug("Comm worker #%u: commit done.", nf->header->id);
+ sem_post(g.nfl_commit_queue);
- // Commit finished
nfl_state_free(nf);
free((char*)filename);
- pthread_mutex_unlock(&(nf->lock));
+
+ pthread_mutex_lock(&nf->has_finished_lock);
+ nf->has_finished = true;
+ pthread_cond_signal(&nf->has_finished_cond);
+ pthread_mutex_unlock(&nf->has_finished_lock);
+
pthread_exit(NULL);
}
@@ -200,29 +198,40 @@ static void *nfl_start_commit_worker(void *targs) {
* State managers
*/
-void nfl_state_update_or_create(nflog_state_t **nf,
+void nfl_state_init(nflog_state_t **nf,
uint32_t id, uint32_t entries_max,
nflog_global_t *g) {
- if(*nf == NULL) {
+ assert(nf);
+ if(unlikely(*nf == NULL)) {
*nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
- pthread_mutex_init(&((*nf)->lock), NULL);
(*nf)->global = g;
+ (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
+ (*nf)->header->id = id;
+ (*nf)->header->max_n_entries = entries_max;
+ (*nf)->header->n_entries = 0;
+
+ (*nf)->has_finished = true;
+ pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
+ pthread_cond_init(&(*nf)->has_finished_cond, NULL);
}
- // Don't use calloc here, as it will consume physical memory
- // before we fill the buffer. Instead, fill entries with 0
- // on the fly, to squeeze more space for compression.
+ // Ensure trunk with same id in previous run has finished to prevent reusing a trunk
+ // which it's still being used. Furthermore, this hopefully alleviate us from
+ // bursty network traffic.
+ pthread_mutex_lock(&(*nf)->has_finished_lock);
+ while(!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
+ (*nf)->has_finished = false;
+ pthread_mutex_unlock(&(*nf)->has_finished_lock);
+
+ // Don't use calloc here, as it will cause page fault and
+ // consume physical memory before we fill the buffer.
+ // Instead, fill entries with 0 on the fly, to squeeze
+ // more space for compression.
(*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
entries_max);
- (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
- (*nf)->header->id = id;
- (*nf)->header->max_n_entries = entries_max;
- (*nf)->header->n_entries = 0;
}
void nfl_state_free(nflog_state_t *nf) {
- // Free header and store only
- // Leave the rest intact
- free((void*)nf->header);
+ // Free only and leave the rest intact
free((void*)nf->store);
}