aboutsummaryrefslogtreecommitdiffstats
path: root/nflog.c
diff options
context:
space:
mode:
authorYunchih Chen <yunchih.cat@gmail.com>2017-12-01 11:03:45 +0800
committerYunchih Chen <yunchih.cat@gmail.com>2017-12-01 11:03:45 +0800
commitecd23b3efbbf338ef54447cad1c24cfcce8b785b (patch)
treeec8e5e5aa17924866e2d120992f5eb16fb7fcc5a /nflog.c
parent73b99a356db73745e8a70c1442ed3fbdbc0c7317 (diff)
downloadnfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.gz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.bz2
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.lz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.xz
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.tar.zst
nfcollect-ecd23b3efbbf338ef54447cad1c24cfcce8b785b.zip
Revise nflog_state_t lifecycle handler function
Diffstat (limited to 'nflog.c')
-rw-r--r--nflog.c57
1 files changed, 45 insertions, 12 deletions
diff --git a/nflog.c b/nflog.c
index bd32b83..455ab50 100644
--- a/nflog.c
+++ b/nflog.c
@@ -48,7 +48,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
// only process ipv4 packet
if (payload_len >= 0 && ((payload[0] & 0xf0) == 0x40)) {
struct iphdr *iph = (struct iphdr *)payload;
- nflog_entry_t *entry = &(nf->store[nf->header.n_entries]);
+ nflog_entry_t *entry = &(nf->store[nf->header->n_entries]);
void *inner_hdr = iph + iph->ihl;
// Only accept TCP / UDP packets
@@ -75,7 +75,7 @@ static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
// get current timestamp
time(&entry->timestamp);
- nf->header.n_entries++;
+ nf->header->n_entries++;
}
// Ignore IPv6 packet for now Q_Q
@@ -109,21 +109,21 @@ void *nflog_worker(void *targs) {
nfl_init(nf);
int fd = nflog_fd(nf->nfl_fd);
- uint32_t *p_cnt_now = &(nf->header.n_entries);
- uint32_t cnt_max = nf->header.max_n_entries;
+ uint32_t *p_cnt_now = &(nf->header->n_entries);
+ uint32_t cnt_max = nf->header->max_n_entries;
- debug("Recv worker #%u: main loop starts\n", nf->header.id);
- nf->header.start_time = time(NULL);
+ debug("Recv worker #%u: main loop starts\n", nf->header->id);
+ nf->header->start_time = time(NULL);
while (*p_cnt_now < cnt_max) {
int rv; char buf[4096];
if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) {
- debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header.id,
+ debug("Recv worker #%u: nflog packet received (len=%u)\n", nf->header->id,
rv);
nflog_handle_packet(nf->nfl_fd, buf, rv);
}
}
- nf->header.end_time = time(NULL);
+ nf->header->end_time = time(NULL);
nfl_cleanup(nf);
nfl_commit(nf);
@@ -131,6 +131,10 @@ void *nflog_worker(void *targs) {
pthread_exit(NULL);
}
+/*
+ * Committer
+ */
+
void nfl_commit(nflog_state_t *nf) {
pthread_t tid;
pthread_create(&tid, NULL, _nfl_commit_worker, (void *)nf);
@@ -139,13 +143,42 @@ void nfl_commit(nflog_state_t *nf) {
void *_nfl_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
- debug("Comm worker #%u: thread started\n", nf->header.id);
+ debug("Comm worker #%u: thread started\n", nf->header->id);
sem_wait(&nfl_commit_queue);
- debug("Comm worker #%u: commit started\n", nf->header.id);
- nfl_commit_worker(&(nf->header), nf->store);
- debug("Comm worker #%u: commit done\n", nf->header.id);
+ debug("Comm worker #%u: commit started\n", nf->header->id);
+ nfl_commit_worker(nf->header, nf->store);
+ debug("Comm worker #%u: commit done\n", nf->header->id);
sem_post(&nfl_commit_queue);
+ // Commit finished
+ nfl_state_free(nf);
pthread_mutex_unlock(&(nf->lock));
}
+
+/*
+ * State managers
+ */
+
+void nfl_state_update_or_create(nflog_state_t **nf, uint32_t id, uint32_t entries_max) {
+ if(*nf == NULL) {
+ *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
+ pthread_mutex_init(&((*nf)->lock), NULL);
+ }
+
+ // Don't use calloc here, as it will consume physical memory
+ // before we fill the buffer. Instead, fill entries with 0
+ // on the fly, to squeeze more space for compression.
+ (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
+ entries_max);
+ (*nf)->header->id = id;
+ (*nf)->header->max_n_entries = entries_max;
+ (*nf)->header->n_entries = 0;
+}
+
+void nfl_state_free(nflog_state_t *nf) {
+ // Free header and store only
+ // Leave the rest intact
+ free(nf->header);
+ free(nf->store);
+}