Post-process collection results using Fluent Bit .
Fluent Bit will read the JSONL-formattted results file for each completed client collection, which can then be modified, filtered, and forwarded to any of it’s large number of supported outputs . The specific outputs used here are just for demonstration purposes, and you can easily change the Fluent Bit pipeline config to use a different one.
The processed data is not returned to Velociraptor - it is shipped to external destinations. Although it is possible to read back the data from stdout it’s pointless to do so, unless perhaps you’re testing something new or troubleshooting some issue.
Results files are processed individually, per artifact or artifact/namedsource.
ClientArtifactsToWatch
will be processed.ClientArtifactsToWatch
will be processed.Enrichments and transformations can be added to Fluent Bit’s pipeline via its Processors and Filters, or alternatively (and probably more easily) such things can be done on the receiving end.
This artifact configures Fluent Bit via a YAML config file. This config file contains the pipeline definition, parser definitions, and other config options. This is easier to work with rather than specifying everything on the command line, especially if you want to define more complex pipelines.
Velociraptor’s execve()
plugin runs programs in an isolated environment,
into which some of env variables are injected via the env
argument. The
data in these variables augments the flow data: ClientId
, FlowId
, and
ArtifactName
are added to the processed records for downstream tracking
purposes, but you may choose not to do so. The artifact injects these into
the execve environment and the Fluent config specifies the var => field
mapping to retrieve them.
To avoid unnecessarily exposing credentials for the external systems
(outputs) we store these in environment variables so that they aren’t
written to any logs. In the Fluent Bit config we specify which env vars to
retrieve the values from using the ${VARIABLE}
notation.
These sensitive variables are defined in text files, to which access will be
restricted. A utility program called envdir
is used to read and populate
the environment with these variables. This program is available in the
official repos for most Linux distributions, and can be installed as
follows:
sudo apt install daemontools
The files containing the sensitive env vars can be created as follows (as root):
mkdir /etc/opensearch_creds
echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST
echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT
echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER
echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD
If your Velociraptor is running as a service then only the velociraptor
service account needs access to the credential files, and in that case set
permissions on them so that only that specific user has access:
chown -v velociraptor:velociraptor /etc/opensearch_creds/*
chmod -v 600 /etc/opensearch_creds/*
Extra fields - ClientId
, FlowId
, and ArtifactName
- are added to the
processed records for downstream tracking purposes, but you may choose not
to do so, or to add other fields. The artifact injects these into the execve
environment and the Fluent config specifies the var -> field mapping to
retrieve them.
We don’t parse any timestamp from the data because Velociraptor doesn’t have a primary timestamp field, nor even require any timestamps in results.
/opt/fluent-bit/bin/fluent-bit
.envdir
utility via the daemontools
package, as described
above.Other log forwarders such as Logstash and Filebeats work similarly, in principle, to Fluent Bit. So this artifact could be used as the basis for other forwarders or any other command line applications that consume JSONL format. Any other CLI apps that read and process JSONL data could be used in the same way using the basic processing logic contained in this artifact. However, fast startup time is a critical aspect because the application needs to be launched for each targeted flow, so bloated apps such as Logstash are likely to be impractical when used this way. Fluent Bit is lightweight and fast!
This artifact could also be easily modified to watch for server collection completions instead of client collections.
Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery #Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB #Kafka #LogDNA #Loki #Oracle #PostgreSQL
name: Server.PostProcess.FluentBit
author: |
@predictiple - 2025-07-13
description: |
Post-process collection results using [Fluent Bit](https://fluentbit.io/).
Fluent Bit will read the JSONL-formattted results file for each completed
client collection, which can then be modified, filtered, and forwarded to
any of it's large number of supported
[outputs](https://docs.fluentbit.io/manual/pipeline/outputs).
The specific outputs used here are just for demonstration purposes, and you
can easily change the Fluent Bit pipeline config to use a different one.
The processed data is _not_ returned to Velociraptor - it is shipped to
external destinations. Although it is possible to read back the data from
stdout it's pointless to do so, unless perhaps you're testing something new
or troubleshooting some issue.
#### How it works
Results files are processed individually, per artifact or
artifact/namedsource.
- If a collection consists of multiple artifacts, only the ones specified in
`ClientArtifactsToWatch` will be processed.
- If an artifact contains multiple named sources, then only the ones
designated in `ClientArtifactsToWatch` will be processed.
Enrichments and transformations can be added to Fluent Bit's pipeline via
its Processors and Filters, or alternatively (and probably more easily) such
things can be done on the receiving end.
This artifact configures Fluent Bit via a YAML config file. This config file
contains the pipeline definition, parser definitions, and other config
options. This is easier to work with rather than specifying everything on
the command line, especially if you want to define more complex pipelines.
Velociraptor's `execve()` plugin runs programs in an isolated environment,
into which some of env variables are injected via the `env` argument. The
data in these variables augments the flow data: `ClientId`, `FlowId`, and
`ArtifactName` are added to the processed records for downstream tracking
purposes, but you may choose not to do so. The artifact injects these into
the execve environment and the Fluent config specifies the var => field
mapping to retrieve them.
To avoid unnecessarily exposing credentials for the external systems
(outputs) we store these in environment variables so that they aren't
written to any logs. In the Fluent Bit config we specify which env vars to
retrieve the values from using the `${VARIABLE}` notation.
These sensitive variables are defined in text files, to which access will be
restricted. A utility program called `envdir` is used to read and populate
the environment with these variables. This program is available in the
official repos for most Linux distributions, and can be installed as
follows:
```sh
sudo apt install daemontools
```
The files containing the sensitive env vars can be created as follows (as
root):
```sh
mkdir /etc/opensearch_creds
echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST
echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT
echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER
echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD
```
If your Velociraptor is running as a service then only the `velociraptor`
service account needs access to the credential files, and in that case set
permissions on them so that only that specific user has access:
```sh
chown -v velociraptor:velociraptor /etc/opensearch_creds/*
chmod -v 600 /etc/opensearch_creds/*
```
Extra fields - `ClientId`, `FlowId`, and `ArtifactName` - are added to the
processed records for downstream tracking purposes, but you may choose not
to do so, or to add other fields. The artifact injects these into the execve
environment and the Fluent config specifies the var -> field mapping to
retrieve them.
We don't parse any timestamp from the data because Velociraptor doesn't have
a primary timestamp field, nor even require any timestamps in results.
#### Server preparation checklist
1. Fluent Bit should be installed on the server. It's best to install it
[from the official repos](https://docs.fluentbit.io/manual/installation/linux)
in which case the binary should be located at
`/opt/fluent-bit/bin/fluent-bit`.
2. Install the `envdir` utility via the `daemontools` package, as described
above.
3. Create and secure the sensitive environment variable files, as described
above.
#### Other command line data processors
Other log forwarders such as Logstash and Filebeats work similarly, in
principle, to Fluent Bit. So this artifact could be used as the basis for
other forwarders or any other command line applications that consume JSONL
format. Any other CLI apps that read and process JSONL data could be used in
the same way using the basic processing logic contained in this artifact.
However, fast startup time is a critical aspect because the application
needs to be launched for each targeted flow, so bloated apps such as
Logstash are likely to be impractical when used this way. Fluent Bit is
lightweight and fast!
This artifact could also be easily modified to watch for server collection
completions instead of client collections.
#### Tested with
- Fluent Bit v4.0.3
- Velociraptor v0.74.5
- Opensearch v3.1.0
---
Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery
#Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB
#Kafka #LogDNA #Loki #Oracle #PostgreSQL
reference:
- https://docs.fluentbit.io/manual
- https://fluentbit.io/
- https://manpages.ubuntu.com/manpages/jammy/man8/envdir.8.html
type: SERVER_EVENT
precondition: SELECT OS From info() where OS = "linux"
required_permissions:
- EXECVE
parameters:
- name: ClientArtifactsToWatch
description: Select the client artifacts to be watched for completions
type: artifactset
artifact_type: CLIENT
sources: TRUE
default: |
Artifact
Windows.Registry.AppCompatCache
Windows.Forensics.SRUM/Execution Stats
- name: FluentBinary
description: The Fluent Bit binary
type: string
default: /opt/fluent-bit/bin/fluent-bit
- name: FluentConfig
description: The Fluent configuration in YAML format
type: string
default: |
parsers:
- name: json
format: json
service:
flush: 1
pipeline:
inputs:
- name: tail
path: ${RESULTSPATH}
parser: json
tag: velociraptor
read_from_head: true
threaded: false
exit_on_eof: true
inotify_watcher: false
mem_buf_limit: 10mb
buffer_chunk_size: 128kb
buffer_max_size: 128kb
storage.type: memory
processors:
logs:
- name: content_modifier
action: insert
key: client_id
value: ${CLIENTID}
- name: content_modifier
action: insert
key: flow_id
value: ${FLOWID}
- name: content_modifier
action: insert
key: artifact_name
value: ${ARTIFACTNAME}
outputs:
# we generate this is for the Velociraptor monitoring GUI
- name: counter
match: velociraptor
# uncomment for testing
# - name: stdout
# match: velociraptor
# format: json_lines
# uncomment if you want to write to a file (for testing)
# - name: file
# match: velociraptor
# path: /tmp/fluent/
# file: ${FLOWID}_${INDEX}.json
# format: plain
# mkdir: true
# send the data to Opensearch
- name: opensearch
match: velociraptor
host: ${OPENSEARCH_HOST}
port: ${OPENSEARCH_PORT}
http_user: ${OPENSEARCH_USER}
http_passwd: ${OPENSEARCH_PASSWD}
logstash_format: on
logstash_prefix: ${INDEX}
trace_error: on
trace_output: off
tls.verify: off
tls: on
suppress_type_name: on
compress: gzip
buffer_size: 1mb
retry_limit: 20
workers: 1
sources:
- query: |
-- We write to a temp file and reuse it as long as the monitoring task is active.
LET FluentConfigFile <= tempfile(data=FluentConfig, extension=".yaml")
LET _ <= log(message="Config file created at %v", args=FluentConfigFile)
LET Completions = SELECT *
FROM watch_monitoring(artifact='System.Flow.Completion')
-- Try to match ANY artifact in flow against ANY artifact in ClientArtifactsToWatch.
-- Returns flows where there is at least one match in the completed flow.
WHERE Flow.artifacts_with_results =~ join(array=ClientArtifactsToWatch.Artifact, sep="|")
-- Run Fluent Bit for each results file
LET PostProcess(ClientId, FlowId, ArtifactName, ResultsPath) =
SELECT *
FROM execve(argv=[ "envdir", "/etc/opensearch_creds/", FluentBinary, "-q" "-c", FluentConfigFile ],
-- Inject these vars into the execve environment.
-- Fluent Bit will retrieve them and add them to the data.
env=dict(CLIENTID=ClientId,
FLOWID=FlowId,
INDEX=lowcase(string=regex_replace(source=ArtifactName, re='''[/ ]''',replace="_")),
ARTIFACTNAME=ArtifactName,
RESULTSPATH=ResultsPath
),
-- Under normal circumstances length and line breaks are
-- irrelevant since we aren't reading the data back.
--sep="\n",
length=99999999)
LET GetResultsFile(ClientId, FlowId, ArtifactName) =
SELECT
file_store(path=Data.VFSPath) AS ResultsPath
FROM enumerate_flow(client_id=ClientId, flow_id=FlowId)
WHERE Type = "Result"
-- Match either named or unnamed sources
AND ( Data.VFSPath =~ ArtifactName
OR pathspec(parse=Data.VFSPath).Components[-3] + "/" +
pathspec(parse=Data.VFSPath).Components[-1] =~ ArtifactName )
SELECT *,
PostProcess(ClientId=ClientId,
FlowId=FlowId,
ArtifactName=ArtifactName,
ResultsPath=ResultsPath)[0].Stdout AS RecordsProcessed
FROM foreach(row=Completions,
query={ SELECT *,
GetResultsFile(ClientId=ClientId,
FlowId=FlowId,
ArtifactName=ArtifactName)[0].ResultsPath AS ResultsPath
-- We need to check each result within each artifact to see if
-- it is associated with the flow match that previously occurred.
FROM foreach(row=Flow.artifacts_with_results,
query={ SELECT _value AS ArtifactName, ClientId, FlowId
FROM scope()
WHERE ArtifactName IN ClientArtifactsToWatch.Artifact
}
)
},
workers=10
)