Server.PostProcess.FluentBit

Post-process collection results using Fluent Bit .

Fluent Bit will read the JSONL-formattted results file for each completed client collection, which can then be modified, filtered, and forwarded to any of it’s large number of supported outputs . The specific outputs used here are just for demonstration purposes, and you can easily change the Fluent Bit pipeline config to use a different one.

The processed data is not returned to Velociraptor - it is shipped to external destinations. Although it is possible to read back the data from stdout it’s pointless to do so, unless perhaps you’re testing something new or troubleshooting some issue.

How it works

Results files are processed individually, per artifact or artifact/namedsource.

  • If a collection consists of multiple artifacts, only the ones specified in ClientArtifactsToWatch will be processed.
  • If an artifact contains multiple named sources, then only the ones designated in ClientArtifactsToWatch will be processed.

Enrichments and transformations can be added to Fluent Bit’s pipeline via its Processors and Filters, or alternatively (and probably more easily) such things can be done on the receiving end.

This artifact configures Fluent Bit via a YAML config file. This config file contains the pipeline definition, parser definitions, and other config options. This is easier to work with rather than specifying everything on the command line, especially if you want to define more complex pipelines.

Velociraptor’s execve() plugin runs programs in an isolated environment, into which some of env variables are injected via the env argument. The data in these variables augments the flow data: ClientId, FlowId, and ArtifactName are added to the processed records for downstream tracking purposes, but you may choose not to do so. The artifact injects these into the execve environment and the Fluent config specifies the var => field mapping to retrieve them.

To avoid unnecessarily exposing credentials for the external systems (outputs) we store these in environment variables so that they aren’t written to any logs. In the Fluent Bit config we specify which env vars to retrieve the values from using the ${VARIABLE} notation.

These sensitive variables are defined in text files, to which access will be restricted. A utility program called envdir is used to read and populate the environment with these variables. This program is available in the official repos for most Linux distributions, and can be installed as follows:

sudo apt install daemontools

The files containing the sensitive env vars can be created as follows (as root):

mkdir /etc/opensearch_creds
echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST
echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT
echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER
echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD

If your Velociraptor is running as a service then only the velociraptor service account needs access to the credential files, and in that case set permissions on them so that only that specific user has access:

chown -v velociraptor:velociraptor /etc/opensearch_creds/*
chmod -v 600 /etc/opensearch_creds/*

Extra fields - ClientId, FlowId, and ArtifactName - are added to the processed records for downstream tracking purposes, but you may choose not to do so, or to add other fields. The artifact injects these into the execve environment and the Fluent config specifies the var -> field mapping to retrieve them.

We don’t parse any timestamp from the data because Velociraptor doesn’t have a primary timestamp field, nor even require any timestamps in results.

Server preparation checklist

  1. Fluent Bit should be installed on the server. It’s best to install it from the official repos in which case the binary should be located at /opt/fluent-bit/bin/fluent-bit.
  2. Install the envdir utility via the daemontools package, as described above.
  3. Create and secure the sensitive environment variable files, as described above.

Other command line data processors

Other log forwarders such as Logstash and Filebeats work similarly, in principle, to Fluent Bit. So this artifact could be used as the basis for other forwarders or any other command line applications that consume JSONL format. Any other CLI apps that read and process JSONL data could be used in the same way using the basic processing logic contained in this artifact. However, fast startup time is a critical aspect because the application needs to be launched for each targeted flow, so bloated apps such as Logstash are likely to be impractical when used this way. Fluent Bit is lightweight and fast!

This artifact could also be easily modified to watch for server collection completions instead of client collections.

Tested with

  • Fluent Bit v4.0.3
  • Velociraptor v0.74.5
  • Opensearch v3.1.0

Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery #Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB #Kafka #LogDNA #Loki #Oracle #PostgreSQL


name: Server.PostProcess.FluentBit

author: |
    @predictiple - 2025-07-13

description: |
    Post-process collection results using [Fluent Bit](https://fluentbit.io/).

    Fluent Bit will read the JSONL-formattted results file for each completed
    client collection, which can then be modified, filtered, and forwarded to
    any of it's large number of supported
    [outputs](https://docs.fluentbit.io/manual/pipeline/outputs).
    The specific outputs used here are just for demonstration purposes, and you
    can easily change the Fluent Bit pipeline config to use a different one.

    The processed data is _not_ returned to Velociraptor - it is shipped to
    external destinations. Although it is possible to read back the data from
    stdout it's pointless to do so, unless perhaps you're testing something new
    or troubleshooting some issue.

    #### How it works

    Results files are processed individually, per artifact or
    artifact/namedsource.

    - If a collection consists of multiple artifacts, only the ones specified in
      `ClientArtifactsToWatch` will be processed.
    - If an artifact contains multiple named sources, then only the ones
      designated in `ClientArtifactsToWatch` will be processed.

    Enrichments and transformations can be added to Fluent Bit's pipeline via
    its Processors and Filters, or alternatively (and probably more easily) such
    things can be done on the receiving end.

    This artifact configures Fluent Bit via a YAML config file. This config file
    contains the pipeline definition, parser definitions, and other config
    options. This is easier to work with rather than specifying everything on
    the command line, especially if you want to define more complex pipelines.

    Velociraptor's `execve()` plugin runs programs in an isolated environment,
    into which  some of env variables are injected via the `env` argument. The
    data in these variables augments the flow data: `ClientId`, `FlowId`, and
    `ArtifactName` are added to the processed records for downstream tracking
    purposes, but you may choose not to do so. The artifact injects these into
    the execve environment and the Fluent config specifies the var => field
    mapping to retrieve them.

    To avoid unnecessarily exposing credentials for the external systems
    (outputs) we store these in environment variables so that they aren't
    written to any logs. In the Fluent Bit config we specify which env vars to
    retrieve the values from using the `${VARIABLE}` notation.

    These sensitive variables are defined in text files, to which access will be
    restricted. A utility program called `envdir` is used to read and populate
    the environment with these variables. This program is available in the
    official repos for most Linux distributions, and can be installed as
    follows:

    ```sh
    sudo apt install daemontools
    ```

    The files containing the sensitive env vars can be created as follows (as
    root):

    ```sh
    mkdir /etc/opensearch_creds
    echo "192.168.1.104" > /etc/opensearch_creds/OPENSEARCH_HOST
    echo "9200" > /etc/opensearch_creds/OPENSEARCH_PORT
    echo "admin" > /etc/opensearch_creds/OPENSEARCH_USER
    echo "ONEtwo333$$$" > /etc/opensearch_creds/OPENSEARCH_PASSWD
    ```

    If your Velociraptor is running as a service then only the `velociraptor`
    service account needs access to the credential files, and in that case set
    permissions on them so that only that specific user has access:

    ```sh
    chown -v velociraptor:velociraptor /etc/opensearch_creds/*
    chmod -v 600 /etc/opensearch_creds/*
    ```

    Extra fields - `ClientId`, `FlowId`, and `ArtifactName` - are added to the
    processed records for downstream tracking purposes, but you may choose not
    to do so, or to add other fields. The artifact injects these into the execve
    environment and the Fluent config specifies the var -> field mapping to
    retrieve them.

    We don't parse any timestamp from the data because Velociraptor doesn't have
    a primary timestamp field, nor even require any timestamps in results.

    #### Server preparation checklist

    1. Fluent Bit should be installed on the server. It's best to install it
       [from the official repos](https://docs.fluentbit.io/manual/installation/linux)
       in which case the binary should be located at
       `/opt/fluent-bit/bin/fluent-bit`.
    2. Install the `envdir` utility via the `daemontools` package, as described
       above.
    3. Create and secure the sensitive environment variable files, as described
       above.

    #### Other command line data processors

    Other log forwarders such as Logstash and Filebeats work similarly, in
    principle, to Fluent Bit. So this artifact could be used as the basis for
    other forwarders or any other command line applications that consume JSONL
    format. Any other CLI apps that read and process JSONL data could be used in
    the same way using the basic processing logic contained in this artifact.
    However, fast startup time is a critical aspect because the application
    needs to be launched for each targeted flow, so bloated apps such as
    Logstash are likely to be impractical when used this way. Fluent Bit is
    lightweight and fast!

    This artifact could also be easily modified to watch for server collection
    completions instead of client collections.

    #### Tested with

    - Fluent Bit v4.0.3
    - Velociraptor v0.74.5
    - Opensearch v3.1.0

    ---

    Tags: #post-processing #Elasticsearch #OpenSearch #Splunk #Graylog #BigQuery
    #Chronicle #CloudWatch #Amazon #S3 #Azure #Datadog #Dynatrace #InfluxDB
    #Kafka #LogDNA #Loki #Oracle #PostgreSQL

reference:
  - https://docs.fluentbit.io/manual
  - https://fluentbit.io/
  - https://manpages.ubuntu.com/manpages/jammy/man8/envdir.8.html

type: SERVER_EVENT

precondition: SELECT OS From info() where OS = "linux"

required_permissions:
  - EXECVE

parameters:

  - name: ClientArtifactsToWatch
    description: Select the client artifacts to be watched for completions
    type: artifactset
    artifact_type: CLIENT
    sources: TRUE
    default: |
      Artifact
      Windows.Registry.AppCompatCache
      Windows.Forensics.SRUM/Execution Stats

  - name: FluentBinary
    description: The Fluent Bit binary
    type: string
    default: /opt/fluent-bit/bin/fluent-bit

  - name: FluentConfig
    description: The Fluent configuration in YAML format
    type: string
    default: |
        parsers:
          - name: json
            format: json
        service:
            flush: 1
        pipeline:
          inputs:
            - name: tail
              path: ${RESULTSPATH}
              parser: json
              tag: velociraptor
              read_from_head: true
              threaded: false
              exit_on_eof: true
              inotify_watcher: false
              mem_buf_limit: 10mb
              buffer_chunk_size: 128kb
              buffer_max_size: 128kb
              storage.type: memory
              processors:
                logs:
                  - name: content_modifier
                    action: insert
                    key: client_id
                    value: ${CLIENTID}
                  - name: content_modifier
                    action: insert
                    key: flow_id
                    value: ${FLOWID}
                  - name: content_modifier
                    action: insert
                    key: artifact_name
                    value: ${ARTIFACTNAME}
          
          outputs:
            # we generate this is for the Velociraptor monitoring GUI
            - name: counter
              match: velociraptor

            # uncomment for testing
            # - name: stdout
            #   match: velociraptor
            #   format: json_lines

            # uncomment if you want to write to a file (for testing)
            # - name: file
            #   match: velociraptor
            #   path: /tmp/fluent/
            #   file: ${FLOWID}_${INDEX}.json
            #   format: plain
            #   mkdir: true
              
            # send the data to Opensearch
            - name: opensearch
              match: velociraptor
              host: ${OPENSEARCH_HOST}
              port: ${OPENSEARCH_PORT}
              http_user: ${OPENSEARCH_USER}
              http_passwd: ${OPENSEARCH_PASSWD}
              logstash_format: on
              logstash_prefix: ${INDEX}
              trace_error: on
              trace_output: off
              tls.verify: off
              tls: on
              suppress_type_name: on
              compress: gzip
              buffer_size: 1mb
              retry_limit: 20
              workers: 1


sources:
  - query: |

       -- We write to a temp file and reuse it as long as the monitoring task is active.
       LET FluentConfigFile <= tempfile(data=FluentConfig, extension=".yaml")
       LET _ <= log(message="Config file created at %v", args=FluentConfigFile)

       LET Completions = SELECT *
         FROM watch_monitoring(artifact='System.Flow.Completion')
         -- Try to match ANY artifact in flow against ANY artifact in ClientArtifactsToWatch.
         -- Returns flows where there is at least one match in the completed flow.
         WHERE Flow.artifacts_with_results =~ join(array=ClientArtifactsToWatch.Artifact, sep="|")

       -- Run Fluent Bit for each results file
       LET PostProcess(ClientId, FlowId, ArtifactName, ResultsPath) =
         SELECT *
         FROM execve(argv=[ "envdir", "/etc/opensearch_creds/", FluentBinary, "-q" "-c", FluentConfigFile ],

                     -- Inject these vars into the execve environment.
                     -- Fluent Bit will retrieve them and add them to the data.
                     env=dict(CLIENTID=ClientId,
                              FLOWID=FlowId,
                              INDEX=lowcase(string=regex_replace(source=ArtifactName, re='''[/ ]''',replace="_")),
                              ARTIFACTNAME=ArtifactName,
                              RESULTSPATH=ResultsPath
                              ),

                     -- Under normal circumstances length and line breaks are
                     -- irrelevant since we aren't reading the data back.
                     --sep="\n",
                     length=99999999)


       LET GetResultsFile(ClientId, FlowId, ArtifactName) =
         SELECT
           file_store(path=Data.VFSPath) AS ResultsPath
         FROM enumerate_flow(client_id=ClientId, flow_id=FlowId)
         WHERE Type = "Result"
         -- Match either named or unnamed sources
         AND ( Data.VFSPath =~ ArtifactName
           OR pathspec(parse=Data.VFSPath).Components[-3] + "/" +
              pathspec(parse=Data.VFSPath).Components[-1] =~ ArtifactName )

       SELECT *,
              PostProcess(ClientId=ClientId,
                          FlowId=FlowId,
                          ArtifactName=ArtifactName,
                          ResultsPath=ResultsPath)[0].Stdout AS RecordsProcessed
       FROM foreach(row=Completions,
                    query={ SELECT *,
                              GetResultsFile(ClientId=ClientId,
                                             FlowId=FlowId,
                                             ArtifactName=ArtifactName)[0].ResultsPath AS ResultsPath
                            -- We need to check each result within each artifact to see if
                            -- it is associated with the flow match that previously occurred.
                            FROM foreach(row=Flow.artifacts_with_results,
                                         query={ SELECT _value AS ArtifactName, ClientId, FlowId
                                                 FROM scope()
                                                 WHERE ArtifactName IN ClientArtifactsToWatch.Artifact
                                               }
                                        )
                          },
                    workers=10
                   )