AvanzadoNLPThreat IntelligenceIOC extractionNamed Entity RecognitionCTI automation

NLP para Threat Intelligence: Extracción Automática de IOCs y TTPs

Cómo aplicar procesamiento de lenguaje natural (NLP) para extraer IOCs, TTPs y entidades de amenaza de informes CTI en texto libre. Named Entity Recognition, clasificación de texto y extracción de relaciones aplicados a Threat Intelligence.

MalwareIntel Research··11 min lectura
Serie: AI/ML para Malware — Parte 12

El cuello de botella humano en CTI

Cada día se publican decenas de informes de threat intelligence: papers de Mandiant, blogs de CrowdStrike, advisories de CISA, reports de Recorded Future, análisis de Kaspersky, hilos de Twitter de investigadores independientes. Cada informe contiene IOCs (hashes, IPs, dominios), TTPs (técnicas MITRE ATT&CK), nombres de familias de malware, atribuciones a threat actors y relaciones entre todos ellos.

Un analista CTI experimentado tarda entre 30 y 90 minutos en procesar un informe completo: leer, extraer IOCs, mapear TTPs, identificar relaciones, normalizar nomenclatura y alimentar la plataforma de inteligencia. Con el volumen actual de informes, ningún equipo tiene suficientes analistas para procesarlos todos.

El NLP aplicado a CTI automatiza las partes mecánicas de este proceso: extracción de entidades, clasificación de texto, mapeo de TTPs. El analista se centra en lo que requiere juicio humano: evaluación de confianza, contextualización y decisiones operativas.

Extracción de IOCs con regex

Para IOCs con formato fijo (hashes, IPs, CVEs), regex es la herramienta más precisa. No necesitas un modelo de ML para detectar un hash SHA256: siempre son 64 caracteres hexadecimales.

import re
from dataclasses import dataclass

@dataclass
class ExtractedIOC:
    ioc_type: str
    value: str
    context: str  # Texto alrededor del IOC

class IOCRegexExtractor:
    """Extractor de IOCs basado en expresiones regulares."""
    
    PATTERNS = {
        "ipv4": re.compile(
            r"\b(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\.){3}"
            r"(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)\b"
        ),
        "ipv6": re.compile(
            r"\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b"
        ),
        "md5": re.compile(r"\b[0-9a-fA-F]{32}\b"),
        "sha1": re.compile(r"\b[0-9a-fA-F]{40}\b"),
        "sha256": re.compile(r"\b[0-9a-fA-F]{64}\b"),
        "domain": re.compile(
            r"\b(?:[a-zA-Z0-9](?:[a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)+"
            r"(?:com|net|org|io|xyz|top|info|biz|ru|cn|tk|pw|cc|ws|"
            r"onion|bit|to|me|co|uk|de|fr|es|it|nl|br|jp|kr|in)\b"
        ),
        "url": re.compile(
            r"https?://[^\s<>\"'\)]+|"
            r"hxxps?://[^\s<>\"'\)]+|"  # Defanged URLs
            r"h\[tt\]ps?://[^\s<>\"'\)]+"
        ),
        "email": re.compile(
            r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b"
        ),
        "cve": re.compile(r"\bCVE-\d{4}-\d{4,}\b"),
        "mitre_technique": re.compile(r"\bT\d{4}(?:\.\d{3})?\b"),
    }
    
    # IPs privadas y reservadas a filtrar
    PRIVATE_IP_RANGES = [
        re.compile(r"^10\."),
        re.compile(r"^172\.(1[6-9]|2\d|3[01])\."),
        re.compile(r"^192\.168\."),
        re.compile(r"^127\."),
        re.compile(r"^0\."),
    ]
    
    def extract(self, text: str) -> list[ExtractedIOC]:
        """Extrae todos los IOCs del texto."""
        # Refang URLs defanged
        text_clean = self._refang(text)
        
        results = []
        for ioc_type, pattern in self.PATTERNS.items():
            for match in pattern.finditer(text_clean):
                value = match.group()
                
                # Validar y filtrar
                if not self._validate(ioc_type, value):
                    continue
                
                # Extraer contexto (50 chars antes y después)
                start = max(0, match.start() - 50)
                end = min(len(text_clean), match.end() + 50)
                context = text_clean[start:end].strip()
                
                results.append(ExtractedIOC(
                    ioc_type=ioc_type,
                    value=value,
                    context=context,
                ))
        
        return self._deduplicate(results)
    
    def _refang(self, text: str) -> str:
        """Convierte IOCs defanged a su formato real."""
        text = text.replace("hxxp", "http")
        text = text.replace("[.]", ".")
        text = text.replace("[:]", ":")
        text = text.replace("h[tt]p", "http")
        text = re.sub(r"\[at\]", "@", text, flags=re.IGNORECASE)
        return text
    
    def _validate(self, ioc_type: str, value: str) -> bool:
        """Valida el IOC extraído y filtra falsos positivos."""
        if ioc_type == "ipv4":
            # Filtrar IPs privadas
            for private_range in self.PRIVATE_IP_RANGES:
                if private_range.match(value):
                    return False
            # Filtrar versiones (1.2.3.4 podría ser una versión)
            parts = value.split(".")
            if all(int(p) < 10 for p in parts):
                return False
        
        elif ioc_type in ("md5", "sha1", "sha256"):
            # Filtrar hashes que son solo zeros o repeticiones
            if len(set(value.lower())) < 3:
                return False
        
        elif ioc_type == "domain":
            # Filtrar dominios comunes que no son IOCs
            common = {
                "example.com", "microsoft.com", "google.com",
                "github.com", "wikipedia.org", "twitter.com",
            }
            if value.lower() in common:
                return False
        
        return True
    
    def _deduplicate(self, iocs: list[ExtractedIOC]) -> list[ExtractedIOC]:
        """Elimina duplicados manteniendo el primer contexto."""
        seen = set()
        unique = []
        for ioc in iocs:
            key = (ioc.ioc_type, ioc.value.lower())
            if key not in seen:
                seen.add(key)
                unique.append(ioc)
        return unique

Limitaciones de regex

Regex funciona bien para IOCs con formato estricto. Falla en estos casos:

  1. Dominios mencionados como referencia, no como IOC. "El informe de CrowdStrike publicado en crowdstrike.com describe..." No es un IOC.
  2. Hashes en contextos irrelevantes. Commit hashes de Git, checksums de paquetes legítimos.
  3. IOCs complejos. Mutex names, registry keys, user-agents: no tienen formato fijo.
  4. Relaciones. Regex extrae valores aislados, no entiende que "el hash X se comunica con el dominio Y".

Para superar estas limitaciones necesitas NLP.

Named Entity Recognition para CTI

NER (Named Entity Recognition) es la tarea de identificar y clasificar entidades con nombre en texto. En CTI, las entidades son: malware families, threat actors, herramientas, TTPs, IOCs contextualizados, sectores objetivo y países.

Entrenando un NER con spaCy

import spacy
from spacy.tokens import DocBin
from spacy.training import Example
import json

# Esquema de entidades CTI
CTI_ENTITIES = [
    "MALWARE",        # LockBit, Emotet, Cobalt Strike
    "THREAT_ACTOR",   # APT28, Lazarus, FIN7
    "TOOL",           # Mimikatz, PowerShell, Metasploit
    "TECHNIQUE",      # Spearphishing, process injection
    "IOC_HASH",       # SHA256, MD5 en contexto
    "IOC_IP",         # IPs con contexto de C2
    "IOC_DOMAIN",     # Dominios maliciosos
    "IOC_URL",        # URLs de distribución
    "CVE",            # CVE-2023-34362
    "SECTOR",         # Financial, Healthcare, Government
    "COUNTRY",        # Russia, China, North Korea
]

def prepare_training_data(annotations: list[dict]) -> DocBin:
    """Convierte anotaciones JSON a formato spaCy DocBin."""
    nlp = spacy.blank("en")
    db = DocBin()
    
    for item in annotations:
        text = item["text"]
        entities = item["entities"]  # [(start, end, label), ...]
        
        doc = nlp.make_doc(text)
        ents = []
        for start, end, label in entities:
            span = doc.char_span(start, end, label=label)
            if span is not None:
                ents.append(span)
        
        doc.ents = ents
        db.add(doc)
    
    return db

# Ejemplo de datos anotados
training_data = [
    {
        "text": "APT28 deployed a new variant of Fancy Bear backdoor "
                "communicating with 185.174.136.204 over port 443.",
        "entities": [
            (0, 5, "THREAT_ACTOR"),
            (31, 51, "MALWARE"),
            (73, 89, "IOC_IP"),
        ]
    },
    {
        "text": "The LockBit 3.0 ransomware exploited CVE-2023-4966 "
                "to gain initial access to healthcare organizations.",
        "entities": [
            (4, 16, "MALWARE"),
            (37, 51, "CVE"),
            (79, 89, "SECTOR"),
        ]
    },
]

Usando transformers para NER de seguridad

Para mayor precisión, los modelos basados en transformers superan a los modelos estadísticos de spaCy:

from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    pipeline,
)

class CTINERModel:
    """NER basado en transformers para extracción de entidades CTI."""
    
    def __init__(self, model_name: str = "jackaduma/SecBERT"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForTokenClassification.from_pretrained(
            model_name
        )
        self.ner_pipeline = pipeline(
            "ner",
            model=self.model,
            tokenizer=self.tokenizer,
            aggregation_strategy="simple",
        )
    
    def extract_entities(self, text: str) -> list[dict]:
        """Extrae entidades CTI del texto."""
        # Dividir texto largo en chunks manejables
        chunks = self._split_text(text, max_length=512)
        
        all_entities = []
        offset = 0
        
        for chunk in chunks:
            entities = self.ner_pipeline(chunk)
            for ent in entities:
                ent["start"] += offset
                ent["end"] += offset
                all_entities.append(ent)
            offset += len(chunk) + 1
        
        return all_entities
    
    def _split_text(self, text: str, max_length: int = 512) -> list[str]:
        """Divide texto en chunks respetando límites de oración."""
        sentences = text.split(". ")
        chunks = []
        current_chunk = ""
        
        for sentence in sentences:
            if len(current_chunk) + len(sentence) > max_length:
                if current_chunk:
                    chunks.append(current_chunk.strip())
                current_chunk = sentence
            else:
                current_chunk += ". " + sentence if current_chunk else sentence
        
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks

Extracción de TTPs y mapeo a MITRE ATT&CK

Mapear texto libre a técnicas MITRE ATT&CK es uno de los problemas más valiosos en NLP para CTI. Un informe puede decir "el actor utilizó spearphishing con adjuntos Office maliciosos" sin mencionar explícitamente T1566.001.

from sentence_transformers import SentenceTransformer
import numpy as np

class TTPMapper:
    """Mapea descripciones textuales a técnicas MITRE ATT&CK."""
    
    def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.techniques: list[dict] = []
        self.technique_embeddings: np.ndarray | None = None
    
    def load_attack_techniques(self, attack_json_path: str):
        """Carga técnicas ATT&CK y genera embeddings."""
        import json
        
        with open(attack_json_path) as f:
            attack_data = json.load(f)
        
        self.techniques = []
        descriptions = []
        
        for obj in attack_data["objects"]:
            if obj.get("type") == "attack-pattern":
                tech = {
                    "id": next(
                        (ref["external_id"] 
                         for ref in obj.get("external_references", [])
                         if ref.get("source_name") == "mitre-attack"),
                        None
                    ),
                    "name": obj.get("name", ""),
                    "description": obj.get("description", ""),
                    "tactics": [
                        phase["phase_name"]
                        for phase in obj.get("kill_chain_phases", [])
                    ],
                }
                if tech["id"]:
                    self.techniques.append(tech)
                    descriptions.append(
                        f"{tech['name']}: {tech['description'][:500]}"
                    )
        
        # Generar embeddings de todas las técnicas
        self.technique_embeddings = self.model.encode(
            descriptions, 
            show_progress_bar=True,
            normalize_embeddings=True,
        )
        
        print(f"Cargadas {len(self.techniques)} técnicas ATT&CK")
    
    def map_text_to_ttps(
        self, 
        text: str, 
        top_k: int = 5,
        threshold: float = 0.4
    ) -> list[dict]:
        """Mapea un fragmento de texto a las técnicas ATT&CK más relevantes."""
        text_embedding = self.model.encode(
            [text], normalize_embeddings=True
        )
        
        # Similitud coseno
        similarities = np.dot(
            self.technique_embeddings, text_embedding.T
        ).flatten()
        
        # Top-k por encima del umbral
        top_indices = np.argsort(similarities)[::-1][:top_k]
        
        results = []
        for idx in top_indices:
            score = float(similarities[idx])
            if score >= threshold:
                tech = self.techniques[idx]
                results.append({
                    "technique_id": tech["id"],
                    "technique_name": tech["name"],
                    "tactics": tech["tactics"],
                    "confidence": score,
                })
        
        return results

# Uso
mapper = TTPMapper()
mapper.load_attack_techniques("enterprise-attack.json")

text = """
The attackers sent phishing emails with macro-enabled Word documents 
to employees in the finance department. Upon execution, the macro 
downloaded a second-stage payload using PowerShell and established 
persistence through a scheduled task.
"""

ttps = mapper.map_text_to_ttps(text)
for ttp in ttps:
    print(f"  {ttp['technique_id']} - {ttp['technique_name']} "
          f"(confidence: {ttp['confidence']:.2f})")

Extracción de relaciones entre entidades

Después de identificar entidades (malware, actores, IOCs), el siguiente paso es extraer las relaciones entre ellas:

class RelationExtractor:
    """Extrae relaciones entre entidades CTI."""
    
    RELATION_PATTERNS = {
        "uses": [
            r"{ACTOR}\s+(?:used|deployed|leveraged|utilized)\s+{MALWARE}",
            r"{MALWARE}\s+(?:was|is)\s+(?:used|deployed)\s+by\s+{ACTOR}",
        ],
        "targets": [
            r"{ACTOR}\s+(?:targeted|attacked|compromised)\s+{SECTOR}",
            r"{MALWARE}\s+(?:targets|affects)\s+{SECTOR}",
        ],
        "communicates_with": [
            r"{MALWARE}\s+(?:communicat|connect|beacon)(?:es|ed|ing)\s+"
            r"(?:with|to)\s+{IOC}",
            r"C2\s+(?:server|infrastructure)\s+(?:at|on)\s+{IOC}",
        ],
        "exploits": [
            r"{MALWARE}\s+(?:exploit|leverag)(?:s|ed|ing)\s+{CVE}",
            r"{CVE}\s+(?:was|is)\s+exploited\s+by\s+{MALWARE}",
        ],
        "attributed_to": [
            r"{MALWARE}\s+(?:is\s+)?attributed\s+to\s+{ACTOR}",
            r"{ACTOR}\s+(?:is\s+)?(?:behind|responsible\s+for)\s+{MALWARE}",
        ],
    }
    
    def extract_relations(
        self,
        text: str,
        entities: list[ExtractedIOC],
    ) -> list[dict]:
        """Extrae relaciones entre entidades encontradas en el texto."""
        relations = []
        
        # Agrupar entidades por tipo
        by_type: dict[str, list[str]] = {}
        for ent in entities:
            by_type.setdefault(ent.ioc_type, []).append(ent.value)
        
        # Para cada par de entidades, buscar patrones de relación
        for rel_type, patterns in self.RELATION_PATTERNS.items():
            for pattern in patterns:
                # Sustituir placeholders por regex de entidades
                for source_type, targets_type in self._get_type_pairs(rel_type):
                    sources = by_type.get(source_type, [])
                    targets = by_type.get(targets_type, [])
                    
                    for source in sources:
                        for target in targets:
                            filled = pattern.replace(
                                f"{{{source_type.upper()}}}", 
                                re.escape(source)
                            ).replace(
                                f"{{{targets_type.upper()}}}", 
                                re.escape(target)
                            )
                            if re.search(filled, text, re.IGNORECASE):
                                relations.append({
                                    "type": rel_type,
                                    "source": source,
                                    "source_type": source_type,
                                    "target": target,
                                    "target_type": targets_type,
                                })
        
        return relations
    
    def _get_type_pairs(self, rel_type: str) -> list[tuple[str, str]]:
        """Retorna pares de tipos válidos para cada relación."""
        pairs = {
            "uses": [("THREAT_ACTOR", "MALWARE")],
            "targets": [
                ("THREAT_ACTOR", "SECTOR"),
                ("MALWARE", "SECTOR"),
            ],
            "communicates_with": [("MALWARE", "IOC_IP"), ("MALWARE", "IOC_DOMAIN")],
            "exploits": [("MALWARE", "CVE")],
            "attributed_to": [("MALWARE", "THREAT_ACTOR")],
        }
        return pairs.get(rel_type, [])

Pipeline completo: de informe a inteligencia estructurada

Combinando todos los componentes en un pipeline end-to-end:

@dataclass
class CTIReport:
    """Informe CTI estructurado extraído de texto libre."""
    source_text: str
    iocs: list[ExtractedIOC]
    entities: list[dict]
    ttps: list[dict]
    relations: list[dict]
    confidence: float

class CTIExtractionPipeline:
    """Pipeline completo de extracción de CTI desde texto libre."""
    
    def __init__(self):
        self.ioc_extractor = IOCRegexExtractor()
        self.ttp_mapper = TTPMapper()
        self.relation_extractor = RelationExtractor()
        
        # Cargar modelos
        self.ttp_mapper.load_attack_techniques("enterprise-attack.json")
    
    def process(self, text: str) -> CTIReport:
        """Procesa un informe CTI completo."""
        # Paso 1: Extraer IOCs con regex
        iocs = self.ioc_extractor.extract(text)
        
        # Paso 2: Extraer entidades con NER
        entities = self._extract_entities(text)
        
        # Paso 3: Mapear párrafos a TTPs
        ttps = self._extract_ttps(text)
        
        # Paso 4: Extraer relaciones
        relations = self.relation_extractor.extract_relations(
            text, iocs + [
                ExtractedIOC(e["entity_group"], e["word"], "")
                for e in entities
            ]
        )
        
        # Paso 5: Calcular confianza global
        confidence = self._estimate_confidence(iocs, entities, ttps)
        
        return CTIReport(
            source_text=text,
            iocs=iocs,
            entities=entities,
            ttps=ttps,
            relations=relations,
            confidence=confidence,
        )
    
    def _extract_ttps(self, text: str) -> list[dict]:
        """Extrae TTPs por párrafo para mayor precisión."""
        paragraphs = text.split("\n\n")
        all_ttps = []
        seen = set()
        
        for para in paragraphs:
            if len(para.strip()) < 50:
                continue
            ttps = self.ttp_mapper.map_text_to_ttps(para, top_k=3)
            for ttp in ttps:
                if ttp["technique_id"] not in seen:
                    seen.add(ttp["technique_id"])
                    ttp["source_paragraph"] = para[:200]
                    all_ttps.append(ttp)
        
        return all_ttps
    
    def _extract_entities(self, text: str) -> list[dict]:
        """Placeholder para NER (reemplazar con modelo real)."""
        # En producción, usar CTINERModel
        return []
    
    def _estimate_confidence(
        self, 
        iocs: list, 
        entities: list, 
        ttps: list
    ) -> float:
        """Estima confianza basada en riqueza de extracción."""
        score = 0.0
        if len(iocs) > 0:
            score += 0.3
        if len(entities) > 0:
            score += 0.3
        if len(ttps) > 0:
            score += 0.2
        if len(iocs) > 5:
            score += 0.1
        if len(ttps) > 3:
            score += 0.1
        return min(score, 1.0)

Evaluación del pipeline

Las métricas de evaluación para NLP en CTI son las estándar de NER y extracción de información:

ComponenteMétricaValor típico
IOC extraction (regex)Precision0.95 a 0.99
IOC extraction (regex)Recall0.85 a 0.92
NER malware/actorF10.85 a 0.92
TTP mappingPrecision@50.70 a 0.80
TTP mappingRecall@50.55 a 0.65
Relation extractionF10.60 a 0.75

Las relaciones son lo más difícil de extraer con precisión. La variabilidad del lenguaje natural hace que patrones basados en regex capturen solo una fracción de las relaciones expresadas en texto. Los modelos de extracción de relaciones basados en transformers mejoran significativamente estos números.

Aplicaciones prácticas

Alimentación automática de plataformas CTI

def ingest_report_to_cti_platform(
    report: CTIReport, 
    api_client: CTIPlatformClient
):
    """Ingesta automática de un informe procesado en la plataforma CTI."""
    
    # Crear IOCs con contexto
    for ioc in report.iocs:
        api_client.create_ioc(
            ioc_type=ioc.ioc_type,
            value=ioc.value,
            confidence=int(report.confidence * 100),
            context=ioc.context,
            source="NLP Pipeline",
        )
    
    # Mapear TTPs
    for ttp in report.ttps:
        if ttp["confidence"] > 0.6:
            api_client.tag_ttp(
                technique_id=ttp["technique_id"],
                confidence=int(ttp["confidence"] * 100),
            )
    
    # Crear relaciones
    for rel in report.relations:
        api_client.create_relationship(
            source=rel["source"],
            target=rel["target"],
            relationship_type=rel["type"],
        )

Monitorización de feeds en tiempo real

El pipeline puede ejecutarse continuamente sobre feeds RSS, blogs de seguridad y canales de Telegram, extrayendo inteligencia en tiempo real sin intervención humana.

Conclusión

El NLP transforma texto no estructurado en inteligencia accionable. La combinación de regex para IOCs con formato fijo, NER para entidades con nombre y sentence embeddings para mapeo de TTPs cubre la mayor parte de las necesidades de extracción automática de CTI.

El paso siguiente natural es incorporar LLMs (modelos de lenguaje grande) para tareas que requieren comprensión profunda del contexto: resumir informes, generar hipótesis de atribución y responder preguntas sobre el corpus de inteligencia acumulada. Eso lo cubrimos en el artículo sobre LLMs en operaciones de seguridad.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.