aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2018-04-28 10:45:41 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2018-04-28 10:45:41 +0800
commitaf4bf7c93f3390a8013de18679f12b336d2a314b (patch)
tree9ceae4ffd03ec8ea116591fc153e4f79e2b1921f
parent5300aeef42f0d79dbed5703457620784d8e848a6 (diff)
downloadnfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.gz
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.bz2
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.lz
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.xz
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.tar.zst
nfcollect-af4bf7c93f3390a8013de18679f12b336d2a314b.zip
Move netlink socket initialization into separate function
-rw-r--r--bin/nfcollect.c13
-rw-r--r--include/collect.h2
-rw-r--r--include/main.h14
-rw-r--r--lib/collect.c77
4 files changed, 61 insertions, 45 deletions
diff --git a/bin/nfcollect.c b/bin/nfcollect.c
index e168595..f59d68f 100644
--- a/bin/nfcollect.c
+++ b/bin/nfcollect.c
@@ -54,10 +54,13 @@ const char *help_text =
"\n";
static uint32_t calculate_starting_trunk(const char *storage_dir);
+static nfl_nl_t netlink_fd;
static void sig_handler(int signo) {
- if (signo == SIGHUP)
+ if (signo == SIGHUP) {
puts("Terminated due to SIGHUP ...");
+ nfl_close_netlink_fd(&netlink_fd);
+ }
}
int main(int argc, char *argv[]) {
@@ -133,7 +136,6 @@ int main(int argc, char *argv[]) {
max_commit_worker = max_commit_worker > 0 ? max_commit_worker : 1;
}
- g.nfl_group_id = nfl_group_id;
g.storage_dir = storage_dir;
// register signal handler
@@ -149,7 +151,6 @@ int main(int argc, char *argv[]) {
// Set up nflog receiver worker
nfl_state_t **trunks = (nfl_state_t **)calloc(trunk_cnt, sizeof(void *));
- nfl_commit_init(trunk_cnt);
info(PACKAGE ": storing in directory '%s', capped by %d MiB", storage_dir,
storage_size);
@@ -168,9 +169,12 @@ int main(int argc, char *argv[]) {
free((char *)fn);
}
+ nfl_open_netlink_fd(&netlink_fd, nfl_group_id);
for (;; cur_trunk = NEXT(cur_trunk, trunk_cnt)) {
debug("Running receiver worker: id = %d", cur_trunk);
nfl_state_init(&(trunks[cur_trunk]), cur_trunk, entries_max, &g);
+ trunks[cur_trunk]->netlink_fd = &netlink_fd;
+
pthread_create(&(trunks[cur_trunk]->thread), NULL, nfl_collect_worker,
(void *)trunks[cur_trunk]);
// wait for current receiver worker
@@ -179,7 +183,8 @@ int main(int argc, char *argv[]) {
// Won't reach here
// We don't actually free trunks or the semaphore at all
- // sem_destroy(&nfl_commit_queue);
+ sem_destroy(g.nfl_commit_queue);
+ nfl_close_netlink_fd(&netlink_fd);
exit(0);
}
diff --git a/include/collect.h b/include/collect.h
index 2f75f4a..1af0506 100644
--- a/include/collect.h
+++ b/include/collect.h
@@ -5,3 +5,5 @@ void *nfl_collect_worker(void *targs);
void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
nfl_global_t *g);
void nfl_state_free(nfl_state_t *nf);
+void nfl_open_netlink_fd(nfl_nl_t *nf, uint16_t group_id);
+void nfl_close_netlink_fd(nfl_nl_t *nf);
diff --git a/include/main.h b/include/main.h
index d33d499..cbe0f28 100644
--- a/include/main.h
+++ b/include/main.h
@@ -134,17 +134,21 @@ typedef struct _nfl_global_t {
enum nfl_compression_t compression_opt;
} nfl_global_t;
+typedef struct _nfl_nl_t {
+ struct nflog_handle *fd;
+ struct nflog_g_handle *group_fd;
+} nfl_nl_t;
+
typedef struct _nfl_state_t {
nfl_global_t *global;
nfl_header_t *header;
nfl_entry_t *store;
+ nfl_nl_t *netlink_fd;
- struct nflog_handle *nfl_fd;
- struct nflog_g_handle *nfl_group_fd;
+ bool has_finished_recv;
+ pthread_cond_t has_finished_recv_cond;
+ pthread_mutex_t has_finished_recv_lock;
- bool has_finished;
- pthread_cond_t has_finished_cond;
- pthread_mutex_t has_finished_lock;
pthread_t thread;
} nfl_state_t;
diff --git a/lib/collect.c b/lib/collect.c
index 4b3dd3e..9bffd11 100644
--- a/lib/collect.c
+++ b/lib/collect.c
@@ -38,7 +38,6 @@
nfl_global_t g;
-static void nfl_init(nfl_state_t *nf);
static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nfl_state_t *nf);
static void nfl_state_free(nfl_state_t *nf);
@@ -133,42 +132,46 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
return 0;
}
-static void nfl_init(nfl_state_t *nf) {
+void nfl_open_netlink_fd(nfl_nl_t *nl, uint16_t group_id) {
// open nflog
- ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open")
+ ERR((nl->fd = nflog_open()) == NULL, "nflog_open")
debug("Opening nflog communication file descriptor");
// monitor IPv4 packets only
- ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf");
+ ERR(nflog_bind_pf(nl->fd, AF_INET) < 0, "nflog_bind_pf");
// bind to group
- nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id);
+ nl->group_fd = nflog_bind_group(nl->fd, group_id);
// If the returned group_fd is NULL, it's likely
// that another process (like ulogd) has already
// bound to the same NFLOD group.
- if(!nf->nfl_group_fd)
- FATAL("Cannot bind to NFLOG group %d, is it used by another process?",
- nf->global->nfl_group_id);
+ if(!nl->group_fd)
+ FATAL("Cannot bind to NFLOG group %d, is it used by another process?", group_id);
- ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
+ ERR(nflog_set_mode(nl->group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
"Could not set copy mode");
// Batch send 128 packets from kernel to userspace
- ERR(nflog_set_qthresh(nf->nfl_group_fd, NF_NFLOG_QTHRESH),
+ ERR(nflog_set_qthresh(nl->group_fd, NF_NFLOG_QTHRESH),
"Could not set qthresh");
+}
- nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
- debug("Registering nflog callback");
-
- memcpy(&g, nf->global, sizeof(nfl_global_t));
+void nfl_close_netlink_fd(nfl_nl_t *nl) {
+ nflog_unbind_group(nl->group_fd);
+ nflog_close(nl->fd);
}
void *nfl_collect_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
- nfl_init(nf);
+ memcpy(&g, nf->global, sizeof(nfl_global_t));
- int fd = nflog_fd(nf->nfl_fd);
+ nflog_callback_register(nf->netlink_fd->group_fd, &handle_packet, nf);
+ debug("Registering nflog callback");
+
+ int fd = nflog_fd(nf->netlink_fd->fd);
debug("Recv worker #%u: main loop starts", nf->header->id);
+
+ // Write start time
time(&nf->header->start_time);
int rv;
@@ -181,23 +184,23 @@ void *nfl_collect_worker(void *targs) {
debug("Recv worker #%u: nflog packet received "
"(len=%u, #entries=%u)",
nf->header->id, rv, nf->header->n_entries);
- nflog_handle_packet(nf->nfl_fd, buf, rv);
+ nflog_handle_packet(nf->netlink_fd->fd, buf, rv);
}
}
- debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id,
+ debug("Recv worker #%u: finished, received packets: %u",
+ nf->header->id,
nf->header->max_n_entries);
// write end time
time(&nf->header->end_time);
- nflog_unbind_group(nf->nfl_group_fd);
- nflog_close(nf->nfl_fd);
// write checksum
nf->header->cksum = nfl_header_cksum(nf->header);
// spawn commit thread
nfl_commit(nf);
+
pthread_exit(NULL);
}
@@ -215,20 +218,21 @@ static void *nfl_start_commit_worker(void *targs) {
nfl_state_t *nf = (nfl_state_t *)targs;
const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
debug("Comm worker #%u: thread started.", nf->header->id);
+ bool truncate = true;
sem_wait(g.nfl_commit_queue);
debug("Comm worker #%u: commit started.", nf->header->id);
- nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename);
+ nfl_commit_worker(nf->header, nf->store, g.compression_opt, truncate, filename);
debug("Comm worker #%u: commit done.", nf->header->id);
sem_post(g.nfl_commit_queue);
nfl_state_free(nf);
free((char *)filename);
- pthread_mutex_lock(&nf->has_finished_lock);
- nf->has_finished = true;
- pthread_cond_signal(&nf->has_finished_cond);
- pthread_mutex_unlock(&nf->has_finished_lock);
+ pthread_mutex_lock(&nf->has_finished_recv_lock);
+ nf->has_finished_recv = true;
+ pthread_cond_signal(&nf->has_finished_recv_cond);
+ pthread_mutex_unlock(&nf->has_finished_recv_lock);
pthread_exit(NULL);
}
@@ -240,35 +244,36 @@ static void *nfl_start_commit_worker(void *targs) {
void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
nfl_global_t *g) {
assert(nf);
+
+ // Check if nf has been allocated
if (unlikely(*nf == NULL)) {
*nf = (nfl_state_t *)malloc(sizeof(nfl_state_t));
(*nf)->global = g;
(*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t));
(*nf)->header->id = id;
- (*nf)->header->n_entries = 0;
(*nf)->header->max_n_entries = entries_max;
(*nf)->header->compression_opt = g->compression_opt;
- (*nf)->has_finished = true;
- pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
- pthread_cond_init(&(*nf)->has_finished_cond, NULL);
+ (*nf)->has_finished_recv = true;
+ pthread_mutex_init(&(*nf)->has_finished_recv_lock, NULL);
+ pthread_cond_init(&(*nf)->has_finished_recv_cond, NULL);
}
// Ensure trunk with same id in previous run has finished to prevent reusing
// a trunk which it's still being used. Furthermore, this hopefully
- // alleviate us
- // from bursty network traffic.
- pthread_mutex_lock(&(*nf)->has_finished_lock);
- while (!(*nf)->has_finished)
- pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
- (*nf)->has_finished = false;
- pthread_mutex_unlock(&(*nf)->has_finished_lock);
+ // alleviate us from bursty network traffic.
+ pthread_mutex_lock(&(*nf)->has_finished_recv_lock);
+ while (!(*nf)->has_finished_recv)
+ pthread_cond_wait(&(*nf)->has_finished_recv_cond, &(*nf)->has_finished_recv_lock);
+ (*nf)->has_finished_recv = false;
+ pthread_mutex_unlock(&(*nf)->has_finished_recv_lock);
// Don't use calloc here, as it will cause page fault and
// consume physical memory before we fill the buffer.
// Instead, fill entries with 0 on the fly, to squeeze
// more space for compression.
(*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max);
+ (*nf)->header->n_entries = 0;
}
static void nfl_state_free(nfl_state_t *nf) {