Automatizacion de Memory Forensics: Scripts, Pipelines y YARA en Memoria
Automatizacion del analisis forense de memoria con scripts Python, Volatility 3 como libreria, VolShell interactivo, pipelines CI/CD de analisis automatico, YARA scanning de volcados de memoria e integracion de memory forensics en el workflow SOC.
Por que automatizar el analisis de memoria
El analisis manual con Volatility 3, como hemos visto en los articulos anteriores, sigue un patron repetitivo: ejecutar pslist, pstree, netscan, cmdline, malfind, dlllist, handles, printkey. Para cada volcado. Cada vez. El analista repite los mismos plugins, busca los mismos patrones de anomalias, y aplica los mismos criterios de deteccion.
Esto grita automatizacion. Un script puede ejecutar todos los plugins en paralelo, aplicar reglas de deteccion de anomalias, generar un informe de triaje, y presentar al analista solo los hallazgos que requieren atencion humana. En un SOC que procesa multiples volcados de memoria por semana, la diferencia entre triaje manual (2 a 4 horas) y triaje automatizado (5 a 15 minutos) es la diferencia entre un equipo saturado y uno funcional.
Volatility 3 como libreria Python
Volatility 3 puede usarse como libreria Python, no solo como herramienta de linea de comandos. Esto permite construir scripts personalizados que ejecutan plugins, procesan resultados y generan informes programaticamente.
Estructura basica de un script
import volatility3
from volatility3.framework import contexts, automagic
from volatility3.framework.configuration import requirements
from volatility3.plugins.windows import pslist, pstree, netscan, malfind
def analyze_dump(dump_path):
"""Ejecuta analisis basico de un volcado de memoria."""
# Crear el contexto
ctx = contexts.Context()
# Configurar la ruta del dump
ctx.config["automagic.LayerStacker.single_location"] = (
"file://" + dump_path
)
# Ejecutar automagic (detectar OS, cargar simbolos)
automagics = automagic.available(ctx)
automagic.run(automagics, ctx, pslist.PsList)
# Ejecutar pslist
plugin = pslist.PsList(ctx, "plugins.PsList")
results = list(plugin.run())
return results
Ejecucion de multiples plugins
def full_triage(dump_path):
"""Triaje completo: procesos, red, inyeccion."""
ctx = setup_context(dump_path)
results = dict(
processes=run_plugin(ctx, pslist.PsList),
tree=run_plugin(ctx, pstree.PsTree),
network=run_plugin(ctx, netscan.NetScan),
injections=run_plugin(ctx, malfind.Malfind),
)
return results
Procesamiento de resultados
Los resultados de los plugins son generadores de tuplas. Cada tupla contiene los campos del plugin:
def find_suspicious_processes(processes):
"""Detecta procesos con anomalias conocidas."""
suspicious = []
# Procesos que solo deben tener una instancia
single_instance = ["lsass.exe", "services.exe", "wininit.exe"]
# Contar instancias
counts = dict()
for proc in processes:
name = proc.ImageFileName
if name not in counts:
counts[name] = 0
counts[name] += 1
# Detectar duplicados
for name in single_instance:
if counts.get(name, 0) > 1:
suspicious.append(
dict(
type="DUPLICATE_CRITICAL_PROCESS",
process=name,
count=counts[name],
severity="CRITICAL",
)
)
return suspicious
VolShell: analisis interactivo
VolShell es el shell interactivo de Volatility. Permite explorar la memoria con comandos Python en un entorno REPL:
vol -f memory.raw windows.volshell
Dentro de VolShell:
# Listar procesos
for proc in self.list_processes():
print(f"PID={proc.UniqueProcessId} Name={proc.ImageFileName}")
# Leer memoria de un proceso
proc = self.process_from_pid(5700)
data = proc.read(0x00400000, 256)
# Desensamblar
self.disassemble(0x00400000, count=20)
# Buscar strings
self.context.layers["memory_layer"].scan(
scanner=volatility3.framework.scanners.RegExScanner(
b"https?://[a-zA-Z0-9./]+"
)
)
VolShell es ideal para:
- Investigaciones ad-hoc que no siguen un patron predecible.
- Verificar hipotesis rapidas sobre una region de memoria.
- Aprender como Volatility accede a las estructuras internas.
- Prototyping de logica que luego se integrara en un script.
YARA scanning de volcados de memoria
Plugin yarascan
vol -f memory.raw yarascan.YaraScan --yara-file /path/to/rules.yar
yarascan aplica reglas YARA contra el contenido de la memoria, mapeando los hallazgos a procesos especificos:
Rule: CobaltStrike_Beacon
Owner: rundll32.exe (PID 5800)
Address: 0x000001f0a500
Reglas YARA para memoria
Las reglas YARA disenadas para analisis de memoria difieren de las de disco. En memoria, el malware esta desempaquetado y ejecutandose, lo que expone strings y patrones que en disco estan ofuscados:
rule Cobalt_Strike_Beacon_Memory
{
meta:
description = "Detecta Cobalt Strike beacon en memoria"
author = "MalwareIntel Research"
strings:
$config1 = "%.4x%.4x%.4x%.4x%.4x%.4x%.4x%.4x"
$config2 = "%s (admin)" wide
$config3 = "beacon.dll" ascii
$pipe = "\\\\.\\pipe\\" ascii
$sleep = "sleeptime" ascii
condition:
3 of them
}
rule Generic_Shellcode_x64
{
meta:
description = "Shellcode x64 generico en memoria"
strings:
$prologue1 = { fc 48 83 e4 f0 }
$prologue2 = { 48 31 c9 65 48 8b }
$api_hash = { 41 51 41 50 52 51 56 48 31 d2 65 }
condition:
any of them
}
Repositorios de reglas YARA para memoria
- YARA-Rules: https://github.com/Yara-Rules/rules. Coleccion comunitaria amplia.
- Elastic YARA Rules: reglas de deteccion del equipo de Elastic Security.
- Malpedia YARA: reglas asociadas a familias documentadas en Malpedia.
- Florian Roth's signature-base: https://github.com/Neo23x0/signature-base. Una de las colecciones mas completas.
- CAPE Community: reglas extraidas del sandbox CAPE.
Script de triaje automatizado
Un script de triaje que ejecuta los analisis mas importantes y genera un informe:
#!/usr/bin/env python3
"""
memory_triage.py - Triaje automatizado de volcados de memoria
Ejecuta plugins clave de Volatility 3 y genera informe de hallazgos
"""
import sys
import json
import hashlib
import subprocess
from pathlib import Path
from datetime import datetime
def run_volatility(dump_path, plugin, extra_args=None):
"""Ejecuta un plugin de Volatility y retorna la salida."""
cmd = ["vol", "-f", str(dump_path), "-r", "json", plugin]
if extra_args:
cmd.extend(extra_args)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600,
)
if result.returncode == 0:
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return None
return None
def check_process_anomalies(processes):
"""Detecta anomalias en la lista de procesos."""
findings = []
# Procesos criticos y sus padres esperados
expected_parents = dict(
services_exe="wininit.exe",
lsass_exe="wininit.exe",
svchost_exe="services.exe",
)
# Procesos que solo deben existir una vez
single_instance = [
"lsass.exe", "services.exe",
"wininit.exe", "csrss.exe",
]
name_counts = dict()
for proc in processes:
name = proc.get("ImageFileName", "")
name_counts[name] = name_counts.get(name, 0) + 1
# Detectar duplicados criticos
if name_counts.get("lsass.exe", 0) > 1:
findings.append(dict(
severity="CRITICAL",
type="DUPLICATE_LSASS",
detail=f"lsass.exe aparece {name_counts['lsass.exe']} veces",
))
return findings
def check_network_anomalies(connections):
"""Detecta conexiones de red sospechosas."""
findings = []
suspicious_processes = [
"powershell.exe", "cmd.exe", "rundll32.exe",
"regsvr32.exe", "mshta.exe", "wscript.exe",
"cscript.exe", "certutil.exe",
]
for conn in connections:
owner = conn.get("Owner", "")
state = conn.get("State", "")
foreign = conn.get("ForeignAddr", "")
# Proceso sospechoso con conexion externa
if (owner.lower() in suspicious_processes
and state == "ESTABLISHED"
and not foreign.startswith(("10.", "192.168.", "172."))):
findings.append(dict(
severity="HIGH",
type="SUSPICIOUS_OUTBOUND",
detail=f"{owner} conectado a {foreign}",
pid=conn.get("PID"),
))
return findings
def generate_report(dump_path, all_findings):
"""Genera informe de triaje."""
report = dict(
timestamp=datetime.utcnow().isoformat(),
dump_file=str(dump_path),
dump_sha256=hashlib.sha256(
Path(dump_path).read_bytes()
).hexdigest() if Path(dump_path).stat().st_size < 1e9 else "TOO_LARGE",
total_findings=len(all_findings),
critical=len([f for f in all_findings if f["severity"] == "CRITICAL"]),
high=len([f for f in all_findings if f["severity"] == "HIGH"]),
findings=all_findings,
)
return report
def main():
dump_path = sys.argv[1]
print(f"[*] Triaje de: {dump_path}")
print("[*] Ejecutando plugins...")
# Ejecutar plugins en secuencia
processes = run_volatility(dump_path, "windows.pslist")
network = run_volatility(dump_path, "windows.netscan")
injections = run_volatility(dump_path, "windows.malfind")
# Analizar resultados
findings = []
if processes:
findings.extend(check_process_anomalies(processes))
if network:
findings.extend(check_network_anomalies(network))
if injections:
for inj in injections:
findings.append(dict(
severity="HIGH",
type="CODE_INJECTION",
detail=f"Codigo inyectado en {inj.get('Process')} (PID {inj.get('PID')})",
pid=inj.get("PID"),
))
# Generar informe
report = generate_report(dump_path, findings)
# Salida
output_file = f"triage_{Path(dump_path).stem}.json"
with open(output_file, "w") as f:
json.dump(report, f, indent=2)
print(f"[*] Informe generado: {output_file}")
print(f"[*] Hallazgos: {report['critical']} CRITICAL, {report['high']} HIGH")
return 1 if report["critical"] > 0 else 0
if __name__ == "__main__":
sys.exit(main())
Pipeline CI/CD de analisis
En un SOC maduro, los volcados de memoria pueden procesarse automaticamente cuando se depositan en un almacenamiento compartido:
Arquitectura del pipeline
[Adquisicion]
|
v
[Storage] -- volcado depositado en bucket S3/MinIO
|
v
[Trigger] -- watcher detecta nuevo fichero
|
v
[Triaje automatico]
|-- volatility plugins basicos
|-- YARA scanning
|-- deteccion de anomalias
|
v
[Informe de triaje] -- JSON con hallazgos
|
v
[Notificacion] -- alerta al analista si hay hallazgos CRITICAL/HIGH
|
v
[Analisis manual] -- analista profundiza en hallazgos
Implementacion con scripts
Un watcher simple que monitoriza un directorio:
#!/usr/bin/env python3
"""
watch_dumps.py - Monitoriza directorio de volcados y lanza triaje
"""
import time
import subprocess
from pathlib import Path
WATCH_DIR = Path("/data/memory-dumps/incoming")
PROCESSED_DIR = Path("/data/memory-dumps/processed")
REPORTS_DIR = Path("/data/memory-dumps/reports")
def process_dump(dump_path):
"""Ejecuta el triaje automatizado."""
print(f"[+] Procesando: {dump_path.name}")
result = subprocess.run(
["python3", "memory_triage.py", str(dump_path)],
capture_output=True,
text=True,
timeout=3600,
)
if result.returncode != 0:
send_alert(
f"CRITICAL findings in {dump_path.name}"
)
# Mover a procesado
dump_path.rename(PROCESSED_DIR / dump_path.name)
def send_alert(message):
"""Envia alerta via webhook."""
# Integrar con Slack, Teams, Telegram, etc.
print(f"[ALERT] {message}")
def main():
WATCH_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
processed = set()
while True:
for dump_file in WATCH_DIR.glob("*.raw"):
if dump_file.name not in processed:
process_dump(dump_file)
processed.add(dump_file.name)
for dump_file in WATCH_DIR.glob("*.lime"):
if dump_file.name not in processed:
process_dump(dump_file)
processed.add(dump_file.name)
time.sleep(30)
if __name__ == "__main__":
main()
Integracion con el workflow SOC
Nivel 1: triaje automatico
El pipeline automatizado ejecuta el triaje y genera un informe con hallazgos clasificados por severidad. El analista SOC N1 recibe la notificacion y revisa el informe. Si hay hallazgos CRITICAL o HIGH, escala a N2.
Nivel 2: analisis manual guiado
El analista N2 usa el informe de triaje como punto de partida. En lugar de ejecutar todos los plugins manualmente, se concentra en los hallazgos que el triaje marco como sospechosos. Profundiza con plugins adicionales, extrae artefactos y construye la timeline.
Nivel 3: analisis profundo
El analista N3 / forensic analyst realiza analisis de rootkits, reverse engineering de codigo extraido, analisis de configuracion de C2, y correlacion con inteligencia de amenazas. Produce el informe forense completo con IOCs y recomendaciones.
Feedback loop
Los IOCs extraidos del analisis de memoria alimentan:
- Las reglas YARA del pipeline (para detectar la misma amenaza en futuros volcados).
- El SIEM (para buscar retroactivamente en logs historicos).
- El EDR (para buscar en otros endpoints).
- La plataforma de threat intelligence (para compartir con la comunidad via STIX/TAXII).
Herramientas complementarias
Orochi
Orochi es una plataforma web colaborativa para analisis de memoria basada en Volatility 3. Permite:
- Subir volcados de memoria y ejecutar plugins desde la interfaz web.
- Compartir resultados entre analistas.
- Mantener un historial de analisis.
- Ejecutar YARA scans desde la interfaz.
URL: https://github.com/LDO-CERT/orochi
VolWeb
Interfaz web para Volatility con visualizaciones interactivas del arbol de procesos, conexiones de red y timeline.
MemProcFS
MemProcFS (Memory Process File System) monta un volcado de memoria como un sistema de ficheros virtual. Los procesos aparecen como carpetas, las DLLs como ficheros, y la informacion de red como ficheros de texto. Esto permite usar herramientas estandar de analisis de ficheros sobre la memoria.
./memprocfs -device memory.raw -mount /mnt/memfs
ls /mnt/memfs/pid/5700/
# cmdline.txt dlls/ handles/ maps/ mem/ threads/
cat /mnt/memfs/pid/5700/cmdline.txt
AVML + Volatility en Docker
Para estandarizar el entorno de analisis:
FROM python:3.12-slim
RUN pip install volatility3 yara-python pycryptodome
RUN mkdir /symbols /dumps /reports
COPY rules/ /rules/
COPY scripts/ /scripts/
ENTRYPOINT ["python3", "/scripts/memory_triage.py"]
docker run -v /data/dumps:/dumps -v /data/reports:/reports \
memory-triage /dumps/WKSTN-FIN-042.raw
Consideraciones de rendimiento
Paralelizacion de plugins
Los plugins de Volatility son independientes entre si. Pueden ejecutarse en paralelo:
from concurrent.futures import ProcessPoolExecutor
plugins_to_run = [
"windows.pslist",
"windows.netscan",
"windows.malfind",
"windows.cmdline",
"windows.dlllist",
]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = dict()
for plugin in plugins_to_run:
future = executor.submit(
run_volatility, dump_path, plugin
)
futures[plugin] = future
results = dict()
for plugin, future in futures.items():
results[plugin] = future.result()
Almacenamiento
Los volcados de memoria son ficheros grandes. Consideraciones:
- Almacenamiento SSD para el dump activo (acelera el analisis significativamente).
- Compresion (zstd, lz4) para almacenamiento a largo plazo.
- Politica de retencion: cuanto tiempo conservar los dumps despues del analisis.
Cache de simbolos
Las tablas de simbolos de Windows se descargan la primera vez. Cachear en un directorio compartido evita descargas repetidas:
export VOLATILITY_SYMBOLS=/shared/volatility-symbols/
Conclusion de la serie
A lo largo de 12 articulos hemos cubierto el ciclo completo del analisis forense de memoria:
- Conceptos fundamentales y por que la memoria es evidencia critica.
- Herramientas y tecnicas de adquisicion para Windows y Linux.
- Instalacion y configuracion de Volatility 3.
- Analisis de procesos y deteccion de anomalias.
- DLLs, handles y tecnicas de inyeccion.
- Artefactos de red y deteccion de comunicaciones C2.
- malfind y deteccion de codigo inyectado.
- Registro de Windows en memoria.
- Deteccion de rootkits de kernel.
- Analisis de memoria Linux.
- Caso de estudio completo.
- Automatizacion y pipelines.
La memoria RAM es el campo de batalla donde el malware vive. Dominar su analisis es una de las habilidades mas valiosas que un profesional de seguridad puede desarrollar. Las herramientas son open source y los volcados de practica estan disponibles en CTFs y labs como MemLabs. La inversion es tiempo y practica.
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Volatility 3: Instalacion, Configuracion y Primeros Pasos
Caso de Estudio: Analisis Forense de Memoria de un Sistema Infectado
Malfind y Deteccion de Code Injection en Memoria con Volatility 3
Adquisicion de Imagen Forense de Disco: dd, dc3dd, FTK Imager y Formato E01
Browser Forensics: Analisis Forense de Chrome y Firefox
Cadena de Custodia Digital: Preservacion de Evidencia Electronica
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.