Quarantine a Linux host using iptables rules.
NOTE: This is still a work in progress and may not work exactly as expected. Only use this artifact in a TEST environment/lab. It has been tested against Ubuntu hosts with iptables enabled.
NOTE: There is now a built in Linux.Remediation.Quarantine which works a bit better
name: Linux.Remediation.Quarantine.IPTables
description: |
Quarantine a Linux host using iptables rules.
NOTE: This is still a work in progress and may not work exactly as expected. Only use this artifact in a TEST environment/lab. It has been tested against Ubuntu hosts with iptables enabled.
NOTE: There is now a built in Linux.Remediation.Quarantine which works a bit better
type: CLIENT
author: Wes Lambert -- @therealwlambert
parameters:
- name: RemovePolicy
type: bool
description: Tickbox to remove policy.
- name: NotificationMessage
description: |
Optional notification to send to logged in users.
sources:
- query: |
// Get domain, port, and Frontends for VR, like we do in the Windows Quarantine artifact (H/T @mgreen27)
LET get_domain(URL) = parse_string_with_regex(string=URL, regex='^https?://(?P<Domain>[^:/]+)').Domain
LET get_port(URL) = if(condition= URL=~"https://[^:]+/", then="443",else=if(condition= URL=~"http://[^:]+/", then="80",else=parse_string_with_regex(string=URL,
regex='^https?://[^:/]+(:(?P<Port>[0-9]*))?/').Port))
LET Frontends <= SELECT VRAddr, VRPort FROM foreach(row=config.server_urls, query={
SELECT
get_domain(URL=_value) AS VRAddr,
get_port(URL=_value) AS VRPort
FROM scope()
})
LET RemoveOldSavedRules = SELECT * FROM execve(argv=["bash", "-c", "rm", "-f", "/root/original-rules"])
LET RestoreOldRules = SELECT * FROM chain(
a={SELECT log(message="Removing quarantine policy...") FROM scope()},
b={SELECT * FROM execve(argv=["iptables-restore", "/root/original-rules"])},
c={SELECT * FROM execve(argv=["rm", "-f", "/root/original-rules"])}
)
LET IptablesExists <= SELECT ReturnCode FROM execve(argv=["ls","/usr/sbin/iptables"]) WHERE ReturnCode = 0
LET RuleBackupDoesntExist = SELECT ReturnCode FROM execve(argv=["ls","/root/original-rules"]) WHERE ReturnCode = 2
LET SaveCurrentRules = SELECT * FROM execve(argv=["iptables-save", "-f", "/root/original-rules"])
LET RuleBackup = if(condition=RuleBackupDoesntExist, then=SaveCurrentRules, else=log(message="Rule backup already exists!"))
LET ZenityExists = SELECT ReturnCode FROM execve(argv=["ls","/usr/bin/zenity"]) WHERE ReturnCode = 0
LET ZenityCommand = SELECT * FROM execve(argv=["zenity", "--info", "--title", "ALERT", "--text", NotificationMessage])
LET WallExists = SELECT ReturnCode FROM execve(argv=["ls","/usr/bin/wall"]) WHERE ReturnCode = 0
LET WallCommand = SELECT * FROM execve(argv=["wall", "-n", NotificationMessage])
LET XMessageExists = SELECT ReturnCode FROM execve(argv=["ls","/usr/bin/xmessage"]) WHERE ReturnCode = 0
LET XMessageCommand = SELECT * FROM execve(argv=["xmessage", NotificationMessage])
LET Display = SELECT ReturnCode FROM execve(argv=["xhost"]) WHERE ReturnCode = 0
LET NotifyCommand = SELECT * FROM
if(condition=Display,
then={SELECT * FROM
if(condition=ZenityExists,
then=ZenityCommand,
else=if(condition=XMessageExists,
then=XMessageCommand
)
)
},
else={ SELECT * FROM
if(condition=WallExists,
then=WallCommand,
else={ SELECT log(message="Unable to perform notification, as not suitable applications were found.") FROM scope() })
}
)
LET NotifyUsers = SELECT * FROM if(condition=NotificationMessage,then=NotifyCommand)
LET RemoveQuarantine = SELECT *, timestamp(epoch=now()) as Time FROM RestoreOldRules
LET InputAllow = SELECT * from foreach(row=Frontends, query={ SELECT * FROM execve(argv=['iptables', '-A', 'INPUT', '-s', VRAddr, '-j', 'ACCEPT'])})
LET ForwardAllow = SELECT * from foreach(row=Frontends, query={ SELECT * FROM execve(argv=['iptables', '-A', 'FORWARD', '-s', VRAddr, '-j', 'ACCEPT'])})
LET OutputAllow = SELECT * from foreach(row=Frontends, query={ SELECT * FROM execve(argv=['iptables', '-A', 'OUTPUT', '-p', 'tcp', '-d', VRAddr, '--dport', VRPort, '-j', 'ACCEPT'])})
LET InputDrop = SELECT * FROM execve(argv=['iptables', '-P', 'INPUT', 'DROP'])
LET DockerChainExists = SELECT ReturnCode FROM execve(argv=['iptables', '-nL', 'DOCKER-USER']) WHERE ReturnCode = 0
LET DockerDrop = SELECT if(condition=DockerChainExists, then={SELECT * FROM execve(argv=['iptables', '-I', 'DOCKER-USER', '-j', 'DROP'])}) FROM scope()
LET ForwardDrop = SELECT * FROM execve(argv=['iptables', '-P', 'FORWARD', 'DROP'])
LET OutputDrop = SELECT * FROM execve(argv=['iptables', '-P', 'OUTPUT', 'DROP'])
SELECT if(condition=IptablesExists,
then=if(condition=RemovePolicy,
then=RemoveQuarantine,
else={ SELECT * FROM chain(
a=NotifyUsers,
b=RuleBackup,
c=InputAllow,
d=ForwardAllow,
e=OutputAllow,
f=InputDrop,
g=DockerDrop,
h=ForwardDrop,
i=OutputDrop
)
}
),
else=log(message="Iptables not found. Only Iptables-based quarantine is supported for Linux hosts at this time.")
) AS Quarantine FROM scope()