IntermedioPythonphishingemail analysisSOARTheHiveautomatización

Python para Análisis de Email: Automatizar el Triaje de Phishing

Automatización del análisis de correos de phishing con Python: parsing de archivos EML y MSG, extracción de headers, URLs y adjuntos, validación SPF/DKIM/DMARC, construcción de un bot de triaje e integración con TheHive y plataformas SOAR.

MalwareIntel Research··15 min lectura

El desafío del análisis de email en el SOC

Los emails de phishing representan el vector de ataque inicial más común. El 90% de los incidentes de seguridad comienzan con un correo electrónico malicioso. Un SOC típico recibe decenas o cientos de reportes diarios de usuarios que dicen "este email me parece sospechoso". El analista debe abrir cada uno, revisar headers, verificar URLs, comprobar adjuntos y decidir si es phishing real o un falso positivo. A mano, cada email tarda entre 5 y 15 minutos.

Con Python, el mismo análisis se completa en segundos. Este artículo construye un sistema completo de triaje automatizado de phishing: desde el parsing del archivo .eml hasta la integración con TheHive para gestión de casos.

Parsing de archivos EML y MSG

Python incluye la biblioteca email en su librería estándar, capaz de parsear archivos en formato EML (RFC 5322). Para archivos MSG de Outlook, usamos la biblioteca extract-msg.

"""
email_parser.py — Parser unificado para EML y MSG
"""
import email
from email import policy
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class ParsedEmail:
    """Representación normalizada de un email analizado."""
    subject: str = ""
    sender: str = ""
    sender_ip: str = ""
    reply_to: str = ""
    to: list[str] = field(default_factory=list)
    date: str = ""
    message_id: str = ""
    headers: dict[str, str] = field(default_factory=dict)
    body_text: str = ""
    body_html: str = ""
    urls: list[str] = field(default_factory=list)
    attachments: list[dict] = field(default_factory=list)
    authentication_results: str = ""
    received_chain: list[str] = field(default_factory=list)


def parse_eml(file_path: str) -> ParsedEmail:
    """Parsear un archivo .eml y extraer todos los campos relevantes."""
    path = Path(file_path)
    with open(path, "rb") as f:
        msg = email.message_from_binary_file(f, policy=policy.default)

    parsed = ParsedEmail()
    parsed.subject = msg.get("Subject", "")
    parsed.sender = msg.get("From", "")
    parsed.reply_to = msg.get("Reply-To", "")
    parsed.to = [addr.strip() for addr in msg.get("To", "").split(",")]
    parsed.date = msg.get("Date", "")
    parsed.message_id = msg.get("Message-ID", "")
    parsed.authentication_results = msg.get(
        "Authentication-Results", ""
    )

    # Cadena de Received headers (orden cronologico inverso)
    parsed.received_chain = msg.get_all("Received", [])

    # Extraer IP del primer salto
    if parsed.received_chain:
        first_hop = parsed.received_chain[-1]  # Ultimo = mas antiguo
        parsed.sender_ip = _extract_ip(first_hop)

    # Todos los headers como diccionario
    parsed.headers = {k: v for k, v in msg.items()}

    # Body
    if msg.is_multipart():
        for part in msg.walk():
            content_type = part.get_content_type()
            if content_type == "text/plain":
                parsed.body_text = part.get_content()
            elif content_type == "text/html":
                parsed.body_html = part.get_content()
    else:
        content_type = msg.get_content_type()
        content = msg.get_content()
        if content_type == "text/plain":
            parsed.body_text = content
        else:
            parsed.body_html = content

    # Adjuntos
    for part in msg.iter_attachments():
        attachment = {
            "filename": part.get_filename() or "unknown",
            "content_type": part.get_content_type(),
            "size": len(part.get_content()),
            "data": part.get_content(),  # bytes
        }
        parsed.attachments.append(attachment)

    return parsed


def _extract_ip(received_header: str) -> str:
    """Extraer la primera IP de un header Received."""
    import re
    ip_pattern = r'\[(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]'
    match = re.search(ip_pattern, received_header)
    return match.group(1) if match else ""

Para archivos MSG de Microsoft Outlook:

def parse_msg(file_path: str) -> ParsedEmail:
    """Parsear un archivo .msg de Outlook."""
    import extract_msg

    msg = extract_msg.Message(file_path)
    parsed = ParsedEmail()
    parsed.subject = msg.subject or ""
    parsed.sender = msg.sender or ""
    parsed.to = [msg.to or ""]
    parsed.date = str(msg.date or "")
    parsed.body_text = msg.body or ""
    parsed.body_html = msg.htmlBody or ""

    for attachment in msg.attachments:
        parsed.attachments.append({
            "filename": attachment.longFilename or attachment.shortFilename,
            "content_type": attachment.mimetype or "application/octet-stream",
            "size": len(attachment.data) if attachment.data else 0,
            "data": attachment.data,
        })

    msg.close()
    return parsed

Extracción y análisis de headers

Los headers de email son la huella dactilar del mensaje. Un email legítimo de tu banco tiene headers consistentes. Un phishing, no.

"""
header_analyzer.py — Análisis de headers para detección de phishing
"""
import re
from dataclasses import dataclass


@dataclass
class HeaderAnalysis:
    """Resultado del análisis de headers."""
    sender_domain: str = ""
    envelope_from: str = ""
    reply_to_domain: str = ""
    sender_ip: str = ""
    ip_country: str = ""
    hop_count: int = 0
    spf_result: str = ""
    dkim_result: str = ""
    dmarc_result: str = ""
    anomalies: list[str] = None

    def __post_init__(self):
        if self.anomalies is None:
            self.anomalies = []


def analyze_headers(parsed: "ParsedEmail") -> HeaderAnalysis:
    """Analizar headers para detectar anomalias de phishing."""
    analysis = HeaderAnalysis()

    # Extraer dominio del remitente
    sender_match = re.search(r'@([\w.-]+)', parsed.sender)
    analysis.sender_domain = sender_match.group(1) if sender_match else ""

    # Reply-To diferente del From (tecnica clasica de phishing)
    if parsed.reply_to:
        reply_match = re.search(r'@([\w.-]+)', parsed.reply_to)
        analysis.reply_to_domain = reply_match.group(1) if reply_match else ""
        if (analysis.reply_to_domain and
                analysis.reply_to_domain != analysis.sender_domain):
            analysis.anomalies.append(
                f"Reply-To domain ({analysis.reply_to_domain}) "
                f"differs from sender ({analysis.sender_domain})"
            )

    # Numero de saltos (Received headers)
    analysis.hop_count = len(parsed.received_chain)
    if analysis.hop_count > 10:
        analysis.anomalies.append(
            f"Unusually high hop count: {analysis.hop_count}"
        )

    # Parsear Authentication-Results
    auth_results = parsed.authentication_results.lower()
    for protocol in ["spf", "dkim", "dmarc"]:
        pattern = rf'{protocol}=(\w+)'
        match = re.search(pattern, auth_results)
        if match:
            result = match.group(1)
            setattr(analysis, f"{protocol}_result", result)
            if result in ("fail", "softfail", "none"):
                analysis.anomalies.append(
                    f"{protocol.upper()} validation: {result}"
                )

    # Sender IP
    analysis.sender_ip = parsed.sender_ip

    # Display name spoofing
    display_name = parsed.sender.split("<")[0].strip().strip('"')
    if "@" in display_name:
        analysis.anomalies.append(
            f"Email address in display name (spoofing attempt): "
            f"{display_name}"
        )

    return analysis

Las anomalias que detectamos incluyen: dominios de Reply-To diferentes al From (el atacante quiere que la respuesta llegue a su buzón), fallos de SPF/DKIM/DMARC (el dominio no autoriza al servidor que envió el email), número excesivo de saltos (puede indicar uso de relays abiertos) y direcciones de email dentro del display name (spoofing visual).

Extracción de URLs y verificación de reputación

Las URLs son el payload más común en phishing. El objetivo es extraerlas todas, desofuscarlas y verificar su reputación.

"""
url_analyzer.py — Extracción y verificación de URLs
"""
import re
import hashlib
import urllib.parse
from dataclasses import dataclass

import requests


@dataclass
class URLAnalysis:
    url: str
    domain: str
    is_shortened: bool = False
    final_url: str = ""
    vt_positives: int = 0
    urlhaus_status: str = ""
    risk_score: int = 0


SHORTENERS = {
    "bit.ly", "t.co", "tinyurl.com", "goo.gl",
    "ow.ly", "is.gd", "buff.ly", "rb.gy",
    "cutt.ly", "shorturl.at",
}


def extract_urls(text: str, html: str = "") -> list[str]:
    """Extraer todas las URLs del body del email."""
    combined = f"{text} {html}"

    # Patron para URLs en texto plano
    url_pattern = r'https?://[^\s<>"\')\]}{,]+'
    urls = re.findall(url_pattern, combined)

    # URLs en atributos href de HTML
    href_pattern = r'href=["\']?(https?://[^"\'>\s]+)'
    urls.extend(re.findall(href_pattern, html))

    # Deduplicar manteniendo orden
    seen = set()
    unique = []
    for url in urls:
        url = url.rstrip(".")  # Limpiar puntos finales
        if url not in seen:
            seen.add(url)
            unique.append(url)

    return unique


def analyze_url(url: str, vt_api_key: str = "") -> URLAnalysis:
    """Analizar una URL individual: resolver, verificar reputacion."""
    parsed = urllib.parse.urlparse(url)
    analysis = URLAnalysis(
        url=url,
        domain=parsed.netloc,
    )

    # Detectar acortadores
    domain_base = parsed.netloc.lower().lstrip("www.")
    if domain_base in SHORTENERS:
        analysis.is_shortened = True
        try:
            resp = requests.head(
                url, allow_redirects=True, timeout=5
            )
            analysis.final_url = resp.url
        except requests.RequestException:
            analysis.final_url = "ERROR: Could not resolve"

    # Verificar en URLhaus (abuse.ch, gratis, sin API key)
    try:
        resp = requests.post(
            "https://urlhaus-api.abuse.ch/v1/url/",
            data={"url": url},
            timeout=10,
        )
        if resp.status_code == 200:
            data = resp.json()
            analysis.urlhaus_status = data.get(
                "query_status", "unknown"
            )
            if data.get("query_status") == "listed":
                analysis.risk_score += 50
    except requests.RequestException:
        pass

    # Verificar en VirusTotal (requiere API key)
    if vt_api_key:
        url_id = hashlib.sha256(url.encode()).hexdigest()
        try:
            resp = requests.get(
                f"https://www.virustotal.com/api/v3/urls/{url_id}",
                headers={"x-apikey": vt_api_key},
                timeout=10,
            )
            if resp.status_code == 200:
                stats = resp.json().get(
                    "data", {}
                ).get("attributes", {}).get(
                    "last_analysis_stats", {}
                )
                analysis.vt_positives = stats.get("malicious", 0)
                if analysis.vt_positives > 0:
                    analysis.risk_score += min(
                        analysis.vt_positives * 10, 50
                    )
        except requests.RequestException:
            pass

    # Heuristicas adicionales
    if analysis.is_shortened:
        analysis.risk_score += 10

    # Dominio con muchos subdominios (signin.bank.attacker.com)
    subdomain_count = parsed.netloc.count(".")
    if subdomain_count > 3:
        analysis.risk_score += 15

    # Path con keywords sospechosos
    suspicious_paths = [
        "login", "signin", "verify", "update",
        "secure", "account", "confirm", "password",
    ]
    path_lower = parsed.path.lower()
    for kw in suspicious_paths:
        if kw in path_lower:
            analysis.risk_score += 5
            break

    return analysis

El scoring combina señales de múltiples fuentes: reputación en URLhaus y VirusTotal, uso de acortadores, subdominios excesivos y keywords sospechosos en el path. Ninguna señal individual es definitiva, pero la combinación proporciona un score fiable.

Extracción de adjuntos y verificación de hash

Los adjuntos maliciosos (documentos Office con macros, PDFs con exploits, ejecutables renombrados) son el segundo vector de phishing más común.

"""
attachment_analyzer.py — Hash y verificación de adjuntos
"""
import hashlib
from dataclasses import dataclass

import requests


DANGEROUS_EXTENSIONS = {
    ".exe", ".scr", ".bat", ".cmd", ".ps1", ".vbs", ".js",
    ".wsf", ".hta", ".msi", ".dll", ".com", ".pif",
    ".docm", ".xlsm", ".pptm",  # Office con macros
    ".iso", ".img", ".vhd",     # Contenedores de disco
    ".lnk",                      # Shortcuts maliciosos
}

SUSPICIOUS_EXTENSIONS = {
    ".doc", ".xls", ".ppt",     # Office legacy (pueden tener macros)
    ".pdf", ".rtf",             # Pueden contener exploits
    ".zip", ".rar", ".7z",     # Archivos comprimidos
    ".html", ".htm",            # HTML smuggling
}


@dataclass
class AttachmentAnalysis:
    filename: str
    content_type: str
    size: int
    md5: str
    sha256: str
    extension: str
    is_dangerous: bool = False
    is_suspicious: bool = False
    mb_status: str = ""    # MalwareBazaar
    vt_positives: int = 0
    risk_score: int = 0


def analyze_attachment(
    filename: str,
    data: bytes,
    content_type: str,
    vt_api_key: str = "",
) -> AttachmentAnalysis:
    """Analizar un adjunto: hash, extension, reputacion."""
    ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ""

    analysis = AttachmentAnalysis(
        filename=filename,
        content_type=content_type,
        size=len(data),
        md5=hashlib.md5(data).hexdigest(),
        sha256=hashlib.sha256(data).hexdigest(),
        extension=ext,
        is_dangerous=ext in DANGEROUS_EXTENSIONS,
        is_suspicious=ext in SUSPICIOUS_EXTENSIONS,
    )

    if analysis.is_dangerous:
        analysis.risk_score += 40
    elif analysis.is_suspicious:
        analysis.risk_score += 15

    # Doble extension (documento.pdf.exe)
    parts = filename.rsplit(".", 2)
    if len(parts) > 2:
        analysis.risk_score += 25
        analysis.is_dangerous = True

    # Verificar hash en MalwareBazaar (gratis)
    try:
        resp = requests.post(
            "https://mb-api.abuse.ch/api/v1/",
            data={
                "query": "get_info",
                "hash": analysis.sha256,
            },
            timeout=10,
        )
        if resp.status_code == 200:
            result = resp.json()
            analysis.mb_status = result.get(
                "query_status", "unknown"
            )
            if result.get("query_status") == "hash_found":
                analysis.risk_score += 50
    except requests.RequestException:
        pass

    # Verificar en VirusTotal
    if vt_api_key:
        try:
            resp = requests.get(
                f"https://www.virustotal.com/api/v3/files/{analysis.sha256}",
                headers={"x-apikey": vt_api_key},
                timeout=10,
            )
            if resp.status_code == 200:
                stats = resp.json().get(
                    "data", {}
                ).get("attributes", {}).get(
                    "last_analysis_stats", {}
                )
                analysis.vt_positives = stats.get("malicious", 0)
                if analysis.vt_positives > 0:
                    analysis.risk_score += min(
                        analysis.vt_positives * 5, 50
                    )
        except requests.RequestException:
            pass

    return analysis

La doble extension es una técnica clásica: el archivo se llama factura.pdf.exe y Windows oculta la extension real mostrando solo factura.pdf. En nuestro análisis, cualquier archivo con más de una extension recibe una penalización alta.

Validación SPF, DKIM y DMARC

SPF, DKIM y DMARC son los tres pilares de la autenticación de email. Si fallan, el email probablemente no fue enviado por quien dice ser.

"""
auth_validator.py — Validación de autenticación de email
"""
import dns.resolver
import re
from dataclasses import dataclass


@dataclass
class AuthValidation:
    spf_record: str = ""
    spf_pass: bool = False
    dkim_selector: str = ""
    dkim_valid: bool = False
    dmarc_record: str = ""
    dmarc_policy: str = ""
    summary: str = ""
    risk_score: int = 0


def validate_email_auth(
    sender_domain: str,
    sender_ip: str = "",
    auth_results_header: str = "",
) -> AuthValidation:
    """Validar SPF, DKIM y DMARC para un dominio remitente."""
    validation = AuthValidation()

    # SPF: consultar registro TXT
    try:
        answers = dns.resolver.resolve(sender_domain, "TXT")
        for rdata in answers:
            txt = rdata.to_text().strip('"')
            if txt.startswith("v=spf1"):
                validation.spf_record = txt
                # Verificar si la IP esta autorizada
                if sender_ip and sender_ip in txt:
                    validation.spf_pass = True
                elif "~all" in txt or "-all" in txt:
                    validation.spf_pass = False
                break
    except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
            dns.resolver.NoNameservers):
        validation.risk_score += 15

    # DMARC: consultar _dmarc.dominio
    try:
        answers = dns.resolver.resolve(
            f"_dmarc.{sender_domain}", "TXT"
        )
        for rdata in answers:
            txt = rdata.to_text().strip('"')
            if txt.startswith("v=DMARC1"):
                validation.dmarc_record = txt
                policy_match = re.search(r'p=(\w+)', txt)
                if policy_match:
                    validation.dmarc_policy = policy_match.group(1)
                break
    except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
            dns.resolver.NoNameservers):
        validation.risk_score += 10

    # Parsear Authentication-Results del header (ya evaluado por MTA)
    if auth_results_header:
        ar_lower = auth_results_header.lower()
        if "dkim=pass" in ar_lower:
            validation.dkim_valid = True
        if "spf=pass" in ar_lower:
            validation.spf_pass = True

    # Scoring basado en resultados
    if not validation.spf_pass:
        validation.risk_score += 20
    if not validation.dkim_valid:
        validation.risk_score += 15
    if validation.dmarc_policy == "none":
        validation.risk_score += 5
    elif not validation.dmarc_record:
        validation.risk_score += 10

    # Resumen
    spf_status = "PASS" if validation.spf_pass else "FAIL"
    dkim_status = "PASS" if validation.dkim_valid else "FAIL"
    dmarc_status = validation.dmarc_policy or "NONE"
    validation.summary = (
        f"SPF={spf_status} DKIM={dkim_status} DMARC={dmarc_status}"
    )

    return validation

Un email con SPF=FAIL, DKIM=FAIL y sin DMARC es casi con certeza falsificado. Pero cuidado con los falsos positivos: muchas empresas legítimas tienen configuraciones de email incorrectas. El score de autenticación debe ser un factor más, no el único criterio.

Construir un bot de triaje de phishing

Ahora combinamos todos los módulos en un pipeline completo que procesa un email y genera un veredicto con score.

"""
phishing_triage.py — Pipeline completo de triaje de phishing
"""
from dataclasses import dataclass
from enum import Enum


class Verdict(Enum):
    CLEAN = "clean"
    SUSPICIOUS = "suspicious"
    MALICIOUS = "malicious"


@dataclass
class TriageResult:
    email_subject: str
    sender: str
    verdict: Verdict
    total_score: int
    header_analysis: "HeaderAnalysis"
    url_analyses: list["URLAnalysis"]
    attachment_analyses: list["AttachmentAnalysis"]
    auth_validation: "AuthValidation"
    summary: str = ""


# Umbrales configurables
THRESHOLD_CLEAN = 20
THRESHOLD_MALICIOUS = 60


def triage_email(
    file_path: str,
    vt_api_key: str = "",
) -> TriageResult:
    """Ejecutar pipeline completo de triaje sobre un email."""

    # 1. Parsear el email
    if file_path.endswith(".msg"):
        parsed = parse_msg(file_path)
    else:
        parsed = parse_eml(file_path)

    # 2. Analizar headers
    header_analysis = analyze_headers(parsed)

    # 3. Extraer y analizar URLs
    urls = extract_urls(parsed.body_text, parsed.body_html)
    url_analyses = [
        analyze_url(url, vt_api_key) for url in urls[:20]
    ]  # Limitar a 20 URLs

    # 4. Analizar adjuntos
    attachment_analyses = [
        analyze_attachment(
            att["filename"], att["data"],
            att["content_type"], vt_api_key,
        )
        for att in parsed.attachments
    ]

    # 5. Validar autenticacion
    auth_validation = validate_email_auth(
        header_analysis.sender_domain,
        header_analysis.sender_ip,
        parsed.authentication_results,
    )

    # 6. Calcular score total
    total_score = 0
    total_score += len(header_analysis.anomalies) * 10
    total_score += auth_validation.risk_score

    if url_analyses:
        max_url_score = max(u.risk_score for u in url_analyses)
        total_score += max_url_score

    if attachment_analyses:
        max_att_score = max(
            a.risk_score for a in attachment_analyses
        )
        total_score += max_att_score

    # Cap at 100
    total_score = min(total_score, 100)

    # 7. Determinar veredicto
    if total_score >= THRESHOLD_MALICIOUS:
        verdict = Verdict.MALICIOUS
    elif total_score >= THRESHOLD_CLEAN:
        verdict = Verdict.SUSPICIOUS
    else:
        verdict = Verdict.CLEAN

    # 8. Generar resumen
    summary_lines = [
        f"Subject: {parsed.subject}",
        f"From: {parsed.sender}",
        f"Score: {total_score}/100 -> {verdict.value}",
        f"Auth: {auth_validation.summary}",
        f"URLs: {len(urls)} found, "
        f"max risk: {max((u.risk_score for u in url_analyses), default=0)}",
        f"Attachments: {len(parsed.attachments)} found",
        f"Anomalies: {', '.join(header_analysis.anomalies) or 'None'}",
    ]

    return TriageResult(
        email_subject=parsed.subject,
        sender=parsed.sender,
        verdict=verdict,
        total_score=total_score,
        header_analysis=header_analysis,
        url_analyses=url_analyses,
        attachment_analyses=attachment_analyses,
        auth_validation=auth_validation,
        summary="\n".join(summary_lines),
    )

Uso desde línea de comandos

if __name__ == "__main__":
    import sys
    import json

    if len(sys.argv) != 2:
        print("Uso: python phishing_triage.py email.eml")
        sys.exit(1)

    result = triage_email(
        sys.argv[1],
        vt_api_key="YOUR_VT_API_KEY",
    )

    print(f"\n{'='*50}")
    print(f"PHISHING TRIAGE REPORT")
    print(f"{'='*50}")
    print(result.summary)
    print(f"{'='*50}")

    if result.verdict == Verdict.MALICIOUS:
        print("[!] ACCION: Escalar a analista N2. Bloquear remitente.")
    elif result.verdict == Verdict.SUSPICIOUS:
        print("[?] ACCION: Revision manual recomendada.")
    else:
        print("[OK] ACCION: Cerrar como benigno.")

Integración con TheHive y SOAR

El resultado del triaje debe alimentar el sistema de gestión de casos del SOC. TheHive es la plataforma open source más utilizada para esto.

"""
thehive_integration.py — Crear casos en TheHive desde el triaje
"""
import requests
from datetime import datetime


class TheHiveClient:
    """Cliente simplificado para TheHive 5 API."""

    def __init__(self, url: str, api_key: str):
        self.url = url.rstrip("/")
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }

    def create_case(self, triage_result: "TriageResult") -> dict:
        """Crear un caso en TheHive a partir del resultado de triaje."""
        severity_map = {
            "clean": 1,        # Low
            "suspicious": 2,   # Medium
            "malicious": 3,    # High
        }

        case_data = {
            "title": (
                f"[Phishing] {triage_result.email_subject[:80]}"
            ),
            "description": triage_result.summary,
            "severity": severity_map.get(
                triage_result.verdict.value, 2
            ),
            "tlp": 2,   # TLP:AMBER
            "pap": 2,   # PAP:AMBER
            "tags": ["phishing", "email", "automated-triage"],
            "flag": triage_result.verdict == Verdict.MALICIOUS,
            "customFields": {
                "phishing-score": {
                    "integer": triage_result.total_score,
                },
                "sender-domain": {
                    "string": triage_result.header_analysis.sender_domain,
                },
                "auth-result": {
                    "string": triage_result.auth_validation.summary,
                },
            },
        }

        resp = requests.post(
            f"{self.url}/api/v1/case",
            json=case_data,
            headers=self.headers,
            timeout=15,
        )
        resp.raise_for_status()
        return resp.json()

    def add_observables(
        self, case_id: str, triage_result: "TriageResult"
    ) -> list[dict]:
        """Añadir IOCs como observables al caso."""
        observables = []

        # URLs maliciosas
        for url_analysis in triage_result.url_analyses:
            if url_analysis.risk_score > 30:
                observables.append({
                    "dataType": "url",
                    "data": url_analysis.url,
                    "message": (
                        f"Risk score: {url_analysis.risk_score}"
                    ),
                    "tlp": 2,
                    "ioc": True,
                    "tags": ["phishing-url"],
                })

        # Hashes de adjuntos
        for att in triage_result.attachment_analyses:
            if att.risk_score > 20:
                observables.append({
                    "dataType": "hash",
                    "data": att.sha256,
                    "message": (
                        f"File: {att.filename} "
                        f"(risk: {att.risk_score})"
                    ),
                    "tlp": 2,
                    "ioc": True,
                    "tags": ["phishing-attachment"],
                })

        # Sender IP
        if triage_result.header_analysis.sender_ip:
            observables.append({
                "dataType": "ip",
                "data": triage_result.header_analysis.sender_ip,
                "message": "Sender IP from Received headers",
                "tlp": 2,
                "ioc": False,
                "tags": ["sender-ip"],
            })

        # Crear observables en TheHive
        results = []
        for obs in observables:
            resp = requests.post(
                f"{self.url}/api/v1/case/{case_id}/observable",
                json=obs,
                headers=self.headers,
                timeout=10,
            )
            if resp.status_code == 201:
                results.append(resp.json())

        return results

Gestión de falsos positivos

El mayor riesgo de la automatización de triaje es el falso positivo. Un email legítimo clasificado como phishing genera trabajo innecesario y erosiona la confianza del equipo en la herramienta.

Estrategias para reducir falsos positivos:

Listas blancas de dominios: mantén una lista de dominios internos y partners de confianza. Si el email pasa SPF/DKIM y el dominio está en la lista blanca, reduce el score significativamente.

TRUSTED_DOMAINS = {
    "empresa.com", "partner.com",
    "microsoft.com", "google.com",
}

def apply_whitelist(analysis: HeaderAnalysis, score: int) -> int:
    """Reducir score si el dominio esta en la lista de confianza."""
    if (analysis.sender_domain in TRUSTED_DOMAINS
            and analysis.spf_result == "pass"
            and analysis.dkim_result == "pass"):
        return max(score - 30, 0)
    return score

Feedback loop: cuando un analista marca un email como falso positivo, registra el patrón para ajustar los umbrales. Con el tiempo, el sistema aprende qué señales son ruido en tu entorno específico.

Tres zonas de decisión: en lugar de un umbral binario, define tres zonas. Score menor de 20: cerrar automáticamente. Score mayor de 60: escalar automáticamente. Entre 20 y 60: cola de revisión manual. Ajusta los umbrales según la tasa de falsos positivos observada.

Recursos

  • Python email library (docs oficiales): documentación de la librería estándar de Python para parsing de emails según RFC 5322. Cubre todos los métodos usados en este artículo.
  • extract-msg (PyPI): biblioteca para parsear archivos .msg de Microsoft Outlook. Alternativa: olefile para acceso de bajo nivel al formato OLE2.
  • dnspython: biblioteca Python para consultas DNS, necesaria para la validación de SPF/DKIM/DMARC consultando registros TXT y CNAME.
  • URLhaus API (abuse.ch): API gratuita para verificar URLs maliciosas. Sin rate limit agresivo, ideal para automatización. Documentación en urlhaus.abuse.ch.
  • MalwareBazaar API (abuse.ch): API gratuita para verificar hashes de archivos contra la base de datos de malware de abuse.ch. Complementaria a VirusTotal.
  • TheHive Project: plataforma open source de gestión de incidentes. La API REST permite crear casos, observables y tareas programáticamente. Documentación en docs.thehive-project.org.
  • RFC 7208 (SPF): estándar que define Sender Policy Framework. Referencia para entender cómo funciona la validación SPF que implementamos.
  • MITRE ATT&CK T1566 (Phishing): referencia de técnicas y subtécnicas de phishing documentadas por MITRE, incluyendo spearphishing attachment (T1566.001) y spearphishing link (T1566.002).

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.