aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2017-12-01 11:03:45 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2017-12-01 11:03:45 +0800
commitecd23b3efbbf338ef54447cad1c24cfcce8b785b (patch)
treeec8e5e5aa17924866e2d120992f5eb16fb7fcc5a
parent73b99a356db73745e8a70c1442ed3fbdbc0c7317 (diff)
downloadnfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.gz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.bz2
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.lz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.xz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.zst
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.zip
Revise nflog_state_t lifecycle handler function
-rw-r--r--main.c22
-rw-r--r--main.h2
-rw-r--r--nflog.c57
-rw-r--r--nflog.h2
4 files changed, 54 insertions, 29 deletions
diff --git a/main.c b/main.c
index b759e62..f07e047 100644
--- a/main.c
+++ b/main.c
@@ -56,19 +56,6 @@ void sig_handler(int signo) {
/* TODO */
}
}
-nflog_state_t *get_nflog_state(uint32_t id, uint32_t entries_max) {
- nflog_state_t *state =
- (nflog_state_t *)malloc(sizeof(nflog_state_t));
- pthread_mutex_init(&(state->lock), NULL);
- state->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
- entries_max);
- state->header.id = id;
- state->header.max_n_entries = entries_max;
- state->header.n_entries = 0;
- return state;
-}
-
-void free_nflog_state(nflog_state_t **state) { *state = NULL; }
int main(int argc, char *argv[]) {
@@ -153,14 +140,17 @@ int main(int argc, char *argv[]) {
nfl_commit_init(trunk_cnt);
for (i = 0;; i = (i + 1) % trunk_cnt) {
- trunks[i] =
- trunks[i] != NULL ? trunks[i] : get_nflog_state(i, entries_max);
+ nfl_state_update_or_create(&(trunks[i]), i, entries_max);
+ // will be unlocked when #i has finished receiving & committing
pthread_mutex_lock(&(trunks[i]->lock));
pthread_create(&(trunks[i]->thread), NULL, nflog_worker,
(void *)trunks[i]);
+ // wait for current receiver worker
pthread_join(trunks[i]->thread, NULL);
}
- sem_destroy(&nfl_commit_queue);
+ // Won't reach here
+ // We don't actually free trunks or the semaphore at all
+ // sem_destroy(&nfl_commit_queue);
exit(0);
}
diff --git a/main.h b/main.h
index 96bc65a..f94c2f6 100644
--- a/main.h
+++ b/main.h
@@ -97,7 +97,7 @@ typedef struct __attribute__((packed)) _nflog_entry_t {
typedef struct _nflog_state_t {
- nflog_header_t header;
+ nflog_header_t* header;
nflog_entry_t* store;
struct nflog_handle *nfl_fd;
diff --git a/nflog.c b/nflog.c
index bd32b83..455ab50 100644
--- a/nflog.c
+++ b/nflog.c
@@ -48,7 +48,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
// only process ipv4 packet
if (payload_len >= 0 && ((payload[0] & 0xf0) == 0x40)) {
struct iphdr *iph = (struct iphdr *)payload;
- nflog_entry_t *entry = &(nf->store[nf->header.n_entries]);
+ nflog_entry_t *entry = &(nf->store[nf->header->n_entries]);
void *inner_hdr = iph + iph->ihl;
// Only accept TCP / UDP packets
@@ -75,7 +75,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
// get current timestamp
time(&entry->timestamp);
- nf->header.n_entries++;
+ nf->header->n_entries++;
}
// Ignore IPv6 packet for now Q_Q
@@ -109,21 +109,21 @@ void *nflog_worker(void *targs) {
nfl_init(nf);
int fd = nflog_fd(nf->nfl_fd);
- uint32_t *p_cnt_now = &(nf->header.n_entries);
- uint32_t cnt_max = nf->header.max_n_entries;
+ uint32_t *p_cnt_now = &(nf->header->n_entries);
+ uint32_t cnt_max = nf->header->max_n_entries;
- debug("Recv worker #%u: main loop starts\n", nf->header.id);
- nf->header.start_time = time(NULL);
+ debug("Recv worker #%u: main loop starts\n", nf->header->id);
+ nf->header->start_time = time(NULL);
while (*p_cnt_now < cnt_max) {
int rv; char buf[4096];
if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) {
- debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header.id,
+ debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header->id,
rv);
nflog_handle_packet(nf->nfl_fd, buf, rv);
}
}
- nf->header.end_time = time(NULL);
+ nf->header->end_time = time(NULL);
nfl_cleanup(nf);
nfl_commit(nf);
@@ -131,6 +131,10 @@ void *nflog_worker(void *targs) {
pthread_exit(NULL);
}
+/*
+ * Committer
+ */
+
void nfl_commit(nflog_state_t *nf) {
pthread_t tid;
pthread_create(&tid, NULL, _nfl_commit_worker, (void *)nf);
@@ -139,13 +143,42 @@ void nfl_commit(nflog_state_t *nf) {
void *_nfl_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
- debug("Comm worker #%u: thread started\n", nf->header.id);
+ debug("Comm worker #%u: thread started\n", nf->header->id);
sem_wait(&nfl_commit_queue);
- debug("Comm worker #%u: commit started\n", nf->header.id);
- nfl_commit_worker(&(nf->header), nf->store);
- debug("Comm worker #%u: commit done\n", nf->header.id);
+ debug("Comm worker #%u: commit started\n", nf->header->id);
+ nfl_commit_worker(nf->header, nf->store);
+ debug("Comm worker #%u: commit done\n", nf->header->id);
sem_post(&nfl_commit_queue);
+ // Commit finished
+ nfl_state_free(nf);
pthread_mutex_unlock(&(nf->lock));
}
+
+/*
+ * State managers
+ */
+
+void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max) {
+ if(*nf == NULL) {
+ *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
+ pthread_mutex_init(&((*nf)->lock), NULL);
+ }
+
+ // Don't use calloc here, as it will consume physical memory
+ // before we fill the buffer. Instead, fill entries with 0
+ // on the fly, to squeeze more space for compression.
+ (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
+ entries_max);
+ (*nf)->header->id = id;
+ (*nf)->header->max_n_entries = entries_max;
+ (*nf)->header->n_entries = 0;
+}
+
+void nfl_state_free(nflog_state_t *nf) {
+ // Free header and store only
+ // Leave the rest intact
+ free(nf->header);
+ free(nf->store);
+}
diff --git a/nflog.h b/nflog.h
index a677cc0..3ca8544 100644
--- a/nflog.h
+++ b/nflog.h
@@ -1,3 +1,5 @@
#pragma once
void* nflog_worker(void *targs);
+void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max);
+void nfl_state_free(nflog_state_t *nf);