Server.PostProcess.FluentBit

Post-process collection results using Fluent Bit .

Since v0.75 this artifact will no longer work since it reads files directly from the datastore, which are now compressed using zlib by default .

It is possible to disable compresion globally using the Datastore.compression config setting, but then you will lose the space-saving benefits of the new compression feature, which are likely to be significant in most deployments.

Fluent Bit will read the JSONL-formattted results file for each completed client collection, which can then be modified, filtered, and forwarded to any of it’s large number of supported outputs . The specific outputs used here are just for demonstration purposes, and you can easily change the Fluent Bit pipeline config to use a different one.

The processed data is not returned to Velociraptor - it is shipped to external destinations. Although it is possible to read back the data from stdout it’s pointless to do so, unless perhaps you’re testing something new or troubleshooting some issue.

How it works

Results files are processed individually, per artifact or artifact/namedsource.

  • If a collection consists of multiple artifacts, only the ones specified in ClientArtifactsToWatch will be processed.
  • If an artifact contains multiple named sources, then only the ones designated in ClientArtifactsToWatch will be processed.

Enrichments and transformations can be added to Fluent Bit’s pipeline via its Processors and Filters, or alternatively (and probably more easily) such things can be done on the receiving end.

This artifact configures Fluent Bit via a YAML config file. This config file contains the pipeline definition, parser definitions, and other config options. This is easier to work with rather than specifying everything on the command line, especially if you want to define more complex pipelines.

Velociraptor’s execve() plugin runs programs in an isolated environment, into which some of env variables are injected via the env argument. The data in these variables augments the flow data: ClientId, FlowId, and ArtifactName are added to the processed records for downstream tracking purposes, but you may choose not to do so. The artifact injects these into the execve environment and the Fluent config specifies the var => field mapping to retrieve them.

To avoid unnecessarily exposing credentials for the external systems (outputs) we store these in environment variables so that they aren’t written to any logs. In the Fluent Bit config we specify which env vars to retrieve the values from using the ${VARIABLE} notation.

These sensitive variables are defined in text files, to which access will be restricted. A utility program called envdir is used to read and populate the environment with these variables. This program is available in the official repos for most Linux distributions, and can be installed as follows:

sudo apt install daemontools

The files containing the sensitive env vars can be created as follows (as root):

mkdir /etc/opensearch_creds
echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST
echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT
echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER
echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD

If your Velociraptor is running as a service then only the velociraptor service account needs access to the credential files, and in that case set permissions on them so that only that specific user has access:

chown -v velociraptor:velociraptor /etc/opensearch_creds/*
chmod -v 600 /etc/opensearch_creds/*

Extra fields - ClientId, FlowId, and ArtifactName - are added to the processed records for downstream tracking purposes, but you may choose not to do so, or to add other fields. The artifact injects these into the execve environment and the Fluent config specifies the var -> field mapping to retrieve them.

We don’t parse any timestamp from the data because Velociraptor doesn’t have a primary timestamp field, nor even require any timestamps in results.

Server preparation checklist

  1. Fluent Bit should be installed on the server. It’s best to install it from the official repos in which case the binary should be located at /opt/fluent-bit/bin/fluent-bit.
  2. Install the envdir utility via the daemontools package, as described above.
  3. Create and secure the sensitive environment variable files, as described above.

Other command line data processors

Other log forwarders such as Logstash and Filebeats work similarly, in principle, to Fluent Bit. So this artifact could be used as the basis for other forwarders or any other command line applications that consume JSONL format. Any other CLI apps that read and process JSONL data could be used in the same way using the basic processing logic contained in this artifact. However, fast startup time is a critical aspect because the application needs to be launched for each targeted flow, so bloated apps such as Logstash are likely to be impractical when used this way. Fluent Bit is lightweight and fast!

This artifact could also be easily modified to watch for server collection completions instead of client collections.

Tested with

  • Fluent Bit v4.0.3
  • Velociraptor v0.74.5
  • Opensearch v3.1.0

Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery #Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB #Kafka #LogDNA #Loki #Oracle #PostgreSQL


name: Server.PostProcess.FluentBit

author: |
    @predictiple - 2025-07-13

description: |
    Post-process collection results using [Fluent Bit](https://fluentbit.io/).

    
Since v0.75 this artifact will no longer work since it reads files directly
from the datastore, which are now
[compressed using zlib by default](https://docs.velociraptor.app/blog/2025/2025-08-30-release-notes-0.75/#storing-compressed-data).

It is possible to disable compresion globally using the
`Datastore.compression` config setting, but then you will lose the
space-saving benefits of the new compression feature, which are likely to be
significant in most deployments.
Fluent Bit will read the JSONL-formattted results file for each completed client collection, which can then be modified, filtered, and forwarded to any of it's large number of supported [outputs](https://docs.fluentbit.io/manual/pipeline/outputs). The specific outputs used here are just for demonstration purposes, and you can easily change the Fluent Bit pipeline config to use a different one. The processed data is _not_ returned to Velociraptor - it is shipped to external destinations. Although it is possible to read back the data from stdout it's pointless to do so, unless perhaps you're testing something new or troubleshooting some issue. #### How it works Results files are processed individually, per artifact or artifact/namedsource. - If a collection consists of multiple artifacts, only the ones specified in `ClientArtifactsToWatch` will be processed. - If an artifact contains multiple named sources, then only the ones designated in `ClientArtifactsToWatch` will be processed. Enrichments and transformations can be added to Fluent Bit's pipeline via its Processors and Filters, or alternatively (and probably more easily) such things can be done on the receiving end. This artifact configures Fluent Bit via a YAML config file. This config file contains the pipeline definition, parser definitions, and other config options. This is easier to work with rather than specifying everything on the command line, especially if you want to define more complex pipelines. Velociraptor's `execve()` plugin runs programs in an isolated environment, into which some of env variables are injected via the `env` argument. The data in these variables augments the flow data: `ClientId`, `FlowId`, and `ArtifactName` are added to the processed records for downstream tracking purposes, but you may choose not to do so. The artifact injects these into the execve environment and the Fluent config specifies the var => field mapping to retrieve them. To avoid unnecessarily exposing credentials for the external systems (outputs) we store these in environment variables so that they aren't written to any logs. In the Fluent Bit config we specify which env vars to retrieve the values from using the `${VARIABLE}` notation. These sensitive variables are defined in text files, to which access will be restricted. A utility program called `envdir` is used to read and populate the environment with these variables. This program is available in the official repos for most Linux distributions, and can be installed as follows: ```sh sudo apt install daemontools ``` The files containing the sensitive env vars can be created as follows (as root): ```sh mkdir /etc/opensearch_creds echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD ``` If your Velociraptor is running as a service then only the `velociraptor` service account needs access to the credential files, and in that case set permissions on them so that only that specific user has access: ```sh chown -v velociraptor:velociraptor /etc/opensearch_creds/* chmod -v 600 /etc/opensearch_creds/* ``` Extra fields - `ClientId`, `FlowId`, and `ArtifactName` - are added to the processed records for downstream tracking purposes, but you may choose not to do so, or to add other fields. The artifact injects these into the execve environment and the Fluent config specifies the var -> field mapping to retrieve them. We don't parse any timestamp from the data because Velociraptor doesn't have a primary timestamp field, nor even require any timestamps in results. #### Server preparation checklist 1. Fluent Bit should be installed on the server. It's best to install it [from the official repos](https://docs.fluentbit.io/manual/installation/linux) in which case the binary should be located at `/opt/fluent-bit/bin/fluent-bit`. 2. Install the `envdir` utility via the `daemontools` package, as described above. 3. Create and secure the sensitive environment variable files, as described above. #### Other command line data processors Other log forwarders such as Logstash and Filebeats work similarly, in principle, to Fluent Bit. So this artifact could be used as the basis for other forwarders or any other command line applications that consume JSONL format. Any other CLI apps that read and process JSONL data could be used in the same way using the basic processing logic contained in this artifact. However, fast startup time is a critical aspect because the application needs to be launched for each targeted flow, so bloated apps such as Logstash are likely to be impractical when used this way. Fluent Bit is lightweight and fast! This artifact could also be easily modified to watch for server collection completions instead of client collections. #### Tested with - Fluent Bit v4.0.3 - Velociraptor v0.74.5 - Opensearch v3.1.0 --- Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery #Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB #Kafka #LogDNA #Loki #Oracle #PostgreSQL reference: - https://docs.fluentbit.io/manual - https://fluentbit.io/ - https://manpages.ubuntu.com/manpages/jammy/man8/envdir.8.html type: SERVER_EVENT precondition: SELECT OS From info() where OS = "linux" required_permissions: - EXECVE parameters: - name: ClientArtifactsToWatch description: Select the client artifacts to be watched for completions type: artifactset artifact_type: CLIENT sources: TRUE default: | Artifact Windows.Registry.AppCompatCache Windows.Forensics.SRUM/Execution Stats - name: FluentBinary description: The Fluent Bit binary type: string default: /opt/fluent-bit/bin/fluent-bit - name: FluentConfig description: The Fluent configuration in YAML format type: string default: | parsers: - name: json format: json service: flush: 1 pipeline: inputs: - name: tail path: ${RESULTSPATH} parser: json tag: velociraptor read_from_head: true threaded: false exit_on_eof: true inotify_watcher: false mem_buf_limit: 10mb buffer_chunk_size: 128kb buffer_max_size: 128kb storage.type: memory processors: logs: - name: content_modifier action: insert key: client_id value: ${CLIENTID} - name: content_modifier action: insert key: flow_id value: ${FLOWID} - name: content_modifier action: insert key: artifact_name value: ${ARTIFACTNAME} outputs: # we generate this is for the Velociraptor monitoring GUI - name: counter match: velociraptor # uncomment for testing # - name: stdout # match: velociraptor # format: json_lines # uncomment if you want to write to a file (for testing) # - name: file # match: velociraptor # path: /tmp/fluent/ # file: ${FLOWID}_${INDEX}.json # format: plain # mkdir: true # send the data to Opensearch - name: opensearch match: velociraptor host: ${OPENSEARCH_HOST} port: ${OPENSEARCH_PORT} http_user: ${OPENSEARCH_USER} http_passwd: ${OPENSEARCH_PASSWD} logstash_format: on logstash_prefix: ${INDEX} trace_error: on trace_output: off tls.verify: off tls: on suppress_type_name: on compress: gzip buffer_size: 1mb retry_limit: 20 workers: 1 sources: - query: | -- We write to a temp file and reuse it as long as the monitoring task is active. LET FluentConfigFile <= tempfile(data=FluentConfig, extension=".yaml") LET _ <= log(message="Config file created at %v", args=FluentConfigFile) LET Completions = SELECT * FROM watch_monitoring(artifact='System.Flow.Completion') -- Try to match ANY artifact in flow against ANY artifact in ClientArtifactsToWatch. -- Returns flows where there is at least one match in the completed flow. WHERE Flow.artifacts_with_results =~ join(array=ClientArtifactsToWatch.Artifact, sep="|") -- Run Fluent Bit for each results file LET PostProcess(ClientId, FlowId, ArtifactName, ResultsPath) = SELECT * FROM execve(argv=[ "envdir", "/etc/opensearch_creds/", FluentBinary, "-q" "-c", FluentConfigFile ], -- Inject these vars into the execve environment. -- Fluent Bit will retrieve them and add them to the data. env=dict(CLIENTID=ClientId, FLOWID=FlowId, INDEX=lowcase(string=regex_replace(source=ArtifactName, re='''[/ ]''',replace="_")), ARTIFACTNAME=ArtifactName, RESULTSPATH=ResultsPath ), -- Under normal circumstances length and line breaks are -- irrelevant since we aren't reading the data back. --sep="\n", length=99999999) LET GetResultsFile(ClientId, FlowId, ArtifactName) = SELECT file_store(path=Data.VFSPath) AS ResultsPath FROM enumerate_flow(client_id=ClientId, flow_id=FlowId) WHERE Type = "Result" -- Match either named or unnamed sources AND ( Data.VFSPath =~ ArtifactName OR pathspec(parse=Data.VFSPath).Components[-3] + "/" + pathspec(parse=Data.VFSPath).Components[-1] =~ ArtifactName ) SELECT *, PostProcess(ClientId=ClientId, FlowId=FlowId, ArtifactName=ArtifactName, ResultsPath=ResultsPath)[0].Stdout AS RecordsProcessed FROM foreach(row=Completions, query={ SELECT *, GetResultsFile(ClientId=ClientId, FlowId=FlowId, ArtifactName=ArtifactName)[0].ResultsPath AS ResultsPath -- We need to check each result within each artifact to see if -- it is associated with the flow match that previously occurred. FROM foreach(row=Flow.artifacts_with_results, query={ SELECT _value AS ArtifactName, ClientId, FlowId FROM scope() WHERE ArtifactName IN ClientArtifactsToWatch.Artifact } ) }, workers=10 )