diff options
Diffstat (limited to 'collect.c')
-rw-r--r-- | collect.c | 97 |
1 files changed, 53 insertions, 44 deletions
@@ -28,9 +28,10 @@ #include <sys/types.h> // u_int32_t for libnetfilter_log #include <libnetfilter_log/libnetfilter_log.h> #include <pthread.h> +#include <string.h> #include <time.h> -nflog_global_t *g; +nflog_global_t g; static void nfl_cleanup(void *nf); static void nfl_init(nflog_state_t *nf); @@ -92,11 +93,11 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, nf->header->n_entries++; debug("Recv packet info entry #%d: " - "timestamp:\t%ld\t" - "daddr:\t%ld\t" - "transfer:\t%s\t" - "uid:\t%d\t" - "sport:\t%d\t" + "timestamp:\t%ld,\t" + "daddr:\t%ld,\t" + "transfer:\t%s,\t" + "uid:\t%d,\t" + "sport:\t%d,\t" "dport:\t%d", nf->header->n_entries, entry->timestamp, (unsigned long)entry->daddr.s_addr, @@ -125,19 +126,7 @@ static void nfl_init(nflog_state_t *nf) { nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf); debug("Registering nflog callback"); - g = nf->global; -} - -static void nfl_cleanup(void *args) { - nflog_state_t *nf = (nflog_state_t *)args; - - // write end time - time(&nf->header->end_time); - nflog_unbind_group(nf->nfl_group_fd); - nflog_close(nf->nfl_fd); - - // commit to file - if(g->commit_when_died) nfl_commit(nf); + memcpy(&g, nf->global, sizeof(nflog_global_t)); } void *nfl_collect_worker(void *targs) { @@ -150,8 +139,6 @@ void *nfl_collect_worker(void *targs) { debug("Recv worker #%u: main loop starts", nf->header->id); time(&nf->header->start_time); - pthread_cleanup_push(nfl_cleanup, (void*)nf); - int rv; char buf[4096]; while (*p_cnt_now < cnt_max) { @@ -163,8 +150,15 @@ void *nfl_collect_worker(void *targs) { } } - debug("Recv worker #%u: finish recv", nf->header->id); - pthread_cleanup_pop(1); + debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, cnt_max); + + // write end time + time(&nf->header->end_time); + nflog_unbind_group(nf->nfl_group_fd); + nflog_close(nf->nfl_fd); + + // spawn commit thread + nfl_commit(nf); pthread_exit(NULL); } @@ -180,19 +174,23 @@ static void nfl_commit(nflog_state_t *nf) { static void *nfl_start_commit_worker(void *targs) { nflog_state_t* nf = (nflog_state_t*) targs; - const char *filename = nfl_get_filename(g->storage_dir, nf->header->id); - debug("Comm worker #%u: thread started", nf->header->id); + const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); + debug("Comm worker #%u: thread started.", nf->header->id); - sem_wait(g->nfl_commit_queue); - debug("Comm worker #%u: commit started", nf->header->id); + sem_wait(g.nfl_commit_queue); + debug("Comm worker #%u: commit started.", nf->header->id); nfl_commit_worker(nf->header, nf->store, filename); - debug("Comm worker #%u: commit done", nf->header->id); - sem_post(g->nfl_commit_queue); + debug("Comm worker #%u: commit done.", nf->header->id); + sem_post(g.nfl_commit_queue); - // Commit finished nfl_state_free(nf); free((char*)filename); - pthread_mutex_unlock(&(nf->lock)); + + pthread_mutex_lock(&nf->has_finished_lock); + nf->has_finished = true; + pthread_cond_signal(&nf->has_finished_cond); + pthread_mutex_unlock(&nf->has_finished_lock); + pthread_exit(NULL); } @@ -200,29 +198,40 @@ static void *nfl_start_commit_worker(void *targs) { * State managers */ -void nfl_state_update_or_create(nflog_state_t **nf, +void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g) { - if(*nf == NULL) { + assert(nf); + if(unlikely(*nf == NULL)) { *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t)); - pthread_mutex_init(&((*nf)->lock), NULL); (*nf)->global = g; + (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); + (*nf)->header->id = id; + (*nf)->header->max_n_entries = entries_max; + (*nf)->header->n_entries = 0; + + (*nf)->has_finished = true; + pthread_mutex_init(&(*nf)->has_finished_lock, NULL); + pthread_cond_init(&(*nf)->has_finished_cond, NULL); } - // Don't use calloc here, as it will consume physical memory - // before we fill the buffer. Instead, fill entries with 0 - // on the fly, to squeeze more space for compression. + // Ensure trunk with same id in previous run has finished to prevent reusing a trunk + // which it's still being used. Furthermore, this hopefully alleviate us from + // bursty network traffic. + pthread_mutex_lock(&(*nf)->has_finished_lock); + while(!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); + (*nf)->has_finished = false; + pthread_mutex_unlock(&(*nf)->has_finished_lock); + + // Don't use calloc here, as it will cause page fault and + // consume physical memory before we fill the buffer. + // Instead, fill entries with 0 on the fly, to squeeze + // more space for compression. (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * entries_max); - (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); - (*nf)->header->id = id; - (*nf)->header->max_n_entries = entries_max; - (*nf)->header->n_entries = 0; } void nfl_state_free(nflog_state_t *nf) { - // Free header and store only - // Leave the rest intact - free((void*)nf->header); + // Free only and leave the rest intact free((void*)nf->store); } |