To make threat detection and response as effective as possible, security analysts need the relevant data in real-time. Security teams deploy network monitoring tools to obtain the data from the network. One such tool is open-source Zeek which observes the network and interprets the network traffic to create logs rich with useful information for security analysts. These logs are stored on disk in TSV format by default. For real-time threat detection, this data should be available promptly to other analytics services such as SIEM. One way to achieve this is by using a streaming platform such as Kafka or RabbitMQ. Choosing a streaming platform can depend on the specific use case. In this blog post, we will look at how we can send Zeek logs to RabbitMQ by writing our own Zeek plugin.
Zeek Logging Architecture
Zeek provides an extensible logging framework that is built upon 3 concepts: Stream, Filter, and Writer.
A stream corresponds to one log file which consists of several fields. One such example is conn.log which records several NetFlow features of all the connections Zeek has seen. To see more information about the fields in conn.log, take a look at Conn::Info.
We can attach filters to a stream to control what and how the data is written to logs. A default filter is attached to all the streams at the beginning which logs all the data to disk. See Log::FIlter for all the options.
A Writer controls the format of the logs and where the logs are written. Zeek provides 3 writers: AsciiWriter, SQLiteWriter, and NoneWriter. AsciiWriter is used as a default which writes the logs in TSV format to the disk. We can also configure this writer to change its behavior such as logging in JSON format instead of the default TSV format.
When Zeek logs a stream, it uses the log manager which manages the log writers and filters. There is one Writer associated with each filter. For each filter, the log manager instantiates WriterFrontend which in turn creates the correct WriterBackend. All three Writers are subclasses of WriterBackend. This design gives the opportunity to easily extend the WriterBackend to create new writers. When we take a look at the writers already present in Zeek, we can see that they are written as Zeek plugins. For example, take a look at src/logging/writers/ascii/Plugin.cc. We are also going to extend WriterBackend class to write a Zeek plugin that will send Zeek logs to RabbitMQ
Installing Dependencies
Before we start writing the Zeek plugin, we need to install some dependencies:
- Zeek
- Rabbitmq-c
- RabbitMQ
To build the plugin, keep in mind that we need to build the Zeek locally. To build the Zeek, we can follow the instructions here. We can look at the build instructions for Rabbitmq-c here. To quickly run RabbitMQ, we can run it using docker.
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
Initializing the Plugin
Zeek provides an easy way to initialize the plugin using one of its auxiliary tools: init-plugin. It takes the directory name where the plugin will be initialized, namespace, and plugin name. The namespace is used to avoid naming conflicts in the plugin names. We initialize the Zeek plugin like this:
path/to/zeek/auxil/zeek-aux/plugin-support/init-plugin ./zeek-amqp Trapmine AMQPWriter
This will create a directory named zeek-amqp with the following structure:
.
├── CHANGES
├── CMakeLists.txt
├── configure
├── configure.plugin
├── COPYING.edit-me
├── Makefile
├── README
├── scripts
│ ├── __load__.zeek
│ ├── __preload__.zeek
│ ├── Trapmine
│ │ └── AMQPWriter
│ │ └── __load__.zeek
│ └── types.zeek
├── src
│ ├── amqpwriter.bif
│ ├── Plugin.cc
│ └── Plugin.h
├── tests
│ ├── amqpwriter
│ │ └── show-plugin.zeek
│ ├── btest.cfg
│ ├── Makefile
│ ├── random.seed
│ └── Scripts
│ ├── diff-remove-timestamps
│ └── get-zeek-env
├── tree_structure.txt
├── VERSION
└── zkg.met
The src folder contains the source code for the plugin and the scripts folder contains the Zeek scripts for the plugin. We can start by updating the description of the Plugin in the src/Plugin.cc like this:
config.description = "Sends logs to AMQP Exchange";
Now we can try to compile and install the plugin using the following commands
make
sudo make install
Running zeek -N will show our plugin like this:
Trapmine::AMQPWriter - Sends logs to AMQP Exchange (dynamic, version 0.1.0)
Extending the WriterBackend
We make a class named AMQP which extends WriterBackend. We then implement its virtual methods. These methods include:
- DoInit: This method is called once on the initialization of the writer. We create an amqp socket, connect to the socket, log in using the credentials, create a channel, and declare a queue.
- DoFInish: This method is called once we are finished with writing logs and Zeek can then assume it is safe to terminate the thread.
- DoRotate: This method allows the implementation of log rotation which we do not need in this case. The writer must call FinishedRotation() in its implementation.
- DoSetBuf: Implementation of change of file buffering state. We do not need to implement this method in this case.
- DoFlush: Implementation of the flushing of logs is not required in this case.
- DoHeartbeat: Implementation of heartbeat mechanism which is triggered by heartbeat messages from Zeek’s main thread. We do not need to implement this method as well.
- DoWrite: Implementation of writing of one log entry to the destination. We publish the message in a queue.
We use Zeek’s JSON formatter to publish the Zeek logs in JSON format. However, we need to know on the other end of the queue the log name which produced this message. We extend the JSON formatter by TaggedJSON class. This formatter tags the JSON log message with the log name. For example, if conn.log produced following message:
{"ts":1633952241.658421,"uid":"CX9WWiDerGCStuc77","id.orig_h":"172.16.114.129","id.orig_p":52028,"id.resp_h":"34.107.221.82","id.resp_p":80,"proto":"tcp","duration":0.00017595291137695313,"orig_bytes":1,"resp_bytes":0,"conn_state":"OTH","missed_bytes":0,"history":"Aa","orig_pkts":1,"orig_ip_bytes":40,"resp_pkts":1,"resp_ip_bytes":40}
We tag it with the key name “conn”:
{"conn": {"ts":1633952241.658421,"uid":"CX9WWiDerGCStuc77","id.orig_h":"172.16.114.129","id.orig_p":52028,"id.resp_h":"34.107.221.82","id.resp_p":80,"proto":"tcp","duration":0.00017595291137695313,"orig_bytes":1,"resp_bytes":0,"conn_state":"OTH","missed_bytes":0,"history":"Aa","orig_pkts":1,"orig_ip_bytes":40,"resp_pkts":1,"resp_ip_bytes":40}}
Zeek global and per-filter configuration options
There are usually 2 ways to pass some values to a Zeek plugin.
- We can redefine the global options for a writer.
- We can define the configuration options in the config table when creating a filter.
To be able to achieve this with our plugin, we define the configuration options in src/amqpwriter.bif. These options include:
- hostname
- amqp_port
- vhost
- username
- password
- queue_name
- exchange
- routing_key
We can define the default values for these options in scripts/init_default.zeek file like this:
export { |
Zeek loads all the scripts loaded in scripts/__load__.zeek. So we add the following line to load script/init_default.zeek in scripts/__load__.zeek.
@load ./init_default.zeek |
Now we can redefine these options globally in zeek scriptland like this:
redef LogAMQP::amqp_port = 5761; |
We can load per-filter options in the constructor of AMQP class by iterating over WriterInfo::config_map which is a map of string -> string values. By convention, per-filter options should override the global options. So, we load global options in our constructor before loading per-filter options.
Registering the Plugin component with Zeek
The final step in writing our AMQP plugin is to register the plugin component with Zeek. We can do this by using AddComponent in Plugin’s Configure method in the src/Plugin.cc file. We pass a new component with a reference to the AMQP Instantiate method like this:
AddComponent(new zeek::logging::Component("AMQP",zeek::logging::writer::AMQP::Instantiate)); |
Compiling and Linking
Before we compile our plugin, we have to provide the rabbitmq-c library and headers correctly to the compiler and linker. It is quite simple to achieve this using CMake.
We first add an option in the configure.plugin file to pass rabbitmq-c path. We write a CMakescript (cCMakeFindLIB_RABBITMQ_C.cmake) to find the correct paths for rabbitmq-c library. We find the root path of rabbitmq-c and the path of its include directory by using the find_path function and find the rabbitmq-c library files using the find_library function. We then write a find_package handler for rabbitmq-c. FInal CMake script should look like this:
find_path(LIB_RABBITMQ_C_ROOT_DIR |
Finally, in CMakeLists.txt, we find the rabbitmq-c package using find_package() and then include the rabbitmq-c library using include_directories before starting the compilation. We call zeek_plugin_cc for all the .cc files in the src directory and finally link the rabbitmq-c library using zeek_plugin_link_library.
We compile and install the plugin using the following commands.
./configure --zeek-dist=/path/to/zeek --with-rabbitmq-c=/path/to/rabbitmq-c |
Verify the installation by running zeek -NN.
Sending Zeek logs to RabbitMQ
We write a simple Zeek script (logs_to_rabbitmq.zeek) to send conn.log to RabbitMQ by adding a filter with WRITER_AMQP.
event zeek_init() &priority=1 { |
Run the Zeek with our script while it is listening on a network interface like this:
zeek -i ens33 -C logs_to_rabbitmq.zeek |
After some time, we can see the logs being published in “test_queue” in the RabbitMQ management interface.
We can see that these log lines correspond to the conn.log that Zeek produced.
#separator \x09 |
You can see the full plugin source code here at GitLab.
Muhammad Saboor – R&D Engineer / TRAPMINE