Python y APIs de Threat Intelligence: VirusTotal, Shodan, OTX y AbuseIPDB
Guía completa para integrar Python con las principales APIs de threat intelligence: VirusTotal v3, Shodan, AbuseIPDB y OTX AlienVault. Incluye un script de enrichment multi-fuente con caché, rate limiting y exportación a CSV/JSON.
APIs de CTI: la materia prima del analista
Un indicador de compromiso aislado dice poco. Una IP sospechosa puede ser un servidor legítimo mal configurado o el C2 de un APT activo. La diferencia la marca el contexto, y ese contexto viene de las APIs de threat intelligence.
Las cuatro APIs que cubre este artículo son las más utilizadas en SOCs de todo el mundo:
- VirusTotal: análisis de malware, reputación de URLs, dominios e IPs. La referencia universal.
- Shodan: búsqueda de dispositivos conectados a Internet. Perfil de puertos, servicios y vulnerabilidades.
- AbuseIPDB: base de datos colaborativa de IPs reportadas como maliciosas.
- OTX AlienVault: plataforma de threat intelligence comunitaria con pulsos de IOCs contextualizados.
Cada una aporta una perspectiva diferente. Combinarlas en un script unificado de enrichment multiplica el valor de la inteligencia disponible.
Fundamentos de APIs REST para SOC
Antes de entrar en código, hay conceptos de APIs que todo analista debe dominar.
Autenticación
Todas las APIs de CTI requieren una API key. Se envía como header HTTP:
# VirusTotal
headers = {"x-apikey": "tu_api_key"}
# Shodan
params = {"key": "tu_api_key"}
# AbuseIPDB
headers = {"Key": "tu_api_key", "Accept": "application/json"}
Rate limiting
Cada API tiene límites de consultas por tiempo. Excederlos devuelve HTTP 429 (Too Many Requests) y puede resultar en bloqueo temporal:
| API | Plan gratuito | Plan de pago |
|---|---|---|
| VirusTotal | 4/min, 500/día | 30/min (Premium) |
| Shodan | 1/seg | 10/seg (Enterprise) |
| AbuseIPDB | 1.000/día | 5.000/día (Premium) |
| OTX | Sin límite estricto | N/A (gratuito) |
Formato de respuesta
Todas devuelven JSON. La estructura varía, pero el patrón es similar:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
elif response.status_code == 404:
# IOC no encontrado
pass
elif response.status_code == 429:
# Rate limit alcanzado, esperar
pass
VirusTotal API v3
VirusTotal es la plataforma más completa para análisis de malware y reputación de IOCs. La API v3 usa endpoints RESTful con autenticación por header.
Consultar un hash de archivo
import os
import requests
from dotenv import load_dotenv
load_dotenv()
VT_API_KEY = os.getenv("VT_API_KEY")
def vt_file_lookup(file_hash: str) -> dict:
"""Consulta un hash (MD5/SHA1/SHA256) en VirusTotal."""
url = f"https://www.virustotal.com/api/v3/files/{file_hash}"
headers = {"x-apikey": VT_API_KEY}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 404:
return {"found": False, "hash": file_hash}
response.raise_for_status()
attrs = response.json()["data"]["attributes"]
stats = attrs.get("last_analysis_stats", {})
return {
"found": True,
"hash": file_hash,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"name": attrs.get("meaningful_name"),
"type": attrs.get("type_description"),
"size": attrs.get("size"),
"first_seen": attrs.get("first_submission_date"),
"last_seen": attrs.get("last_analysis_date"),
"tags": attrs.get("tags", []),
"popular_threat_names": attrs.get("popular_threat_classification", {})
.get("suggested_threat_label"),
}
Consultar una URL
import base64
def vt_url_lookup(url_to_check: str) -> dict:
"""Consulta la reputación de una URL en VirusTotal."""
# VT v3 requiere el URL ID: base64 del URL sin padding
url_id = base64.urlsafe_b64encode(url_to_check.encode()).decode().strip("=")
endpoint = f"https://www.virustotal.com/api/v3/urls/{url_id}"
headers = {"x-apikey": VT_API_KEY}
response = requests.get(endpoint, headers=headers, timeout=30)
if response.status_code == 404:
return {"found": False, "url": url_to_check}
response.raise_for_status()
attrs = response.json()["data"]["attributes"]
stats = attrs.get("last_analysis_stats", {})
return {
"found": True,
"url": url_to_check,
"malicious": stats.get("malicious", 0),
"total": sum(stats.values()),
"final_url": attrs.get("last_final_url"),
"title": attrs.get("title"),
"categories": attrs.get("categories", {}),
}
Consultar una IP
def vt_ip_lookup(ip: str) -> dict:
"""Consulta información de una IP en VirusTotal."""
url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip}"
headers = {"x-apikey": VT_API_KEY}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 404:
return {"found": False, "ip": ip}
response.raise_for_status()
attrs = response.json()["data"]["attributes"]
stats = attrs.get("last_analysis_stats", {})
return {
"found": True,
"ip": ip,
"malicious": stats.get("malicious", 0),
"total": sum(stats.values()),
"country": attrs.get("country"),
"as_owner": attrs.get("as_owner"),
"asn": attrs.get("asn"),
"network": attrs.get("network"),
"reputation": attrs.get("reputation", 0),
}
Shodan API
Shodan escanea Internet y cataloga puertos abiertos, servicios, banners, certificados SSL y vulnerabilidades detectadas. Es fundamental para evaluar la exposición de una IP.
Consultar información de un host
SHODAN_API_KEY = os.getenv("SHODAN_API_KEY")
def shodan_host_lookup(ip: str) -> dict:
"""Consulta información de un host en Shodan."""
url = f"https://api.shodan.io/shodan/host/{ip}"
params = {"key": SHODAN_API_KEY}
response = requests.get(url, params=params, timeout=30)
if response.status_code == 404:
return {"found": False, "ip": ip}
response.raise_for_status()
data = response.json()
return {
"found": True,
"ip": ip,
"hostnames": data.get("hostnames", []),
"country": data.get("country_name"),
"city": data.get("city"),
"org": data.get("org"),
"isp": data.get("isp"),
"os": data.get("os"),
"ports": data.get("ports", []),
"vulns": data.get("vulns", []),
"last_update": data.get("last_update"),
"services": [
{
"port": s.get("port"),
"protocol": s.get("transport"),
"product": s.get("product"),
"version": s.get("version"),
}
for s in data.get("data", [])[:10] # Limitar a 10 servicios
],
}
Búsqueda en Shodan
def shodan_search(query: str, page: int = 1) -> dict:
"""Ejecuta una búsqueda en Shodan."""
url = "https://api.shodan.io/shodan/host/search"
params = {
"key": SHODAN_API_KEY,
"query": query,
"page": page,
}
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
return {
"total": data.get("total", 0),
"results": [
{
"ip": r.get("ip_str"),
"port": r.get("port"),
"org": r.get("org"),
"country": r.get("location", {}).get("country_name"),
"product": r.get("product"),
"banner": r.get("data", "")[:200], # Primeros 200 chars
}
for r in data.get("matches", [])
]
}
Búsquedas útiles para investigación:
# Servidores Cobalt Strike
shodan_search('product:"Cobalt Strike Beacon"')
# RDP expuesto en un rango
shodan_search('port:3389 net:203.0.113.0/24')
# Paneles de C2 conocidos
shodan_search('http.title:"Havoc C2"')
AbuseIPDB API
AbuseIPDB es una base de datos colaborativa de IPs maliciosas. Los administradores de red reportan IPs que atacan sus sistemas, creando un historial de reputación.
Verificar reputación de una IP
ABUSE_API_KEY = os.getenv("ABUSEIPDB_API_KEY")
def abuseipdb_check(ip: str, max_age_days: int = 90) -> dict:
"""Consulta la reputación de una IP en AbuseIPDB."""
url = "https://api.abuseipdb.com/api/v2/check"
headers = {
"Key": ABUSE_API_KEY,
"Accept": "application/json"
}
params = {
"ipAddress": ip,
"maxAgeInDays": max_age_days,
"verbose": ""
}
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()["data"]
return {
"found": True,
"ip": data["ipAddress"],
"abuse_score": data["abuseConfidenceScore"],
"country": data.get("countryCode"),
"isp": data.get("isp"),
"domain": data.get("domain"),
"usage_type": data.get("usageType"),
"total_reports": data.get("totalReports", 0),
"num_distinct_users": data.get("numDistinctUsers", 0),
"last_reported": data.get("lastReportedAt"),
"is_tor": data.get("isTor", False),
"is_whitelisted": data.get("isWhitelisted", False),
"categories": [
r.get("categories", [])
for r in data.get("reports", [])[:5]
],
}
Reportar una IP
def abuseipdb_report(ip: str, categories: list[int], comment: str) -> dict:
"""Reporta una IP maliciosa a AbuseIPDB."""
url = "https://api.abuseipdb.com/api/v2/report"
headers = {
"Key": ABUSE_API_KEY,
"Accept": "application/json"
}
data = {
"ip": ip,
"categories": ",".join(str(c) for c in categories),
"comment": comment,
}
response = requests.post(url, headers=headers, data=data, timeout=30)
response.raise_for_status()
return response.json()["data"]
Categorías de AbuseIPDB más relevantes:
| Código | Categoría |
|---|---|
| 3 | Fraud Orders |
| 4 | DDoS Attack |
| 5 | FTP Brute-Force |
| 11 | Email Spam |
| 14 | Port Scan |
| 18 | Brute-Force |
| 19 | Bad Web Bot |
| 21 | Web App Attack |
| 22 | SSH |
| 23 | IoT Targeted |
OTX AlienVault API
OTX (Open Threat Exchange) es una plataforma comunitaria donde investigadores publican "pulsos" con IOCs contextualizados. A diferencia de las otras APIs, OTX ofrece contexto narrativo sobre amenazas.
Buscar pulsos
OTX_API_KEY = os.getenv("OTX_API_KEY")
def otx_search_pulses(query: str, limit: int = 10) -> list[dict]:
"""Busca pulsos en OTX relacionados con un término."""
url = "https://otx.alienvault.com/api/v1/search/pulses"
headers = {"X-OTX-API-KEY": OTX_API_KEY}
params = {"q": query, "limit": limit}
response = requests.get(url, headers=headers, params=params, timeout=30)
response.raise_for_status()
results = response.json().get("results", [])
return [
{
"id": p["id"],
"name": p["name"],
"description": p.get("description", "")[:300],
"author": p.get("author_name"),
"created": p.get("created"),
"modified": p.get("modified"),
"tags": p.get("tags", []),
"tlp": p.get("tlp", "white"),
"adversary": p.get("adversary"),
"malware_families": p.get("malware_families", []),
"ioc_count": len(p.get("indicators", [])),
}
for p in results
]
Consultar IOCs de un indicador
def otx_indicator_lookup(ioc_type: str, ioc_value: str) -> dict:
"""
Consulta un indicador en OTX.
ioc_type: 'IPv4', 'domain', 'hostname', 'url', 'file' (hash)
"""
type_map = {
"ipv4": "IPv4",
"domain": "domain",
"hostname": "hostname",
"url": "url",
"md5": "file",
"sha1": "file",
"sha256": "file",
}
otx_type = type_map.get(ioc_type.lower(), ioc_type)
section = "general"
url = f"https://otx.alienvault.com/api/v1/indicators/{otx_type}/{ioc_value}/{section}"
headers = {"X-OTX-API-KEY": OTX_API_KEY}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 404:
return {"found": False, "type": ioc_type, "value": ioc_value}
response.raise_for_status()
data = response.json()
return {
"found": True,
"type": ioc_type,
"value": ioc_value,
"pulse_count": data.get("pulse_info", {}).get("count", 0),
"pulses": [
{
"name": p.get("name"),
"tags": p.get("tags", []),
"adversary": p.get("adversary"),
"malware_families": p.get("malware_families", []),
}
for p in data.get("pulse_info", {}).get("pulses", [])[:5]
],
"country": data.get("country_name"),
"asn": data.get("asn"),
"reputation": data.get("reputation", 0),
"validation": data.get("validation", []),
}
Script de enrichment multi-fuente
El script completo que unifica las cuatro APIs. Recibe una lista de IOCs, los enriquece consultando todas las fuentes disponibles, y genera un informe consolidado.
Detección automática de tipo de IOC
import re
import ipaddress
def detect_ioc_type(value: str) -> str:
"""Detecta automáticamente el tipo de un IOC."""
value = value.strip()
# Hash SHA256
if re.match(r'^[a-fA-F0-9]{64}$', value):
return "sha256"
# Hash SHA1
if re.match(r'^[a-fA-F0-9]{40}$', value):
return "sha1"
# Hash MD5
if re.match(r'^[a-fA-F0-9]{32}$', value):
return "md5"
# URL
if value.startswith(("http://", "https://")):
return "url"
# IPv4
try:
ip = ipaddress.ip_address(value)
if ip.version == 4:
return "ipv4"
return "ipv6"
except ValueError:
pass
# Dominio
if re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9\-]*[a-zA-Z0-9])?(\.[a-zA-Z]{2,})+$', value):
return "domain"
return "unknown"
Caché local de resultados
Para evitar consultas repetidas y respetar rate limits:
import json
import os
import hashlib
from datetime import datetime, timedelta
class IOCCache:
"""Caché local de resultados de enrichment."""
def __init__(self, cache_dir: str = ".ioc_cache", ttl_hours: int = 24):
self.cache_dir = cache_dir
self.ttl = timedelta(hours=ttl_hours)
os.makedirs(cache_dir, exist_ok=True)
def _key(self, ioc_value: str, source: str) -> str:
raw = f"{ioc_value}:{source}"
return hashlib.md5(raw.encode()).hexdigest()
def get(self, ioc_value: str, source: str) -> dict | None:
path = os.path.join(self.cache_dir, f"{self._key(ioc_value, source)}.json")
if not os.path.exists(path):
return None
with open(path) as f:
cached = json.load(f)
# Verificar TTL
cached_time = datetime.fromisoformat(cached["_cached_at"])
if datetime.now() - cached_time > self.ttl:
os.remove(path)
return None
return cached["data"]
def set(self, ioc_value: str, source: str, data: dict) -> None:
path = os.path.join(self.cache_dir, f"{self._key(ioc_value, source)}.json")
with open(path, "w") as f:
json.dump({
"_cached_at": datetime.now().isoformat(),
"data": data
}, f)
Rate limiter simple
import time
from collections import defaultdict
class RateLimiter:
"""Rate limiter por API con delays configurables."""
def __init__(self):
self.delays = {
"virustotal": 16, # 4/min = 1 cada 15s + margen
"shodan": 1.1, # 1/seg
"abuseipdb": 1.0, # 1.000/día, generoso
"otx": 0.5, # Sin límite estricto
}
self.last_call = defaultdict(float)
def wait(self, api: str) -> None:
"""Espera el tiempo necesario antes de la siguiente consulta."""
delay = self.delays.get(api, 1.0)
elapsed = time.time() - self.last_call[api]
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_call[api] = time.time()
El enrichment engine
import csv
class IOCEnrichmentEngine:
"""Motor de enrichment multi-fuente para IOCs."""
def __init__(self):
self.cache = IOCCache()
self.limiter = RateLimiter()
def enrich(self, ioc_value: str, ioc_type: str = None) -> dict:
"""Enriquece un IOC consultando todas las fuentes disponibles."""
if ioc_type is None:
ioc_type = detect_ioc_type(ioc_value)
result = {
"value": ioc_value,
"type": ioc_type,
"enriched_at": datetime.now().isoformat(),
"sources": {}
}
# VirusTotal (soporta todos los tipos)
vt_data = self._query_with_cache(ioc_value, "virustotal",
self._vt_lookup, ioc_value, ioc_type)
if vt_data:
result["sources"]["virustotal"] = vt_data
# Shodan (solo IPs)
if ioc_type == "ipv4":
shodan_data = self._query_with_cache(ioc_value, "shodan",
shodan_host_lookup, ioc_value)
if shodan_data:
result["sources"]["shodan"] = shodan_data
# AbuseIPDB (solo IPs)
if ioc_type == "ipv4":
abuse_data = self._query_with_cache(ioc_value, "abuseipdb",
abuseipdb_check, ioc_value)
if abuse_data:
result["sources"]["abuseipdb"] = abuse_data
# OTX (soporta IPs, dominios, hashes)
if ioc_type in ("ipv4", "domain", "md5", "sha1", "sha256"):
otx_data = self._query_with_cache(ioc_value, "otx",
otx_indicator_lookup, ioc_type, ioc_value)
if otx_data:
result["sources"]["otx"] = otx_data
# Calcular veredicto consolidado
result["verdict"] = self._calculate_verdict(result["sources"])
return result
def _query_with_cache(self, ioc_value, source, func, *args) -> dict | None:
"""Consulta con caché y rate limiting."""
# Intentar caché primero
cached = self.cache.get(ioc_value, source)
if cached:
return cached
# Consultar API con rate limiting
try:
self.limiter.wait(source)
data = func(*args)
self.cache.set(ioc_value, source, data)
return data
except Exception as e:
return {"error": str(e)}
def _vt_lookup(self, ioc_value: str, ioc_type: str) -> dict:
"""Selecciona el lookup de VT según el tipo de IOC."""
if ioc_type in ("md5", "sha1", "sha256"):
return vt_file_lookup(ioc_value)
elif ioc_type == "ipv4":
return vt_ip_lookup(ioc_value)
elif ioc_type == "url":
return vt_url_lookup(ioc_value)
elif ioc_type == "domain":
return vt_ip_lookup(ioc_value) # VT usa el mismo endpoint
return {"error": f"Tipo no soportado en VT: {ioc_type}"}
def _calculate_verdict(self, sources: dict) -> dict:
"""Calcula un veredicto consolidado basado en todas las fuentes."""
score = 0
factors = []
# VirusTotal
vt = sources.get("virustotal", {})
if vt.get("found"):
malicious = vt.get("malicious", 0)
if malicious > 10:
score += 40
factors.append(f"VT: {malicious} detecciones")
elif malicious > 3:
score += 20
factors.append(f"VT: {malicious} detecciones")
elif malicious > 0:
score += 10
factors.append(f"VT: {malicious} detecciones")
# AbuseIPDB
abuse = sources.get("abuseipdb", {})
if abuse.get("found"):
abuse_score = abuse.get("abuse_score", 0)
if abuse_score > 75:
score += 30
factors.append(f"AbuseIPDB: {abuse_score}%")
elif abuse_score > 25:
score += 15
factors.append(f"AbuseIPDB: {abuse_score}%")
# Shodan (puertos peligrosos abiertos)
shodan = sources.get("shodan", {})
if shodan.get("found"):
vulns = shodan.get("vulns", [])
if vulns:
score += 20
factors.append(f"Shodan: {len(vulns)} CVEs")
# OTX (presencia en pulsos)
otx = sources.get("otx", {})
if otx.get("found"):
pulse_count = otx.get("pulse_count", 0)
if pulse_count > 5:
score += 15
factors.append(f"OTX: {pulse_count} pulsos")
elif pulse_count > 0:
score += 5
factors.append(f"OTX: {pulse_count} pulsos")
# Clasificar
if score >= 60:
level = "MALICIOSO"
elif score >= 30:
level = "SOSPECHOSO"
elif score > 0:
level = "BAJO_RIESGO"
else:
level = "LIMPIO"
return {
"level": level,
"score": min(score, 100),
"factors": factors,
}
def enrich_batch(self, iocs: list[str]) -> list[dict]:
"""Enriquece una lista de IOCs."""
results = []
total = len(iocs)
for i, ioc in enumerate(iocs, 1):
ioc = ioc.strip()
if not ioc:
continue
print(f" [{i}/{total}] {ioc}...", end=" ", flush=True)
result = self.enrich(ioc)
verdict = result["verdict"]
print(f"{verdict['level']} (score: {verdict['score']})")
results.append(result)
return results
Exportar resultados
def export_to_csv(results: list[dict], output: str = "enrichment_results.csv"):
"""Exporta los resultados de enrichment a CSV."""
fields = [
"value", "type", "verdict_level", "verdict_score",
"vt_malicious", "abuse_score", "shodan_ports",
"otx_pulses", "factors"
]
with open(output, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
for r in results:
row = {
"value": r["value"],
"type": r["type"],
"verdict_level": r["verdict"]["level"],
"verdict_score": r["verdict"]["score"],
"vt_malicious": r["sources"].get("virustotal", {}).get("malicious", "N/A"),
"abuse_score": r["sources"].get("abuseipdb", {}).get("abuse_score", "N/A"),
"shodan_ports": str(r["sources"].get("shodan", {}).get("ports", [])),
"otx_pulses": r["sources"].get("otx", {}).get("pulse_count", "N/A"),
"factors": " | ".join(r["verdict"]["factors"]),
}
writer.writerow(row)
def export_to_json(results: list[dict], output: str = "enrichment_results.json"):
"""Exporta los resultados completos a JSON."""
with open(output, "w") as f:
json.dump(results, f, indent=2, default=str)
Uso completo
if __name__ == "__main__":
engine = IOCEnrichmentEngine()
# IOC individual
result = engine.enrich("203.0.113.42")
print(json.dumps(result, indent=2, default=str))
# Lista desde archivo
with open("iocs_to_check.txt") as f:
iocs = [line.strip() for line in f if line.strip()]
results = engine.enrich_batch(iocs)
export_to_csv(results)
export_to_json(results)
print(f"\nProcesados: {len(results)} IOCs")
print(f"Maliciosos: {sum(1 for r in results if r['verdict']['level'] == 'MALICIOSO')}")
print(f"Sospechosos: {sum(1 for r in results if r['verdict']['level'] == 'SOSPECHOSO')}")
Manejo robusto de errores
En producción, las APIs fallan. Timeouts, rate limits, errores de servidor, respuestas malformadas. El código defensivo marca la diferencia:
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session() -> requests.Session:
"""Crea una sesión HTTP con reintentos automáticos."""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
Usa esta sesión en lugar de requests.get() directo para que los reintentos sean automáticos.
Próximos pasos
Con el enrichment multi-fuente funcionando, el siguiente paso natural es automatizar flujos completos: desde la recepción de una alerta hasta la generación del informe. El próximo artículo introduce SOAR (Security Orchestration, Automation and Response), la arquitectura de playbooks, y cómo estos scripts se integran en flujos de automatización más amplios.
Recursos
- VirusTotal API v3 Reference. Documentación oficial completa.
- Shodan API Documentation. Referencia de endpoints y filtros de búsqueda.
- AbuseIPDB API Docs. Guía con ejemplos y categorías de reporte.
- OTX DirectConnect API. Documentación de la API de AlienVault OTX.
- MalwareIntel IOC Search. Plataforma de búsqueda de IOCs con datos agregados de múltiples fuentes.
- Requests Advanced Usage. Sesiones, reintentos y adaptadores HTTP.
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Python para SOC Analysts: Fundamentos y Primeros Scripts de Seguridad
Python para Análisis de Logs: Parsear, Filtrar y Correlacionar Eventos
SOAR: Qué Es, Arquitectura y Diseño de Playbooks de Automatización
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.