TRAPMINE | Prevent Threats. Detect Unknown. Respond to Attacks.

How to Stream Zeek Logs to RabbitMQ in Real-Time

To make threat detection and response as effective as possible, security analysts need the relevant data in real-time. Security teams deploy network monitoring tools to obtain the data from the network. One such tool is open-source Zeek which observes the network and interprets the network traffic to create logs rich with useful information for security analysts. These logs are stored on disk in TSV format by default. For real-time threat detection, this data should be available promptly to other analytics services such as SIEM. One way to achieve this is by using a streaming platform such as Kafka or RabbitMQ. Choosing a streaming platform can depend on the specific use case. In this blog post, we will look at how we can send Zeek logs to RabbitMQ by writing our own Zeek plugin.

Zeek Logging Architecture

Zeek provides an extensible logging framework that is built upon 3 concepts: Stream, Filter, and Writer.

A stream corresponds to one log file which consists of several fields. One such example is conn.log which records several NetFlow features of all the connections Zeek has seen. To see more information about the fields in conn.log, take a look at Conn::Info.

We can attach filters to a stream to control what and how the data is written to logs. A default filter is attached to all the streams at the beginning which logs all the data to disk. See Log::FIlter for all the options.

A Writer controls the format of the logs and where the logs are written. Zeek provides 3 writers: AsciiWriter, SQLiteWriter, and NoneWriter. AsciiWriter is used as a default which writes the logs in TSV format to the disk. We can also configure this writer to change its behavior such as logging in JSON format instead of the default TSV format.

When Zeek logs a stream, it uses the log manager which manages the log writers and filters. There is one Writer associated with each filter. For each filter, the log manager instantiates WriterFrontend which in turn creates the correct WriterBackend. All three Writers are subclasses of WriterBackend. This design gives the opportunity to easily extend the WriterBackend to create new writers. When we take a look at the writers already present in Zeek, we can see that they are written as Zeek plugins. For example, take a look at src/logging/writers/ascii/ We are also going to extend WriterBackend class to write a Zeek plugin that will send Zeek logs to RabbitMQ

Installing Dependencies

Before we start writing the Zeek plugin, we need to install some dependencies:

  1. Zeek
  2. Rabbitmq-c
  3. RabbitMQ

To build the plugin, keep in mind that we need to build the Zeek locally. To build the Zeek, we can follow the instructions here. We can look at the build instructions for Rabbitmq-c here. To quickly run RabbitMQ, we can run it using docker. 

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

Initializing the Plugin

Zeek provides an easy way to initialize the plugin using one of its auxiliary tools: init-plugin. It takes the directory name where the plugin will be initialized, namespace, and plugin name. The namespace is used to avoid naming conflicts in the plugin names. We initialize the Zeek plugin like this:

path/to/zeek/auxil/zeek-aux/plugin-support/init-plugin ./zeek-amqp Trapmine AMQPWriter

This will create a directory named zeek-amqp with the following structure:

├── CMakeLists.txt
├── configure
├── configure.plugin
├── COPYING.edit-me
├── Makefile
├── scripts
│   ├── __load__.zeek
│   ├── __preload__.zeek
│   ├── Trapmine
│   │   └── AMQPWriter
│   │   └── __load__.zeek
│   └── types.zeek
├── src
│   ├── amqpwriter.bif
│   ├──
│   └── Plugin.h
├── tests
│   ├── amqpwriter
│   │   └── show-plugin.zeek
│   ├── btest.cfg
│   ├── Makefile
│   ├── random.seed
│   └── Scripts
│   ├── diff-remove-timestamps
│   └── get-zeek-env
├── tree_structure.txt
└── zkg.met

The src folder contains the source code for the plugin and the scripts folder contains the Zeek scripts for the plugin. We can start by updating the description of the Plugin in the src/ like this:

config.description = "Sends logs to AMQP Exchange"; 

Now we can try to compile and install the plugin using the following commands

sudo make install

Running zeek -N will show our plugin like this:

Trapmine::AMQPWriter - Sends logs to AMQP Exchange (dynamic, version 0.1.0)

Extending the WriterBackend

We make a class named AMQP which extends WriterBackend. We then implement its virtual methods. These methods include:

  1. DoInit: This method is called once on the initialization of the writer. We create an amqp socket, connect to the socket, log in using the credentials, create a channel, and declare a queue.
  2. DoFInish: This method is called once we are finished with writing logs and Zeek can then assume it is safe to terminate the thread.
  3. DoRotate: This method allows the implementation of log rotation which we do not need in this case. The writer must call FinishedRotation() in its implementation.
  4. DoSetBuf: Implementation of change of file buffering state. We do not need to implement this method in this case.
  5. DoFlush: Implementation of the flushing of logs is not required in this case.
  6. DoHeartbeat: Implementation of heartbeat mechanism which is triggered by heartbeat messages from Zeek’s main thread. We do not need to implement this method as well.
  7. DoWrite: Implementation of writing of one log entry to the destination. We publish the message in a queue.

We use Zeek’s JSON formatter to publish the Zeek logs in JSON format. However, we need to know on the other end of the queue the log name which produced this message. We extend the JSON formatter by TaggedJSON class. This formatter tags the JSON log message with the log name. For example, if conn.log produced following message:


We tag it with the key name “conn”:

{"conn": {"ts":1633952241.658421,"uid":"CX9WWiDerGCStuc77","id.orig_h":"","id.orig_p":52028,"id.resp_h":"","id.resp_p":80,"proto":"tcp","duration":0.00017595291137695313,"orig_bytes":1,"resp_bytes":0,"conn_state":"OTH","missed_bytes":0,"history":"Aa","orig_pkts":1,"orig_ip_bytes":40,"resp_pkts":1,"resp_ip_bytes":40}}

Zeek global and per-filter configuration options

There are usually 2 ways to pass some values to a Zeek plugin.

  1. We can redefine the global options for a writer.
  2. We can define the configuration options in the config table when creating a filter.

To be able to achieve this with our plugin, we define the configuration options in src/amqpwriter.bif. These options include:

  1. hostname
  2. amqp_port
  3. vhost
  4. username
  5. password
  6. queue_name
  7. exchange
  8. routing_key

We can define the default values for these options in scripts/init_default.zeek file like this: 

export {
const hostname = "localhost" &redef;
const amqp_port = 5672 &redef;
const vhost = "/" &redef;
const username = "guest" &redef;
const password = "guest" &redef;
const queue_name = "test_queue" &redef;
const exchange = "" &redef;
const routing_key = "test_queue" &redef;

Zeek loads all the scripts loaded in scripts/__load__.zeek. So we add the following line to load script/init_default.zeek in scripts/__load__.zeek.

@load ./init_default.zeek 

Now we can redefine these options globally in zeek scriptland like this:

redef LogAMQP::amqp_port = 5761;

We can load per-filter options in the constructor of AMQP class by iterating over WriterInfo::config_map which is a map of string -> string values. By convention, per-filter options should override the global options. So, we load global options in our constructor before loading per-filter options.

Registering the Plugin component with Zeek

The final step in writing our AMQP plugin is to register the plugin component with Zeek. We can do this by using AddComponent in Plugin’s Configure method in the src/ file. We pass a new component with a reference to the AMQP Instantiate method like this:

AddComponent(new zeek::logging::Component("AMQP",zeek::logging::writer::AMQP::Instantiate));

Compiling and Linking

Before we compile our plugin, we have to provide the rabbitmq-c library and headers correctly to the compiler and linker. It is quite simple to achieve this using CMake.

We first add an option in the configure.plugin file to pass rabbitmq-c path. We write a CMakescript (cCMakeFindLIB_RABBITMQ_C.cmake) to find the correct paths for rabbitmq-c library. We find the root path of rabbitmq-c and the path of its include directory by using the find_path function and find the rabbitmq-c library files using the find_library function. We then write a find_package handler for rabbitmq-c. FInal CMake script should look like this:

    NAMES include/rabbitmq-c/amqp.h

    NAMES rabbitmq
    HINTS ${LIB_RABBITMQ_C_ROOT_DIR}/build/librabbitmq

    NAMES include/rabbitmq-c/amqp.h

find_package_handle_standard_args(LIB_RABBITMQ_C DEFAULT_MSG


Finally, in CMakeLists.txt, we find the rabbitmq-c package using find_package() and then include the rabbitmq-c library using include_directories before starting the compilation. We call zeek_plugin_cc for all the .cc files in the src directory and finally link the rabbitmq-c library using zeek_plugin_link_library.

We compile and install the plugin using the following commands.

./configure --zeek-dist=/path/to/zeek --with-rabbitmq-c=/path/to/rabbitmq-c
sudo make install

Verify the installation by running zeek -NN.

Sending Zeek logs to RabbitMQ

We write a simple Zeek script (logs_to_rabbitmq.zeek) to send conn.log to RabbitMQ by adding a filter with WRITER_AMQP.

event zeek_init() &priority=1 {
    local filter: Log::Filter = [
  ["hostname"] = "localhost",
  ["amqp_port"] = "5672",
  ["username"] = "guest",
  ["password"] = "guest",
  ["vhost"] = "/",
  ["queue_name"] = "test_queue",
  ["exchange"] = "",
  ["routing_key"] = "test_queue"
    Log::add_filter(Conn::LOG, filter);

Run the Zeek with our script while it is listening on a network interface like this:

zeek -i ens33 -C logs_to_rabbitmq.zeek

After some time, we can see the logs being published in “test_queue” in the RabbitMQ management interface.

We can see that these log lines correspond to the conn.log that Zeek produced.

#separator \x09
#set_separator    ,
#empty_field    (empty)
#unset_field    -
#path    conn
#open    2021-10-11-04-37-28
#fields    ts    uid    id.orig_h    id.orig_p    id.resp_h    id.resp_p    proto    service    duration    orig_bytes    resp_bytes    conn_state    local_orig    local_resp    missed_bytes    history    orig_pkts    orig_ip_bytes    resp_pkts    resp_ip_bytes    tunnel_parents
#types    time    string    addr    port    addr    port    enum    string    interval    count    count    string    bool    bool    count    string    count    count    count    count    set[string]
1633952241.658421    CX9WWiDerGCStuc77    52028    REDACTED_IP    80    tcp    -    0.000176    1    0    OTH    -    -0    Aa    1    40    1    40    -
1633952241.658289    CAUqadsHWswaiWq07    52030   REDACTED_IP    80    tcp    -    0.000287    1    0    OTH    -    -0    Aa    1    40    1    40    -
#close    2021-10-11-04-37-28

You can see the full plugin source code here at GitLab.

Muhammad Saboor – R&D Engineer / TRAPMINE


More Posts