// The MIT License (MIT) // Copyright (c) 2017 Yun-Chih Chen // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // The above copyright notice and this permission notice shall be included in // all // copies or substantial portions of the Software. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. #include "commit.h" #include "common.h" #include #include #include // size_t for libnetfilter_log #include #include // u_int32_t for libnetfilter_log #include nflog_global_t g; static void nfl_init(nflog_state_t *nf); static void *nfl_start_commit_worker(void *targs); static void nfl_commit(nflog_state_t *nf); static void nfl_state_free(nflog_state_t *nf); static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, struct nflog_data *nfa, void *_nf) { register const struct iphdr *iph; register nflog_entry_t *entry; const struct tcphdr *tcph; const struct udphdr *udph; char *payload; void *inner_hdr; uint32_t uid; int payload_len = nflog_get_payload(nfa, &payload); nflog_state_t *nf = (nflog_state_t *)_nf; pthread_testcancel(); /* cancellation point */ // only process ipv4 packet if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40)) return 1; if (unlikely(nf->header->n_entries >= nf->header->max_n_entries)) return 1; iph = (struct iphdr *)payload; entry = &(nf->store[nf->header->n_entries]); inner_hdr = (uint32_t *)iph + iph->ihl; // Only accept TCP / UDP packets if (iph->protocol == IPPROTO_TCP) { tcph = (struct tcphdr *)inner_hdr; entry->sport = ntohs(tcph->source); entry->dport = ntohs(tcph->dest); // only process SYNC and PSH packet, drop ACK if (!tcph->syn && !tcph->psh) return 1; } else if (iph->protocol == IPPROTO_UDP) { udph = (struct udphdr *)inner_hdr; entry->sport = ntohs(udph->source); entry->dport = ntohs(udph->dest); } else return 1; // Ignore other types of packet entry->daddr.s_addr = iph->daddr; entry->protocol = iph->protocol; // get sender uid if (nflog_get_uid(nfa, &uid) != 0) return 1; entry->uid = uid; // get current timestamp time(&entry->timestamp); nf->header->n_entries++; debug("Recv packet info entry #%d: " "timestamp:\t%ld,\t" "daddr:\t%ld,\t" "transfer:\t%s,\t" "uid:\t%d,\t" "sport:\t%d,\t" "dport:\t%d", nf->header->n_entries, entry->timestamp, (unsigned long)entry->daddr.s_addr, iph->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid, entry->sport, entry->dport); // Ignore IPv6 packet for now Q_Q return 0; } static void nfl_init(nflog_state_t *nf) { // open nflog ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open") debug("Opening nflog communication file descriptor"); // monitor IPv4 packets only ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf"); // bind to group nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id); /* ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, sizeof(struct * iphdr) + 4) < 0, */ ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nflog_recv_size) < 0, "Could not set copy mode"); nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf); debug("Registering nflog callback"); memcpy(&g, nf->global, sizeof(nflog_global_t)); } void *nfl_collect_worker(void *targs) { nflog_state_t *nf = (nflog_state_t *)targs; nfl_init(nf); int fd = nflog_fd(nf->nfl_fd); uint32_t *p_cnt_now = &(nf->header->n_entries); uint32_t cnt_max = nf->header->max_n_entries; debug("Recv worker #%u: main loop starts", nf->header->id); time(&nf->header->start_time); int rv; char buf[4096]; while (*p_cnt_now < cnt_max) { pthread_testcancel(); /* cancellation point */ if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) { debug("Recv worker #%u: nflog packet received (len=%u)", nf->header->id, rv); nflog_handle_packet(nf->nfl_fd, buf, rv); } } debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id, cnt_max); // write end time time(&nf->header->end_time); nflog_unbind_group(nf->nfl_group_fd); nflog_close(nf->nfl_fd); // write checksum nf->header->cksum = nfl_header_cksum(nf->header); // spawn commit thread nfl_commit(nf); pthread_exit(NULL); } /* * Committer */ static void nfl_commit(nflog_state_t *nf) { pthread_t tid; pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf); pthread_detach(tid); } static void *nfl_start_commit_worker(void *targs) { nflog_state_t *nf = (nflog_state_t *)targs; const char *filename = nfl_get_filename(g.storage_dir, nf->header->id); debug("Comm worker #%u: thread started.", nf->header->id); sem_wait(g.nfl_commit_queue); debug("Comm worker #%u: commit started.", nf->header->id); nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename); debug("Comm worker #%u: commit done.", nf->header->id); sem_post(g.nfl_commit_queue); nfl_state_free(nf); free((char *)filename); pthread_mutex_lock(&nf->has_finished_lock); nf->has_finished = true; pthread_cond_signal(&nf->has_finished_cond); pthread_mutex_unlock(&nf->has_finished_lock); pthread_exit(NULL); } /* * State managers */ void nfl_state_init(nflog_state_t **nf, uint32_t id, uint32_t entries_max, nflog_global_t *g) { assert(nf); if (unlikely(*nf == NULL)) { *nf = (nflog_state_t *)malloc(sizeof(nflog_state_t)); (*nf)->global = g; (*nf)->header = (nflog_header_t *)malloc(sizeof(nflog_header_t)); (*nf)->header->id = id; (*nf)->header->n_entries = 0; (*nf)->header->max_n_entries = entries_max; (*nf)->header->compression_opt = g->compression_opt; (*nf)->has_finished = true; pthread_mutex_init(&(*nf)->has_finished_lock, NULL); pthread_cond_init(&(*nf)->has_finished_cond, NULL); } // Ensure trunk with same id in previous run has finished to prevent reusing // a trunk which it's still being used. Furthermore, this hopefully alleviate us // from bursty network traffic. pthread_mutex_lock(&(*nf)->has_finished_lock); while (!(*nf)->has_finished) pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock); (*nf)->has_finished = false; pthread_mutex_unlock(&(*nf)->has_finished_lock); // Don't use calloc here, as it will cause page fault and // consume physical memory before we fill the buffer. // Instead, fill entries with 0 on the fly, to squeeze // more space for compression. (*nf)->store = (nflog_entry_t *)malloc(sizeof(nflog_entry_t) * entries_max); } static void nfl_state_free(nflog_state_t *nf) { // Free only packet store and leave the rest intact free((void *)nf->store); }