diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2017-12-01 11:03:45 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2017-12-01 11:03:45 +0800 |
commit | ecd23b3efbbf338ef54447cad1c24cfcce8b785b (patch) | |
tree | ec8e5e5aa17924866e2d120992f5eb16fb7fcc5a | |
parent | 73b99a356db73745e8a70c1442ed3fbdbc0c7317 (diff) | |
download | nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.gz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.bz2 nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.lz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.xz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.zst nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.zip |
Revise nflog_state_t lifecycle handler function
-rw-r--r-- | main.c | 22 | ||||
-rw-r--r-- | main.h | 2 | ||||
-rw-r--r-- | nflog.c | 57 | ||||
-rw-r--r-- | nflog.h | 2 |
4 files changed, 54 insertions, 29 deletions
@@ -56,19 +56,6 @@ void sig_handler(int signo) { /* TODO */ } } -nflog_state_t *get_nflog_state(uint32_t id, uint32_t entries_max) { - nflog_state_t *state = - (nflog_state_t *)malloc(sizeof(nflog_state_t)); - pthread_mutex_init(&(state->lock), NULL); - state->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * - entries_max); - state->header.id = id; - state->header.max_n_entries = entries_max; - state->header.n_entries = 0; - return state; -} - -void free_nflog_state(nflog_state_t **state) { *state = NULL; } int main(int argc, char *argv[]) { @@ -153,14 +140,17 @@ int main(int argc, char *argv[]) { nfl_commit_init(trunk_cnt); for (i = 0;; i = (i + 1) % trunk_cnt) { - trunks[i] = - trunks[i] != NULL ? trunks[i] : get_nflog_state(i, entries_max); + nfl_state_update_or_create(&(trunks[i]), i, entries_max); + // will be unlocked when #i has finished receiving & committing pthread_mutex_lock(&(trunks[i]->lock)); pthread_create(&(trunks[i]->thread), NULL, nflog_worker, (void *)trunks[i]); + // wait for current receiver worker pthread_join(trunks[i]->thread, NULL); } - sem_destroy(&nfl_commit_queue); + // Won't reach here + // We don't actually free trunks or the semaphore at all + // sem_destroy(&nfl_commit_queue); exit(0); } @@ -97,7 +97,7 @@ typedef struct __attribute__((packed)) _nflog_entry_t { typedef struct _nflog_state_t { - nflog_header_t header; + nflog_header_t* header; nflog_entry_t* store; struct nflog_handle *nfl_fd; @@ -48,7 +48,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, // only process ipv4 packet if (payload_len >= 0 && ((payload[0] & 0xf0) == 0x40)) { struct iphdr *iph = (struct iphdr *)payload; - nflog_entry_t *entry = &(nf->store[nf->header.n_entries]); + nflog_entry_t *entry = &(nf->store[nf->header->n_entries]); void *inner_hdr = iph + iph->ihl; // Only accept TCP / UDP packets @@ -75,7 +75,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, // get current timestamp time(&entry->timestamp); - nf->header.n_entries++; + nf->header->n_entries++; } // Ignore IPv6 packet for now Q_Q @@ -109,21 +109,21 @@ void *nflog_worker(void *targs) { nfl_init(nf); int fd = nflog_fd(nf->nfl_fd); - uint32_t *p_cnt_now = &(nf->header.n_entries); - uint32_t cnt_max = nf->header.max_n_entries; + uint32_t *p_cnt_now = &(nf->header->n_entries); + uint32_t cnt_max = nf->header->max_n_entries; - debug("Recv worker #%u: main loop starts\n", nf->header.id); - nf->header.start_time = time(NULL); + debug("Recv worker #%u: main loop starts\n", nf->header->id); + nf->header->start_time = time(NULL); while (*p_cnt_now < cnt_max) { int rv; char buf[4096]; if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) { - debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header.id, + debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header->id, rv); nflog_handle_packet(nf->nfl_fd, buf, rv); } } - nf->header.end_time = time(NULL); + nf->header->end_time = time(NULL); nfl_cleanup(nf); nfl_commit(nf); @@ -131,6 +131,10 @@ void *nflog_worker(void *targs) { pthread_exit(NULL); } +/* + * Committer + */ + void nfl_commit(nflog_state_t *nf) { pthread_t tid; pthread_create(&tid, NULL, _nfl_commit_worker, (void *)nf); @@ -139,13 +143,42 @@ void nfl_commit(nflog_state_t *nf) { void *_nfl_commit_worker(void *targs) { nflog_state_t* nf = (nflog_state_t*) targs; - debug("Comm worker #%u: thread started\n", nf->header.id); + debug("Comm worker #%u: thread started\n", nf->header->id); sem_wait(&nfl_commit_queue); - debug("Comm worker #%u: commit started\n", nf->header.id); - nfl_commit_worker(&(nf->header), nf->store); - debug("Comm worker #%u: commit done\n", nf->header.id); + debug("Comm worker #%u: commit started\n", nf->header->id); + nfl_commit_worker(nf->header, nf->store); + debug("Comm worker #%u: commit done\n", nf->header->id); sem_post(&nfl_commit_queue); + // Commit finished + nfl_state_free(nf); pthread_mutex_unlock(&(nf->lock)); } + +/* + * State managers + */ + +void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max) { + if(*nf == NULL) { + *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t)); + pthread_mutex_init(&((*nf)->lock), NULL); + } + + // Don't use calloc here, as it will consume physical memory + // before we fill the buffer. Instead, fill entries with 0 + // on the fly, to squeeze more space for compression. + (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * + entries_max); + (*nf)->header->id = id; + (*nf)->header->max_n_entries = entries_max; + (*nf)->header->n_entries = 0; +} + +void nfl_state_free(nflog_state_t *nf) { + // Free header and store only + // Leave the rest intact + free(nf->header); + free(nf->store); +} @@ -1,3 +1,5 @@ #pragma once void* nflog_worker(void *targs); +void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max); +void nfl_state_free(nflog_state_t *nf); |