capa
capa copied to clipboard
Extract indicators (HBI/NBI) around capability detections
Capabilities often have associated host-based and network-based indicators (HBIs and NBIs). Especially these examples (by rule namespaces) come to mind:
-
communication/http
: IPs, domains -
host-interaction/file-system
: file names -
host-interaction/registry
: registry keys and values
We often encounter an HBI or NBI as a string used close around a capability, e.g. as argument to an API call.
It would be worth exploring if we can automatically:
- extract strings potentially related to capabilities
- perform some sanity checks on them (e.g., is it an IP? or is it a file path?)
- output them with the capabilities (e.g. as part of our existing rendering or in a new section highlighting potential indicators)
I suspect this could work very well with in the dynamic analysis flavor, but also for static extraction on basic samples could work quite well.
I agree that it would be interesting to incorporate these things into capa! I'll have a closer look at some capa code and sandbox data and make a proposal for how we can implement some of these features!
The Practical Malware Analysis book lab 03-02.dll may be a good test case here.
Hi @mr-tz here are my current ideas about the web domain extractors. Hope to have it finished up soon - please let me know if you have any questions!
import dnspython
from pathlib import Path
from typing import Generator, Iterator, Dict
from capa.capa import ida, ghidra
from capa.features.address import Address
from capa.features.extractors.base_extractor import FeatureExtractor, FunctionHandle
from capa.features.extractors import pefile, elffile, viv, cape, dotnetfile
import viv_utils
from capa.helpers import get_auto_format
from capa.features.common import (
FORMAT_IDA,
FORMAT_GHIDRA,
FORMAT_PE,
FORMAT_ELF,
FORMAT_VIV,
FORMAT_CAPE,
FORMAT_DOTNET,
)
import re
def default_extract_domain_names(file: Path) -> Iterator[str]:
"""yield web domain regex matches from list of strings"""
# should the following be turned into a constant?
# See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790)
domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$"
for string in get_strings(file):
if re.search(domain_pattern, string):
yield string
def get_strings(file: Path) -> Iterator[str]: # we say ' buf = Path(path).read_bytes()' below - is path str?
'''different extractors implement 'extract_file_strings' in slightly different ways'''
format_ = get_auto_format(file)
if format_ == FORMAT_IDA:
strings, _ = ida.helpers.extract_file_strings()
elif format_ == FORMAT_GHIDRA:
strings, _ = ghidra.helpers.extract_file_strings()
else:
buf = file.read_bytes()
if format_ == FORMAT_PE:
strings, _ = pefile.extract_file_strings(buf)
elif format_ == FORMAT_ELF:
strings, _ = elffile.extract_file_strings(buf)
elif format_ == FORMAT_VIV:
strings, _ = viv.file.extract_file_strings(buf)
elif format_ == FORMAT_CAPE:
strings, _ = cape.file.extract_file_strings(buf)
return strings
def verbose_extract_domain_names(extractor: FeatureExtractor, file: Path) -> Generator[str, None, None]:
"""yield web domain regex matches from list of strings"""
# should the following be turned into a constant?
# See this Stackoverflow post that discusses the parts of this regex (http://stackoverflow.com/a/7933253/433790)
domain_pattern = r"^(?!.{256})(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+(?:[a-z]{1,63}|xn--[a-z0-9]{1,59})$"
domain_counter_dict = {}
for string in get_strings(file):
if re.search(domain_pattern, string):
try:
domain_counter_dict[string] += 1
except KeyError:
domain_counter_dict[string] = 1
for string, total_occurrances in domain_counter_dict:
yield formatted_verbose(extractor, file, string, total_occurrances)
def formatted_verbose(extractor: FeatureExtractor, file: Path, string: str, total_occurrances: int) -> str:
"""
example output:
capa -v suspicious.exe
-----------------------
google.com
|---- IP address:
| |----192.0.0.1
| |----192.0.0.2
|----Protocols used to communicate with google.com: HTTP (1), HTTPS (2)
|----3 occurrances
"""
return (f"{string}\n"
+ f" |---- {ip_address_statement(string)}\n"
+ f" |---- {network_protocol_statement(extractor, file, string)}\n"
+ f" |---- {total_occurrances} occurrances\n")
def ip_address_statement(string: str) -> str:
resolver = dns.resolver.Resolver()
answer = resolver.query(f"{string}", "A")
if len(answer) == 1:
return "IP address: ".join(ip_address for ip_address in answer)
else:
statement = "IP addresses:\n"
counter = 0
for ip_address in answer:
statement.join(f"| |----{ip_address}\n")
counter += 1
if counter = 5:
statement.join(f"| |----{total_ips(string) - 5} IP addresses not shown")
return statement
return statement
def network_protocol_statement(extractor: FeatureExtractor, file: Path, string: str) -> str:
"""get_protocols supports the following protocols: Ftp, Https, Http"""
protocols = get_protocols(extractor, file, string)
if len(protocols) = 1:
return f"Protocol used to communicate with {string}: ".join(f"{protocol} ({count})" for protocol, count in protocols)
else:
statement = f"Protocols used to communicate with {string}:\n"
for protocol, count in protocols:
statement.join(f"| |----{protocol} ({count})\n")
return statement
def get_protocols(extractor: FeatureExtractor, file: Path, domain: str) -> Dict[str, int]:
"""
for every occurrance of 'domain' in the extractor, we see which function (e.g., Windows API)
it is a parameter of
"""
domain_protocols = {}
occurrances = domain_occurrances_in_file(file, domain)
while occurrances > 0:
try:
caller_func = yielded_caller_func_static(extractor, domain, file, 0)
except NotImplementedError: # if StaticExtractor methods are not implemented, we call DynamicExtractor yielder
caller_func = yielded_caller_func_dynamic(extractor, domain, file, 0)
if "Ftp" in caller_func:
increment_protocol(domain_protocols, "FTP")
elif "Https" in caller_func:
increment_protocol(domain_protocols, "HTTPS")
elif "Http" in caller_func and 'Https' not in caller_func:
increment_protocol(domain_protocols, "HTTP")
# elif 'other protocol':
# pass
else:
# Network protocol not found
increment_protocol(domain_protocols, caller_func)
occurrances = occurrances - 1
return domain_protocols # dict of all the protocols used to interact with a domain and number of times each interacts
def domain_occurrances_in_file(file, domain) -> int:
counter = 0
for string in get_strings(file):
if string == domain:
counter += 1
return counter
def increment_protocol(protocols_dict: dict, protocol: str) -> Dict[str, int]:
try:
protocols_dict[protocol] += 1
except KeyError:
protocols_dict[protocol] = 1
return protocols_dict
def yielded_caller_func_static(extractor: FeatureExtractor, target_string: str, file: Path, start_position: Address):
for func in extractor.get_functions():
for feature, addr in func.extract_function_features():
if addr < start_position:
continue
# would function names be stored in any of these locations? could seem to find the answer from code snippets on GitHub
if feature.value == target_string:
if any(['Http', 'Https', 'Ftp']) in get_function_name(func, file):
yield func.inner
else:
try:
yield from yielded_caller_func_static(extractor, func, file, addr)
except StopIteration:
yield "Network protocol not found - please open an issue on GitHub!"
def get_function_name(func: FunctionHandle, file: Path) -> str:
format_ = get_auto_format(file)
if format_ == FORMAT_VIV:
function_name = viv_utils.get_function_name(func.address)
elif format_ == FORMAT_PE:
function_name = pefile.get_function_name(func.address)
elif format_ == FORMAT_DOTNET:
function_name = dotnetfile.get_function_name(func.address)
elif format_ == FORMAT_ELF:
function_name = elffile.get_function_name(func.address)
else:
function_name = 'Problema'
return function_name
def yielded_caller_func_dynamic(extractor: FeatureExtractor, target_string: str, file: Path, start_position: Address):
"""
we look into an extractor to see what APIs operates on a web domain
we loop through processes/threads/calls looking for web domains.
if we find one, we see if the API (that operates on the web domain)
contains a network protocol (e.g., "Http").
most Windows API network management functions contain their protocol in
their name (e.g., "HttpOpenRequestA").
if the API does not contain the network protocol, this function yields from
itself but looks for referenecs to the API name to see if a network management
function operates on this API (e.g., like how "HttpOpenRequestA" operates
on a handle returned by "InternetConnect", which contains a web domain).
"""
for ph in extractor.get_processes():
for th in extractor.get_threads(ph):
for ch in extractor.get_calls(ph, th):
for feature, addr in extractor.extract_call_features(ph, th, ch):
if addr < start_position: # if 'yield from', ignores references to api_name that occur before web domain
continue
if feature.value == target_string:
api_name = extractor.extract_call_features(ph, th, ch)[0][0]
if any(['Http', 'Https', 'Ftp']) in api_name:
yield api_name
else:
try:
yield from yielded_caller_func_dynamic(extractor, api_name, file, addr)
except StopIteration:
yield "Network protocol not found - please open a GitHub issue!"
is there a commit/branch/PR I can comment on inline? It would also be helpful to see some example output.
@mr-tz hopefully by tomorrow! I opened a PR yesterday but deleted it and decided to restructure a couple parts. I'll make sure to include some example output!