diff options
author | Yunchih Chen <yunchih.cat@gmail.com> | 2017-12-01 11:03:45 +0800 |
---|---|---|
committer | Yunchih Chen <yunchih.cat@gmail.com> | 2017-12-01 11:03:45 +0800 |
commit | ecd23b3efbbf338ef54447cad1c24cfcce8b785b (patch) | |
tree | ec8e5e5aa17924866e2d120992f5eb16fb7fcc5a /nflog.c | |
parent | 73b99a356db73745e8a70c1442ed3fbdbc0c7317 (diff) | |
download | nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.gz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.bz2 nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.lz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.xz nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.zst nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.zip |
Revise nflog_state_t lifecycle handler function
Diffstat (limited to 'nflog.c')
-rw-r--r-- | nflog.c | 57 |
1 files changed, 45 insertions, 12 deletions
@@ -48,7 +48,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, // only process ipv4 packet if (payload_len >= 0 && ((payload[0] & 0xf0) == 0x40)) { struct iphdr *iph = (struct iphdr *)payload; - nflog_entry_t *entry = &(nf->store[nf->header.n_entries]); + nflog_entry_t *entry = &(nf->store[nf->header->n_entries]); void *inner_hdr = iph + iph->ihl; // Only accept TCP / UDP packets @@ -75,7 +75,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, // get current timestamp time(&entry->timestamp); - nf->header.n_entries++; + nf->header->n_entries++; } // Ignore IPv6 packet for now Q_Q @@ -109,21 +109,21 @@ void *nflog_worker(void *targs) { nfl_init(nf); int fd = nflog_fd(nf->nfl_fd); - uint32_t *p_cnt_now = &(nf->header.n_entries); - uint32_t cnt_max = nf->header.max_n_entries; + uint32_t *p_cnt_now = &(nf->header->n_entries); + uint32_t cnt_max = nf->header->max_n_entries; - debug("Recv worker #%u: main loop starts\n", nf->header.id); - nf->header.start_time = time(NULL); + debug("Recv worker #%u: main loop starts\n", nf->header->id); + nf->header->start_time = time(NULL); while (*p_cnt_now < cnt_max) { int rv; char buf[4096]; if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) { - debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header.id, + debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header->id, rv); nflog_handle_packet(nf->nfl_fd, buf, rv); } } - nf->header.end_time = time(NULL); + nf->header->end_time = time(NULL); nfl_cleanup(nf); nfl_commit(nf); @@ -131,6 +131,10 @@ void *nflog_worker(void *targs) { pthread_exit(NULL); } +/* + * Committer + */ + void nfl_commit(nflog_state_t *nf) { pthread_t tid; pthread_create(&tid, NULL, _nfl_commit_worker, (void *)nf); @@ -139,13 +143,42 @@ void nfl_commit(nflog_state_t *nf) { void *_nfl_commit_worker(void *targs) { nflog_state_t* nf = (nflog_state_t*) targs; - debug("Comm worker #%u: thread started\n", nf->header.id); + debug("Comm worker #%u: thread started\n", nf->header->id); sem_wait(&nfl_commit_queue); - debug("Comm worker #%u: commit started\n", nf->header.id); - nfl_commit_worker(&(nf->header), nf->store); - debug("Comm worker #%u: commit done\n", nf->header.id); + debug("Comm worker #%u: commit started\n", nf->header->id); + nfl_commit_worker(nf->header, nf->store); + debug("Comm worker #%u: commit done\n", nf->header->id); sem_post(&nfl_commit_queue); + // Commit finished + nfl_state_free(nf); pthread_mutex_unlock(&(nf->lock)); } + +/* + * State managers + */ + +void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max) { + if(*nf == NULL) { + *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t)); + pthread_mutex_init(&((*nf)->lock), NULL); + } + + // Don't use calloc here, as it will consume physical memory + // before we fill the buffer. Instead, fill entries with 0 + // on the fly, to squeeze more space for compression. + (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * + entries_max); + (*nf)->header->id = id; + (*nf)->header->max_n_entries = entries_max; + (*nf)->header->n_entries = 0; +} + +void nfl_state_free(nflog_state_t *nf) { + // Free header and store only + // Leave the rest intact + free(nf->header); + free(nf->store); +} |