NLP para Threat Intelligence: Extracción Automática de IOCs y TTPs
Cómo aplicar procesamiento de lenguaje natural (NLP) para extraer IOCs, TTPs y entidades de amenaza de informes CTI en texto libre. Named Entity Recognition, clasificación de texto y extracción de relaciones aplicados a Threat Intelligence.
El cuello de botella humano en CTI
Cada día se publican decenas de informes de threat intelligence: papers de Mandiant, blogs de CrowdStrike, advisories de CISA, reports de Recorded Future, análisis de Kaspersky, hilos de Twitter de investigadores independientes. Cada informe contiene IOCs (hashes, IPs, dominios), TTPs (técnicas MITRE ATT&CK), nombres de familias de malware, atribuciones a threat actors y relaciones entre todos ellos.
Un analista CTI experimentado tarda entre 30 y 90 minutos en procesar un informe completo: leer, extraer IOCs, mapear TTPs, identificar relaciones, normalizar nomenclatura y alimentar la plataforma de inteligencia. Con el volumen actual de informes, ningún equipo tiene suficientes analistas para procesarlos todos.
El NLP aplicado a CTI automatiza las partes mecánicas de este proceso: extracción de entidades, clasificación de texto, mapeo de TTPs. El analista se centra en lo que requiere juicio humano: evaluación de confianza, contextualización y decisiones operativas.
Extracción de IOCs con regex
Para IOCs con formato fijo (hashes, IPs, CVEs), regex es la herramienta más precisa. No necesitas un modelo de ML para detectar un hash SHA256: siempre son 64 caracteres hexadecimales.
import re
from dataclasses import dataclass
@dataclass
class ExtractedIOC:
ioc_type: str
value: str
context: str # Texto alrededor del IOC
class IOCRegexExtractor:
"""Extractor de IOCs basado en expresiones regulares."""
PATTERNS = {
"ipv4": re.compile(
r"\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}"
r"(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b"
),
"ipv6": re.compile(
r"\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b"
),
"md5": re.compile(r"\b[0-9a-fA-F]{32}\b"),
"sha1": re.compile(r"\b[0-9a-fA-F]{40}\b"),
"sha256": re.compile(r"\b[0-9a-fA-F]{64}\b"),
"domain": re.compile(
r"\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+"
r"(?:com|net|org|io|xyz|top|info|biz|ru|cn|tk|pw|cc|ws|"
r"onion|bit|to|me|co|uk|de|fr|es|it|nl|br|jp|kr|in)\b"
),
"url": re.compile(
r"https?://[^\s<>\"'\)]+|"
r"hxxps?://[^\s<>\"'\)]+|" # Defanged URLs
r"h\[tt\]ps?://[^\s<>\"'\)]+"
),
"email": re.compile(
r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b"
),
"cve": re.compile(r"\bCVE-\d{4}-\d{4,}\b"),
"mitre_technique": re.compile(r"\bT\d{4}(?:\.\d{3})?\b"),
}
# IPs privadas y reservadas a filtrar
PRIVATE_IP_RANGES = [
re.compile(r"^10\."),
re.compile(r"^172\.(1[6-9]|2\d|3[01])\."),
re.compile(r"^192\.168\."),
re.compile(r"^127\."),
re.compile(r"^0\."),
]
def extract(self, text: str) -> list[ExtractedIOC]:
"""Extrae todos los IOCs del texto."""
# Refang URLs defanged
text_clean = self._refang(text)
results = []
for ioc_type, pattern in self.PATTERNS.items():
for match in pattern.finditer(text_clean):
value = match.group()
# Validar y filtrar
if not self._validate(ioc_type, value):
continue
# Extraer contexto (50 chars antes y después)
start = max(0, match.start() - 50)
end = min(len(text_clean), match.end() + 50)
context = text_clean[start:end].strip()
results.append(ExtractedIOC(
ioc_type=ioc_type,
value=value,
context=context,
))
return self._deduplicate(results)
def _refang(self, text: str) -> str:
"""Convierte IOCs defanged a su formato real."""
text = text.replace("hxxp", "http")
text = text.replace("[.]", ".")
text = text.replace("[:]", ":")
text = text.replace("h[tt]p", "http")
text = re.sub(r"\[at\]", "@", text, flags=re.IGNORECASE)
return text
def _validate(self, ioc_type: str, value: str) -> bool:
"""Valida el IOC extraído y filtra falsos positivos."""
if ioc_type == "ipv4":
# Filtrar IPs privadas
for private_range in self.PRIVATE_IP_RANGES:
if private_range.match(value):
return False
# Filtrar versiones (1.2.3.4 podría ser una versión)
parts = value.split(".")
if all(int(p) < 10 for p in parts):
return False
elif ioc_type in ("md5", "sha1", "sha256"):
# Filtrar hashes que son solo zeros o repeticiones
if len(set(value.lower())) < 3:
return False
elif ioc_type == "domain":
# Filtrar dominios comunes que no son IOCs
common = {
"example.com", "microsoft.com", "google.com",
"github.com", "wikipedia.org", "twitter.com",
}
if value.lower() in common:
return False
return True
def _deduplicate(self, iocs: list[ExtractedIOC]) -> list[ExtractedIOC]:
"""Elimina duplicados manteniendo el primer contexto."""
seen = set()
unique = []
for ioc in iocs:
key = (ioc.ioc_type, ioc.value.lower())
if key not in seen:
seen.add(key)
unique.append(ioc)
return unique
Limitaciones de regex
Regex funciona bien para IOCs con formato estricto. Falla en estos casos:
- Dominios mencionados como referencia, no como IOC. "El informe de CrowdStrike publicado en crowdstrike.com describe..." No es un IOC.
- Hashes en contextos irrelevantes. Commit hashes de Git, checksums de paquetes legítimos.
- IOCs complejos. Mutex names, registry keys, user-agents: no tienen formato fijo.
- Relaciones. Regex extrae valores aislados, no entiende que "el hash X se comunica con el dominio Y".
Para superar estas limitaciones necesitas NLP.
Named Entity Recognition para CTI
NER (Named Entity Recognition) es la tarea de identificar y clasificar entidades con nombre en texto. En CTI, las entidades son: malware families, threat actors, herramientas, TTPs, IOCs contextualizados, sectores objetivo y países.
Entrenando un NER con spaCy
import spacy
from spacy.tokens import DocBin
from spacy.training import Example
import json
# Esquema de entidades CTI
CTI_ENTITIES = [
"MALWARE", # LockBit, Emotet, Cobalt Strike
"THREAT_ACTOR", # APT28, Lazarus, FIN7
"TOOL", # Mimikatz, PowerShell, Metasploit
"TECHNIQUE", # Spearphishing, process injection
"IOC_HASH", # SHA256, MD5 en contexto
"IOC_IP", # IPs con contexto de C2
"IOC_DOMAIN", # Dominios maliciosos
"IOC_URL", # URLs de distribución
"CVE", # CVE-2023-34362
"SECTOR", # Financial, Healthcare, Government
"COUNTRY", # Russia, China, North Korea
]
def prepare_training_data(annotations: list[dict]) -> DocBin:
"""Convierte anotaciones JSON a formato spaCy DocBin."""
nlp = spacy.blank("en")
db = DocBin()
for item in annotations:
text = item["text"]
entities = item["entities"] # [(start, end, label), ...]
doc = nlp.make_doc(text)
ents = []
for start, end, label in entities:
span = doc.char_span(start, end, label=label)
if span is not None:
ents.append(span)
doc.ents = ents
db.add(doc)
return db
# Ejemplo de datos anotados
training_data = [
{
"text": "APT28 deployed a new variant of Fancy Bear backdoor "
"communicating with 185.174.136.204 over port 443.",
"entities": [
(0, 5, "THREAT_ACTOR"),
(31, 51, "MALWARE"),
(73, 89, "IOC_IP"),
]
},
{
"text": "The LockBit 3.0 ransomware exploited CVE-2023-4966 "
"to gain initial access to healthcare organizations.",
"entities": [
(4, 16, "MALWARE"),
(37, 51, "CVE"),
(79, 89, "SECTOR"),
]
},
]
Usando transformers para NER de seguridad
Para mayor precisión, los modelos basados en transformers superan a los modelos estadísticos de spaCy:
from transformers import (
AutoTokenizer,
AutoModelForTokenClassification,
pipeline,
)
class CTINERModel:
"""NER basado en transformers para extracción de entidades CTI."""
def __init__(self, model_name: str = "jackaduma/SecBERT"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForTokenClassification.from_pretrained(
model_name
)
self.ner_pipeline = pipeline(
"ner",
model=self.model,
tokenizer=self.tokenizer,
aggregation_strategy="simple",
)
def extract_entities(self, text: str) -> list[dict]:
"""Extrae entidades CTI del texto."""
# Dividir texto largo en chunks manejables
chunks = self._split_text(text, max_length=512)
all_entities = []
offset = 0
for chunk in chunks:
entities = self.ner_pipeline(chunk)
for ent in entities:
ent["start"] += offset
ent["end"] += offset
all_entities.append(ent)
offset += len(chunk) + 1
return all_entities
def _split_text(self, text: str, max_length: int = 512) -> list[str]:
"""Divide texto en chunks respetando límites de oración."""
sentences = text.split(". ")
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) > max_length:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = sentence
else:
current_chunk += ". " + sentence if current_chunk else sentence
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
Extracción de TTPs y mapeo a MITRE ATT&CK
Mapear texto libre a técnicas MITRE ATT&CK es uno de los problemas más valiosos en NLP para CTI. Un informe puede decir "el actor utilizó spearphishing con adjuntos Office maliciosos" sin mencionar explícitamente T1566.001.
from sentence_transformers import SentenceTransformer
import numpy as np
class TTPMapper:
"""Mapea descripciones textuales a técnicas MITRE ATT&CK."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.techniques: list[dict] = []
self.technique_embeddings: np.ndarray | None = None
def load_attack_techniques(self, attack_json_path: str):
"""Carga técnicas ATT&CK y genera embeddings."""
import json
with open(attack_json_path) as f:
attack_data = json.load(f)
self.techniques = []
descriptions = []
for obj in attack_data["objects"]:
if obj.get("type") == "attack-pattern":
tech = {
"id": next(
(ref["external_id"]
for ref in obj.get("external_references", [])
if ref.get("source_name") == "mitre-attack"),
None
),
"name": obj.get("name", ""),
"description": obj.get("description", ""),
"tactics": [
phase["phase_name"]
for phase in obj.get("kill_chain_phases", [])
],
}
if tech["id"]:
self.techniques.append(tech)
descriptions.append(
f"{tech['name']}: {tech['description'][:500]}"
)
# Generar embeddings de todas las técnicas
self.technique_embeddings = self.model.encode(
descriptions,
show_progress_bar=True,
normalize_embeddings=True,
)
print(f"Cargadas {len(self.techniques)} técnicas ATT&CK")
def map_text_to_ttps(
self,
text: str,
top_k: int = 5,
threshold: float = 0.4
) -> list[dict]:
"""Mapea un fragmento de texto a las técnicas ATT&CK más relevantes."""
text_embedding = self.model.encode(
[text], normalize_embeddings=True
)
# Similitud coseno
similarities = np.dot(
self.technique_embeddings, text_embedding.T
).flatten()
# Top-k por encima del umbral
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for idx in top_indices:
score = float(similarities[idx])
if score >= threshold:
tech = self.techniques[idx]
results.append({
"technique_id": tech["id"],
"technique_name": tech["name"],
"tactics": tech["tactics"],
"confidence": score,
})
return results
# Uso
mapper = TTPMapper()
mapper.load_attack_techniques("enterprise-attack.json")
text = """
The attackers sent phishing emails with macro-enabled Word documents
to employees in the finance department. Upon execution, the macro
downloaded a second-stage payload using PowerShell and established
persistence through a scheduled task.
"""
ttps = mapper.map_text_to_ttps(text)
for ttp in ttps:
print(f" {ttp['technique_id']} - {ttp['technique_name']} "
f"(confidence: {ttp['confidence']:.2f})")
Extracción de relaciones entre entidades
Después de identificar entidades (malware, actores, IOCs), el siguiente paso es extraer las relaciones entre ellas:
class RelationExtractor:
"""Extrae relaciones entre entidades CTI."""
RELATION_PATTERNS = {
"uses": [
r"{ACTOR}\s+(?:used|deployed|leveraged|utilized)\s+{MALWARE}",
r"{MALWARE}\s+(?:was|is)\s+(?:used|deployed)\s+by\s+{ACTOR}",
],
"targets": [
r"{ACTOR}\s+(?:targeted|attacked|compromised)\s+{SECTOR}",
r"{MALWARE}\s+(?:targets|affects)\s+{SECTOR}",
],
"communicates_with": [
r"{MALWARE}\s+(?:communicat|connect|beacon)(?:es|ed|ing)\s+"
r"(?:with|to)\s+{IOC}",
r"C2\s+(?:server|infrastructure)\s+(?:at|on)\s+{IOC}",
],
"exploits": [
r"{MALWARE}\s+(?:exploit|leverag)(?:s|ed|ing)\s+{CVE}",
r"{CVE}\s+(?:was|is)\s+exploited\s+by\s+{MALWARE}",
],
"attributed_to": [
r"{MALWARE}\s+(?:is\s+)?attributed\s+to\s+{ACTOR}",
r"{ACTOR}\s+(?:is\s+)?(?:behind|responsible\s+for)\s+{MALWARE}",
],
}
def extract_relations(
self,
text: str,
entities: list[ExtractedIOC],
) -> list[dict]:
"""Extrae relaciones entre entidades encontradas en el texto."""
relations = []
# Agrupar entidades por tipo
by_type: dict[str, list[str]] = {}
for ent in entities:
by_type.setdefault(ent.ioc_type, []).append(ent.value)
# Para cada par de entidades, buscar patrones de relación
for rel_type, patterns in self.RELATION_PATTERNS.items():
for pattern in patterns:
# Sustituir placeholders por regex de entidades
for source_type, targets_type in self._get_type_pairs(rel_type):
sources = by_type.get(source_type, [])
targets = by_type.get(targets_type, [])
for source in sources:
for target in targets:
filled = pattern.replace(
f"{{{source_type.upper()}}}",
re.escape(source)
).replace(
f"{{{targets_type.upper()}}}",
re.escape(target)
)
if re.search(filled, text, re.IGNORECASE):
relations.append({
"type": rel_type,
"source": source,
"source_type": source_type,
"target": target,
"target_type": targets_type,
})
return relations
def _get_type_pairs(self, rel_type: str) -> list[tuple[str, str]]:
"""Retorna pares de tipos válidos para cada relación."""
pairs = {
"uses": [("THREAT_ACTOR", "MALWARE")],
"targets": [
("THREAT_ACTOR", "SECTOR"),
("MALWARE", "SECTOR"),
],
"communicates_with": [("MALWARE", "IOC_IP"), ("MALWARE", "IOC_DOMAIN")],
"exploits": [("MALWARE", "CVE")],
"attributed_to": [("MALWARE", "THREAT_ACTOR")],
}
return pairs.get(rel_type, [])
Pipeline completo: de informe a inteligencia estructurada
Combinando todos los componentes en un pipeline end-to-end:
@dataclass
class CTIReport:
"""Informe CTI estructurado extraído de texto libre."""
source_text: str
iocs: list[ExtractedIOC]
entities: list[dict]
ttps: list[dict]
relations: list[dict]
confidence: float
class CTIExtractionPipeline:
"""Pipeline completo de extracción de CTI desde texto libre."""
def __init__(self):
self.ioc_extractor = IOCRegexExtractor()
self.ttp_mapper = TTPMapper()
self.relation_extractor = RelationExtractor()
# Cargar modelos
self.ttp_mapper.load_attack_techniques("enterprise-attack.json")
def process(self, text: str) -> CTIReport:
"""Procesa un informe CTI completo."""
# Paso 1: Extraer IOCs con regex
iocs = self.ioc_extractor.extract(text)
# Paso 2: Extraer entidades con NER
entities = self._extract_entities(text)
# Paso 3: Mapear párrafos a TTPs
ttps = self._extract_ttps(text)
# Paso 4: Extraer relaciones
relations = self.relation_extractor.extract_relations(
text, iocs + [
ExtractedIOC(e["entity_group"], e["word"], "")
for e in entities
]
)
# Paso 5: Calcular confianza global
confidence = self._estimate_confidence(iocs, entities, ttps)
return CTIReport(
source_text=text,
iocs=iocs,
entities=entities,
ttps=ttps,
relations=relations,
confidence=confidence,
)
def _extract_ttps(self, text: str) -> list[dict]:
"""Extrae TTPs por párrafo para mayor precisión."""
paragraphs = text.split("\n\n")
all_ttps = []
seen = set()
for para in paragraphs:
if len(para.strip()) < 50:
continue
ttps = self.ttp_mapper.map_text_to_ttps(para, top_k=3)
for ttp in ttps:
if ttp["technique_id"] not in seen:
seen.add(ttp["technique_id"])
ttp["source_paragraph"] = para[:200]
all_ttps.append(ttp)
return all_ttps
def _extract_entities(self, text: str) -> list[dict]:
"""Placeholder para NER (reemplazar con modelo real)."""
# En producción, usar CTINERModel
return []
def _estimate_confidence(
self,
iocs: list,
entities: list,
ttps: list
) -> float:
"""Estima confianza basada en riqueza de extracción."""
score = 0.0
if len(iocs) > 0:
score += 0.3
if len(entities) > 0:
score += 0.3
if len(ttps) > 0:
score += 0.2
if len(iocs) > 5:
score += 0.1
if len(ttps) > 3:
score += 0.1
return min(score, 1.0)
Evaluación del pipeline
Las métricas de evaluación para NLP en CTI son las estándar de NER y extracción de información:
| Componente | Métrica | Valor típico |
|---|---|---|
| IOC extraction (regex) | Precision | 0.95 a 0.99 |
| IOC extraction (regex) | Recall | 0.85 a 0.92 |
| NER malware/actor | F1 | 0.85 a 0.92 |
| TTP mapping | Precision@5 | 0.70 a 0.80 |
| TTP mapping | Recall@5 | 0.55 a 0.65 |
| Relation extraction | F1 | 0.60 a 0.75 |
Las relaciones son lo más difícil de extraer con precisión. La variabilidad del lenguaje natural hace que patrones basados en regex capturen solo una fracción de las relaciones expresadas en texto. Los modelos de extracción de relaciones basados en transformers mejoran significativamente estos números.
Aplicaciones prácticas
Alimentación automática de plataformas CTI
def ingest_report_to_cti_platform(
report: CTIReport,
api_client: CTIPlatformClient
):
"""Ingesta automática de un informe procesado en la plataforma CTI."""
# Crear IOCs con contexto
for ioc in report.iocs:
api_client.create_ioc(
ioc_type=ioc.ioc_type,
value=ioc.value,
confidence=int(report.confidence * 100),
context=ioc.context,
source="NLP Pipeline",
)
# Mapear TTPs
for ttp in report.ttps:
if ttp["confidence"] > 0.6:
api_client.tag_ttp(
technique_id=ttp["technique_id"],
confidence=int(ttp["confidence"] * 100),
)
# Crear relaciones
for rel in report.relations:
api_client.create_relationship(
source=rel["source"],
target=rel["target"],
relationship_type=rel["type"],
)
Monitorización de feeds en tiempo real
El pipeline puede ejecutarse continuamente sobre feeds RSS, blogs de seguridad y canales de Telegram, extrayendo inteligencia en tiempo real sin intervención humana.
Conclusión
El NLP transforma texto no estructurado en inteligencia accionable. La combinación de regex para IOCs con formato fijo, NER para entidades con nombre y sentence embeddings para mapeo de TTPs cubre la mayor parte de las necesidades de extracción automática de CTI.
El paso siguiente natural es incorporar LLMs (modelos de lenguaje grande) para tareas que requieren comprensión profunda del contexto: resumir informes, generar hipótesis de atribución y responder preguntas sobre el corpus de inteligencia acumulada. Eso lo cubrimos en el artículo sobre LLMs en operaciones de seguridad.
Preguntas frecuentes
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.