- 签名
- cuckoosandbox/community/modules/signatures">cuckoosandbox/community/modules/signatures
- android">android
- cross">cross
- darwin">darwin
- extractor">extractor
- linux">linux
- network">network
- network_bind.py:一套API调用">network_bind.py:一套API调用
- suricata.py">suricata.py
- windows">windows
- Anti
- Anti-AV
- antiav_avast_libs.py:通过模块名和参数名,确定调用的模块">antiav_avast_libs.py:通过模块名和参数名,确定调用的模块
- Anti-DBG
- 🙋♀️🙋♀️🙋♀️Anti-SandBox🙋♀️🙋♀️🙋♀️
- Anti-Virus
- Anti-VM
- Anti-AV
- APT
- apt_carbunak.py:互斥体字符串+正则">apt_carbunak.py:互斥体字符串+正则
- apt_cloudatlas.py:文件名字符串+正则">apt_cloudatlas.py:文件名字符串+正则
- apt_flame.py:互斥体名+注册表路径+正则">apt_flame.py:互斥体名+注册表路径+正则
- apt_inception.py:文件名+正则">apt_inception.py:文件名+正则
- apt_putter_panda.py:互斥体名+正则">apt_putter_panda.py:互斥体名+正则
- apt_sandworm_ip.py:固定IP(IoCs)">apt_sandworm_ip.py:固定IP(IoCs)
- apt_sandworm_url.py:URL+正则">apt_sandworm_url.py:URL+正则
- apt_turlacarbon.py">apt_turlacarbon.py
- apt_uroburos_file.py">apt_uroburos_file.py
- apt_uroburos_mutex.py">apt_uroburos_mutex.py
- BackDoor
- Banker
- BitCoin
- Boot
- Bot
- Browser
- ByPass
- Cloud
- Create
- Crypto
- DDOS
- Detect
- Disable
- DNS
- Exec
- Exploit
- IM
- InfoStealer
- Injection
- Keylogger
- Locate
- Locker
- MemDump
- Modify
- NetWork
- Office
- Packer
- Persistence
- POS
- PowerShell
- Process
- Ransom
- RAT
- Recon
- RootKit
- SMTP
- Stealth
- Trojan
- Virus
- Windows
- Worm
- 其他
- Anti
签名
cuckoosandbox/cuckoo/common/abstracts.py
# Copyright (C) 2012-2013 Claudio Guarnieri.
# Copyright (C) 2014-2019 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import logging
import os
import re
import time
import xml.etree.ElementTree as ET
from cuckoo.common.config import config
from cuckoo.common.exceptions import CuckooCriticalError
from cuckoo.common.exceptions import CuckooDependencyError
from cuckoo.common.exceptions import CuckooMachineError
from cuckoo.common.exceptions import CuckooOperationalError
from cuckoo.common.exceptions import CuckooReportError
from cuckoo.common.files import Folders
from cuckoo.common.objects import Dictionary
from cuckoo.core.database import Database
from cuckoo.misc import cwd, make_list
try:
import libvirt
HAVE_LIBVIRT = True
except ImportError:
HAVE_LIBVIRT = False
log = logging.getLogger(__name__)
class Configuration(object):
skip = (
"family", "extra",
)
# Single entry values.
keywords1 = (
"type", "version", "magic", "campaign",
)
# Multiple entry values.
keywords2 = (
"cnc", "url", "mutex", "user_agent", "referrer",
)
# Encryption key values.
keywords3 = (
"des3key", "rc4key", "xorkey", "pubkey", "privkey", "iv",
)
# Normalize keys.
mapping = {
"cncs": "cnc",
"urls": "url",
"user-agent": "user_agent",
}
def __init__(self):
self.entries = []
self.order = []
self.families = {}
def add(self, entry):
self.entries.append(entry)
if entry["family"] not in self.families:
self.families[entry["family"]] = {
"family": entry["family"],
}
self.order.append(entry["family"])
family = self.families[entry["family"]]
for key, value in entry.items():
if key in self.skip or not value:
continue
key = self.mapping.get(key, key)
if key in self.keywords1:
if family.get(key) and family[key] != value:
log.error(
"Duplicate value for %s => %r vs %r",
key, family[key], value
)
continue
family[key] = value
elif key in self.keywords2:
if key not in family:
family[key] = []
for value in make_list(value):
if value and value not in family[key]:
family[key].append(value)
elif key in self.keywords3:
if "key" not in family:
family["key"] = {}
if key not in family["key"]:
family["key"][key] = []
if value not in family["key"][key]:
family["key"][key].append(value)
elif key not in family.get("extra", {}):
if "extra" not in family:
family["extra"] = {}
family["extra"][key] = [value]
elif value not in family["extra"][key]:
family["extra"][key].append(value)
def get(self, family, *keys):
r = self.families.get(family, {})
for key in keys:
r = r.get(key, {})
return r or None
def family(self, name):
return self.families.get(name) or {}
def results(self):
ret = []
for family in self.order:
ret.append(self.families[family])
return ret
class Auxiliary(object):
"""Base abstract class for auxiliary modules."""
def __init__(self):
self.task = None
self.machine = None
self.guest_manager = None
self.options = None
@classmethod
def init_once(cls):
pass
def set_task(self, task):
self.task = task
def set_machine(self, machine):
self.machine = machine
def set_guest_manager(self, guest_manager):
self.guest_manager = guest_manager
def set_options(self, options):
self.options = Dictionary(options)
def start(self):
raise NotImplementedError
def stop(self):
raise NotImplementedError
class Machinery(object):
"""Base abstract class for machinery modules."""
# Default label used in machinery configuration file to supply virtual
# machine name/label/vmx path. Override it if you dubbed it in another
# way.
LABEL = "label"
def __init__(self):
self.options = None
self.db = Database()
self.remote_control = False
# Machine table is cleaned to be filled from configuration file
# at each start.
self.db.clean_machines()
@classmethod
def init_once(cls):
pass
def pcap_path(self, task_id):
"""Return the .pcap path for this task id."""
return cwd("storage", "analyses", "%s" % task_id, "dump.pcap")
def set_options(self, options):
"""Set machine manager options.
@param options: machine manager options dict.
"""
self.options = options
def initialize(self, module_name):
"""Read, load, and verify machines configuration.
@param module_name: module name.
"""
# Load.
self._initialize(module_name)
# Run initialization checks.
self._initialize_check()
def _initialize(self, module_name):
"""Read configuration.
@param module_name: module name.
"""
machinery = self.options.get(module_name)
for vmname in machinery["machines"]:
options = self.options.get(vmname)
# If configured, use specific network interface for this
# machine, else use the default value.
if options.get("interface"):
interface = options["interface"]
else:
interface = machinery.get("interface")
if options.get("resultserver_ip"):
ip = options["resultserver_ip"]
else:
ip = config("cuckoo:resultserver:ip")
if options.get("resultserver_port"):
port = options["resultserver_port"]
else:
# The ResultServer port might have been dynamically changed,
# get it from the ResultServer singleton. Also avoid import
# recursion issues by importing ResultServer here.
from cuckoo.core.resultserver import ResultServer
port = ResultServer().port
self.db.add_machine(
name=vmname,
label=options[self.LABEL],
ip=options.ip,
platform=options.platform,
options=options.get("options", ""),
tags=options.tags,
interface=interface,
snapshot=options.snapshot,
resultserver_ip=ip,
resultserver_port=port
)
def _initialize_check(self):
"""Run checks against virtualization software when a machine manager
is initialized.
@note: in machine manager modules you may override or superclass
his method.
@raise CuckooMachineError: if a misconfiguration or a unkown vm state
is found.
"""
try:
configured_vms = self._list()
except NotImplementedError:
return
for machine in self.machines():
# If this machine is already in the "correct" state, then we
# go on to the next machine.
if machine.label in configured_vms and \
self._status(machine.label) in [self.POWEROFF, self.ABORTED]:
continue
# This machine is currently not in its correct state, we're going
# to try to shut it down. If that works, then the machine is fine.
try:
self.stop(machine.label)
except CuckooMachineError as e:
raise CuckooCriticalError(
"Please update your configuration. Unable to shut '%s' "
"down or find the machine in its proper state: %s" %
(machine.label, e)
)
if not config("cuckoo:timeouts:vm_state"):
raise CuckooCriticalError(
"Virtual machine state change timeout has not been set "
"properly, please update it to be non-null."
)
def machines(self):
"""List virtual machines.
@return: virtual machines list
"""
return self.db.list_machines()
def availables(self):
"""Return how many machines are free.
@return: free machines count.
"""
return self.db.count_machines_available()
def acquire(self, machine_id=None, platform=None, tags=None):
"""Acquire a machine to start analysis.
@param machine_id: machine ID.
@param platform: machine platform.
@param tags: machine tags
@return: machine or None.
"""
if machine_id:
return self.db.lock_machine(label=machine_id)
elif platform:
return self.db.lock_machine(platform=platform, tags=tags)
else:
return self.db.lock_machine(tags=tags)
def release(self, label=None):
"""Release a machine.
@param label: machine name.
"""
self.db.unlock_machine(label)
def running(self):
"""Return running virtual machines.
@return: running virtual machines list.
"""
return self.db.list_machines(locked=True)
def shutdown(self):
"""Shutdown the machine manager and kill all alive machines.
@raise CuckooMachineError: if unable to stop machine.
"""
if len(self.running()) > 0:
log.info("Still %s guests alive. Shutting down...",
len(self.running()))
for machine in self.running():
try:
self.stop(machine.label)
except CuckooMachineError as e:
log.warning("Unable to shutdown machine %s, please check "
"manually. Error: %s", machine.label, e)
def set_status(self, label, status):
"""Set status for a virtual machine.
@param label: virtual machine label
@param status: new virtual machine status
"""
self.db.set_machine_status(label, status)
def start(self, label, task):
"""Start a machine.
@param label: machine name.
@param task: task object.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
def stop(self, label=None):
"""Stop a machine.
@param label: machine name.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
def _list(self):
"""List virtual machines configured.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
def dump_memory(self, label, path):
"""Take a memory dump of a machine.
@param path: path to where to store the memory dump.
"""
raise NotImplementedError
def enable_remote_control(self, label):
"""Enable remote control interface (RDP/VNC/SSH).
@param label: machine name.
@return: None
"""
raise NotImplementedError
def disable_remote_control(self, label):
"""Disable remote control interface (RDP/VNC/SSH).
@param label: machine name.
@return: None
"""
raise NotImplementedError
def get_remote_control_params(self, label):
"""Return connection details for remote control.
@param label: machine name.
@return: dict with keys: protocol, host, port
"""
raise NotImplementedError
def _wait_status(self, label, *states):
"""Wait for a vm status.
@param label: virtual machine name.
@param state: virtual machine status, accepts multiple states as list.
@raise CuckooMachineError: if default waiting timeout expire.
"""
# This block was originally suggested by Loic Jaquemet.
waitme = 0
try:
current = self._status(label)
except NameError:
return
while current not in states:
log.debug("Waiting %i cuckooseconds for machine %s to switch "
"to status %s", waitme, label, states)
if waitme > config("cuckoo:timeouts:vm_state"):
raise CuckooMachineError(
"Timeout hit while for machine %s to change status" % label
)
time.sleep(1)
waitme += 1
current = self._status(label)
@staticmethod
def version():
"""Return the version of the virtualization software"""
return None
class LibVirtMachinery(Machinery):
"""Libvirt based machine manager.
If you want to write a custom module for a virtualization software
supported by libvirt you have just to inherit this machine manager and
change the connection string.
"""
# VM states.
RUNNING = "running"
PAUSED = "paused"
POWEROFF = "poweroff"
ERROR = "machete"
ABORTED = "abort"
def __init__(self):
if not HAVE_LIBVIRT:
raise CuckooDependencyError(
"The libvirt package has not been installed "
"(`pip install libvirt-python`)"
)
super(LibVirtMachinery, self).__init__()
def initialize(self, module):
"""Initialize machine manager module. Override default to set proper
connection string.
@param module: machine manager module
"""
super(LibVirtMachinery, self).initialize(module)
def _initialize_check(self):
"""Run all checks when a machine manager is initialized.
@raise CuckooMachineError: if libvirt version is not supported.
"""
# Version checks.
if not self._version_check():
raise CuckooMachineError("Libvirt version is not supported, "
"please get an updated version")
# Preload VMs
self.vms = self._fetch_machines()
# Base checks. Also attempts to shutdown any machines which are
# currently still active.
super(LibVirtMachinery, self)._initialize_check()
def start(self, label, task):
"""Start a virtual machine.
@param label: virtual machine name.
@param task: task object.
@raise CuckooMachineError: if unable to start virtual machine.
"""
log.debug("Starting machine %s", label)
if self._status(label) != self.POWEROFF:
msg = "Trying to start a virtual machine that has not " \
"been turned off {0}".format(label)
raise CuckooMachineError(msg)
conn = self._connect()
vm_info = self.db.view_machine_by_label(label)
snapshot_list = self.vms[label].snapshotListNames(flags=0)
# If a snapshot is configured try to use it.
if vm_info.snapshot and vm_info.snapshot in snapshot_list:
# Revert to desired snapshot, if it exists.
log.debug("Using snapshot {0} for virtual machine "
"{1}".format(vm_info.snapshot, label))
try:
vm = self.vms[label]
snapshot = vm.snapshotLookupByName(vm_info.snapshot, flags=0)
self.vms[label].revertToSnapshot(snapshot, flags=0)
except libvirt.libvirtError:
msg = "Unable to restore snapshot {0} on " \
"virtual machine {1}".format(vm_info.snapshot, label)
raise CuckooMachineError(msg)
finally:
self._disconnect(conn)
elif self._get_snapshot(label):
snapshot = self._get_snapshot(label)
log.debug("Using snapshot {0} for virtual machine "
"{1}".format(snapshot.getName(), label))
try:
self.vms[label].revertToSnapshot(snapshot, flags=0)
except libvirt.libvirtError:
raise CuckooMachineError("Unable to restore snapshot on "
"virtual machine {0}".format(label))
finally:
self._disconnect(conn)
else:
self._disconnect(conn)
raise CuckooMachineError("No snapshot found for virtual machine "
"{0}".format(label))
# Check state.
self._wait_status(label, self.RUNNING)
def stop(self, label):
"""Stop a virtual machine. Kill them all.
@param label: virtual machine name.
@raise CuckooMachineError: if unable to stop virtual machine.
"""
log.debug("Stopping machine %s", label)
if self._status(label) == self.POWEROFF:
raise CuckooMachineError("Trying to stop an already stopped "
"machine {0}".format(label))
# Force virtual machine shutdown.
conn = self._connect()
try:
if not self.vms[label].isActive():
log.debug("Trying to stop an already stopped machine %s. "
"Skip", label)
else:
self.vms[label].destroy() # Machete's way!
except libvirt.libvirtError as e:
raise CuckooMachineError("Error stopping virtual machine "
"{0}: {1}".format(label, e))
finally:
self._disconnect(conn)
# Check state.
self._wait_status(label, self.POWEROFF)
def shutdown(self):
"""Override shutdown to free libvirt handlers - they print errors."""
super(LibVirtMachinery, self).shutdown()
# Free handlers.
self.vms = None
def dump_memory(self, label, path):
"""Take a memory dump.
@param path: path to where to store the memory dump.
"""
log.debug("Dumping memory for machine %s", label)
conn = self._connect()
try:
# Resolve permission issue as libvirt creates the file as
# root/root in mode 0600, preventing us from reading it. This
# supposedly still doesn't allow us to remove it, though..
open(path, "wb").close()
self.vms[label].coreDump(path, flags=libvirt.VIR_DUMP_MEMORY_ONLY)
except libvirt.libvirtError as e:
raise CuckooMachineError("Error dumping memory virtual machine "
"{0}: {1}".format(label, e))
finally:
self._disconnect(conn)
def _status(self, label):
"""Get current status of a vm.
@param label: virtual machine name.
@return: status string.
"""
log.debug("Getting status for %s", label)
# Stetes mapping of python-libvirt.
# virDomainState
# VIR_DOMAIN_NOSTATE = 0
# VIR_DOMAIN_RUNNING = 1
# VIR_DOMAIN_BLOCKED = 2
# VIR_DOMAIN_PAUSED = 3
# VIR_DOMAIN_SHUTDOWN = 4
# VIR_DOMAIN_SHUTOFF = 5
# VIR_DOMAIN_CRASHED = 6
# VIR_DOMAIN_PMSUSPENDED = 7
conn = self._connect()
try:
state = self.vms[label].state(flags=0)
except libvirt.libvirtError as e:
raise CuckooMachineError("Error getting status for virtual "
"machine {0}: {1}".format(label, e))
finally:
self._disconnect(conn)
if state:
if state[0] == 1:
status = self.RUNNING
elif state[0] == 3:
status = self.PAUSED
elif state[0] == 4 or state[0] == 5:
status = self.POWEROFF
else:
status = self.ERROR
# Report back status.
if status:
self.set_status(label, status)
return status
else:
raise CuckooMachineError("Unable to get status for "
"{0}".format(label))
def _connect(self):
"""Connect to libvirt subsystem.
@raise CuckooMachineError: when unable to connect to libvirt.
"""
# Check if a connection string is available.
if not self.dsn:
raise CuckooMachineError("You must provide a proper "
"connection string")
try:
return libvirt.open(self.dsn)
except libvirt.libvirtError:
raise CuckooMachineError("Cannot connect to libvirt")
def _disconnect(self, conn):
"""Disconnect from libvirt subsystem.
@raise CuckooMachineError: if cannot disconnect from libvirt.
"""
try:
conn.close()
except libvirt.libvirtError:
raise CuckooMachineError("Cannot disconnect from libvirt")
def _fetch_machines(self):
"""Fetch machines handlers.
@return: dict with machine label as key and handle as value.
"""
vms = {}
for vm in self.machines():
vms[vm.label] = self._lookup(vm.label)
return vms
def _lookup(self, label):
"""Search for a virtual machine.
@param conn: libvirt connection handle.
@param label: virtual machine name.
@raise CuckooMachineError: if virtual machine is not found.
"""
conn = self._connect()
try:
vm = conn.lookupByName(label)
except libvirt.libvirtError:
raise CuckooMachineError("Cannot find machine "
"{0}".format(label))
finally:
self._disconnect(conn)
return vm
def _list(self):
"""List available virtual machines.
@raise CuckooMachineError: if unable to list virtual machines.
"""
conn = self._connect()
try:
names = conn.listDefinedDomains()
except libvirt.libvirtError:
raise CuckooMachineError("Cannot list domains")
finally:
self._disconnect(conn)
return names
def _version_check(self):
"""Check if libvirt release supports snapshots.
@return: True or false.
"""
if libvirt.getVersion() >= 8000:
return True
else:
return False
def _get_snapshot(self, label):
"""Get current snapshot for virtual machine
@param label: virtual machine name
@return None or current snapshot
@raise CuckooMachineError: if cannot find current snapshot or
when there are too many snapshots available
"""
def _extract_creation_time(node):
"""Extracts creation time from a KVM vm config file.
@param node: config file node
@return: extracted creation time
"""
xml = ET.fromstring(node.getXMLDesc(flags=0))
return xml.findtext("./creationTime")
snapshot = None
conn = self._connect()
try:
vm = self.vms[label]
# Try to get the currrent snapshot, otherwise fallback on the latest
# from config file.
if vm.hasCurrentSnapshot(flags=0):
snapshot = vm.snapshotCurrent(flags=0)
else:
log.debug("No current snapshot, using latest snapshot")
# No current snapshot, try to get the last one from config file.
snapshot = sorted(vm.listAllSnapshots(flags=0),
key=_extract_creation_time,
reverse=True)[0]
except libvirt.libvirtError:
raise CuckooMachineError("Unable to get snapshot for "
"virtual machine {0}".format(label))
finally:
self._disconnect(conn)
return snapshot
def enable_remote_control(self, label):
# TODO: we can't dynamically enable/disable this right now
pass
def disable_remote_control(self, label):
pass
def get_remote_control_params(self, label):
conn = self._connect()
try:
vm = conn.lookupByName(label)
if not vm:
log.warning("No such VM: %s", label)
return {}
port = 0
desc = ET.fromstring(vm.XMLDesc())
for elem in desc.findall("./devices/graphics"):
if elem.attrib.get("type") == "vnc":
# Future work: passwd, listen, socket (addr:port)
port = elem.attrib.get("port")
if port:
port = int(port)
break
finally:
self._disconnect(conn)
if port <= 0:
log.error("VM %s does not have a valid VNC port", label)
return {}
# TODO The Cuckoo Web Interface may be running at a different host
# than the actual Cuckoo daemon (and as such, the VMs).
return {
"protocol": "vnc",
"host": "127.0.0.1",
"port": port,
}
class Processing(object):
"""Base abstract class for processing module."""
order = 1
enabled = True
def __init__(self):
self.analysis_path = ""
self.baseline_path = ""
self.logs_path = ""
self.task = None
self.machine = None
self.options = None
self.results = {}
@classmethod
def init_once(cls):
pass
def set_options(self, options):
"""Set processing options.
@param options: processing options dict.
"""
self.options = Dictionary(options)
def set_task(self, task):
"""Add task information.
@param task: task dictionary.
"""
self.task = task
def set_machine(self, machine):
"""Add machine information."""
self.machine = machine
def set_baseline(self, baseline_path):
"""Set the path to the baseline directory."""
self.baseline_path = baseline_path
def set_path(self, analysis_path):
"""Set paths.
@param analysis_path: analysis folder path.
"""
self.analysis_path = analysis_path
self.log_path = os.path.join(self.analysis_path, "analysis.log")
self.cuckoolog_path = os.path.join(self.analysis_path, "cuckoo.log")
self.file_path = os.path.realpath(os.path.join(self.analysis_path,
"binary"))
self.dropped_path = os.path.join(self.analysis_path, "files")
self.dropped_meta_path = os.path.join(self.analysis_path, "files.json")
self.extracted_path = os.path.join(self.analysis_path, "extracted")
self.package_files = os.path.join(self.analysis_path, "package_files")
self.buffer_path = os.path.join(self.analysis_path, "buffer")
self.logs_path = os.path.join(self.analysis_path, "logs")
self.shots_path = os.path.join(self.analysis_path, "shots")
self.pcap_path = os.path.join(self.analysis_path, "dump.pcap")
self.pmemory_path = os.path.join(self.analysis_path, "memory")
self.memory_path = os.path.join(self.analysis_path, "memory.dmp")
self.mitmout_path = os.path.join(self.analysis_path, "mitm.log")
self.mitmerr_path = os.path.join(self.analysis_path, "mitm.err")
self.tlsmaster_path = os.path.join(self.analysis_path, "tlsmaster.txt")
self.suricata_path = os.path.join(self.analysis_path, "suricata")
self.network_path = os.path.join(self.analysis_path, "network")
self.taskinfo_path = os.path.join(self.analysis_path, "task.json")
def set_results(self, results):
"""Set the results - the fat dictionary."""
self.results = results
def run(self):
"""Start processing.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
class Signature(object):
"""Base class for Cuckoo signatures."""
name = ""
description = ""
severity = 1
order = 1
categories = []
families = []
authors = []
references = []
platform = None
alert = False
enabled = True
minimum = None
maximum = None
ttp = []
# Maximum amount of marks to record.
markcount = 50
# Basic filters to reduce the amount of events sent to this signature.
filter_apinames = []
filter_categories = []
def __init__(self, caller):
"""
@param caller: calling object. Stores results in caller.results
"""
self.marks = []
self.matched = False
self._caller = caller
# These are set by the caller, they represent the process identifier
# and call index respectively.
self.pid = None
self.cid = None
self.call = None
@classmethod
def init_once(cls):
pass
def _check_value(self, pattern, subject, regex=False, all=False):
"""Check a pattern against a given subject.
@param pattern: string or expression to check for.
@param subject: target of the check.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
ret = set()
if regex:
exp = re.compile(pattern, re.IGNORECASE)
if isinstance(subject, list):
for item in subject:
if exp.match(item):
ret.add(item)
else:
if exp.match(subject):
ret.add(subject)
else:
if isinstance(subject, list):
for item in subject:
if item.lower() == pattern.lower():
ret.add(item)
else:
if subject == pattern:
ret.add(subject)
# Return all elements.
if all:
return list(ret)
# Return only the first element, if available. Otherwise return None.
elif ret:
return ret.pop()
def get_results(self, key=None, default=None):
if key:
return self._caller.results.get(key, default)
return self._caller.results
def get_processes(self, name=None):
"""Get a list of processes.
@param name: If set only return processes with that name.
@return: List of processes or empty list
"""
for item in self.get_results("behavior", {}).get("processes", []):
if name is None or item["process_name"] == name:
yield item
def get_process_by_pid(self, pid=None):
"""Get a process by its process identifier.
@param pid: pid to search for.
@return: process.
"""
for item in self.get_results("behavior", {}).get("processes", []):
if item["pid"] == pid:
return item
def get_summary(self, key=None, default=[]):
"""Get one or all values related to the global summary."""
summary = self.get_results("behavior", {}).get("summary", {})
return summary.get(key, default) if key else summary
def get_summary_generic(self, pid, actions):
"""Get generic info from summary.
@param pid: pid of the process. None for all
@param actions: A list of actions to get
"""
ret = []
for process in self.get_results("behavior", {}).get("generic", []):
if pid is not None and process["pid"] != pid:
continue
for action in actions:
if action in process["summary"]:
ret += process["summary"][action]
return ret
def get_files(self, pid=None, actions=None):
"""Get files read, queried, or written to optionally by a
specific process.
@param pid: the process or None for all
@param actions: actions to search for. None is all
@return: yields files
"""
if actions is None:
actions = [
"file_opened", "file_written",
"file_read", "file_deleted",
"file_exists", "file_failed",
]
return self.get_summary_generic(pid, actions)
def get_dll_loaded(self, pid=None):
"""Get DLLs loaded by a specific process.
@param pid: the process or None for all
@return: yields DLLs loaded
"""
return self.get_summary_generic(pid, ["dll_loaded"])
def get_keys(self, pid=None, actions=None):
"""Get registry keys.
@param pid: The pid to look in or None for all.
@param actions: the actions as a list.
@return: yields registry keys
"""
if actions is None:
actions = [
"regkey_opened", "regkey_written",
"regkey_read", "regkey_deleted",
]
return self.get_summary_generic(pid, actions)
def check_file(self, pattern, regex=False, actions=None, pid=None,
all=False):
"""Check for a file being opened.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@param actions: a list of key actions to use.
@param pid: The process id to check. If it is set to None, all
processes will be checked.
@return: boolean with the result of the check.
"""
if actions is None:
actions = [
"file_opened", "file_written",
"file_read", "file_deleted",
"file_exists", "file_failed",
]
return self._check_value(pattern=pattern,
subject=self.get_files(pid, actions),
regex=regex,
all=all)
def check_dll_loaded(self, pattern, regex=False, actions=None, pid=None,
all=False):
"""Check for DLLs being loaded.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@param pid: The process id to check. If it is set to None, all
processes will be checked.
@return: boolean with the result of the check.
"""
return self._check_value(pattern=pattern,
subject=self.get_dll_loaded(pid),
regex=regex,
all=all)
def check_command_line(self, pattern, regex=False, all=False):
"""Check for a command line being opened.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
return self._check_value(pattern=pattern,
subject=self.get_summary("command_line"),
regex=regex,
all=all)
def check_key(self, pattern, regex=False, actions=None, pid=None,
all=False):
"""Check for a registry key being accessed.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@param actions: a list of key actions to use.
@param pid: The process id to check. If it is set to None, all
processes will be checked.
@return: boolean with the result of the check.
"""
if actions is None:
actions = [
"regkey_written", "regkey_opened",
"regkey_read", "regkey_deleted",
]
return self._check_value(pattern=pattern,
subject=self.get_keys(pid, actions),
regex=regex,
all=all)
def get_mutexes(self, pid=None):
"""
@param pid: Pid to filter for
@return:List of mutexes
"""
return self.get_summary_generic(pid, ["mutex"])
def check_mutex(self, pattern, regex=False, all=False):
"""Check for a mutex being opened.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
return self._check_value(pattern=pattern,
subject=self.get_mutexes(),
regex=regex,
all=all)
def get_command_lines(self):
"""Retrieve all command lines used."""
return self.get_summary("command_line")
def get_wmi_queries(self):
"""Retrieve all executed WMI queries."""
return self.get_summary("wmi_query")
def get_net_generic(self, subtype):
"""Generic getting network data.
@param subtype: subtype string to search for.
"""
return self.get_results("network", {}).get(subtype, [])
def get_net_hosts(self):
"""Return a list of all hosts."""
return self.get_net_generic("hosts")
def get_net_domains(self):
"""Return a list of all domains."""
return self.get_net_generic("domains")
def get_net_http(self):
"""Return a list of all http data."""
return self.get_net_generic("http")
def get_net_http_ex(self):
"""Return a list of all http data."""
return \
self.get_net_generic("http_ex") + self.get_net_generic("https_ex")
def get_net_udp(self):
"""Return a list of all udp data."""
return self.get_net_generic("udp")
def get_net_icmp(self):
"""Return a list of all icmp data."""
return self.get_net_generic("icmp")
def get_net_irc(self):
"""Return a list of all irc data."""
return self.get_net_generic("irc")
def get_net_smtp(self):
"""Return a list of all smtp data."""
return self.get_net_generic("smtp")
def get_net_smtp_ex(self):
""""Return a list of all smtp data"""
return self.get_net_generic("smtp_ex")
def get_virustotal(self):
"""Return the information retrieved from virustotal."""
return self.get_results("virustotal", {})
def get_volatility(self, module=None):
"""Return the data that belongs to the given module."""
volatility = self.get_results("memory", {})
return volatility if module is None else volatility.get(module, {})
def get_apkinfo(self, section=None, default={}):
"""Return the apkinfo results for this analysis."""
apkinfo = self.get_results("apkinfo", {})
return apkinfo if section is None else apkinfo.get(section, default)
def get_droidmon(self, section=None, default={}):
"""Return the droidmon results for this analysis."""
droidmon = self.get_results("droidmon", {})
return droidmon if section is None else droidmon.get(section, default)
def get_googleplay(self, section=None, default={}):
"""Return the Google Play results for this analysis."""
googleplay = self.get_results("googleplay", {})
return googleplay if section is None else googleplay.get(section, default)
def check_ip(self, pattern, regex=False, all=False):
"""Check for an IP address being contacted.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
return self._check_value(pattern=pattern,
subject=self.get_net_hosts(),
regex=regex,
all=all)
def check_domain(self, pattern, regex=False, all=False):
"""Check for a domain being contacted.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
domains = set()
for item in self.get_net_domains():
domains.add(item["domain"])
return self._check_value(pattern=pattern,
subject=list(domains),
regex=regex,
all=all)
def check_url(self, pattern, regex=False, all=False):
"""Check for a URL being contacted.
@param pattern: string or expression to check for.
@param regex: boolean representing if the pattern is a regular
expression or not and therefore should be compiled.
@return: boolean with the result of the check.
"""
urls = set()
for item in self.get_net_http():
urls.add(item["uri"])
return self._check_value(pattern=pattern,
subject=list(urls),
regex=regex,
all=all)
def check_suricata_alerts(self, pattern):
"""Check for pattern in Suricata alert signature
@param pattern: string or expression to check for.
@return: True/False
"""
for alert in self.get_results("suricata", {}).get("alerts", []):
if re.findall(pattern, alert.get("signature", ""), re.I):
return True
return False
def init(self):
"""Allow signatures to initialize themselves."""
def mark_call(self, *args, **kwargs):
"""Mark the current call as explanation as to why this signature
matched."""
mark = {
"type": "call",
"pid": self.pid,
"cid": self.cid,
"call": self.call,
}
if args or kwargs:
log.warning(
"You have provided extra arguments to the mark_call() method "
"which no longer supports doing so. Please report explicit "
"IOCs through mark_ioc()."
)
self.marks.append(mark)
def mark_ioc(self, category, ioc, description=None):
"""Mark an IOC as explanation as to why the current signature
matched."""
mark = {
"type": "ioc",
"category": category,
"ioc": ioc,
"description": description,
}
# Prevent duplicates.
if mark not in self.marks:
self.marks.append(mark)
def mark_vol(self, plugin, **kwargs):
"""Mark output of a Volatility plugin as explanation as to why the
current signature matched."""
mark = {
"type": "volatility",
"plugin": plugin,
}
mark.update(kwargs)
self.marks.append(mark)
def mark_config(self, config):
"""Mark configuration from this malware family."""
if not isinstance(config, dict) or "family" not in config:
raise CuckooCriticalError("Invalid call to mark_config().")
self.marks.append({
"type": "config",
"config": config,
})
def mark(self, **kwargs):
"""Mark arbitrary data."""
mark = {
"type": "generic",
}
mark.update(kwargs)
self.marks.append(mark)
def has_marks(self, count=None):
"""Return true if this signature has one or more marks."""
if count is not None:
return len(self.marks) >= count
return not not self.marks
def on_call(self, call, process):
"""Notify signature about API call. Return value determines
if this signature is done or could still match.
Only called if signature is "active".
@param call: logged API call.
@param process: proc object.
"""
raise NotImplementedError
def on_signature(self, signature):
"""Event yielded when another signatures has matched. Some signatures
only take effect when one or more other signatures have matched as
well.
@param signature: The signature that just matched
"""
def on_process(self, process):
"""Called on process change.
Can be used for cleanup of flags, re-activation of the signature, etc.
@param process: dictionary describing this process
"""
def on_yara(self, category, filepath, match):
"""Called on YARA match.
@param category: yara match category
@param filepath: path to the file that matched
@param match: yara match information
The Yara match category can be one of the following.
extracted: an extracted PE image from a process memory dump
procmem: a process memory dump
dropped: a dropped file
"""
def on_extract(self, match):
"""Called on an Extracted match.
@param match: extracted match information
"""
def on_complete(self):
"""Signature is notified when all API calls have been processed."""
def extend_ttp(self):
"""Find the short and long descriptions for the TTPs of a signature"""
d = {}
for t in self.ttp:
d[t] = self._caller.ttp_descriptions.get(t)
return d
def results(self):
"""Turn this signature into actionable results."""
return dict(name=self.name,
ttp=self.extend_ttp(),
description=self.description,
severity=self.severity,
families=self.families,
references=self.references,
marks=self.marks[:self.markcount],
markcount=len(self.marks))
@property
def cfgextr(self):
return self._caller.c
class Report(object):
"""Base abstract class for reporting module."""
order = 1
def __init__(self):
self.analysis_path = ""
self.reports_path = ""
self.task = None
self.options = None
@classmethod
def init_once(cls):
pass
def _get_analysis_path(self, subpath):
return os.path.join(self.analysis_path, subpath)
def set_path(self, analysis_path):
"""Set analysis folder path.
@param analysis_path: analysis folder path.
"""
self.analysis_path = analysis_path
self.file_path = os.path.realpath(self._get_analysis_path("binary"))
self.reports_path = self._get_analysis_path("reports")
self.shots_path = self._get_analysis_path("shots")
self.pcap_path = self._get_analysis_path("dump.pcap")
try:
Folders.create(self.reports_path)
except CuckooOperationalError as e:
raise CuckooReportError(e)
def set_options(self, options):
"""Set report options.
@param options: report options dict.
"""
self.options = Dictionary(options)
def set_task(self, task):
"""Add task information.
@param task: task dictionary.
"""
self.task = task
def run(self, results):
"""Start report processing.
@raise NotImplementedError: this method is abstract.
"""
raise NotImplementedError
class BehaviorHandler(object):
"""Base class for behavior handlers inside of BehaviorAnalysis."""
key = "undefined"
# Behavior event types this handler is interested in.
event_types = []
def __init__(self, behavior_analysis):
self.analysis = behavior_analysis
def handles_path(self, logpath):
"""Needs to return True for the log files this handler wants to
process."""
return False
def parse(self, logpath):
"""Called after handles_path succeeded, should generate behavior
events."""
raise NotImplementedError
def handle_event(self, event):
"""Handle an event that gets passed down the stack."""
raise NotImplementedError
def run(self):
"""Return the handler specific structure, gets placed into
behavior[self.key]."""
raise NotImplementedError
class ProtocolHandler(object):
"""Abstract class for protocol handlers coming out of the analysis."""
def __init__(self, task_id, ctx, version=None):
self.task_id = task_id
self.handler = ctx
self.fd = None
self.version = version
def __enter__(self):
self.init()
def __exit__(self, type, value, traceback):
self.close()
def close(self):
if self.fd:
self.fd.close()
self.fd = None
def handle(self):
raise NotImplementedError
class Extractor(object):
"""One piece in a series of recursive extractors & unpackers."""
yara_rules = []
# Minimum and maximum supported version in Cuckoo.
minimum = None
maximum = None
@classmethod
def init_once(cls):
pass
def __init__(self, parent):
self.parent = parent
def handle_yara(self, filepath, match):
raise NotImplementedError
def push_command_line(self, cmdline, process=None):
self.parent.push_command_line(cmdline, process)
def push_script(self, process, command):
self.parent.push_script(process, command)
def push_script_recursive(self, command):
self.parent.push_script_recursive(command)
def push_shellcode(self, sc):
self.parent.push_shellcode(sc)
def push_blob(self, blob, category, externals, info=None):
self.parent.push_blob(blob, category, externals, info)
def push_blob_noyara(self, blob, category, info=None):
self.parent.push_blob_noyara(blob, category, info)
def push_config(self, config):
self.parent.push_config(config)
def enhance(self, filepath, key, value):
self.parent.enhance(filepath, key, value)
创建您的新签名
为了让您更好地了解创建签名的过程,我们将一起创建一个非常简单的签名,并逐步介绍这些步骤和可用选项。为此,我们将创建一个签名来检查所分析的恶意软件是否打开了名为“i_am_a_malware”的互斥锁。
首先要做的是导入依赖项,创建骨架并定义一些初始属性。这些是您当前可以设置的:
name
:签名的标识符。description
:对签名所代表的内容的简要描述。severity
:标识匹配事件严重性的数字(通常在 1 到 3 之间)。categories
:描述匹配事件类型的类别列表(例如“ banker ”、“ injection ”或“ anti-vm ”)。families
:恶意软件家族名称的列表,以防签名与已知的特定匹配。authors
:创作签名的人员列表。references
:为签名提供上下文的引用列表(URL)。enable
:如果设置为 False,则将跳过签名。alert
:如果设置为 True 可用于指定应报告签名(可能由专用报告模块)。minimum
:成功运行此签名所需的最低 Cuckoo 版本。maximum
:成功运行此签名所需的最大 Cuckoo 版本。
在我们的示例中,我们将创建以下框架:
from cuckoo.common.abstracts import Signature
class BadBadMalware(Signature): # We initialize the class inheriting Signature.
name = "badbadmalware" # We define the name of the signature
description = "Creates a mutex known to be associated with Win32.BadBadMalware" # We provide a description
severity = 3 # We set the severity to maximum
categories = ["trojan"] # We add a category
families = ["badbadmalware"] # We add the name of our fictional malware family
authors = ["Me"] # We specify the author
minimum = "2.0" # We specify that in order to run the signature, the user will simply need Cuckoo 2.0
def on_complete(self):
return
这是一个完全有效的签名。它还没有真正做任何事情,所以现在我们需要定义签名匹配的条件。
正如我们所说,我们想要匹配一个特定的互斥锁名称,所以我们进行如下操作:
from cuckoo.common.abstracts import Signature
class BadBadMalware(Signature):
name = "badbadmalware"
description = "Creates a mutex known to be associated with Win32.BadBadMalware"
severity = 3
categories = ["trojan"]
families = ["badbadmalware"]
authors = ["Me"]
minimum = "2.0"
def on_complete(self):
return self.check_mutex("i_am_a_malware")
就这么简单,现在我们的签名将返回True
是否观察到分析的恶意软件打开了指定的互斥锁。
如果您想更明确并直接访问全局容器,可以通过以下方式转换先前的签名:
from cuckoo.common.abstracts import Signature
class BadBadMalware(Signature):
name = "badbadmalware"
description = "Creates a mutex known to be associated with Win32.BadBadMalware"
severity = 3
categories = ["trojan"]
families = ["badbadmalware"]
authors = ["Me"]
minimum = "2.0"
def on_complete(self):
for process in self.get_processes_by_pid():
if "summary" in process and "mutexes" in process["summary"]:
for mutex in process["summary"]["mutexes"]:
if mutex == "i_am_a_malware":
return True
return False
事件签名
从 1.0 版本开始,Cuckoo 提供了一种编写更高性能签名的方法。过去,每个签名都需要遍历分析期间收集的整个 API 调用集合。当此类集合规模较大时,这会不必要地导致性能问题。
从 1.2 开始,Cuckoo 只支持所谓的“事件签名”。基于该run
功能的旧签名可以移植到使用 on_complete
。
主要区别在于,使用这种新格式,所有签名将并行执行,并且on_call()
将通过 API 调用集合在一个循环内为每个签名调用一个回调函数。
使用此技术的示例签名如下:
from cuckoo.common.abstracts import Signature
class SystemMetrics(Signature):
name = "generic_metrics"
description = "Uses GetSystemMetrics"
severity = 2
categories = ["generic"]
authors = ["Cuckoo Developers"]
minimum = "2.0"
# Evented signatures can specify filters that reduce the amount of
# API calls that are streamed in. One can filter Process name, API
# name/identifier and category. These should be sets for faster lookup.
filter_processnames = set()
filter_apinames = set(["GetSystemMetrics"])
filter_categories = set()
# This is a signature template. It should be used as a skeleton for
# creating custom signatures, therefore is disabled by default.
# on_call函数用于event签名。
# 这些使用更有效的方式处理记录的API调用。
enabled = False
def on_complete(self):
# In the on_complete method one can implement any cleanup code and
# decide one last time if this signature matches or not.
# Return True in case it matches.
return False
# This method will be called for every logged API call by the loop
# in the RunSignatures plugin. The return value determines the "state"
# of this signature. True means the signature matched and False it did not this time.
# Use self.deactivate() to stop streaming in API calls.
def on_call(self, call, pid, tid):
# This check would in reality not be needed as we already make use
# of filter_apinames above.
if call["api"] == "GetSystemMetrics":
# Signature matched, return True.
return True
# continue
return None
内联注释已经不言自明。
当签名匹配时触发另一个事件:
required = ["creates_exe", "badmalware"]
def on_signature(self, matched_sig):
if matched_sig in self.required:
self.required.remove(matched_sig)
if not self.required:
return True
return False
这种签名可用于将识别异常的多个签名组合成一个对样本进行分类的签名(恶意软件警报)。
cuckoosandbox/community/modules/signatures
android
android_antivirus_virustotal.py
android_dangerous_permissions.py
android_dynamic_code.py
android_embedded_apk.py
android_google_play_diff.py
android_native_code.py
android_reflection_code.py
Application
application_aborted_broadcast_receiver.py
application_deleted_app.py
application_executed_shell_command.py
application_installed_app.py
application_queried_account_info.py
application_queried_installed_apps.py
application_queried_phone_number.py
application_queried_private_information.py
application_recording_audio.py
application_registered_receiver_runtime.py
application_sent_sms_messages.py
application_stopped_processes.py
application_uses_location.py
application_using_the_camera.py
cross
buffer.py
buffer2.py
html_flash.py
js_eval.py
js_iframe.py
js_suspicious.py
keys.py
static_pdf.py
darwin
code_injection.py
task_for_pid.py
extractor
dde.py
filetypes.py
ole.py
powerfun.py
unicorn.py
linux
network
dead_host.py
dns_cnc.py
dns_tld.py
network_bind.py:一套API调用
from lib.cuckoo.common.abstracts import Signature
class NetworkBIND(Signature):
name = "network_bind"
description = "Starts servers listening"
severity = 2
categories = ["bind"]
authors = ["nex", "Accuvant"]
minimum = "2.0"
filter_apinames = "bind", "listen", "accept"
def init(self):
self.mask = 0
def on_call(self, call, process):
if call["api"] == "bind":
self.mark_call()
self.mask |= 1
if call["api"] == "listen":
self.mark_call()
self.mask |= 2
if call["api"] == "accept":
self.mark_call()
self.mask |= 4
def on_complete(self):
return self.mask == 7
network_cnc_http.py
network_dyndns.py
network_http.py
network_icmp.py
network_irc.py
network_smtp.py
network_torgateway.py
network_wscript.py
nolookup_communication.py
p2p_cnc.py
snort.py
suricata.py
import re
from lib.cuckoo.common.abstracts import Signature
mapping = {
"lokibot": "loki",
}
# Obviously needs some more work.
protocols = {
80: "http",
443: "https",
}
class SuricataAlert(Signature):
name = "suricata_alert"
description = "Raised Suricata alerts"
severity = 3
categories = ["network"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
et_trojan = "ET TROJAN", "ETPRO TROJAN"
blocklist = (
"executable", "potential", "likely", "rogue", "supicious", "generic",
"possible", "known", "common", "troj", "trojan", "team", "probably",
"w2km", "http", "abuse", "win32", "unknown", "single", "filename",
"worm", "fake", "malicious", "observed", "windows",
)
family_next = (
"win32", "win64", "w32", "ransomware",
)
def extract_family(self, signature):
words = re.findall("[A-Za-z0-9]+", signature)
if len(words) < 3:
return
family = words[2].lower()
if family in self.family_next and len(words) > 3:
family = words[3].lower()
if family in self.blocklist or len(family) < 4:
return
# If it exists in our mapping, normalize the name.
return mapping.get(family, family)
def on_complete(self):
alerts = []
for alert in self.get_results("suricata", {}).get("alerts", []):
if alert["signature"] in alerts:
continue
if not alert["signature"].startswith(self.et_trojan):
continue
# Extract text between parentheses.
reg_type = re.search("\\(([A-Za-z0-9_]+)\\)", alert["signature"])
reg_type = reg_type.group(1) if reg_type else None
family = self.extract_family(alert["signature"])
if not family:
continue
self.mark_config({
"family": family.title(),
"cnc": "%s://%s:%s" % (
protocols.get(alert["dst_port"], "tcp"),
alert["dst_ip"], alert["dst_port"]
),
"type": reg_type,
})
self.mark_ioc("suricata", alert["signature"])
alerts.append(alert["signature"])
return self.has_marks()
windows
Anti
Anti-AV
antiav_avast_libs.py:通过模块名和参数名,确定调用的模块
from lib.cuckoo.common.abstracts import Signature
class AvastDetectLibs(Signature):
name = "antiav_avast_libs"
description = "Detects Avast Antivirus through the presence of a library"
severity = 3
categories = ["anti-av"]
authors = ["Optiv"]
minimum = "2.0"
ttp = ["T1063"]
filter_apinames = set(["LdrLoadDll", "LdrGetDllHandle"])
def on_call(self, call, process):
dllname = call["arguments"]["module_name"]
if "snxhk" in dllname.lower():
self.mark_call()
def on_complete(self):
return self.has_marks()
antiav_bitdefender_libs.py
antiav_detectfile.py
antiav_detectreg.py
antiav_servicestop.py
antiav_srp.py
Anti-DBG
antidbg_debuggercheck.py
antidbg_devices.py
antidbg_windows.py
🙋♀️🙋♀️🙋♀️Anti-SandBox🙋♀️🙋♀️🙋♀️
antisandbox_clipboard.py
antisandbox_cuckoo_files.py
antisandbox_file.py
antisandbox_forehwnd.py
antisandbox_fortinet_files.py
antisandbox_idletime.py
antisandbox_joe_anubis_files.py
antisandbox_mouse_hook.py
antisandbox_restart.py
antisandbox_sleep.py
antisandbox_sunbelt.py
antisandbox_sunbelt_files.py
antisandbox_threattrack_files.py
antisandbox_unhook.py
Anti-Virus
antivirus_detection_cn.py
antivirus_irma.py
antivirus_virustotal.py
Anti-VM
antivm_bochs_keys.py
antivm_computername.py
antivm_disksize.py
antivm_generic_bios.py
antivm_generic_cpu.py
antivm_generic_disk.py
antivm_generic_firmware.py
antivm_generic_ide.py
antivm_generic_scsi.py
antivm_generic_services.py
antivm_hyperv_keys.py
antivm_memory_available.py
antivm_network_adapter.py
antivm_parallels_keys.py
antivm_parallels_window.py
antivm_psuedo_device.py
antivm_sandboxie.py
antivm_vbox_acpi.py
antivm_vbox_devices.py
antivm_vbox_files.py
antivm_vbox_keys.py
antivm_vbox_provname.py
antivm_vbox_window.py
antivm_virtualpc.py
antivm_virtualpc_magic.py
antivm_virtualpc_window.py
antivm_vmware_files.py
antivm_vmware_in_insn.py
antivm_vmware_keys.py
antivm_vmware_window.py
antivm_vpc_keys.py
antivm_xen_keys.py
APT
apt_carbunak.py:互斥体字符串+正则
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class APT_Carbunak(Signature):
name = "apt_carbunak"
description = "Creates known Carbunak/Anunak APT files, registry keys and/or mutexes"
severity = 3
categories = ["apt"]
families = ["carbunak"]
authors = ["RedSocks"]
minimum = "2.0"
mutexes_re = [
".*Uw1HDFMKPAlFRFYZ.*",
".*VVpHVlcJbQtFQVNP.*",
".*BAgXCgAIbQtFFwRP.*",
".*WApFWgRebQtFRFZP.*",
".*VlxDV1lSbQtFTVdP.*",
".*AwxMCwcIbQtFTQBP.*",
".*UghMW1BbbQtFRlNP.*",
".*Vl5FCABfbQtFRQNP.*",
".*Vg9GC1FbbQtFQlRP.*",
".*BQ9EXgBbbQtFF1RP.*",
]
def on_complete(self):
for indicator in self.mutexes_re:
for mutex in self.check_mutex(pattern=indicator, regex=True, all=True):
self.mark_ioc("mutex", mutex)
return self.has_marks()
apt_cloudatlas.py:文件名字符串+正则
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class APT_CloudAtlas(Signature):
name = "apt_cloudatlas"
description = "Creates known CloudAtlas APT files, registry keys and/or mutexes"
severity = 3
categories = ["apt"]
families = ["cloudatlas"]
authors = ["RedSocks"]
minimum = "2.0"
files_re = [
".*steinheimman",
".*papersaving",
".*previliges",
".*fundamentive",
".*bicorporate",
".*miditiming",
".*damnatorily",
".*munnopsis",
".*arzner",
".*redtailed",
".*roodgoose",
".*acholias",
".*salefians",
".*wartworts",
".*frequencyuse",
".*nonmagyar",
".*shebir",
".*getgoing",
]
def on_complete(self):
for indicator in self.files_re:
for filepath in self.check_file(pattern=indicator, regex=True, all=True):
self.mark_ioc("file", filepath)
return self.has_marks()
apt_flame.py:互斥体名+注册表路径+正则
# Copyright (C) 2012 Claudio "nex" Guarnieri (@botherder)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lib.cuckoo.common.abstracts import Signature
class Flame(Signature):
name = "targeted_flame"
description = "Shows some indicators associated with the Flame malware"
severity = 3
categories = ["targeted"]
families = ["flame", "skywiper"]
authors = ["nex"]
minimum = "2.0"
references = [
"http://www.crysys.hu/skywiper/skywiper.pdf",
"http://www.securelist.com/en/blog/208193522/The_Flame_Questions_and_Answers",
"http://www.certcc.ir/index.php?name=news&file=article&sid=1894",
]
mutexes_re = [
".*__fajb",
".*DVAAccessGuard",
".*mssecuritymgr"
]
regkeys_re = [
".*\\\\Microsoft\\ Shared\\\\MSSecurityMgr\\\\.*",
".*\\\\Ef_trace\\.log$"
]
def on_complete(self):
for indicator in self.mutexes_re:
mutex = self.check_mutex(pattern=indicator, regex=True)
if mutex:
self.mark_ioc("mutex", mutex)
for indicator in self.regkeys_re:
filepath = self.check_file(pattern=indicator, regex=True)
if filepath:
self.mark_ioc("file", filepath)
return self.has_marks()
apt_inception.py:文件名+正则
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class InceptionAPT(Signature):
name = "apt_inception"
description = "Creates known Inception APT files, registry keys and/or mutexes"
severity = 3
categories = ["apt"]
families = ["inception"]
authors = ["RedSocks"]
references = [
"https://www.bluecoat.com/security-blog/2014-12-09/blue-coat-exposes-%E2%80%9C-inception-framework%E2%80%9D-very-sophisticated-layered-malware",
]
minimum = "2.0"
files_re = [
".*polymorphed.*dll",
]
def on_complete(self):
for indicator in self.files_re:
for filepath in self.check_file(pattern=indicator, regex=True, all=True):
self.mark_ioc("file", filepath)
return self.has_marks()
apt_putter_panda.py:互斥体名+正则
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class PutterpandaMutexes(Signature):
name = "putterpanda_mutexes"
description = "Creates known Putter Panda APT mutexes"
severity = 3
categories = ["rat"]
families = ["panda"]
authors = ["RedSocks"]
minimum = "2.0"
mutexes_re = [
".*__PDH_PLA_MUTEX__",
]
def on_complete(self):
for indicator in self.mutexes_re:
if self.check_mutex(pattern=indicator, regex=True):
return True
apt_sandworm_ip.py:固定IP(IoCs)
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class apt_sandworm_ip(Signature):
name = "apt_sandworm_ip"
description = "Connects to Known Sandworm APT IP address"
severity = 2
categories = ["apt"]
authors = ["RedSocks"]
minimum = "2.0"
ipaddrs = [
"95.143.193.131",
"46.165.222.6",
"78.46.40.239",
"144.76.119.48",
"37.220.34.56",
"46.4.28.218",
"95.143.193.182",
"5.61.38.31",
"94.185.80.66",
"95.211.122.36"
]
def on_complete(self):
for ipaddr in self.ipaddrs:
if self.check_ip(pattern=ipaddr):
self.mark_ioc("ipaddr", ipaddr)
return self.has_marks()
apt_sandworm_url.py:URL+正则
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class apt_sandworm_url(Signature):
name = "apt_sandworm_url"
description = "Uses Known Sandworm APT URL Indicator"
severity = 2
categories = ["apt"]
authors = ["RedSocks"]
minimum = "2.0"
urls_re = [
".*YXJyYWtpczAy.*",
".*aG91c2VhdHJlaWRlczk0.*",
".*\/loadvers\/paramctrl\.php",
".*\/dirconf\/check\.php",
".*\/siteproperties\/viewframes\/dialog\.php",
]
def on_complete(self):
for url in self.urls_re:
if self.check_url(pattern=url, regex=True):
self.mark_ioc("url", url)
return self.has_marks()
apt_turlacarbon.py
# Copyright (C) 2015 Robby Zeitfuchs (@robbyFux)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lib.cuckoo.common.abstracts import Signature
class TurlaCarbon(Signature):
name = "apt_turlacarbon"
description = "Appears to be the targeted Turla Carbon malware"
severity = 3
alert = True
categories = ["apt"]
families = ["turla", "uroburos", "snake"]
authors = ["Robby Zeitfuchs", "@robbyFux"]
minimum = "2.0"
references = [
"https://blog.gdatasoftware.com/blog/article/analysis-of-project-cobra.html",
"https://malwr.com/analysis/MTI2M2RjYTAyZmNmNDE4ZTk5MDBkZjA4MDA5ZTFjMDc/",
]
filter_apinames = "NtWriteFile",
regkey_indicator = ".*\\\\ActiveComputerName$"
buffer_indicators = [
"[NAME]",
"[TIME]",
"iproc",
"user_winmin",
"user_winmax",
"object_id",
]
def init(self):
self.wrote = False
def on_call(self, call, process):
# Check whether each buffer indicator is in this buffer write.//检查每个缓冲区指示器是否在这个缓冲区写。
for indicator in self.buffer_indicators:
if indicator not in call["arguments"]["buffer"]:
break
else:
self.wrote = True
self.mark_call()
def on_complete(self):
if not self.check_key(self.regkey_indicator, regex=True):
return
if self.wrote:
return True
apt_uroburos_file.py
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class UroburosFile(Signature):
name = "uroburos_file"
description = "Creates known Turla/Uroburos APT files"
severity = 3
categories = ["rat"]
families = ["uroburos"]
authors = ["RedSocks"]
minimum = "2.0"
mutexes_re = [
".*turla10",
".*msdata\\\\.*",
".*1396695624",
]
def on_complete(self):
for mutex in self.mutexes_re:
if self.check_mutex(pattern=mutex, regex=True):
return True
apt_uroburos_mutex.py
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class UroburosMutexes(Signature):
name = "uroburos_mutexes"
description = "Creates known Turla/Uroburos APT mutexes"
severity = 3
categories = ["rat"]
families = ["uroburos"]
authors = ["RedSocks"]
minimum = "2.0"
files_re = [
".*\\\\drivers\\\\wo2ifsl.sys",
".*\\\\drivers\\\\acpied.sys",
".*\\\\drivers\\\\atmarpd.sys",
".*\\\\temp\\\\msmsgsmon.exe",
".*\\\\temp\\\\msdattst.ocx",
]
def on_complete(self):
for indicator in self.files_re:
if self.check_mutex(pattern=indicator, regex=True):
return True
if self.check_file(pattern=indicator, regex=True):
return True
BackDoor
backdoor_lolbot.py
backdoor_sdbot.py
backdoor_tdss.py
backdoor_vanbot.py
backdoor_whimoo.py
Banker
banker_bancos.py
banker_cridex.py
banker_prinimalka.py
banker_spyeye_mutex.py
banker_spyeye_url.py
banker_tinba_mutex.py
banker_zeus_mutex.py
banker_zeus_p2p.py
banker_zeus_url.py
banking_mutex.py
BitCoin
Boot
bootconfig_modify.py
bootkit.py
Bot
bot_athena_url.py
bot_athenahttp.py
bot_betabot_url.py
bot_dirtjumper.py
bot_drive.py
bot_drive2.py
bot_kelihos.py
bot_kovter.py
bot_madness.py
bot_pony_url.py
bot_russkill.py
bot_solar_url.py
bot_vnloader.py
bot_warbot_url.py
Browser
browser_bho.py
browser_security.py
browser_startpage.py
ByPass
Cloud
cloud_dropbox.py
cloud_google.py
cloud_mediafire.py
cloud_mega.py
cloud_rapidshare.py
cloud_wetransfer.py
cloudflare.py
Create
creates_doc.py
creates_exe.py
creates_hidden_file.py
creates_largekey.py
creates_null_reg_entry.py
creates_service.py
creates_shortcut.py
Crypto
DDOS
ddos_blackrev_mutex.py
ddos_darkddos_mutex.py
ddos_eclipse_mutex.py
ddos_ipkiller_mutex.py
Detect
detect_putty.py
detect_winscp.py
Disable
disables_app.py
disables_browserwarn.py
disables_security.py
disables_sysrestore.py
disables_wer.py
disables_windowsupdate.py
DNS
dns_dyndns_provider.py
dns_exp3322.py
dns_freehosting_domain.py
Exec
exec_bitsadmin.py
exec_crash.py
exec_waitfor.py
Exploit
exploit_blackhole_url.py
exploit_mutex.py
exploit_sweetorange_mutex.py
exploitation.py
IM
im_bittorrent_bleep.py
im_qq.py
InfoStealer
infostealer_bitcoin.py
infostealer_browser.py
infostealer_browser_modifications.py
infostealer_clipboard.py
infostealer_derusbi_file.py
infostealer_ftp.py
infostealer_im.py
infostealer_keylogger.py
infostealer_mail.py
Injection
injection_explorer.py
injection_memorymodify.py
injection_network_traffic.py
injection_runpe.py
injection_thread.py
injection_writememory.py
Keylogger
keylogger_ardamax_mutex.py
keylogger_jintor_mutex.py
Locate
locates_browser.py
locates_sniffer.py
Locker
locker_cmd.py
locker_regedit.py
locker_taskmgr.py
MemDump
memdump_urls.py
memdump_yara.py
Modify
modifies_certs.py
modifies_proxies.py
modifies_seccenter.py
modifies_uac_notify.py
modifies_wallpaper.py
modifies_zoneid.py
NetWork
network_rdp_mutex.py
network_service_mirc.py
network_tor.py
network_tor_service.py
network_urlshort_cn.py
network_vnc_mutex.py
Office
office.py
# Copyright (C) 2016 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import ntpath
import re
from lib.cuckoo.common.abstracts import Signature
network_objects = [
"microsoft.xmlhttp",
"msxml2.serverxmlhttp",
"msxml2.xmlhttp",
"msxml2.serverxmlhttp.6.0",
"winhttp.winhttprequest.5.1",
]
class OfficeCreateObject(Signature):
name = "office_create_object"
description = "Creates suspicious VBA object"
severity = 3
categories = ["vba"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203"]
filter_apinames = "vbe6_CreateObject", "vbe6_GetObject"
objects = {
"adodb.stream": "file",
"scripting.filesystemobject": "file",
"shell.application": "process",
"wscript.shell": "process",
}
# Include all globally defined network objects.
objects.update(dict((_, "network") for _ in network_objects))
descriptions = {
"network": "May attempt to connect to the outside world",
"file": "May attempt to write one or more files to the harddisk",
"process": "May attempt to create new processes",
}
def on_call(self, call, process):
objname = call["arguments"]["object_name"]
if objname.lower() not in self.objects:
return
description = self.descriptions[self.objects[objname.lower()]]
self.mark_ioc("com_class", objname, description)
return True
class OfficeCheckProjectName(Signature):
name = "office_check_project_name"
description = "Office checks VB project name"
severity = 1
categories = ["vba"]
authors = ["FDD", "Cuckoo Sandbox"]
minimum = "2.0"
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
if call["arguments"]["funcname"] != "macroname":
return
self.mark_call()
return True
class OfficeCountDirectories(Signature):
name = "office_count_dirs"
description = "Office document invokes CountDirectories (possible anti-sandbox)"
severity = 2
categories = ["vba"]
authors = ["FDD @ Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203"]
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
if call["arguments"]["funcname"] != "CountDirectories":
return
self.mark_call()
return True
class OfficeCheckVersion(Signature):
name = "office_appinfo_version"
description = "Office document checks Office version (possible anti-sandbox)"
severity = 2
categories = ["vba"]
authors = ["FDD", "Cuckoo Technologies"]
minimum = "2.0"
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
if "args" not in call["arguments"]:
return
if call["arguments"]["funcname"] != "AppInfo":
return
if call["arguments"]["args"][0] != 2:
return
self.mark_call()
return True
class OfficeCheckWindow(Signature):
name = "office_check_window"
description = "Office document checks Office window size (possible anti-sandbox)"
severity = 2
categories = ["vba"]
authors = ["FDD @ Cuckoo Technologies"]
minimum = "2.0"
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
if "args" not in call["arguments"]:
return
if call["arguments"]["funcname"] != "AppInfo":
return
if call["arguments"]["args"][0] != 7:
return
self.mark_call()
return True
class OfficeHttpRequest(Signature):
name = "office_http_request"
description = "Office document performs HTTP request (possibly to download malware)"
severity = 5
categories = ["vba"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203", "T1071"]
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
# This checks if this instance method invocation belongs to
# a known network class (e.g., "MSXML2.XMLHTTP").
if call["flags"].get("this", "").lower() not in network_objects:
return
# The .Open method specifies the URL.
if call["arguments"]["funcname"] != "Open":
return
# Usually ["GET", "url", False].
if len(call["arguments"]["args"]) == 3:
self.mark_ioc("payload_url", call["arguments"]["args"][1])
return True
class OfficeRecentFiles(Signature):
name = "office_recent_files"
description = "Uses RecentFiles to determine whether it is running in a sandbox"
severity = 4
categories = ["vba"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
filter_apinames = "vbe6_Invoke",
def on_call(self, call, process):
if call["arguments"]["funcname"] == "RecentFiles":
self.mark_call()
return True
class HasOfficeEps(Signature):
name = "has_office_eps"
description = "Located potentially malicious Encapsulated Post Script (EPS) file"
severity = 3
categories = ["office"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if office.get("eps", []):
return True
class OfficeIndirectCall(Signature):
name = "office_indirect_call"
description = "Office document has indirect calls"
severity = 1
categories = ["office"]
authors = ["FDD @ Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203"]
patterns = [
"CallByName[^\r\n;']*",
]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if "macros" in office:
for macro in office["macros"]:
for pattern in self.patterns:
matches = re.findall(pattern, macro["deobf"])
for match in matches:
self.mark_ioc("Statement", match)
return self.has_marks()
class OfficeCheckName(Signature):
name = "office_check_doc_name"
description = "Office document checks it's own name"
severity = 2
categories = ["office"]
authors = ["FDD", "Cuckoo Technologies"]
minimum = "2.0"
patterns = [
"[^\n\r;']*Me.Name[^\n\r;']*",
]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if "macros" in office:
for macro in office["macros"]:
for pattern in self.patterns:
matches = re.findall(pattern, macro["deobf"])
for match in matches:
self.mark_ioc("Statement", match)
return self.has_marks()
class OfficePlatformDetect(Signature):
name = "office_platform_detect"
description = "Office document tries to detect platform"
severity = 2
categories = ["office"]
authors = ["FDD @ Cuckoo Technologies"]
minimum = "2.0"
patterns = [
"#If\s+(?:Not\s+)?Win32",
"#If\s+Mac\s*=\s(?:1|0)"
]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if "macros" in office:
for macro in office["macros"]:
for pattern in self.patterns:
matches = re.findall(pattern, macro["deobf"])
for match in matches:
self.mark_ioc("Statement", match)
return self.has_marks()
class DocumentClose(Signature):
name = "document_close"
description = "Word document hooks document close"
severity = 2
categories = ["office"]
authors = ["FDD", "Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1179"]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if "macros" in office:
for macro in office["macros"]:
if "Sub Document_Close()" in macro["deobf"]:
return True
class DocumentOpen(Signature):
name = "document_open"
description = "Word document hooks document open"
severity = 2
categories = ["office"]
authors = ["FDD", "Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1179"]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
if "macros" in office:
for macro in office["macros"]:
if "Sub Document_Open()" in macro["deobf"]:
return True
class OfficeEpsStrings(Signature):
name = "office_eps_strings"
description = "Suspicious keywords embedded in an Encapsulated Post Script (EPS) file"
severity = 3
categories = ["office"]
authors = ["Cuckoo Technologies"]
minimum = "2.0"
keywords = [
"longjmp", "NtCreateEvent", "NtProtectVirtualMemory",
]
def on_complete(self):
office = self.get_results("static", {}).get("office", {})
for s in office.get("eps", []):
if s.strip() in self.keywords:
self.mark_ioc("eps_string", s)
return self.has_marks()
class OfficeVulnerableGuid(Signature):
name = "office_vuln_guid"
description = "GUIDs known to be associated with a CVE were requested (may be False Positive)"
severity = 3
categories = ["office"]
authors = ["Niels Warnars @ Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203"]
bad_guids = {
"BDD1F04B-858B-11D1-B16A-00C0F0283628": "CVE-2012-0158",
"996BF5E0-8044-4650-ADEB-0B013914E99C": "CVE-2012-0158",
"C74190B6-8589-11d1-B16A-00C0F0283628": "CVE-2012-0158",
"9181DC5F-E07D-418A-ACA6-8EEA1ECB8E9E": "CVE-2012-0158",
"1EFB6596-857C-11D1-B16A-00C0F0283628": "CVE-2012-1856",
"66833FE6-8583-11D1-B16A-00C0F0283628": "CVE-2012-1856",
"1EFB6596-857C-11D1-B16A-00C0F0283628": "CVE-2013-3906",
"DD9DA666-8594-11D1-B16A-00C0F0283628": "CVE-2014-1761",
"00000535-0000-0010-8000-00AA006D2EA4": "CVE-2015-0097",
"0E59F1D5-1FBE-11D0-8FF2-00A0D10038BC": "CVE-2015-0097",
"05741520-C4EB-440A-AC3F-9643BBC9F847": "CVE-2015-1641",
"A08A033D-1A75-4AB6-A166-EAD02F547959": "CVE-2015-1641",
"F4754C9B-64F5-4B40-8AF4-679732AC0607": "CVE-2015-1641",
"4C599241-6926-101B-9992-00000B65C6F9": "CVE-2015-2424",
"44F9A03B-A3EC-4F3B-9364-08E0007F21DF": "CVE-2015-2424",
}
def on_complete(self):
summary = self.get_results("behavior", {}).get("summary", {})
for guid in summary.get("guid", []):
if guid.upper() in self.bad_guids:
self.mark_ioc("cve", self.bad_guids[guid.upper()])
return self.has_marks()
class OfficeVulnModules(Signature):
name = "office_vuln_modules"
description = "Libraries known to be associated with a CVE were requested (may be False Positive)"
severity = 3
categories = ["office"]
authors = ["Niels Warnars @ Cuckoo Technologies"]
minimum = "2.0"
ttp = ["T1203"]
bad_modules = {
"ogl.dll": "CVE-2013-3906",
"oart.dll": "CVE-2013-3906",
"packager.dll": "CVE-2014-4114/6352",
"olkloadr.dll": "CVE-2015-1641",
"epsimp32.flt": "CVE-2015-2545",
}
def on_complete(self):
summary = self.get_results("behavior", {}).get("summary", {})
for module in summary.get("dll_loaded", []):
module = ntpath.split(module)[1]
if module.lower() in self.bad_modules:
self.mark_ioc("cve", self.bad_modules[module.lower()])
return self.has_marks()
office_packager.py
office_rtf.py
# Copyright (C) 2018 Kevin Ross
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lib.cuckoo.common.abstracts import Signature
class RTFUnknownVersion(Signature):
name = "rtf_unknown_version"
description = "RTF file has an unknown version"
severity = 2
categories = ["office"]
authors = ["Kevin Ross"]
minimum = "2.0"
def on_complete(self):
target = self.get_results("target", {})
filetype = target.get("file", {}).get("type") or ""
name = target.get("file", {}).get("name")
if "Rich Text Format data" in filetype and "unknown version" in filetype:
self.mark(
filename=name,
filetype_details=filetype,
)
for dropped in self.get_results("dropped", []):
if "filepath" in dropped:
droppedtype = dropped["type"]
droppedname = dropped["name"]
if "Rich Text Format data" in droppedtype and "unknown version" in droppedtype:
self.mark(
dropped_filename=droppedname,
dropped_filetype_details=filetype,
)
return self.has_marks()
class RTFCharacterSet(Signature):
name = "rtf_unknown_character_set"
description = "RTF file has an unknown character set"
severity = 2
categories = ["office"]
authors = ["Kevin Ross"]
minimum = "2.0"
def on_complete(self):
target = self.get_results("target", {})
filetype = target.get("file", {}).get("type") or ""
name = target.get("file", {}).get("name")
if "Rich Text Format data" in filetype and "unknown character set" in filetype:
self.mark(
filename=name,
filetype_details=filetype,
)
for dropped in self.get_results("dropped", []):
if "filepath" in dropped:
droppedtype = dropped["type"]
droppedname = dropped["name"]
if "Rich Text Format data" in droppedtype and "unknown character set" in droppedtype:
self.mark(
dropped_filename=droppedname,
dropped_filetype_details=filetype,
)
return self.has_marks()
Packer
packer_entropy.py
packer_polymorphic.py
packer_upx.py
packer_vmprotect.py
Persistence
persistence_ads.py
persistence_autorun.py
persistence_bootexecute.py
persistence_registry_fileless.py
POS
pos_alina_file.py
pos_alina_url.py
pos_blackpos_url.py
pos_decebal_mutex.py
pos_dexter.py
pos_jackpos_file.py
pos_jackpos_url.py
pos_poscardstealer_url.py
PowerShell
powerfun.py
powershell.py
powershell_reg.py
powerworm.py
Process
process_interest.py
process_needed.py
Ransom
ransom_mutex.py
ransomware_bcdedit.py
ransomware_fileextensions.py
ransomware_filemodications.py
ransomware_files.py
ransomware_message.py
ransomware_recyclebin.py
ransomware_shadowcopy.py
ransomware_viruscoder.py
ransomware_wbadmin.py
RAT
rat_adzok.py
rat_bandook.py
rat_beastdoor.py
rat_beebus_mutex.py
rat_bifrose.py
rat_blackhole.py
rat_blackice.py
rat_blackshades.py
rat_bladabindi.py
rat_bottilda.py
rat_bozok.py
rat_buzus.py
rat_comRAT.py
rat_cybergate.py
rat_darkcloud.py
rat_darkshell.py
rat_delf.py
rat_dibik.py
rat_evilbot.py
rat_farfli.py
rat_fexel_ip.py
rat_flystudio.py
rat_fynloski.py
rat_ghostbot.py
rat_hesperbot.py
rat_hikit.py
rat_hupigon.py
rat_icepoint.py
rat_jewdo.py
rat_jorik.py
rat_karakum.py
rat_koutodoor.py
rat_kuluoz.py
rat_likseput.py
rat_madness.py
rat_madness_url.py
rat_magania_mutex.py
rat_minerbot.py
rat_mybot.py
rat_naid_ip.py
rat_nakbot.py
rat_netobserve.py
rat_netshadow.py
rat_netwire.py
rat_nitol.py
rat_njrat.py
rat_pasta.py
rat_pcclient.py
rat_plugx.py
rat_poebot.py
rat_poisonivy.py
rat_qakbot.py
rat_rbot.py
rat_renos.py
rat_sadbot.py
rat_senna.py
rat_shadowbot.py
rat_siggen.py
rat_spynet.py
rat_spyrecorder.py
rat_staser.py
rat_swrort.py
rat_teamviewer.py
rat_travnet.py
rat_trogbot.py
rat_turkojan.py
rat_urlspy.py
rat_urxbot.py
rat_vertexnet.py
rat_vertexnet_url.py
rat_wakbot.py
rat_xtreme.py
rat_zegost.py
Recon
recon_beacon.py
recon_checkip.py
recon_fingerprint.py
recon_programs.py
recon_systeminfo.py
RootKit
# Copyright (C) 2010-2015 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# This signature was contributed by RedSocks - http://redsocks.nl
# See the file 'docs/LICENSE' for copying permission.
from lib.cuckoo.common.abstracts import Signature
class BlackEnergyMutexes(Signature):
name = "blackenergy_mutexes"
description = "Creates known BlackEnergy Rootkit mutexes"
severity = 3
categories = ["rootkit"]
families = ["blackenergy"]
authors = ["RedSocks"]
minimum = "2.0"
mutexes_re = [
".*\\{CD56173D-1A7D-4E99-8109-A71BB04263DF\\}",
]
def on_complete(self):
for indicator in self.mutexes_re:
match = self.check_mutex(pattern=indicator, regex=True)
if match:
self.mark_ioc("mutex", match)
return self.has_marks()
SMTP
smtp_gmail.py
smtp_live.py
smtp_mailru.py
smtp_yahoo.py
Stealth
stealth_childproc.py
stealth_hiddenextension.py
stealth_hiddenfile.py
stealth_hiddenicons.py
stealth_hidenotifications.py
stealth_systemprocname.py
stealth_window.py
Trojan
trojan_bublik.py
trojan_ceatrg.py
trojan_coinminer.py
trojan_dapato.py
trojan_emotet.py
trojan_jorik.py
trojan_kilim.py
trojan_lethic.py
trojan_lockscreen.py
trojan_mrblack.py
trojan_obfus_mutex.py
trojan_pincav.py
trojan_redosru.py
trojan_rovnix.py
trojan_sysn.py
trojan_tnega_mutex.py
trojan_vbinject.py
trojan_yoddos.py
trojandl_begseabug_mutex.py
trojandl_upatre_mutex.py
Virus
vir_andromeda.py
vir_bagle.py
vir_banload.py
vir_btc.py
vir_c24_url.py
vir_cryptolocker.py
vir_ddos556.py
vir_decay.py
vir_dofoil.py
vir_dyreza.py
vir_expiro.py
vir_fakeav2_mutex.py
vir_fakeav_mutex.py
vir_gaelicum.py
vir_infinity.py
vir_ircbrute.py
vir_isrstealer.py
vir_istealer_url.py
vir_karagany.py
vir_katusha.py
vir_killdisk.py
vir_koobface.py
vir_luder.py
vir_napolar.py
vir_nebuler.py
vir_oldrea.py
vir_perflogger.py
vir_pidief.py
vir_ponfoy.py
vir_pykse.py
vir_ragebot.py
vir_ramnit.py
vir_sharpstealer.py
vir_shiz.py
vir_shylock.py
vir_ufr3.py
vir_upatre.py
vir_virut.py
virus_jeefo_mutex.py
virus_tufik_mutex.py
Windows
windows_console.py
windows_utilities.py
Worm
worm_allaple.py
worm_fesber_mutex.py
worm_kolabc.py
worm_krepper_mutex.py
worm_palevo.py
worm_phorpiex.py
worm_psyokym.py
worm_puce_mutex.py
worm_renocide.py
worm_rungbu.py
worm_runouce_mutex.py
worm_winsxsbot.py
worm_xworm.py
其他
antiemu_wine.py
allocates_rwx.py
antianalysis_detectfile.py
appinit.py
applocker_bypass.py
bad_certs.py
carberp_mutex.py
clears_logs.py
clickfraud.py
credential_dump.py
dde.py
deepfreeze_mutex.py
deletes_executed.py
# Copyright (C) 2014 Optiv Inc. (brad.spengler@optiv.com), Converted 2016 for Cuckoo 2.0
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lib.cuckoo.common.abstracts import Signature
class DeletesExecutedFiles(Signature):
name = "deletes_executed_files"
description = "Deletes executed files from disk"
severity = 3
categories = ["persistence", "stealth"]
authors = ["Optiv", "Kevin Ross"]
minimum = "2.0"
ttp = ["T1070"]
evented = True
def on_complete(self):
processes = []
for process in self.get_results("behavior", {}).get("generic", []):
for cmdline in process.get("summary", {}).get("command_line", []):
processes.append(cmdline)
if processes:
for deletedfile in self.get_files(actions=["file_deleted"]):
if deletedfile in processes[0]:
self.mark_ioc("file", deletedfile)
return self.has_marks()
downloader_cabby.py
dridex_apis.py
driver_load.py
dropper.py
# Copyright (C) 2014 Optiv Inc. (brad.spengler@optiv.com), Updated 2016 for Cuckoo 2.0
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from lib.cuckoo.common.abstracts import Signature
class Dropper(Signature):
name = "dropper"
description = "Drops a binary and executes it"
severity = 2
categories = ["dropper"]
authors = ["Optiv"]
minimum = "2.0"
ttp = ["T1129"]
def __init__(self, *args, **kwargs):
Signature.__init__(self, *args, **kwargs)
self.executed = []
self.exe = False
if self.get_results("target", {}).get("category") == "file":
f = self.get_results("target", {}).get("file", {})
if "PE32 executable" in f.get("type", ""):
self.exe = True
filter_apinames = "CreateProcessInternalW", "ShellExecuteExW"
def on_call(self, call, process):
filepath = call["arguments"]["filepath"]
if filepath not in self.executed:
self.executed.append(filepath)
def on_complete(self):
for executed in self.executed:
for dropped in self.get_results("dropped", []):
if "filepath" in dropped:
filepath = dropped["filepath"]
if executed == filepath:
self.mark_ioc("file", executed)
if not self.exe:
self.severity = 3
return self.has_marks()
class ExeAppData(Signature):
name = "exe_appdata"
description = "Drops an executable to the user AppData folder"
severity = 2
categories = ["dropper", "persistence"]
authors = ["Kevin Ross"]
minimum = "2.0"
ttp = ["T1129"]
def on_complete(self):
for dropped in self.get_results("dropped", []):
if "filepath" in dropped and dropped["type"].startswith("PE32 executable"):
filepath = dropped["filepath"]
if "\\Users\\" in filepath and "\\AppData\\" in filepath:
self.mark_ioc("file", filepath)
return self.has_marks()
emotet_apis.py
emoves_zoneid_ads.py
excel_datalink_files.py
fraud_fakerean.py
hacktool_pwdump_file.py
has_authenticode.py
has_pdb.py
javascript_commandline.py
maldoc.py
martians.py
mining.py
moves_self.py
multiple_ua.py
nymaim_apis.py
origin_langid.py
payload_download.py
pe_features.py
privileges.py
protection_rx.py
raises_exception.py
reads_user_agent.py
self_delete_bat.py
sharing_rghost.py
shellcode.py
sipstun.py
sniffer_winpcap.py
spreading_autoruninf.py
stops_service.py
suspicious_process.py
tapi_mutex.py
terminates_process.py
volatility_sig.py
wmi.py
url_file.py