Construir un Framework de Automatización SOC: De Scripts a Plataforma
Guía para construir un framework de automatización SOC desde cero: evolución de scripts ad-hoc a plataforma integrada, arquitectura con colas de mensajes y workers, integración con SIEM, EDR y ticketing, patrones de orquestación, gestión de errores, métricas y decisión build vs buy.
La evolución natural de la automatización SOC
Todo SOC que automatiza pasa por las mismas fases:
Fase 1: Scripts ad-hoc. Un analista escribe un script Python para enriquecer IOCs. Otro escribe uno para parsear logs. Los scripts viven en carpetas personales o en un repositorio desordenado. Funcionan, pero solo quien los escribió sabe cómo usarlos.
Fase 2: Biblioteca compartida. El equipo empieza a compartir scripts. Alguien crea funciones reutilizables para autenticación con APIs, manejo de errores y logging. Los scripts importan estas funciones. Hay un README, pero no documentación formal.
Fase 3: Framework interno. Las funciones compartidas se organizan en un paquete Python con estructura definida. Hay una CLI, configuración centralizada, tests y CI/CD. Nuevos scripts se escriben usando el framework como base.
Fase 4: Plataforma. El framework se convierte en un servicio: tiene una API REST, una cola de tareas, workers distribuidos, una interfaz web y monitorización. Los playbooks se definen como configuración (YAML) y se ejecutan a través de la plataforma.
No todos los SOCs necesitan llegar a la Fase 4. Muchos operan eficientemente en la Fase 3. Lo importante es reconocer en qué fase estás y decidir conscientemente si avanzar.
Arquitectura del framework
Un framework de automatización SOC maduro tiene estos componentes:
┌──────────────┐
│ Frontend │
│ (Dashboard) │
└──────┬───────┘
│
┌──────▼───────┐
│ API Gateway │
│ (FastAPI) │
└──────┬───────┘
│
┌────────────▼────────────┐
│ Message Queue │
│ (Redis / RabbitMQ) │
└─────┬──────┬──────┬────┘
│ │ │
┌─────▼──┐ ┌▼────┐ ┌▼─────┐
│Worker 1│ │W. 2 │ │W. N │
│(Enrich)│ │(IR) │ │(Hunt)│
└─────┬──┘ └┬────┘ └┬─────┘
│ │ │
┌──────────▼──────▼──────▼──────────┐
│ Integration Layer │
│ SIEM │ EDR │ Ticketing │ CTI │ FW │
└───────────────────────────────────-┘
API Gateway
El punto de entrada para todas las interacciones: webhooks del SIEM, llamadas desde la CLI, triggers desde la interfaz web.
"""
api/main.py — API Gateway del framework de automatizacion
"""
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from enum import Enum
import uuid
from datetime import datetime
app = FastAPI(
title="SOC Automation Framework",
version="1.0.0",
)
class TaskPriority(str, Enum):
CRITICAL = "critical"
HIGH = "high"
NORMAL = "normal"
LOW = "low"
class TaskRequest(BaseModel):
"""Solicitud de tarea de automatizacion."""
task_type: str # enrich_ioc, triage_alert, run_playbook
priority: TaskPriority = TaskPriority.NORMAL
parameters: dict = {}
requester: str = "system"
playbook_id: str = "" # Solo para run_playbook
class TaskResponse(BaseModel):
task_id: str
status: str
submitted_at: str
class TaskStatus(BaseModel):
task_id: str
status: str # queued, running, completed, failed
result: dict = {}
submitted_at: str
started_at: str = ""
completed_at: str = ""
duration_seconds: float = 0
error: str = ""
# Almacen en memoria (en produccion: Redis o PostgreSQL)
tasks_store: dict[str, TaskStatus] = {}
@app.post("/api/v1/tasks", response_model=TaskResponse)
async def submit_task(
task: TaskRequest, background_tasks: BackgroundTasks
):
"""Enviar una tarea a la cola de ejecucion."""
task_id = str(uuid.uuid4())
now = datetime.utcnow().isoformat()
# Registrar la tarea
tasks_store[task_id] = TaskStatus(
task_id=task_id,
status="queued",
submitted_at=now,
)
# Encolar la tarea (en produccion: Celery/Redis)
background_tasks.add_task(
execute_task, task_id, task
)
return TaskResponse(
task_id=task_id,
status="queued",
submitted_at=now,
)
@app.get("/api/v1/tasks/{task_id}", response_model=TaskStatus)
async def get_task_status(task_id: str):
"""Consultar el estado de una tarea."""
if task_id not in tasks_store:
raise HTTPException(status_code=404, detail="Task not found")
return tasks_store[task_id]
@app.post("/api/v1/webhooks/siem")
async def siem_webhook(
payload: dict, background_tasks: BackgroundTasks
):
"""Webhook para recibir alertas del SIEM."""
task = TaskRequest(
task_type="triage_alert",
priority=TaskPriority.HIGH,
parameters=payload,
requester="siem",
)
return await submit_task(task, background_tasks)
Cola de mensajes y workers
La cola de mensajes desacopla la recepción de tareas de su ejecución. Esto permite procesar múltiples tareas en paralelo, reintentar las que fallan y priorizar las urgentes.
"""
workers/base.py — Worker base con Celery
"""
from celery import Celery
from celery.utils.log import get_task_logger
import time
app = Celery(
"soc_automation",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
task_acks_late=True, # No perder tareas si el worker muere
worker_prefetch_multiplier=1, # Un task a la vez por worker
task_reject_on_worker_lost=True,
task_time_limit=600, # Max 10 minutos por tarea
task_soft_time_limit=540, # Warning a los 9 minutos
)
# Prioridades (0 = mas alta, 9 = mas baja)
app.conf.task_queue_max_priority = 10
app.conf.task_default_priority = 5
logger = get_task_logger(__name__)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=30,
acks_late=True,
)
def enrich_ioc_task(self, ioc_value: str, ioc_type: str) -> dict:
"""Worker para enriquecimiento de IOCs."""
logger.info(f"Enriching {ioc_type}: {ioc_value}")
try:
result = {}
# Consultar VirusTotal
vt_result = query_virustotal(ioc_value, ioc_type)
result["virustotal"] = vt_result
# Consultar AbuseIPDB (solo para IPs)
if ioc_type == "ip":
abuse_result = query_abuseipdb(ioc_value)
result["abuseipdb"] = abuse_result
# Consultar URLhaus (para URLs y dominios)
if ioc_type in ("url", "domain"):
urlhaus_result = query_urlhaus(ioc_value)
result["urlhaus"] = urlhaus_result
# Calcular score consolidado
result["risk_score"] = calculate_risk_score(result)
return result
except Exception as exc:
logger.error(f"Error enriching {ioc_value}: {exc}")
raise self.retry(exc=exc)
@app.task(
bind=True,
max_retries=2,
acks_late=True,
)
def triage_alert_task(self, alert_data: dict) -> dict:
"""Worker para triaje automatizado de alertas."""
logger.info(
f"Triaging alert: {alert_data.get('alert_id', 'unknown')}"
)
try:
# 1. Extraer IOCs de la alerta
iocs = extract_iocs_from_alert(alert_data)
# 2. Enriquecer cada IOC
enrichments = {}
for ioc in iocs:
result = enrich_ioc_task.delay(
ioc["value"], ioc["type"]
)
enrichments[ioc["value"]] = result.get(timeout=60)
# 3. Determinar severidad
max_score = max(
(e.get("risk_score", 0) for e in enrichments.values()),
default=0,
)
# 4. Generar veredicto
if max_score >= 70:
verdict = "escalate"
action = "Create incident ticket, notify N2 analyst"
elif max_score >= 30:
verdict = "investigate"
action = "Manual review required"
else:
verdict = "close"
action = "Auto-close as false positive"
return {
"alert_id": alert_data.get("alert_id"),
"verdict": verdict,
"risk_score": max_score,
"action": action,
"enrichments": enrichments,
}
except Exception as exc:
logger.error(f"Error triaging alert: {exc}")
raise self.retry(exc=exc)
Capa de integración
La capa de integración abstrae las diferencias entre las APIs de cada herramienta. Un worker no llama directamente a la API de Splunk o CrowdStrike: llama a un conector que normaliza la interfaz.
"""
integrations/base.py — Interfaz base para integraciones
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class IntegrationConfig:
name: str
base_url: str
api_key: str = ""
username: str = ""
password: str = ""
timeout: int = 30
max_retries: int = 3
class BaseIntegration(ABC):
"""Interfaz base que toda integracion debe implementar."""
def __init__(self, config: IntegrationConfig):
self.config = config
self._session = None
@abstractmethod
def health_check(self) -> bool:
"""Verificar conectividad con el servicio."""
pass
@abstractmethod
def get_name(self) -> str:
"""Nombre de la integracion."""
pass
class SIEMIntegration(BaseIntegration):
"""Interfaz para integraciones con SIEM."""
@abstractmethod
def search(
self, query: str, timerange: str, max_results: int
) -> list[dict]:
pass
@abstractmethod
def get_alert(self, alert_id: str) -> dict:
pass
@abstractmethod
def update_alert(
self, alert_id: str, status: str, comment: str
) -> bool:
pass
class EDRIntegration(BaseIntegration):
"""Interfaz para integraciones con EDR."""
@abstractmethod
def get_host(self, host_id: str) -> dict:
pass
@abstractmethod
def isolate_host(self, host_id: str, reason: str) -> bool:
pass
@abstractmethod
def get_detections(
self, host_id: str, timerange: str
) -> list[dict]:
pass
class TicketingIntegration(BaseIntegration):
"""Interfaz para integraciones con ticketing."""
@abstractmethod
def create_ticket(
self, title: str, description: str,
priority: str, labels: list[str],
) -> str:
pass
@abstractmethod
def update_ticket(
self, ticket_id: str, status: str, comment: str
) -> bool:
pass
@abstractmethod
def get_ticket(self, ticket_id: str) -> dict:
pass
class CTIIntegration(BaseIntegration):
"""Interfaz para integraciones con feeds de CTI."""
@abstractmethod
def lookup_ioc(
self, value: str, ioc_type: str
) -> dict:
pass
Implementación concreta: CrowdStrike Falcon
"""
integrations/crowdstrike.py — Conector para CrowdStrike Falcon
"""
import requests
class CrowdStrikeIntegration(EDRIntegration):
"""Integracion con CrowdStrike Falcon via OAuth2."""
def __init__(self, config: IntegrationConfig):
super().__init__(config)
self._token = None
def _authenticate(self):
resp = requests.post(
f"{self.config.base_url}/oauth2/token",
data={
"client_id": self.config.username,
"client_secret": self.config.password,
},
timeout=self.config.timeout,
)
resp.raise_for_status()
self._token = resp.json()["access_token"]
def _headers(self):
if not self._token:
self._authenticate()
return {"Authorization": f"Bearer {self._token}"}
def get_name(self) -> str:
return "CrowdStrike Falcon"
def health_check(self) -> bool:
try:
self._authenticate()
return True
except Exception:
return False
def get_host(self, host_id: str) -> dict:
resp = requests.get(
f"{self.config.base_url}/devices/entities/devices/v2",
headers=self._headers(),
params={"ids": host_id},
timeout=self.config.timeout,
)
resp.raise_for_status()
resources = resp.json().get("resources", [])
return resources[0] if resources else {}
def isolate_host(self, host_id: str, reason: str) -> bool:
resp = requests.post(
f"{self.config.base_url}"
f"/devices/entities/devices-actions/v2",
headers=self._headers(),
params={"action_name": "contain"},
json={"ids": [host_id]},
timeout=self.config.timeout,
)
return resp.status_code == 202
def get_detections(
self, host_id: str, timerange: str
) -> list[dict]:
resp = requests.get(
f"{self.config.base_url}"
f"/detects/queries/detects/v1",
headers=self._headers(),
params={
"filter": f"device.device_id:'{host_id}'",
"limit": 100,
},
timeout=self.config.timeout,
)
resp.raise_for_status()
return resp.json().get("resources", [])
Patrones de orquestación
La orquestación define cómo se coordinan múltiples tareas. Los tres patrones principales:
Secuencial
Cada paso espera al anterior. Usado cuando los pasos dependen de los resultados del paso previo.
async def sequential_playbook(alert: dict) -> dict:
"""Ejecucion secuencial: cada paso depende del anterior."""
# Paso 1: Obtener detalles
host_info = await edr.get_host(alert["host_id"])
# Paso 2: Enriquecer IOCs (depende de host_info)
iocs = extract_iocs(host_info)
enrichments = []
for ioc in iocs:
result = await cti.lookup_ioc(ioc["value"], ioc["type"])
enrichments.append(result)
# Paso 3: Decidir accion (depende de enrichment)
if max(e["risk_score"] for e in enrichments) > 70:
await edr.isolate_host(alert["host_id"], "Auto-isolation")
return {"host": host_info, "enrichments": enrichments}
Paralelo
Múltiples tareas se ejecutan simultáneamente. Usado cuando las tareas son independientes.
import asyncio
async def parallel_enrichment(iocs: list[dict]) -> list[dict]:
"""Enriquecer multiples IOCs en paralelo."""
tasks = [
enrich_single_ioc(ioc["value"], ioc["type"])
for ioc in iocs
]
# Ejecutar todos en paralelo, con timeout por tarea
results = await asyncio.gather(
*tasks, return_exceptions=True
)
# Filtrar errores
valid_results = [
r for r in results if not isinstance(r, Exception)
]
return valid_results
Condicional
El flujo se ramifica según los resultados de un paso anterior.
async def conditional_response(triage_result: dict) -> dict:
"""Respuesta condicional basada en el veredicto del triaje."""
verdict = triage_result["verdict"]
score = triage_result["risk_score"]
if verdict == "malicious" and score >= 90:
# Automatico: aislar + ticket + notificar
await edr.isolate_host(triage_result["host_id"], "Auto")
ticket = await ticketing.create_ticket(
title=f"[CRITICAL] {triage_result['alert_type']}",
description=format_ticket(triage_result),
priority="Highest",
labels=["auto-response", "critical"],
)
await notify_team(
"CRITICAL", triage_result, ticket_id=ticket
)
return {"action": "auto_contained", "ticket": ticket}
elif verdict == "suspicious":
# Semi-automatico: ticket + esperar decision
ticket = await ticketing.create_ticket(
title=f"[SUSPICIOUS] {triage_result['alert_type']}",
description=format_ticket(triage_result),
priority="High",
labels=["needs-review"],
)
await notify_analyst(triage_result, ticket)
return {"action": "pending_review", "ticket": ticket}
else:
# Automatico: cerrar como FP
await siem.update_alert(
triage_result["alert_id"],
status="closed",
comment="Auto-closed: score below threshold",
)
return {"action": "auto_closed"}
Gestión de errores y retry
Un framework de automatización SOC no puede fallar silenciosamente. Los errores deben manejarse explícitamente con circuit breakers, reintentos y fallbacks.
"""
core/resilience.py — Patrones de resiliencia
"""
import time
import functools
from dataclasses import dataclass
@dataclass
class CircuitBreakerState:
failures: int = 0
last_failure: float = 0
is_open: bool = False
threshold: int = 5
recovery_time: int = 60 # segundos
class CircuitBreaker:
"""Circuit breaker para integraciones externas."""
def __init__(
self, name: str, threshold: int = 5,
recovery_time: int = 60,
):
self.name = name
self.state = CircuitBreakerState(
threshold=threshold,
recovery_time=recovery_time,
)
def __call__(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Verificar si el circuito esta abierto
if self.state.is_open:
elapsed = time.time() - self.state.last_failure
if elapsed < self.state.recovery_time:
raise CircuitOpenError(
f"Circuit '{self.name}' is open. "
f"Retry in "
f"{self.state.recovery_time - elapsed:.0f}s"
)
# Intentar recovery (half-open)
self.state.is_open = False
try:
result = func(*args, **kwargs)
# Exito: resetear contador
self.state.failures = 0
return result
except Exception as e:
self.state.failures += 1
self.state.last_failure = time.time()
if self.state.failures >= self.state.threshold:
self.state.is_open = True
raise
return wrapper
class CircuitOpenError(Exception):
pass
def with_retry(max_retries=3, delay=5, backoff=2):
"""Decorator para reintentos con backoff exponencial."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except CircuitOpenError:
raise # No reintentar si el circuito esta abierto
except Exception as e:
if attempt == max_retries:
raise
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
# Uso
@CircuitBreaker("virustotal", threshold=3, recovery_time=120)
@with_retry(max_retries=2, delay=5)
def query_virustotal(ioc: str, ioc_type: str) -> dict:
"""Consultar VirusTotal con circuit breaker y retry."""
# ... implementacion
pass
Métricas y monitorización
Sin métricas, no sabes si el framework funciona. Las métricas clave:
"""
core/metrics.py — Metricas del framework de automatizacion
"""
from prometheus_client import Counter, Histogram, Gauge
import time
# Contadores
tasks_total = Counter(
"soc_tasks_total",
"Total de tareas procesadas",
["task_type", "status"], # enrich_ioc/completed
)
alerts_triaged = Counter(
"soc_alerts_triaged_total",
"Total de alertas triageadas",
["verdict"], # malicious/suspicious/clean
)
integration_calls = Counter(
"soc_integration_calls_total",
"Llamadas a integraciones externas",
["integration", "status"], # virustotal/success
)
# Histogramas (latencia)
task_duration = Histogram(
"soc_task_duration_seconds",
"Duracion de tareas en segundos",
["task_type"],
buckets=[1, 5, 10, 30, 60, 120, 300, 600],
)
enrichment_duration = Histogram(
"soc_enrichment_duration_seconds",
"Duracion de enriquecimiento de IOC",
["source"],
buckets=[0.5, 1, 2, 5, 10, 30],
)
# Gauges
active_workers = Gauge(
"soc_active_workers",
"Workers activos",
)
queue_depth = Gauge(
"soc_queue_depth",
"Tareas en cola pendientes",
["priority"],
)
circuit_breaker_state = Gauge(
"soc_circuit_breaker_open",
"Estado del circuit breaker (1=open, 0=closed)",
["integration"],
)
Dashboard mínimo
Las métricas se exportan a Prometheus y se visualizan en Grafana. Los paneles esenciales:
- Tasks por minuto: tasa de procesamiento general
- Latencia p50/p95/p99: cuánto tarda cada tipo de tarea
- Tasa de error: porcentaje de tareas fallidas
- Queue depth: profundidad de la cola (si crece, los workers no dan abasto)
- Circuit breakers: qué integraciones están degradadas
- MTTR por tipo de alerta: métrica de negocio (mejora real del SOC)
- Alertas auto-cerradas vs escaladas: ratio de automatización efectiva
Competencias del equipo
Construir y mantener un framework de automatización SOC requiere un mix de habilidades que no es habitual en un SOC tradicional:
| Habilidad | Nivel necesario | Quién la aporta |
|---|---|---|
| Python | Intermedio-avanzado | Ingeniero de automatización |
| APIs REST | Intermedio | Analista N2/N3 con interés en desarrollo |
| Docker / contenedores | Básico-intermedio | DevOps o ingeniero de automatización |
| Colas de mensajes (Redis, RabbitMQ) | Básico | Ingeniero de automatización |
| Operaciones SOC | Avanzado | Analistas N2/N3 (definen los playbooks) |
| Administración SIEM | Intermedio | Ingeniero SIEM |
| CI/CD (Git, pipelines) | Básico | Cualquiera del equipo |
El rol de "Security Automation Engineer" o "SOAR Engineer" combina conocimientos de desarrollo de software con operaciones de seguridad. Es el perfil más crítico y el más difícil de encontrar.
Build vs buy
La decisión más importante: ¿construir tu propio framework o comprar un SOAR comercial?
| Factor | Build | Buy |
|---|---|---|
| Coste inicial | Bajo (tiempo del equipo) | Alto (50K-200K EUR/año) |
| Coste de mantenimiento | Medio-alto (20-30% de un FTE) | Bajo (soporte del vendor) |
| Time to value | 3-6 meses | 1-3 meses |
| Personalización | Total | Limitada por el producto |
| Integraciones | Las que construyas | Cientos out-of-the-box |
| Dependencia de vendor | Ninguna | Alta |
| Escalabilidad | La que diseñes | Enterprise-grade |
| Soporte | Tu equipo | 24/7 del vendor |
Build cuando: tu SOC tiene menos de 20 analistas, tienes al menos un ingeniero de automatización, tu stack de seguridad es heterogéneo (muchas herramientas diferentes), necesitas integraciones muy específicas, o el presupuesto no permite un SOAR enterprise.
Buy cuando: tu SOC tiene más de 30 analistas, necesitas time-to-value rápido, el equipo no tiene capacidad de desarrollo, las integraciones out-of-the-box cubren el 80% de tus necesidades, o el compliance requiere un producto certificado.
Híbrido (lo más común): usa una herramienta como n8n o Shuffle para la orquestación visual, y scripts Python propios para la lógica compleja que la herramienta no cubre. Obtienes interfaz visual + flexibilidad de código sin el coste de un SOAR enterprise ni el esfuerzo de construir todo desde cero.
Consideraciones de escalabilidad
A medida que el SOC crece, el framework necesita escalar:
Workers horizontales: la arquitectura con cola de mensajes permite añadir workers simplemente lanzando más instancias del proceso worker. Cada worker consume tareas de la cola de forma independiente.
Priorización: las alertas críticas no pueden esperar detrás de un batch de enriquecimiento. La cola de mensajes debe soportar prioridades para que las tareas urgentes se procesen primero.
Rate limiting por integración: cada API externa tiene sus límites. El framework debe implementar rate limiting per-integration para no saturar a VirusTotal o CrowdStrike. Un semáforo por integración limita las llamadas concurrentes.
Persistencia de resultados: los resultados de enriquecimiento se cachean para evitar llamadas redundantes. Si tres alertas contienen la misma IP, solo se enriquece una vez. Un TTL de 1 hora para IOCs y 24 horas para información de host es razonable.
Observabilidad: cada componente (API, workers, integraciones) exporta métricas a Prometheus. Las alertas operativas (cola saturada, worker caído, circuit breaker abierto) son tan importantes como las alertas de seguridad.
Recursos
- Designing Data-Intensive Applications (Martin Kleppmann, 2017): el libro de referencia para arquitectura de sistemas distribuidos. Los patrones de colas de mensajes, workers y resiliencia que aplicamos en el framework están explicados en profundidad.
- Celery Documentation: documentación oficial de Celery, el sistema de colas de tareas más usado en Python. Cubre workers, scheduling, retry y monitorización.
- FastAPI Documentation: documentación del framework web usado para el API Gateway. Incluye ejemplos de background tasks, webhooks y autenticación.
- Shuffle SOAR: plataforma SOAR open source con enfoque en automatización de seguridad. Buena referencia de arquitectura y patrones de orquestación. GitHub: shuffle/shuffle.
- OASIS CACAO (Collaborative Automated Course of Action Operations): estándar para definir playbooks de seguridad en formato JSON interoperable. Referencia para el formato YAML de nuestros playbooks.
- Google SRE Book (cap. Handling Overload): patrones de resiliencia (circuit breakers, load shedding, graceful degradation) aplicables a la capa de integración del framework.
- Prometheus + Grafana: stack de monitorización open source para las métricas del framework. Prometheus scrapes las métricas expuestas por los workers y FastAPI.
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.