PrincipiantePythonSOCscriptingautomatizaciónVirusTotalAbuseIPDB

Python para SOC Analysts: Fundamentos y Primeros Scripts de Seguridad

Guía práctica de Python para analistas SOC: configuración del entorno, librerías esenciales, y tres scripts funcionales para verificar hashes en VirusTotal, parsear logs de firewall y consultar reputación de IPs en AbuseIPDB.

MalwareIntel Research··14 min lectura

Por qué Python es el lenguaje del SOC

Cada analista SOC tiene una rutina que se repite docenas de veces al día: recibir una alerta, copiar un hash o una IP, abrir tres pestañas del navegador, buscar en VirusTotal, consultar AbuseIPDB, revisar OTX, y documentar los resultados en un ticket. Son 5 a 10 minutos por alerta. Con 50 alertas diarias, eso supone entre 4 y 8 horas de trabajo mecánico.

Python permite reducir esos 10 minutos a 10 segundos. Un script que reciba un hash, consulte VirusTotal y AbuseIPDB en paralelo, y genere un resumen formateado transforma completamente el flujo de trabajo. No se trata de reemplazar al analista, sino de eliminar las tareas que no requieren criterio humano.

Las razones concretas por las que Python domina en los SOC:

Ecosistema de seguridad. Librerías como requests (HTTP), ipaddress (validación de IPs), hashlib (hashing), python-evtx (logs Windows), yara-python (reglas YARA) y SDKs oficiales de plataformas CTI (pymisp, OTXv2, vt-py) cubren el 90% de las necesidades.

Multiplataforma. El mismo script funciona en la estación Windows del analista, en el servidor Linux del SIEM, y en el contenedor Docker del pipeline de ingesta.

Curva de aprendizaje. Un analista sin experiencia previa en programación puede escribir scripts útiles en 2 a 3 semanas. Python prioriza la legibilidad sobre la concisión críptica.

Integración con SOAR. Plataformas como Shuffle, Cortex XSOAR, y Tines permiten ejecutar scripts Python como acciones dentro de playbooks automatizados.


Configurar el entorno de desarrollo

Antes de escribir el primer script, necesitas un entorno limpio y reproducible. La tentación de instalar paquetes con pip install directamente en el sistema operativo es fuerte, pero lleva a conflictos de versiones en pocas semanas.

Python y entorno virtual

Verifica que tienes Python 3.10 o superior:

python3 --version

Crea un directorio de trabajo y un entorno virtual aislado:

mkdir ~/soc-scripts && cd ~/soc-scripts
python3 -m venv .venv
source .venv/bin/activate  # Linux/macOS
# .venv\Scripts\activate   # Windows PowerShell

Instala las librerías que usaremos en esta guía:

pip install requests python-dotenv ipaddress

Editor: VS Code con extensiones de seguridad

VS Code es el editor más popular entre analistas SOC por su balance entre simplicidad y potencia. Extensiones recomendadas:

  • Python (Microsoft): autocompletado, depuración, linting.
  • Jupyter (Microsoft): para exploración interactiva de datos.
  • REST Client (Huachao Mao): para probar APIs directamente desde el editor.
  • YARA (Infocyte): syntax highlighting para reglas YARA.

Configura el intérprete de Python para que use el entorno virtual: Ctrl+Shift+P > "Python: Select Interpreter" > selecciona .venv.

Jupyter Notebook para exploración

Para tareas exploratorias (analizar un log nuevo, probar una API desconocida), Jupyter es más cómodo que un script:

pip install jupyter
jupyter notebook

Jupyter permite ejecutar código celda por celda, ver resultados inmediatos e iterar rápido. Una vez que el código funciona, lo mueves a un script .py para automatizarlo.

Gestión de API keys con dotenv

Nunca escribas API keys directamente en el código. Crea un archivo .env en la raíz del proyecto:

# .env (NUNCA lo subas a git)
VT_API_KEY=tu_api_key_de_virustotal
ABUSEIPDB_API_KEY=tu_api_key_de_abuseipdb

Añade .env al .gitignore:

echo ".env" >> .gitignore

En tus scripts, carga las variables así:

import os
from dotenv import load_dotenv

load_dotenv()
VT_API_KEY = os.getenv("VT_API_KEY")

Librerías esenciales para el analista SOC

Antes de construir los scripts, conviene entender las herramientas básicas. Todas forman parte de la librería estándar de Python o se instalan con un solo comando.

requests: el cliente HTTP universal

La librería requests simplifica las llamadas HTTP. Es la base para interactuar con cualquier API de threat intelligence:

import requests

response = requests.get("https://api.example.com/data")
data = response.json()
print(response.status_code)  # 200, 404, 429...

json: parsear respuestas de APIs

Las APIs devuelven datos en JSON. Python lo maneja de forma nativa:

import json

# Parsear un string JSON
data = json.loads('{"malware": "Emotet", "confidence": 95}')
print(data["malware"])  # Emotet

# Escribir JSON a archivo
with open("results.json", "w") as f:
    json.dump(data, f, indent=2)

csv: leer y escribir informes

Muchos feeds CTI y exports de SIEM usan formato CSV:

import csv

# Leer un CSV de IOCs
with open("iocs.csv") as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"IOC: {row['value']} | Tipo: {row['type']}")

# Escribir resultados a CSV
with open("results.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["hash", "detections", "verdict"])
    writer.writerow(["abc123...", 45, "malicious"])

ipaddress: validar y clasificar IPs

La librería ipaddress (estándar) permite validar IPs, detectar rangos privados y trabajar con subredes:

import ipaddress

ip = ipaddress.ip_address("192.168.1.1")
print(ip.is_private)    # True
print(ip.is_global)     # False

# Verificar si una IP está en un rango
network = ipaddress.ip_network("10.0.0.0/8")
print(ipaddress.ip_address("10.5.3.2") in network)  # True

re: expresiones regulares para extraer IOCs

Las expresiones regulares son fundamentales para extraer IOCs de texto no estructurado:

import re

log_line = "Connection from 203.0.113.45 to 198.51.100.23 on port 443"
ips = re.findall(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b', log_line)
print(ips)  # ['203.0.113.45', '198.51.100.23']

hashlib: calcular hashes de archivos

Para verificar la integridad de archivos o generar hashes para consultar en VirusTotal:

import hashlib

def file_hash(filepath, algo="sha256"):
    h = hashlib.new(algo)
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

print(file_hash("/tmp/suspicious.exe"))

Script 1: IOC Checker con VirusTotal API

El primer script resuelve la tarea más común del analista N1: verificar si un hash es conocido como malicioso. En lugar de abrir el navegador, copiar el hash, y navegar por la interfaz de VirusTotal, el script hace todo en una llamada.

Obtener una API key de VirusTotal

  1. Crea una cuenta gratuita en virustotal.com.
  2. Ve a tu perfil > API key.
  3. La cuenta gratuita permite 4 consultas por minuto y 500 al día.
  4. Añade la key a tu archivo .env.

El script completo

#!/usr/bin/env python3
"""
ioc_checker.py - Verificar hashes contra VirusTotal API v3
Uso: python ioc_checker.py <hash_sha256>
"""

import sys
import os
import requests
from dotenv import load_dotenv

load_dotenv()
VT_API_KEY = os.getenv("VT_API_KEY")
VT_BASE_URL = "https://www.virustotal.com/api/v3"


def check_hash(file_hash: str) -> dict:
    """Consulta un hash en VirusTotal y devuelve el resumen."""
    headers = {"x-apikey": VT_API_KEY}
    url = f"{VT_BASE_URL}/files/{file_hash}"

    response = requests.get(url, headers=headers, timeout=30)

    if response.status_code == 404:
        return {"hash": file_hash, "found": False}
    
    response.raise_for_status()
    data = response.json()["data"]["attributes"]
    stats = data.get("last_analysis_stats", {})

    return {
        "hash": file_hash,
        "found": True,
        "malicious": stats.get("malicious", 0),
        "undetected": stats.get("undetected", 0),
        "total_engines": sum(stats.values()),
        "suggested_name": data.get("meaningful_name", "N/A"),
        "file_type": data.get("type_description", "N/A"),
        "first_seen": data.get("first_submission_date", "N/A"),
        "tags": data.get("tags", []),
    }


def print_result(result: dict) -> None:
    """Muestra el resultado formateado en terminal."""
    print(f"\n{'='*60}")
    print(f"  Hash: {result['hash']}")
    print(f"{'='*60}")

    if not result["found"]:
        print("  Estado: NO ENCONTRADO en VirusTotal")
        print("  Nota: Hash no analizado previamente")
        return

    detections = result["malicious"]
    total = result["total_engines"]
    ratio = f"{detections}/{total}"

    if detections == 0:
        verdict = "LIMPIO"
    elif detections <= 5:
        verdict = "SOSPECHOSO"
    else:
        verdict = "MALICIOSO"

    print(f"  Veredicto:  {verdict} ({ratio} detecciones)")
    print(f"  Nombre:     {result['suggested_name']}")
    print(f"  Tipo:       {result['file_type']}")
    print(f"  Tags:       {', '.join(result['tags'][:5])}")
    print(f"{'='*60}\n")


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print("Uso: python ioc_checker.py <hash>")
        sys.exit(1)

    if not VT_API_KEY:
        print("Error: VT_API_KEY no configurada en .env")
        sys.exit(1)

    result = check_hash(sys.argv[1])
    print_result(result)

Cómo funciona

  1. El script recibe un hash (MD5, SHA1 o SHA256) como argumento.
  2. Consulta la API v3 de VirusTotal con ese hash.
  3. Si el hash no existe, informa que no fue analizado previamente.
  4. Si existe, extrae las estadísticas de detección, nombre sugerido, tipo de archivo y tags.
  5. Clasifica como LIMPIO (0 detecciones), SOSPECHOSO (1-5) o MALICIOSO (mas de 5).

La función check_hash devuelve un diccionario, lo que permite reutilizarla en otros scripts o pipelines sin depender de la salida por terminal.


Script 2: Log Parser para firewalls

El segundo escenario habitual: tienes un archivo de log de firewall y necesitas extraer todas las IPs, contar cuántas conexiones tiene cada una, e identificar las que se repiten con frecuencia anormal.

El script completo

#!/usr/bin/env python3
"""
log_parser.py - Extraer y analizar IPs de logs de firewall
Uso: python log_parser.py <archivo_log> [--threshold 100]
"""

import re
import sys
import ipaddress
from collections import Counter
from datetime import datetime


def extract_ips(log_file: str) -> list[str]:
    """Extrae todas las IPs válidas de un archivo de log."""
    ip_pattern = re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b')
    ips = []

    with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            matches = ip_pattern.findall(line)
            for ip_str in matches:
                try:
                    ip = ipaddress.ip_address(ip_str)
                    if ip.is_global:  # Ignorar IPs privadas
                        ips.append(ip_str)
                except ValueError:
                    continue

    return ips


def analyze_ips(ips: list[str], threshold: int = 100) -> dict:
    """Analiza las IPs extraídas y genera estadísticas."""
    counter = Counter(ips)
    total_unique = len(counter)
    total_connections = len(ips)

    # IPs que superan el umbral de conexiones
    suspicious = {
        ip: count for ip, count in counter.most_common()
        if count >= threshold
    }

    return {
        "total_connections": total_connections,
        "unique_ips": total_unique,
        "top_10": counter.most_common(10),
        "suspicious": suspicious,
        "threshold": threshold,
    }


def save_results(analysis: dict, output_file: str) -> None:
    """Guarda los resultados en un archivo de texto."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    with open(output_file, "w") as f:
        f.write(f"Análisis de logs - {timestamp}\n")
        f.write(f"{'='*50}\n\n")
        f.write(f"Total conexiones: {analysis['total_connections']}\n")
        f.write(f"IPs únicas:       {analysis['unique_ips']}\n")
        f.write(f"Umbral:           {analysis['threshold']} conexiones\n\n")

        f.write("Top 10 IPs por frecuencia:\n")
        f.write("-" * 35 + "\n")
        for ip, count in analysis["top_10"]:
            f.write(f"  {ip:<20} {count:>6} conexiones\n")

        if analysis["suspicious"]:
            f.write(f"\nIPs sospechosas (umbral superado):\n")
            f.write("-" * 35 + "\n")
            for ip, count in analysis["suspicious"].items():
                f.write(f"  {ip:<20} {count:>6} conexiones\n")
        else:
            f.write("\nNinguna IP supera el umbral de conexiones.\n")


def print_summary(analysis: dict) -> None:
    """Muestra un resumen en terminal."""
    print(f"\nTotal conexiones:  {analysis['total_connections']}")
    print(f"IPs únicas:        {analysis['unique_ips']}")
    print(f"\nTop 10 IPs:")
    for ip, count in analysis["top_10"]:
        bar = "#" * min(count // 10, 40)
        print(f"  {ip:<20} {count:>6}  {bar}")

    if analysis["suspicious"]:
        print(f"\nIPs sospechosas ({len(analysis['suspicious'])}):")
        for ip, count in analysis["suspicious"].items():
            print(f"  {ip:<20} {count:>6} conexiones")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Uso: python log_parser.py <archivo_log> [--threshold 100]")
        sys.exit(1)

    log_file = sys.argv[1]
    threshold = 100

    if "--threshold" in sys.argv:
        idx = sys.argv.index("--threshold")
        threshold = int(sys.argv[idx + 1])

    print(f"Analizando: {log_file}")
    ips = extract_ips(log_file)
    analysis = analyze_ips(ips, threshold)
    print_summary(analysis)
    save_results(analysis, "log_analysis_report.txt")
    print(f"\nInforme guardado en: log_analysis_report.txt")

Qué hace el script

  1. Lee un archivo de log línea por línea.
  2. Extrae todas las direcciones IPv4 con una expresión regular.
  3. Filtra las IPs privadas (RFC 1918), quedándose solo con IPs públicas.
  4. Cuenta la frecuencia de cada IP.
  5. Identifica IPs que superan un umbral configurable de conexiones.
  6. Genera un informe en archivo de texto y un resumen en terminal.

El umbral por defecto es 100 conexiones. Un servidor legítimo (CDN, DNS público) puede tener miles de conexiones, pero una IP desconocida con 500 conexiones en una hora merece investigación.


Script 3: IP Reputation Checker con AbuseIPDB

El tercer script automatiza la consulta de reputación de IPs. AbuseIPDB es una base de datos colaborativa donde los administradores reportan IPs maliciosas. La API gratuita permite 1.000 consultas al día.

Configurar AbuseIPDB

  1. Crea una cuenta en abuseipdb.com.
  2. Ve a la sección API > Create Key.
  3. Añade la key al archivo .env.

El script completo

#!/usr/bin/env python3
"""
ip_reputation.py - Verificar reputación de IPs en AbuseIPDB
Uso: python ip_reputation.py <ip_address>
     python ip_reputation.py --file ips.txt
"""

import sys
import os
import csv
import requests
from dotenv import load_dotenv
from datetime import datetime

load_dotenv()
ABUSE_API_KEY = os.getenv("ABUSEIPDB_API_KEY")
ABUSE_BASE_URL = "https://api.abuseipdb.com/api/v2"


def check_ip(ip: str, max_age_days: int = 90) -> dict:
    """Consulta la reputación de una IP en AbuseIPDB."""
    headers = {
        "Key": ABUSE_API_KEY,
        "Accept": "application/json"
    }
    params = {
        "ipAddress": ip,
        "maxAgeInDays": max_age_days,
        "verbose": ""
    }

    response = requests.get(
        f"{ABUSE_BASE_URL}/check",
        headers=headers,
        params=params,
        timeout=30
    )
    response.raise_for_status()
    data = response.json()["data"]

    return {
        "ip": data["ipAddress"],
        "abuse_score": data["abuseConfidenceScore"],
        "country": data.get("countryCode", "N/A"),
        "isp": data.get("isp", "N/A"),
        "domain": data.get("domain", "N/A"),
        "total_reports": data.get("totalReports", 0),
        "last_reported": data.get("lastReportedAt", "Never"),
        "is_tor": data.get("isTor", False),
        "is_whitelisted": data.get("isWhitelisted", False),
        "usage_type": data.get("usageType", "N/A"),
    }


def classify_risk(result: dict) -> str:
    """Clasifica el nivel de riesgo basándose en el abuse score."""
    score = result["abuse_score"]
    if score == 0:
        return "LIMPIO"
    elif score <= 25:
        return "BAJO"
    elif score <= 50:
        return "MEDIO"
    elif score <= 75:
        return "ALTO"
    else:
        return "CRITICO"


def print_result(result: dict) -> None:
    """Muestra el resultado formateado."""
    risk = classify_risk(result)
    score = result["abuse_score"]

    print(f"\n{'='*55}")
    print(f"  IP: {result['ip']}  |  Riesgo: {risk} ({score}%)")
    print(f"{'='*55}")
    print(f"  País:         {result['country']}")
    print(f"  ISP:          {result['isp']}")
    print(f"  Dominio:      {result['domain']}")
    print(f"  Uso:          {result['usage_type']}")
    print(f"  Reports:      {result['total_reports']}")
    print(f"  Último:       {result['last_reported']}")
    print(f"  Tor:          {'Sí' if result['is_tor'] else 'No'}")
    print(f"  Whitelist:    {'Sí' if result['is_whitelisted'] else 'No'}")
    print(f"{'='*55}\n")


def check_file(filepath: str) -> list[dict]:
    """Verifica una lista de IPs desde un archivo (una por línea)."""
    results = []
    with open(filepath, "r") as f:
        ips = [line.strip() for line in f if line.strip()]

    print(f"Verificando {len(ips)} IPs...")
    for i, ip in enumerate(ips, 1):
        try:
            result = check_ip(ip)
            risk = classify_risk(result)
            print(f"  [{i}/{len(ips)}] {ip:<20} Score: {result['abuse_score']:>3}%  {risk}")
            results.append(result)
        except Exception as e:
            print(f"  [{i}/{len(ips)}] {ip:<20} ERROR: {e}")
            results.append({"ip": ip, "error": str(e)})

    return results


def export_csv(results: list[dict], output: str = "ip_reputation.csv") -> None:
    """Exporta resultados a CSV."""
    fields = ["ip", "abuse_score", "country", "isp", "total_reports",
              "last_reported", "is_tor", "usage_type"]

    with open(output, "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fields, extrasaction="ignore")
        writer.writeheader()
        for r in results:
            if "error" not in r:
                writer.writerow(r)

    print(f"Resultados exportados a: {output}")


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Uso: python ip_reputation.py <ip>")
        print("     python ip_reputation.py --file ips.txt")
        sys.exit(1)

    if not ABUSE_API_KEY:
        print("Error: ABUSEIPDB_API_KEY no configurada en .env")
        sys.exit(1)

    if sys.argv[1] == "--file":
        results = check_file(sys.argv[2])
        export_csv(results)
    else:
        result = check_ip(sys.argv[1])
        print_result(result)

Funcionalidades

El script soporta dos modos de uso:

Consulta individual. Pasa una IP como argumento y obtiene el abuse score, país, ISP, número de reportes y clasificación de riesgo (LIMPIO, BAJO, MEDIO, ALTO, CRITICO).

Consulta masiva. Con --file ips.txt, procesa una lista de IPs (una por línea), muestra el progreso en tiempo real y exporta los resultados a CSV.

La clasificación de riesgo usa umbrales del abuse confidence score: 0% es limpio, 1-25% es bajo, 26-50% es medio, 51-75% es alto, y por encima del 75% es crítico. Estos umbrales son configurables según las necesidades del SOC.


Buenas prácticas para scripts SOC

Con los tres scripts funcionando, estas son las prácticas que separan un script de laboratorio de una herramienta operativa:

1. Nunca hardcodear secretos

Las API keys van en variables de entorno o en un gestor de secretos. Nunca en el código, ni siquiera en comentarios. Un git push accidental con una API key de VirusTotal Enterprise puede costar miles de euros.

2. Manejar errores de red

Las APIs fallan: timeouts, rate limits (HTTP 429), errores del servidor (5xx). Siempre usa timeout en requests y captura excepciones:

try:
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status()
except requests.exceptions.Timeout:
    print("Error: timeout en la consulta")
except requests.exceptions.HTTPError as e:
    if e.response.status_code == 429:
        print("Rate limit alcanzado. Espera 60 segundos.")
    else:
        print(f"Error HTTP: {e.response.status_code}")

3. Respetar rate limits

VirusTotal gratuito permite 4 consultas por minuto. Si procesas una lista de 100 hashes, añade pausas:

import time

for hash_value in hashes:
    result = check_hash(hash_value)
    time.sleep(16)  # 4 por minuto = 1 cada 15 segundos + margen

4. Logging en lugar de print

Para scripts que se ejecutan en cron o en pipelines, sustituye print() por logging:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("soc_scripts.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)
logger.info(f"Consultando hash: {hash_value}")

5. Validar inputs siempre

Nunca confíes en que el input es correcto. Valida antes de consultar:

import re

def is_valid_sha256(value: str) -> bool:
    return bool(re.match(r'^[a-fA-F0-9]{64}$', value))

def is_valid_ipv4(value: str) -> bool:
    try:
        ip = ipaddress.ip_address(value)
        return ip.version == 4
    except ValueError:
        return False

6. Documentar con docstrings

Cada función debe tener un docstring que explique qué hace, qué recibe y qué devuelve. El analista que use tu script dentro de seis meses (posiblemente tú mismo) te lo agradecerá.


Próximos pasos

Con estos tres scripts tienes las bases para automatizar el trabajo repetitivo del SOC. El siguiente artículo de la serie profundiza en el análisis de logs con Python: cómo parsear formatos complejos (Syslog, EVTX, JSON estructurado), usar Pandas para filtrar y correlacionar eventos masivos, y generar visualizaciones que revelen patrones ocultos en los datos.


Recursos

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.