// The MIT License (MIT)
// Copyright (c) 2017 Yun-Chih Chen
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in
// all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
#include "commit.h"
#include "common.h"
#include <stddef.h> // size_t for libnetfilter_log
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
#include <time.h>
nflog_global_t *g;
static void nfl_cleanup(void *nf);
static void nfl_init(nflog_state_t *nf);
static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nflog_state_t *nf);
static void nfl_state_free(nflog_state_t *nf);
static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
struct nflog_data *nfa, void *_nf) {
register const struct iphdr *iph;
register nflog_entry_t *entry;
const struct tcphdr *tcph;
const struct udphdr *udph;
char *payload;
void *inner_hdr;
uint32_t uid;
int payload_len = nflog_get_payload(nfa, &payload);
nflog_state_t *nf = (nflog_state_t *)_nf;
pthread_testcancel(); /* cancellation point */
// only process ipv4 packet
if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40))
return 1;
if (unlikely(nf->header->n_entries >= nf->header->max_n_entries))
return 1;
iph = (struct iphdr *)payload;
entry = &(nf->store[nf->header->n_entries]);
inner_hdr = (uint32_t *)iph + iph->ihl;
// Only accept TCP / UDP packets
if (iph->protocol == IPPROTO_TCP) {
tcph = (struct tcphdr *)inner_hdr;
entry->sport = ntohs(tcph->source);
entry->dport = ntohs(tcph->dest);
// only process SYNC and PSH packet, drop ACK
if(!tcph->syn && !tcph->psh)
return 1;
} else if (iph->protocol == IPPROTO_UDP) {
udph = (struct udphdr *)inner_hdr;
entry->sport = ntohs(udph->source);
entry->dport = ntohs(udph->dest);
} else
return 1; // Ignore other types of packet
entry->daddr.s_addr = iph->daddr;
entry->protocol = iph->protocol;
// get sender uid
if (nflog_get_uid(nfa, &uid) != 0)
return 1;
entry->uid = uid;
// get current timestamp
time(&entry->timestamp);
nf->header->n_entries++;
debug("Recv packet info entry #%d: "
"timestamp:\t%ld\t"
"daddr:\t%ld\t"
"transfer:\t%s\t"
"uid:\t%d\t"
"sport:\t%d\t"
"dport:\t%d",
nf->header->n_entries,
entry->timestamp, (unsigned long)entry->daddr.s_addr,
iph->protocol == IPPROTO_TCP ? "TCP" : "UDP",
entry->uid, entry->sport, entry->dport);
// Ignore IPv6 packet for now Q_Q
return 0;
}
static void nfl_init(nflog_state_t *nf) {
// open nflog
ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open")
debug("Opening nflog communication file descriptor");
// monitor IPv4 packets only
ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf");
// bind to group
nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id);
/* ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, sizeof(struct iphdr) + 4) < 0, */
ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nflog_recv_size) < 0,
"Could not set copy mode");
nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
debug("Registering nflog callback");
g = nf->global;
}
static void nfl_cleanup(void *args) {
nflog_state_t *nf = (nflog_state_t *)args;
// write end time
time(&nf->header->end_time);
nflog_unbind_group(nf->nfl_group_fd);
nflog_close(nf->nfl_fd);
// commit to file
if(g->commit_when_died) nfl_commit(nf);
}
void *nfl_collect_worker(void *targs) {
nflog_state_t *nf = (nflog_state_t *)targs;
nfl_init(nf);
int fd = nflog_fd(nf->nfl_fd);
uint32_t *p_cnt_now = &(nf->header->n_entries);
uint32_t cnt_max = nf->header->max_n_entries;
debug("Recv worker #%u: main loop starts", nf->header->id);
time(&nf->header->start_time);
pthread_cleanup_push(nfl_cleanup, (void*)nf);
int rv; char buf[4096];
while (*p_cnt_now < cnt_max) {
pthread_testcancel(); /* cancellation point */
if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) {
debug("Recv worker #%u: nflog packet received (len=%u)", nf->header->id,
rv);
nflog_handle_packet(nf->nfl_fd, buf, rv);
}
}
debug("Recv worker #%u: finish recv", nf->header->id);
pthread_cleanup_pop(1);
pthread_exit(NULL);
}
/*
* Committer
*/
static void nfl_commit(nflog_state_t *nf) {
pthread_t tid;
pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf);
pthread_detach(tid);
}
static void *nfl_start_commit_worker(void *targs) {
nflog_state_t* nf = (nflog_state_t*) targs;
const char *filename = nfl_get_filename(g->storage_dir, nf->header->id);
debug("Comm worker #%u: thread started", nf->header->id);
sem_wait(g->nfl_commit_queue);
debug("Comm worker #%u: commit started", nf->header->id);
nfl_commit_worker(nf->header, nf->store, filename);
debug("Comm worker #%u: commit done", nf->header->id);
sem_post(g->nfl_commit_queue);
// Commit finished
nfl_state_free(nf);
free((char*)filename);
pthread_mutex_unlock(&(nf->lock));
pthread_exit(NULL);
}
/*
* State managers
*/
void nfl_state_update_or_create(nflog_state_t **nf,
uint32_t id, uint32_t entries_max,
nflog_global_t *g) {
if(*nf == NULL) {
*nf = (nflog_state_t *)malloc(sizeof(nflog_state_t));
pthread_mutex_init(&((*nf)->lock), NULL);
(*nf)->global = g;
}
// Don't use calloc here, as it will consume physical memory
// before we fill the buffer. Instead, fill entries with 0
// on the fly, to squeeze more space for compression.
(*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) *
entries_max);
(*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t));
(*nf)->header->id = id;
(*nf)->header->max_n_entries = entries_max;
(*nf)->header->n_entries = 0;
}
void nfl_state_free(nflog_state_t *nf) {
// Free header and store only
// Leave the rest intact
free((void*)nf->header);
free((void*)nf->store);
}