Python para Análisis de Email: Automatizar el Triaje de Phishing
Automatización del análisis de correos de phishing con Python: parsing de archivos EML y MSG, extracción de headers, URLs y adjuntos, validación SPF/DKIM/DMARC, construcción de un bot de triaje e integración con TheHive y plataformas SOAR.
El desafío del análisis de email en el SOC
Los emails de phishing representan el vector de ataque inicial más común. El 90% de los incidentes de seguridad comienzan con un correo electrónico malicioso. Un SOC típico recibe decenas o cientos de reportes diarios de usuarios que dicen "este email me parece sospechoso". El analista debe abrir cada uno, revisar headers, verificar URLs, comprobar adjuntos y decidir si es phishing real o un falso positivo. A mano, cada email tarda entre 5 y 15 minutos.
Con Python, el mismo análisis se completa en segundos. Este artículo construye un sistema completo de triaje automatizado de phishing: desde el parsing del archivo .eml hasta la integración con TheHive para gestión de casos.
Parsing de archivos EML y MSG
Python incluye la biblioteca email en su librería estándar, capaz de parsear archivos en formato EML (RFC 5322). Para archivos MSG de Outlook, usamos la biblioteca extract-msg.
"""
email_parser.py — Parser unificado para EML y MSG
"""
import email
from email import policy
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ParsedEmail:
"""Representación normalizada de un email analizado."""
subject: str = ""
sender: str = ""
sender_ip: str = ""
reply_to: str = ""
to: list[str] = field(default_factory=list)
date: str = ""
message_id: str = ""
headers: dict[str, str] = field(default_factory=dict)
body_text: str = ""
body_html: str = ""
urls: list[str] = field(default_factory=list)
attachments: list[dict] = field(default_factory=list)
authentication_results: str = ""
received_chain: list[str] = field(default_factory=list)
def parse_eml(file_path: str) -> ParsedEmail:
"""Parsear un archivo .eml y extraer todos los campos relevantes."""
path = Path(file_path)
with open(path, "rb") as f:
msg = email.message_from_binary_file(f, policy=policy.default)
parsed = ParsedEmail()
parsed.subject = msg.get("Subject", "")
parsed.sender = msg.get("From", "")
parsed.reply_to = msg.get("Reply-To", "")
parsed.to = [addr.strip() for addr in msg.get("To", "").split(",")]
parsed.date = msg.get("Date", "")
parsed.message_id = msg.get("Message-ID", "")
parsed.authentication_results = msg.get(
"Authentication-Results", ""
)
# Cadena de Received headers (orden cronologico inverso)
parsed.received_chain = msg.get_all("Received", [])
# Extraer IP del primer salto
if parsed.received_chain:
first_hop = parsed.received_chain[-1] # Ultimo = mas antiguo
parsed.sender_ip = _extract_ip(first_hop)
# Todos los headers como diccionario
parsed.headers = {k: v for k, v in msg.items()}
# Body
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == "text/plain":
parsed.body_text = part.get_content()
elif content_type == "text/html":
parsed.body_html = part.get_content()
else:
content_type = msg.get_content_type()
content = msg.get_content()
if content_type == "text/plain":
parsed.body_text = content
else:
parsed.body_html = content
# Adjuntos
for part in msg.iter_attachments():
attachment = {
"filename": part.get_filename() or "unknown",
"content_type": part.get_content_type(),
"size": len(part.get_content()),
"data": part.get_content(), # bytes
}
parsed.attachments.append(attachment)
return parsed
def _extract_ip(received_header: str) -> str:
"""Extraer la primera IP de un header Received."""
import re
ip_pattern = r'\[(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\]'
match = re.search(ip_pattern, received_header)
return match.group(1) if match else ""
Para archivos MSG de Microsoft Outlook:
def parse_msg(file_path: str) -> ParsedEmail:
"""Parsear un archivo .msg de Outlook."""
import extract_msg
msg = extract_msg.Message(file_path)
parsed = ParsedEmail()
parsed.subject = msg.subject or ""
parsed.sender = msg.sender or ""
parsed.to = [msg.to or ""]
parsed.date = str(msg.date or "")
parsed.body_text = msg.body or ""
parsed.body_html = msg.htmlBody or ""
for attachment in msg.attachments:
parsed.attachments.append({
"filename": attachment.longFilename or attachment.shortFilename,
"content_type": attachment.mimetype or "application/octet-stream",
"size": len(attachment.data) if attachment.data else 0,
"data": attachment.data,
})
msg.close()
return parsed
Extracción y análisis de headers
Los headers de email son la huella dactilar del mensaje. Un email legítimo de tu banco tiene headers consistentes. Un phishing, no.
"""
header_analyzer.py — Análisis de headers para detección de phishing
"""
import re
from dataclasses import dataclass
@dataclass
class HeaderAnalysis:
"""Resultado del análisis de headers."""
sender_domain: str = ""
envelope_from: str = ""
reply_to_domain: str = ""
sender_ip: str = ""
ip_country: str = ""
hop_count: int = 0
spf_result: str = ""
dkim_result: str = ""
dmarc_result: str = ""
anomalies: list[str] = None
def __post_init__(self):
if self.anomalies is None:
self.anomalies = []
def analyze_headers(parsed: "ParsedEmail") -> HeaderAnalysis:
"""Analizar headers para detectar anomalias de phishing."""
analysis = HeaderAnalysis()
# Extraer dominio del remitente
sender_match = re.search(r'@([\w.-]+)', parsed.sender)
analysis.sender_domain = sender_match.group(1) if sender_match else ""
# Reply-To diferente del From (tecnica clasica de phishing)
if parsed.reply_to:
reply_match = re.search(r'@([\w.-]+)', parsed.reply_to)
analysis.reply_to_domain = reply_match.group(1) if reply_match else ""
if (analysis.reply_to_domain and
analysis.reply_to_domain != analysis.sender_domain):
analysis.anomalies.append(
f"Reply-To domain ({analysis.reply_to_domain}) "
f"differs from sender ({analysis.sender_domain})"
)
# Numero de saltos (Received headers)
analysis.hop_count = len(parsed.received_chain)
if analysis.hop_count > 10:
analysis.anomalies.append(
f"Unusually high hop count: {analysis.hop_count}"
)
# Parsear Authentication-Results
auth_results = parsed.authentication_results.lower()
for protocol in ["spf", "dkim", "dmarc"]:
pattern = rf'{protocol}=(\w+)'
match = re.search(pattern, auth_results)
if match:
result = match.group(1)
setattr(analysis, f"{protocol}_result", result)
if result in ("fail", "softfail", "none"):
analysis.anomalies.append(
f"{protocol.upper()} validation: {result}"
)
# Sender IP
analysis.sender_ip = parsed.sender_ip
# Display name spoofing
display_name = parsed.sender.split("<")[0].strip().strip('"')
if "@" in display_name:
analysis.anomalies.append(
f"Email address in display name (spoofing attempt): "
f"{display_name}"
)
return analysis
Las anomalias que detectamos incluyen: dominios de Reply-To diferentes al From (el atacante quiere que la respuesta llegue a su buzón), fallos de SPF/DKIM/DMARC (el dominio no autoriza al servidor que envió el email), número excesivo de saltos (puede indicar uso de relays abiertos) y direcciones de email dentro del display name (spoofing visual).
Extracción de URLs y verificación de reputación
Las URLs son el payload más común en phishing. El objetivo es extraerlas todas, desofuscarlas y verificar su reputación.
"""
url_analyzer.py — Extracción y verificación de URLs
"""
import re
import hashlib
import urllib.parse
from dataclasses import dataclass
import requests
@dataclass
class URLAnalysis:
url: str
domain: str
is_shortened: bool = False
final_url: str = ""
vt_positives: int = 0
urlhaus_status: str = ""
risk_score: int = 0
SHORTENERS = {
"bit.ly", "t.co", "tinyurl.com", "goo.gl",
"ow.ly", "is.gd", "buff.ly", "rb.gy",
"cutt.ly", "shorturl.at",
}
def extract_urls(text: str, html: str = "") -> list[str]:
"""Extraer todas las URLs del body del email."""
combined = f"{text} {html}"
# Patron para URLs en texto plano
url_pattern = r'https?://[^\s<>"\')\]}{,]+'
urls = re.findall(url_pattern, combined)
# URLs en atributos href de HTML
href_pattern = r'href=["\']?(https?://[^"\'>\s]+)'
urls.extend(re.findall(href_pattern, html))
# Deduplicar manteniendo orden
seen = set()
unique = []
for url in urls:
url = url.rstrip(".") # Limpiar puntos finales
if url not in seen:
seen.add(url)
unique.append(url)
return unique
def analyze_url(url: str, vt_api_key: str = "") -> URLAnalysis:
"""Analizar una URL individual: resolver, verificar reputacion."""
parsed = urllib.parse.urlparse(url)
analysis = URLAnalysis(
url=url,
domain=parsed.netloc,
)
# Detectar acortadores
domain_base = parsed.netloc.lower().lstrip("www.")
if domain_base in SHORTENERS:
analysis.is_shortened = True
try:
resp = requests.head(
url, allow_redirects=True, timeout=5
)
analysis.final_url = resp.url
except requests.RequestException:
analysis.final_url = "ERROR: Could not resolve"
# Verificar en URLhaus (abuse.ch, gratis, sin API key)
try:
resp = requests.post(
"https://urlhaus-api.abuse.ch/v1/url/",
data={"url": url},
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
analysis.urlhaus_status = data.get(
"query_status", "unknown"
)
if data.get("query_status") == "listed":
analysis.risk_score += 50
except requests.RequestException:
pass
# Verificar en VirusTotal (requiere API key)
if vt_api_key:
url_id = hashlib.sha256(url.encode()).hexdigest()
try:
resp = requests.get(
f"https://www.virustotal.com/api/v3/urls/{url_id}",
headers={"x-apikey": vt_api_key},
timeout=10,
)
if resp.status_code == 200:
stats = resp.json().get(
"data", {}
).get("attributes", {}).get(
"last_analysis_stats", {}
)
analysis.vt_positives = stats.get("malicious", 0)
if analysis.vt_positives > 0:
analysis.risk_score += min(
analysis.vt_positives * 10, 50
)
except requests.RequestException:
pass
# Heuristicas adicionales
if analysis.is_shortened:
analysis.risk_score += 10
# Dominio con muchos subdominios (signin.bank.attacker.com)
subdomain_count = parsed.netloc.count(".")
if subdomain_count > 3:
analysis.risk_score += 15
# Path con keywords sospechosos
suspicious_paths = [
"login", "signin", "verify", "update",
"secure", "account", "confirm", "password",
]
path_lower = parsed.path.lower()
for kw in suspicious_paths:
if kw in path_lower:
analysis.risk_score += 5
break
return analysis
El scoring combina señales de múltiples fuentes: reputación en URLhaus y VirusTotal, uso de acortadores, subdominios excesivos y keywords sospechosos en el path. Ninguna señal individual es definitiva, pero la combinación proporciona un score fiable.
Extracción de adjuntos y verificación de hash
Los adjuntos maliciosos (documentos Office con macros, PDFs con exploits, ejecutables renombrados) son el segundo vector de phishing más común.
"""
attachment_analyzer.py — Hash y verificación de adjuntos
"""
import hashlib
from dataclasses import dataclass
import requests
DANGEROUS_EXTENSIONS = {
".exe", ".scr", ".bat", ".cmd", ".ps1", ".vbs", ".js",
".wsf", ".hta", ".msi", ".dll", ".com", ".pif",
".docm", ".xlsm", ".pptm", # Office con macros
".iso", ".img", ".vhd", # Contenedores de disco
".lnk", # Shortcuts maliciosos
}
SUSPICIOUS_EXTENSIONS = {
".doc", ".xls", ".ppt", # Office legacy (pueden tener macros)
".pdf", ".rtf", # Pueden contener exploits
".zip", ".rar", ".7z", # Archivos comprimidos
".html", ".htm", # HTML smuggling
}
@dataclass
class AttachmentAnalysis:
filename: str
content_type: str
size: int
md5: str
sha256: str
extension: str
is_dangerous: bool = False
is_suspicious: bool = False
mb_status: str = "" # MalwareBazaar
vt_positives: int = 0
risk_score: int = 0
def analyze_attachment(
filename: str,
data: bytes,
content_type: str,
vt_api_key: str = "",
) -> AttachmentAnalysis:
"""Analizar un adjunto: hash, extension, reputacion."""
ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
analysis = AttachmentAnalysis(
filename=filename,
content_type=content_type,
size=len(data),
md5=hashlib.md5(data).hexdigest(),
sha256=hashlib.sha256(data).hexdigest(),
extension=ext,
is_dangerous=ext in DANGEROUS_EXTENSIONS,
is_suspicious=ext in SUSPICIOUS_EXTENSIONS,
)
if analysis.is_dangerous:
analysis.risk_score += 40
elif analysis.is_suspicious:
analysis.risk_score += 15
# Doble extension (documento.pdf.exe)
parts = filename.rsplit(".", 2)
if len(parts) > 2:
analysis.risk_score += 25
analysis.is_dangerous = True
# Verificar hash en MalwareBazaar (gratis)
try:
resp = requests.post(
"https://mb-api.abuse.ch/api/v1/",
data={
"query": "get_info",
"hash": analysis.sha256,
},
timeout=10,
)
if resp.status_code == 200:
result = resp.json()
analysis.mb_status = result.get(
"query_status", "unknown"
)
if result.get("query_status") == "hash_found":
analysis.risk_score += 50
except requests.RequestException:
pass
# Verificar en VirusTotal
if vt_api_key:
try:
resp = requests.get(
f"https://www.virustotal.com/api/v3/files/{analysis.sha256}",
headers={"x-apikey": vt_api_key},
timeout=10,
)
if resp.status_code == 200:
stats = resp.json().get(
"data", {}
).get("attributes", {}).get(
"last_analysis_stats", {}
)
analysis.vt_positives = stats.get("malicious", 0)
if analysis.vt_positives > 0:
analysis.risk_score += min(
analysis.vt_positives * 5, 50
)
except requests.RequestException:
pass
return analysis
La doble extension es una técnica clásica: el archivo se llama factura.pdf.exe y Windows oculta la extension real mostrando solo factura.pdf. En nuestro análisis, cualquier archivo con más de una extension recibe una penalización alta.
Validación SPF, DKIM y DMARC
SPF, DKIM y DMARC son los tres pilares de la autenticación de email. Si fallan, el email probablemente no fue enviado por quien dice ser.
"""
auth_validator.py — Validación de autenticación de email
"""
import dns.resolver
import re
from dataclasses import dataclass
@dataclass
class AuthValidation:
spf_record: str = ""
spf_pass: bool = False
dkim_selector: str = ""
dkim_valid: bool = False
dmarc_record: str = ""
dmarc_policy: str = ""
summary: str = ""
risk_score: int = 0
def validate_email_auth(
sender_domain: str,
sender_ip: str = "",
auth_results_header: str = "",
) -> AuthValidation:
"""Validar SPF, DKIM y DMARC para un dominio remitente."""
validation = AuthValidation()
# SPF: consultar registro TXT
try:
answers = dns.resolver.resolve(sender_domain, "TXT")
for rdata in answers:
txt = rdata.to_text().strip('"')
if txt.startswith("v=spf1"):
validation.spf_record = txt
# Verificar si la IP esta autorizada
if sender_ip and sender_ip in txt:
validation.spf_pass = True
elif "~all" in txt or "-all" in txt:
validation.spf_pass = False
break
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
validation.risk_score += 15
# DMARC: consultar _dmarc.dominio
try:
answers = dns.resolver.resolve(
f"_dmarc.{sender_domain}", "TXT"
)
for rdata in answers:
txt = rdata.to_text().strip('"')
if txt.startswith("v=DMARC1"):
validation.dmarc_record = txt
policy_match = re.search(r'p=(\w+)', txt)
if policy_match:
validation.dmarc_policy = policy_match.group(1)
break
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
dns.resolver.NoNameservers):
validation.risk_score += 10
# Parsear Authentication-Results del header (ya evaluado por MTA)
if auth_results_header:
ar_lower = auth_results_header.lower()
if "dkim=pass" in ar_lower:
validation.dkim_valid = True
if "spf=pass" in ar_lower:
validation.spf_pass = True
# Scoring basado en resultados
if not validation.spf_pass:
validation.risk_score += 20
if not validation.dkim_valid:
validation.risk_score += 15
if validation.dmarc_policy == "none":
validation.risk_score += 5
elif not validation.dmarc_record:
validation.risk_score += 10
# Resumen
spf_status = "PASS" if validation.spf_pass else "FAIL"
dkim_status = "PASS" if validation.dkim_valid else "FAIL"
dmarc_status = validation.dmarc_policy or "NONE"
validation.summary = (
f"SPF={spf_status} DKIM={dkim_status} DMARC={dmarc_status}"
)
return validation
Un email con SPF=FAIL, DKIM=FAIL y sin DMARC es casi con certeza falsificado. Pero cuidado con los falsos positivos: muchas empresas legítimas tienen configuraciones de email incorrectas. El score de autenticación debe ser un factor más, no el único criterio.
Construir un bot de triaje de phishing
Ahora combinamos todos los módulos en un pipeline completo que procesa un email y genera un veredicto con score.
"""
phishing_triage.py — Pipeline completo de triaje de phishing
"""
from dataclasses import dataclass
from enum import Enum
class Verdict(Enum):
CLEAN = "clean"
SUSPICIOUS = "suspicious"
MALICIOUS = "malicious"
@dataclass
class TriageResult:
email_subject: str
sender: str
verdict: Verdict
total_score: int
header_analysis: "HeaderAnalysis"
url_analyses: list["URLAnalysis"]
attachment_analyses: list["AttachmentAnalysis"]
auth_validation: "AuthValidation"
summary: str = ""
# Umbrales configurables
THRESHOLD_CLEAN = 20
THRESHOLD_MALICIOUS = 60
def triage_email(
file_path: str,
vt_api_key: str = "",
) -> TriageResult:
"""Ejecutar pipeline completo de triaje sobre un email."""
# 1. Parsear el email
if file_path.endswith(".msg"):
parsed = parse_msg(file_path)
else:
parsed = parse_eml(file_path)
# 2. Analizar headers
header_analysis = analyze_headers(parsed)
# 3. Extraer y analizar URLs
urls = extract_urls(parsed.body_text, parsed.body_html)
url_analyses = [
analyze_url(url, vt_api_key) for url in urls[:20]
] # Limitar a 20 URLs
# 4. Analizar adjuntos
attachment_analyses = [
analyze_attachment(
att["filename"], att["data"],
att["content_type"], vt_api_key,
)
for att in parsed.attachments
]
# 5. Validar autenticacion
auth_validation = validate_email_auth(
header_analysis.sender_domain,
header_analysis.sender_ip,
parsed.authentication_results,
)
# 6. Calcular score total
total_score = 0
total_score += len(header_analysis.anomalies) * 10
total_score += auth_validation.risk_score
if url_analyses:
max_url_score = max(u.risk_score for u in url_analyses)
total_score += max_url_score
if attachment_analyses:
max_att_score = max(
a.risk_score for a in attachment_analyses
)
total_score += max_att_score
# Cap at 100
total_score = min(total_score, 100)
# 7. Determinar veredicto
if total_score >= THRESHOLD_MALICIOUS:
verdict = Verdict.MALICIOUS
elif total_score >= THRESHOLD_CLEAN:
verdict = Verdict.SUSPICIOUS
else:
verdict = Verdict.CLEAN
# 8. Generar resumen
summary_lines = [
f"Subject: {parsed.subject}",
f"From: {parsed.sender}",
f"Score: {total_score}/100 -> {verdict.value}",
f"Auth: {auth_validation.summary}",
f"URLs: {len(urls)} found, "
f"max risk: {max((u.risk_score for u in url_analyses), default=0)}",
f"Attachments: {len(parsed.attachments)} found",
f"Anomalies: {', '.join(header_analysis.anomalies) or 'None'}",
]
return TriageResult(
email_subject=parsed.subject,
sender=parsed.sender,
verdict=verdict,
total_score=total_score,
header_analysis=header_analysis,
url_analyses=url_analyses,
attachment_analyses=attachment_analyses,
auth_validation=auth_validation,
summary="\n".join(summary_lines),
)
Uso desde línea de comandos
if __name__ == "__main__":
import sys
import json
if len(sys.argv) != 2:
print("Uso: python phishing_triage.py email.eml")
sys.exit(1)
result = triage_email(
sys.argv[1],
vt_api_key="YOUR_VT_API_KEY",
)
print(f"\n{'='*50}")
print(f"PHISHING TRIAGE REPORT")
print(f"{'='*50}")
print(result.summary)
print(f"{'='*50}")
if result.verdict == Verdict.MALICIOUS:
print("[!] ACCION: Escalar a analista N2. Bloquear remitente.")
elif result.verdict == Verdict.SUSPICIOUS:
print("[?] ACCION: Revision manual recomendada.")
else:
print("[OK] ACCION: Cerrar como benigno.")
Integración con TheHive y SOAR
El resultado del triaje debe alimentar el sistema de gestión de casos del SOC. TheHive es la plataforma open source más utilizada para esto.
"""
thehive_integration.py — Crear casos en TheHive desde el triaje
"""
import requests
from datetime import datetime
class TheHiveClient:
"""Cliente simplificado para TheHive 5 API."""
def __init__(self, url: str, api_key: str):
self.url = url.rstrip("/")
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
def create_case(self, triage_result: "TriageResult") -> dict:
"""Crear un caso en TheHive a partir del resultado de triaje."""
severity_map = {
"clean": 1, # Low
"suspicious": 2, # Medium
"malicious": 3, # High
}
case_data = {
"title": (
f"[Phishing] {triage_result.email_subject[:80]}"
),
"description": triage_result.summary,
"severity": severity_map.get(
triage_result.verdict.value, 2
),
"tlp": 2, # TLP:AMBER
"pap": 2, # PAP:AMBER
"tags": ["phishing", "email", "automated-triage"],
"flag": triage_result.verdict == Verdict.MALICIOUS,
"customFields": {
"phishing-score": {
"integer": triage_result.total_score,
},
"sender-domain": {
"string": triage_result.header_analysis.sender_domain,
},
"auth-result": {
"string": triage_result.auth_validation.summary,
},
},
}
resp = requests.post(
f"{self.url}/api/v1/case",
json=case_data,
headers=self.headers,
timeout=15,
)
resp.raise_for_status()
return resp.json()
def add_observables(
self, case_id: str, triage_result: "TriageResult"
) -> list[dict]:
"""Añadir IOCs como observables al caso."""
observables = []
# URLs maliciosas
for url_analysis in triage_result.url_analyses:
if url_analysis.risk_score > 30:
observables.append({
"dataType": "url",
"data": url_analysis.url,
"message": (
f"Risk score: {url_analysis.risk_score}"
),
"tlp": 2,
"ioc": True,
"tags": ["phishing-url"],
})
# Hashes de adjuntos
for att in triage_result.attachment_analyses:
if att.risk_score > 20:
observables.append({
"dataType": "hash",
"data": att.sha256,
"message": (
f"File: {att.filename} "
f"(risk: {att.risk_score})"
),
"tlp": 2,
"ioc": True,
"tags": ["phishing-attachment"],
})
# Sender IP
if triage_result.header_analysis.sender_ip:
observables.append({
"dataType": "ip",
"data": triage_result.header_analysis.sender_ip,
"message": "Sender IP from Received headers",
"tlp": 2,
"ioc": False,
"tags": ["sender-ip"],
})
# Crear observables en TheHive
results = []
for obs in observables:
resp = requests.post(
f"{self.url}/api/v1/case/{case_id}/observable",
json=obs,
headers=self.headers,
timeout=10,
)
if resp.status_code == 201:
results.append(resp.json())
return results
Gestión de falsos positivos
El mayor riesgo de la automatización de triaje es el falso positivo. Un email legítimo clasificado como phishing genera trabajo innecesario y erosiona la confianza del equipo en la herramienta.
Estrategias para reducir falsos positivos:
Listas blancas de dominios: mantén una lista de dominios internos y partners de confianza. Si el email pasa SPF/DKIM y el dominio está en la lista blanca, reduce el score significativamente.
TRUSTED_DOMAINS = {
"empresa.com", "partner.com",
"microsoft.com", "google.com",
}
def apply_whitelist(analysis: HeaderAnalysis, score: int) -> int:
"""Reducir score si el dominio esta en la lista de confianza."""
if (analysis.sender_domain in TRUSTED_DOMAINS
and analysis.spf_result == "pass"
and analysis.dkim_result == "pass"):
return max(score - 30, 0)
return score
Feedback loop: cuando un analista marca un email como falso positivo, registra el patrón para ajustar los umbrales. Con el tiempo, el sistema aprende qué señales son ruido en tu entorno específico.
Tres zonas de decisión: en lugar de un umbral binario, define tres zonas. Score menor de 20: cerrar automáticamente. Score mayor de 60: escalar automáticamente. Entre 20 y 60: cola de revisión manual. Ajusta los umbrales según la tasa de falsos positivos observada.
Recursos
- Python email library (docs oficiales): documentación de la librería estándar de Python para parsing de emails según RFC 5322. Cubre todos los métodos usados en este artículo.
- extract-msg (PyPI): biblioteca para parsear archivos .msg de Microsoft Outlook. Alternativa:
olefilepara acceso de bajo nivel al formato OLE2. - dnspython: biblioteca Python para consultas DNS, necesaria para la validación de SPF/DKIM/DMARC consultando registros TXT y CNAME.
- URLhaus API (abuse.ch): API gratuita para verificar URLs maliciosas. Sin rate limit agresivo, ideal para automatización. Documentación en urlhaus.abuse.ch.
- MalwareBazaar API (abuse.ch): API gratuita para verificar hashes de archivos contra la base de datos de malware de abuse.ch. Complementaria a VirusTotal.
- TheHive Project: plataforma open source de gestión de incidentes. La API REST permite crear casos, observables y tareas programáticamente. Documentación en docs.thehive-project.org.
- RFC 7208 (SPF): estándar que define Sender Policy Framework. Referencia para entender cómo funciona la validación SPF que implementamos.
- MITRE ATT&CK T1566 (Phishing): referencia de técnicas y subtécnicas de phishing documentadas por MITRE, incluyendo spearphishing attachment (T1566.001) y spearphishing link (T1566.002).
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.