aboutsummaryrefslogblamecommitdiffstats
path: root/lib/collect.c
blob: 3e3f7a38d2540c7005fc7975c675f830a225c4af (plain) (tree)
























                                                                                
                   

                                              
                                                  
                   
                                                        

                 
               
 
                                      
                                                  

                                            


                                                                           
                                     
                                

                              
                  


                    
                                                       
                                         

                               


                                                                     












                                                     
                                     



















                                                 
                                        




                              
                       



                                                                   




                                     
                                       
                 

                                                          

                                
                                                                 

                    
                                                                              
 

                                                                             
                                                                                

                                   

                                                                  
 
                                                 

 
                                       
                                           


                                  

                                                   
 

                                                               
 
           



                                                                        
                                  
                                                             

                                                                    



                                                     

                                                                               





                                         


                                                     

                          


                       



            
                                         
                  
                                                                    


                        
                                                   
                                           

                                                                           
 

                                                              
                                                                          

                                                           
 
                       
                           





                                                 
                       
 




                 

                                                                        
               
                                
                                                         
                          
                                                                     
                               
                                     

                                                            



                                                            

     
                                                                                

                                                                        
                                   
                                                  

                                                                                






                                                             
                                                                            

 
                                             
                                                       
                            
 

// The MIT License (MIT)

// Copyright (c) 2017 Yun-Chih Chen

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in
// all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

#include "commit.h"
#include "common.h"
#include <libnetfilter_log/libnetfilter_log.h>
#include <pthread.h>
#include <stddef.h> // size_t for libnetfilter_log
#include <string.h>
#include <sys/types.h> // u_int32_t for libnetfilter_log
#include <time.h>

nfl_global_t g;

static void nfl_init(nfl_state_t *nf);
static void *nfl_start_commit_worker(void *targs);
static void nfl_commit(nfl_state_t *nf);
static void nfl_state_free(nfl_state_t *nf);

static int handle_packet(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
                         struct nflog_data *nfa, void *_nf) {
    register const struct iphdr *iph;
    register nfl_entry_t *entry;
    const struct tcphdr *tcph;
    const struct udphdr *udph;
    char *payload;
    void *inner_hdr;
    uint32_t uid;

    int payload_len = nflog_get_payload(nfa, &payload);
    nfl_state_t *nf = (nfl_state_t *)_nf;

    // only process ipv4 packet
    if (unlikely(payload_len < 0) || ((payload[0] & 0xf0) != 0x40))
        return 1;
    if (unlikely(nf->header->n_entries >= nf->header->max_n_entries))
        return 1;

    iph = (struct iphdr *)payload;
    entry = &(nf->store[nf->header->n_entries]);

    inner_hdr = (uint32_t *)iph + iph->ihl;
    // Only accept TCP / UDP packets
    if (iph->protocol == IPPROTO_TCP) {
        tcph = (struct tcphdr *)inner_hdr;
        entry->sport = ntohs(tcph->source);
        entry->dport = ntohs(tcph->dest);

        // only process SYNC and PSH packet, drop ACK
        if (!tcph->syn && !tcph->psh)
            return 1;
    } else if (iph->protocol == IPPROTO_UDP) {
        udph = (struct udphdr *)inner_hdr;
        entry->sport = ntohs(udph->source);
        entry->dport = ntohs(udph->dest);
    } else
        return 1; // Ignore other types of packet

    entry->daddr.s_addr = iph->daddr;
    entry->protocol = iph->protocol;

    // get sender uid
    if (nflog_get_uid(nfa, &uid) != 0)
        return 1;
    entry->uid = uid;

    // get current timestamp
    time(&entry->timestamp);
    nf->header->n_entries++;

    debug("Recv packet info entry #%d: "
          "timestamp:\t%ld,\t"
          "daddr:\t%ld,\t"
          "transfer:\t%s,\t"
          "uid:\t%d,\t"
          "sport:\t%d,\t"
          "dport:\t%d",
          nf->header->n_entries, entry->timestamp,
          (unsigned long)entry->daddr.s_addr,
          iph->protocol == IPPROTO_TCP ? "TCP" : "UDP", entry->uid,
          entry->sport, entry->dport);

    // Ignore IPv6 packet for now Q_Q
    return 0;
}

static void nfl_init(nfl_state_t *nf) {
    // open nflog
    ERR((nf->nfl_fd = nflog_open()) == NULL, "nflog_open")
    debug("Opening nflog communication file descriptor");

    // monitor IPv4 packets only
    ERR(nflog_bind_pf(nf->nfl_fd, AF_INET) < 0, "nflog_bind_pf");

    // bind to group
    nf->nfl_group_fd = nflog_bind_group(nf->nfl_fd, nf->global->nfl_group_id);

    /* ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, sizeof(struct
     * iphdr) + 4) < 0, */
    ERR(nflog_set_mode(nf->nfl_group_fd, NFULNL_COPY_PACKET, nfl_recv_size) < 0,
        "Could not set copy mode");

    nflog_callback_register(nf->nfl_group_fd, &handle_packet, nf);
    debug("Registering nflog callback");

    memcpy(&g, nf->global, sizeof(nfl_global_t));
}

void *nfl_collect_worker(void *targs) {
    nfl_state_t *nf = (nfl_state_t *)targs;
    nfl_init(nf);

    int fd = nflog_fd(nf->nfl_fd);
    uint32_t *p_cnt_now = &(nf->header->n_entries);
    uint32_t cnt_max = nf->header->max_n_entries;

    debug("Recv worker #%u: main loop starts", nf->header->id);
    time(&nf->header->start_time);

    int rv;
    // Must have at least 128 to account for sizeof(struct iphdr) +
    // sizeof(struct tcphdr)
    // plus the size of meta data needed by the library's data structure
    char buf[128];
    while (*p_cnt_now < cnt_max) {
        if ((rv = recv(fd, buf, sizeof(buf), 0)) && rv > 0) {
            debug("Recv worker #%u: nflog packet received (len=%u)",
                  nf->header->id, rv);
            nflog_handle_packet(nf->nfl_fd, buf, rv);
        }
    }

    debug("Recv worker #%u: finish recv, received packets: %u", nf->header->id,
          cnt_max);

    // write end time
    time(&nf->header->end_time);
    nflog_unbind_group(nf->nfl_group_fd);
    nflog_close(nf->nfl_fd);

    // write checksum
    nf->header->cksum = nfl_header_cksum(nf->header);

    // spawn commit thread
    nfl_commit(nf);
    pthread_exit(NULL);
}

/*
 * Committer
 */

static void nfl_commit(nfl_state_t *nf) {
    pthread_t tid;
    pthread_create(&tid, NULL, nfl_start_commit_worker, (void *)nf);
    pthread_detach(tid);
}

static void *nfl_start_commit_worker(void *targs) {
    nfl_state_t *nf = (nfl_state_t *)targs;
    const char *filename = nfl_get_filename(g.storage_dir, nf->header->id);
    debug("Comm worker #%u: thread started.", nf->header->id);

    sem_wait(g.nfl_commit_queue);
    debug("Comm worker #%u: commit started.", nf->header->id);
    nfl_commit_worker(nf->header, nf->store, g.compression_opt, filename);
    debug("Comm worker #%u: commit done.", nf->header->id);
    sem_post(g.nfl_commit_queue);

    nfl_state_free(nf);
    free((char *)filename);

    pthread_mutex_lock(&nf->has_finished_lock);
    nf->has_finished = true;
    pthread_cond_signal(&nf->has_finished_cond);
    pthread_mutex_unlock(&nf->has_finished_lock);

    pthread_exit(NULL);
}

/*
 * State managers
 */

void nfl_state_init(nfl_state_t **nf, uint32_t id, uint32_t entries_max,
                    nfl_global_t *g) {
    assert(nf);
    if (unlikely(*nf == NULL)) {
        *nf = (nfl_state_t *)malloc(sizeof(nfl_state_t));
        (*nf)->global = g;
        (*nf)->header = (nfl_header_t *)malloc(sizeof(nfl_header_t));
        (*nf)->header->id = id;
        (*nf)->header->n_entries = 0;
        (*nf)->header->max_n_entries = entries_max;
        (*nf)->header->compression_opt = g->compression_opt;

        (*nf)->has_finished = true;
        pthread_mutex_init(&(*nf)->has_finished_lock, NULL);
        pthread_cond_init(&(*nf)->has_finished_cond, NULL);
    }

    // Ensure trunk with same id in previous run has finished to prevent reusing
    // a trunk which it's still being used.  Furthermore, this hopefully
    // alleviate us
    // from bursty network traffic.
    pthread_mutex_lock(&(*nf)->has_finished_lock);
    while (!(*nf)->has_finished)
        pthread_cond_wait(&(*nf)->has_finished_cond, &(*nf)->has_finished_lock);
    (*nf)->has_finished = false;
    pthread_mutex_unlock(&(*nf)->has_finished_lock);

    // Don't use calloc here, as it will cause page fault and
    // consume physical memory before we fill the buffer.
    // Instead, fill entries with 0 on the fly, to squeeze
    // more space for compression.
    (*nf)->store = (nfl_entry_t *)malloc(sizeof(nfl_entry_t) * entries_max);
}

static void nfl_state_free(nfl_state_t *nf) {
    // Free only packet store and leave the rest intact
    free((void *)nf->store);
}