diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2018-03-02 18:08:47 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2018-03-02 18:08:47 +0800 |
commit | 87e6bdc58160b85664c43ebc731a07ae8bceed79 (patch) | |
tree | fb571dde267521c9ea8aa18a6ffb84e7c4e3799a | |
parent | cb02fbda5ca090b6e69288967b831d9f8f3184ef (diff) | |
download | nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.gz nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.bz2 nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.lz nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.xz nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.tar.zst nfcollect-87e6bdc58160b85664c43ebc731a07ae8bceed79.zip |
Implement receive thread synchronization
-rw-r--r-- | collect.c | 97 | ||||
-rw-r--r-- | collect.h | 2 | ||||
-rw-r--r-- | commit.c | 3 | ||||
-rw-r--r-- | common.c | 5 | ||||
-rw-r--r-- | configure.ac | 2 | ||||
-rw-r--r-- | main.h | 11 | ||||
-rw-r--r-- | nfcollect.c | 20 |
7 files changed, 73 insertions, 67 deletions
@@ -28,9 +28,10 @@ #include <sys/types.h> // u_int32_t for libnetfilter_log #include <libnetfilter_log/libnetfilter_log.h> #include <pthread.h> +#include <string.h> #include <time.h> -nflog_global_t *g; +nflog_global_t g; static void nfl_cleanup(void *nf); static void nfl_init(nflog_state_t *nf); @@ -92,11 +93,11 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, nf->header->n_entries++; debug("Recv packet info entry #%d: " - "timestamp:\t%ld\t" - "daddr:\t%ld\t" - "transfer:\t%s\t" - "uid:\t%d\t" - "sport:\t%d\t" + "timestamp:\t%ld,\t" + "daddr:\t%ld,\t" + "transfer:\t%s,\t" + "uid:\t%d,\t" + "sport:\t%d,\t" "dport:\t%d", nf->header->n_entries, entry->timestamp, (unsigned long)entry->daddr.s_addr, @@ -125,19 +126,7 @@ static void nfl_init(nflog_state_t *nf) { nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf); debug("Registering nflog callback"); - g = nf->global; -} - -static void nfl_cleanup(void *args) { - nflog_state_t *nf = (nflog_state_t *)args; - - // write end time - time(&nf->header->end_time); - nflog_unbind_group(nf->nfl_group_fd); - nflog_close(nf->nfl_fd); - - // commit to file - if(g->commit_when_died) nfl_commit(nf); + memcpy(&g, nf->global, sizeof(nflog_global_t)); } void *nfl_collect_worker(void *targs) { @@ -150,8 +139,6 @@ void *nfl_collect_worker(void *targs) { debug("Recv worker #%u: main loop starts", nf->header->id); time(&nf->header->start_time); - pthread_cleanup_push(nfl_cleanup, (void*)nf); - int rv; char buf[4096]; while (*p_cnt_now < cnt_max) { @@ -163,8 +150,15 @@ void *nfl_collect_worker(void *targs) { } } - debug("Recv worker #%u: finish recv", nf->header->id); - pthread_cleanup_pop(1); + debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, cnt_max); + + // write end time + time(&nf->header->end_time); + nflog_unbind_group(nf->nfl_group_fd); + nflog_close(nf->nfl_fd); + + // spawn commit thread + nfl_commit(nf); pthread_exit(NULL); } @@ -180,19 +174,23 @@ static void nfl_commit(nflog_state_t *nf) { static void *nfl_start_commit_worker(void *targs) { nflog_state_t* nf = (nflog_state_t*) targs; - const char *filename = nfl_get_filename(g->storage_dir, nf->header->id); - debug("Comm worker #%u: thread started", nf->header->id); + const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); + debug("Comm worker #%u: thread started.", nf->header->id); - sem_wait(g->nfl_commit_queue); - debug("Comm worker #%u: commit started", nf->header->id); + sem_wait(g.nfl_commit_queue); + debug("Comm worker #%u: commit started.", nf->header->id); nfl_commit_worker(nf->header, nf->store, filename); - debug("Comm worker #%u: commit done", nf->header->id); - sem_post(g->nfl_commit_queue); + debug("Comm worker #%u: commit done.", nf->header->id); + sem_post(g.nfl_commit_queue); - // Commit finished nfl_state_free(nf); free((char*)filename); - pthread_mutex_unlock(&(nf->lock)); + + pthread_mutex_lock(&nf->has_finished_lock); + nf->has_finished = true; + pthread_cond_signal(&nf->has_finished_cond); + pthread_mutex_unlock(&nf->has_finished_lock); + pthread_exit(NULL); } @@ -200,29 +198,40 @@ static void *nfl_start_commit_worker(void *targs) { * State managers */ -void nfl_state_update_or_create(nflog_state_t **nf, +void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g) { - if(*nf == NULL) { + assert(nf); + if(unlikely(*nf == NULL)) { *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t)); - pthread_mutex_init(&((*nf)->lock), NULL); (*nf)->global = g; + (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); + (*nf)->header->id = id; + (*nf)->header->max_n_entries = entries_max; + (*nf)->header->n_entries = 0; + + (*nf)->has_finished = true; + pthread_mutex_init(&(*nf)->has_finished_lock, NULL); + pthread_cond_init(&(*nf)->has_finished_cond, NULL); } - // Don't use calloc here, as it will consume physical memory - // before we fill the buffer. Instead, fill entries with 0 - // on the fly, to squeeze more space for compression. + // Ensure trunk with same id in previous run has finished to prevent reusing a trunk + // which it's still being used. Furthermore, this hopefully alleviate us from + // bursty network traffic. + pthread_mutex_lock(&(*nf)->has_finished_lock); + while(!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); + (*nf)->has_finished = false; + pthread_mutex_unlock(&(*nf)->has_finished_lock); + + // Don't use calloc here, as it will cause page fault and + // consume physical memory before we fill the buffer. + // Instead, fill entries with 0 on the fly, to squeeze + // more space for compression. (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * entries_max); - (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); - (*nf)->header->id = id; - (*nf)->header->max_n_entries = entries_max; - (*nf)->header->n_entries = 0; } void nfl_state_free(nflog_state_t *nf) { - // Free header and store only - // Leave the rest intact - free((void*)nf->header); + // Free only and leave the rest intact free((void*)nf->store); } @@ -1,5 +1,5 @@ #pragma once void *nfl_collect_worker(void *targs); -void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g); +void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g); void nfl_state_free(nflog_state_t *nf); @@ -12,7 +12,7 @@ void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char* debug("Comm worker #%u: commit to file %s\n", header->id, filename); ERR((f = fopen(filename, "wb")) == NULL, strerror(errno)); - + // commit header written = fwrite(header, 1, sizeof(nflog_header_t), f); ERR(written != sizeof(nflog_header_t), strerror(errno)); @@ -25,4 +25,3 @@ void nfl_commit_worker(nflog_header_t* header, nflog_entry_t* store, const char* // Do fsync ? fclose(f); } - @@ -51,13 +51,14 @@ void nfl_cal_trunk(uint32_t total_size, uint32_t *trunk_cnt, uint32_t *trunk_siz assert(trunk_cnt); assert(total_size); - *trunk_cnt = CEIL_DIV(total_size, pgsize * TRUNK_SIZE_BY_PAGE); + *trunk_cnt = CEIL_DIV(total_size, pgsize*TRUNK_SIZE_BY_PAGE); if(*trunk_cnt > MAX_TRUNK_ID) { + *trunk_cnt = MAX_TRUNK_ID; *trunk_size = total_size / MAX_TRUNK_ID; *trunk_size = (*trunk_size / pgsize) * pgsize; // align with pagesize } else { - *trunk_size = TRUNK_SIZE_BY_PAGE; + *trunk_size = pgsize*TRUNK_SIZE_BY_PAGE; } } diff --git a/configure.ac b/configure.ac index 5601c83..805cb4f 100644 --- a/configure.ac +++ b/configure.ac @@ -14,6 +14,8 @@ AM_INIT_AUTOMAKE([-Wall -Werror foreign dist-xz]) AC_PROG_CC +AC_DEFINE(DEBUG) + AC_CHECK_HEADERS(libnetfilter_log/libnetfilter_log.h) AC_SEARCH_LIBS(nflog_open, netfilter_log) @@ -24,7 +24,9 @@ #ifndef _MAIN_H #define _MAIN_H +#include <assert.h> #include <semaphore.h> +#include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <netinet/in.h> @@ -63,13 +65,15 @@ #define debug(format, ...) \ if (DEBUG_ON) { \ - fprintf(stdout, format "\n", ##__VA_ARGS__); \ + fprintf(stdout, "[DEBUG] " format "\n", ##__VA_ARGS__); \ } #define likely(x) __builtin_expect((x),1) #define unlikely(x) __builtin_expect((x),0) #define CEIL_DIV(a,b) (((a)+(b) - 1)/(b)) +#define NEXT(i, l) ((i+1) % l) +#define PREV(i, l) ((i-1) % l) #define TRUNK_SIZE_BY_PAGE (150) // 150 pages #define MAX_TRUNK_ID (80) #define STORAGE_PREFIX "nflog_storage" @@ -125,7 +129,6 @@ typedef struct __attribute__((packed)) _nflog_entry_t { typedef struct _nflog_global_t { sem_t* nfl_commit_queue; uint16_t nfl_group_id; - uint8_t commit_when_died; const char* storage_dir; } nflog_global_t; @@ -137,7 +140,9 @@ typedef struct _nflog_state_t { struct nflog_handle *nfl_fd; struct nflog_g_handle *nfl_group_fd; - pthread_mutex_t lock; + bool has_finished; + pthread_cond_t has_finished_cond; + pthread_mutex_t has_finished_lock; pthread_t thread; } nflog_state_t; diff --git a/nfcollect.c b/nfcollect.c index b701f5a..5d7aaf1 100644 --- a/nfcollect.c +++ b/nfcollect.c @@ -24,7 +24,7 @@ // SOFTWARE. #include "commit.h" -#include "main.h" +#include "common.h" #include "collect.h" #include <fcntl.h> #include <getopt.h> @@ -60,23 +60,18 @@ int main(int argc, char *argv[]) { nflog_global_t g; int nfl_group_id; char *storage_dir = NULL; - uint8_t commit_when_died = 0; struct option longopts[] = {/* name, has_args, flag, val */ {"nflog-group", required_argument, NULL, 'g'}, {"storage_dir", required_argument, NULL, 'd'}, {"storage_size", required_argument, NULL, 's'}, {"help", no_argument, NULL, 'h'}, - {"commit_when_died", no_argument, NULL, 'c'}, {"version", no_argument, NULL, 'v'}, {0, 0, 0, 0}}; int opt; while ((opt = getopt_long(argc, argv, "g:d:s:hv", longopts, NULL)) != -1) { switch (opt) { - case 'c': - commit_when_died = 1; - break; case 'h': printf("%s", help_text); exit(0); @@ -117,7 +112,6 @@ int main(int argc, char *argv[]) { } g.nfl_group_id = nfl_group_id; - g.commit_when_died = commit_when_died; g.storage_dir = storage_dir; // register signal handler @@ -135,19 +129,15 @@ int main(int argc, char *argv[]) { nfl_commit_init(trunk_cnt); debug("Worker started, entries_max = %d, trunk_cnt = %d", entries_max, trunk_cnt); - for (i = 0;; i = (i + 1) % trunk_cnt) { - // will be unlocked when #i has finished receiving & committing - if(trunks[i]) - pthread_mutex_lock(&(trunks[i]->lock)); - - nfl_state_update_or_create(&(trunks[i]), i, entries_max, &g); - + for (i = 0;; i = NEXT(i, trunk_cnt)) { + debug("Running receiver worker: id = %d", i); + nfl_state_init(&(trunks[i]), i, entries_max, &g); pthread_create(&(trunks[i]->thread), NULL, nfl_collect_worker, (void *)trunks[i]); // wait for current receiver worker pthread_join(trunks[i]->thread, NULL); } - + // Won't reach here // We don't actually free trunks or the semaphore at all // sem_destroy(&nfl_commit_queue); |