IntermedioPythonSOClogsSIEMPandasEVTX

Python para Análisis de Logs: Parsear, Filtrar y Correlacionar Eventos

Técnicas avanzadas de análisis de logs con Python: parsing de Syslog, JSON y EVTX, filtrado con Pandas, correlación de eventos entre múltiples fuentes, visualización con matplotlib y automatización de revisiones diarias.

MalwareIntel Research··12 min lectura

El volumen de logs como desafío y oportunidad

Un SOC de tamaño medio genera entre 10 y 50 GB de logs al día. Firewalls, proxies, endpoints, servidores de correo, Active Directory, aplicaciones web. Cada fuente tiene su formato, su nivel de verbosidad y su propia manera de registrar eventos. El SIEM ingesta y correlaciona a escala, pero hay escenarios donde Python es más efectivo: investigaciones forenses puntuales, análisis de logs históricos exportados, y creación de herramientas personalizadas que el SIEM no soporta de serie.

Este artículo cubre las técnicas esenciales: parsear los formatos más comunes (Syslog, JSON, CSV, EVTX), filtrar con Pandas para encontrar anomalías, correlacionar eventos de múltiples fuentes, y generar visualizaciones que revelen patrones.


Instalar las dependencias

pip install pandas matplotlib python-evtx openpyxl
  • pandas: manipulación y análisis de datos tabulares.
  • matplotlib: gráficos y visualizaciones.
  • python-evtx: parsing de archivos Windows Event Log (.evtx).
  • openpyxl: exportar resultados a Excel.

Formatos de log: anatomía y parsing

Syslog (RFC 5424)

El formato más extendido en dispositivos de red, firewalls y servidores Linux. Una línea típica:

Jun 05 14:23:45 fw-prod-01 kernel: [UFW BLOCK] IN=eth0 OUT= MAC=00:1a:2b... SRC=203.0.113.42 DST=10.0.1.5 PROTO=TCP SPT=44521 DPT=22

El parseo con regex necesita capturar los campos relevantes:

import re
from datetime import datetime

SYSLOG_PATTERN = re.compile(
    r'(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+'
    r'(?P<hostname>[\w\-\.]+)\s+'
    r'(?P<process>[\w\-\[\]]+):\s+'
    r'(?P<message>.*)'
)

def parse_syslog_line(line: str) -> dict | None:
    """Parsea una línea de syslog y extrae campos clave."""
    match = SYSLOG_PATTERN.match(line)
    if not match:
        return None

    result = match.groupdict()

    # Extraer IPs del mensaje (si existen)
    ips = re.findall(r'(?:SRC|DST)=([\d\.]+)', result["message"])
    if len(ips) >= 2:
        result["src_ip"] = ips[0]
        result["dst_ip"] = ips[1]

    # Extraer puertos
    ports = re.findall(r'(?:SPT|DPT)=(\d+)', result["message"])
    if len(ports) >= 2:
        result["src_port"] = int(ports[0])
        result["dst_port"] = int(ports[1])

    # Extraer protocolo
    proto = re.search(r'PROTO=(\w+)', result["message"])
    if proto:
        result["protocol"] = proto.group(1)

    return result


def parse_syslog_file(filepath: str) -> list[dict]:
    """Parsea un archivo de syslog completo."""
    events = []
    with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
        for line_num, line in enumerate(f, 1):
            parsed = parse_syslog_line(line.strip())
            if parsed:
                parsed["line_number"] = line_num
                events.append(parsed)
    return events

JSON estructurado

Los SIEMs modernos y muchas aplicaciones emiten logs en JSON. Este formato es el más cómodo de trabajar:

import json

def parse_json_logs(filepath: str) -> list[dict]:
    """Parsea un archivo con una línea JSON por evento."""
    events = []
    with open(filepath, "r") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                event = json.loads(line)
                events.append(event)
            except json.JSONDecodeError:
                continue  # Líneas malformadas se ignoran
    return events

Para archivos JSON donde todo el contenido es un array:

def parse_json_array(filepath: str) -> list[dict]:
    """Parsea un archivo JSON que contiene un array de eventos."""
    with open(filepath, "r") as f:
        return json.load(f)

CSV

Exports de SIEM, feeds de IOCs y reportes suelen venir en CSV:

import csv

def parse_csv_logs(filepath: str) -> list[dict]:
    """Parsea un archivo CSV de logs."""
    events = []
    with open(filepath, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f)
        for row in reader:
            events.append(dict(row))
    return events

Windows EVTX

Los archivos .evtx contienen los Event Logs de Windows. La librería python-evtx permite parsearlos sin necesidad de un sistema Windows:

import Evtx.Evtx as evtx
import Evtx.Views as evtx_views
import xml.etree.ElementTree as ET


def parse_evtx(filepath: str, event_ids: list[int] = None) -> list[dict]:
    """
    Parsea un archivo EVTX y extrae eventos.
    Si event_ids se especifica, filtra solo esos Event IDs.
    """
    events = []

    with evtx.Evtx(filepath) as log:
        for record in log.records():
            try:
                xml_str = record.xml()
                root = ET.fromstring(xml_str)

                # Namespace de Windows Event Log
                ns = {"ev": "http://schemas.microsoft.com/win/2004/08/events/event"}

                system = root.find("ev:System", ns)
                event_id = int(system.find("ev:EventID", ns).text)

                # Filtrar por Event ID si se especifica
                if event_ids and event_id not in event_ids:
                    continue

                event = {
                    "event_id": event_id,
                    "timestamp": system.find("ev:TimeCreated", ns).get("SystemTime"),
                    "computer": system.find("ev:Computer", ns).text,
                    "channel": system.find("ev:Channel", ns).text,
                    "provider": system.find("ev:Provider", ns).get("Name"),
                }

                # Extraer datos del EventData
                event_data = root.find("ev:EventData", ns)
                if event_data is not None:
                    for data in event_data.findall("ev:Data", ns):
                        name = data.get("Name", "unknown")
                        event[name] = data.text

                events.append(event)

            except Exception:
                continue

    return events

Event IDs críticos para seguridad en Windows:

Event IDDescripciónRelevancia
4624Logon exitosoDetectar logons sospechosos
4625Logon fallidoFuerza bruta, password spraying
4648Logon con credenciales explícitasLateral movement
4688Nuevo proceso creadoEjecución de herramientas
4720Cuenta de usuario creadaPersistencia
4732Miembro añadido a grupo localEscalada de privilegios
7045Servicio instaladoPersistencia, malware
1102Log de seguridad borradoAnti-forensics

Análisis con Pandas

Una vez parseados los logs, Pandas permite filtrar, agrupar y analizar millones de eventos con pocas líneas de código.

Cargar logs en un DataFrame

import pandas as pd

# Desde una lista de diccionarios (resultado del parsing)
events = parse_syslog_file("/var/log/firewall.log")
df = pd.DataFrame(events)

# Desde un CSV directamente
df = pd.read_csv("siem_export.csv", parse_dates=["timestamp"])

# Desde JSON lines
df = pd.read_json("events.jsonl", lines=True)

Filtrar por severidad, fuente y ventana temporal

# Convertir timestamp a datetime
df["timestamp"] = pd.to_datetime(df["timestamp"])

# Filtrar eventos de las últimas 24 horas
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=24)
recent = df[df["timestamp"] >= cutoff]

# Filtrar por severidad
critical = df[df["severity"].isin(["CRITICAL", "HIGH"])]

# Filtrar por IP de origen
suspect = df[df["src_ip"] == "203.0.113.42"]

# Combinar filtros
brute_force = df[
    (df["dst_port"] == 22) &
    (df["message"].str.contains("BLOCK", na=False)) &
    (df["timestamp"] >= cutoff)
]

Agrupar y contar

# Top 10 IPs origen con más conexiones bloqueadas
top_src = df[df["message"].str.contains("BLOCK", na=False)] \
    .groupby("src_ip") \
    .size() \
    .sort_values(ascending=False) \
    .head(10)
print(top_src)

# Eventos por hora (detectar picos)
df["hour"] = df["timestamp"].dt.hour
hourly = df.groupby("hour").size()

# Puertos destino más atacados
top_ports = df.groupby("dst_port").size().sort_values(ascending=False).head(10)

Detectar anomalías estadísticas

# Calcular baseline: conexiones por IP por hora
hourly_by_ip = df.groupby([df["timestamp"].dt.hour, "src_ip"]).size()
mean = hourly_by_ip.mean()
std = hourly_by_ip.std()

# IPs con actividad superior a 3 desviaciones estándar
threshold = mean + 3 * std
anomalies = hourly_by_ip[hourly_by_ip > threshold]
print(f"IPs anómalas: {anomalies.index.get_level_values('src_ip').unique().tolist()}")

Correlación entre múltiples fuentes

La correlación es donde Python supera a muchos SIEMs en flexibilidad. Combinar logs de firewall, proxy, endpoint y Active Directory revela patrones que ninguna fuente aislada muestra.

Ejemplo: detectar exfiltración de datos

La hipótesis: un endpoint interno se comunica con una IP externa que nunca había sido contactada, usando volúmenes de datos inusuales.

def correlate_exfiltration(
    firewall_df: pd.DataFrame,
    proxy_df: pd.DataFrame,
    baseline_days: int = 30
) -> pd.DataFrame:
    """
    Correlaciona logs de firewall y proxy para detectar
    posible exfiltración de datos.
    """
    # 1. IPs externas contactadas en las últimas 24h
    cutoff_24h = datetime.now() - timedelta(hours=24)
    recent_external = firewall_df[
        (firewall_df["timestamp"] >= cutoff_24h) &
        (firewall_df["dst_ip"].apply(lambda ip: not is_private(ip)))
    ]["dst_ip"].unique()

    # 2. Baseline: IPs contactadas en los últimos 30 días
    cutoff_baseline = datetime.now() - timedelta(days=baseline_days)
    baseline_ips = firewall_df[
        (firewall_df["timestamp"] >= cutoff_baseline) &
        (firewall_df["timestamp"] < cutoff_24h)
    ]["dst_ip"].unique()

    # 3. IPs nuevas (no vistas en el baseline)
    new_ips = set(recent_external) - set(baseline_ips)

    # 4. Filtrar por volumen alto (proxy logs)
    suspicious = proxy_df[
        (proxy_df["dst_ip"].isin(new_ips)) &
        (proxy_df["bytes_sent"] > 10_000_000)  # Mas de 10 MB enviados
    ]

    return suspicious[["timestamp", "src_ip", "dst_ip",
                        "bytes_sent", "url", "user_agent"]]


def is_private(ip_str: str) -> bool:
    """Verifica si una IP es privada."""
    import ipaddress
    try:
        return ipaddress.ip_address(ip_str).is_private
    except ValueError:
        return False

Ejemplo: correlacionar logon y ejecución de procesos

Detectar movimiento lateral: un logon seguido de ejecución de herramientas sospechosas en el mismo equipo.

def correlate_logon_execution(
    logon_df: pd.DataFrame,
    process_df: pd.DataFrame,
    window_minutes: int = 5
) -> pd.DataFrame:
    """
    Correlaciona logons (4624) con creación de procesos (4688)
    dentro de una ventana temporal.
    """
    suspicious_tools = [
        "powershell.exe", "cmd.exe", "wmic.exe", "psexec.exe",
        "net.exe", "nltest.exe", "mimikatz.exe", "procdump.exe",
        "certutil.exe", "bitsadmin.exe", "mshta.exe", "rundll32.exe"
    ]

    results = []

    for _, logon in logon_df.iterrows():
        # Buscar procesos sospechosos en la ventana temporal
        window_start = logon["timestamp"]
        window_end = window_start + timedelta(minutes=window_minutes)

        matches = process_df[
            (process_df["computer"] == logon["computer"]) &
            (process_df["timestamp"] >= window_start) &
            (process_df["timestamp"] <= window_end) &
            (process_df["process_name"].str.lower().isin(suspicious_tools))
        ]

        for _, proc in matches.iterrows():
            results.append({
                "logon_time": logon["timestamp"],
                "logon_type": logon.get("LogonType", "N/A"),
                "username": logon.get("TargetUserName", "N/A"),
                "computer": logon["computer"],
                "src_ip": logon.get("IpAddress", "N/A"),
                "process_time": proc["timestamp"],
                "process": proc["process_name"],
                "command_line": proc.get("CommandLine", "N/A"),
                "delta_seconds": (proc["timestamp"] - logon["timestamp"]).total_seconds()
            })

    return pd.DataFrame(results)

Visualización con matplotlib

Los gráficos transforman tablas de datos en patrones visibles. Un pico de actividad a las 3 AM, un puerto que recibe 10 veces más tráfico de lo normal, o una IP que aparece en múltiples fuentes son más evidentes en un gráfico que en una tabla.

Eventos por hora (detectar picos fuera de horario)

import matplotlib.pyplot as plt
import matplotlib.dates as mdates

def plot_events_timeline(df: pd.DataFrame, title: str = "Eventos por hora"):
    """Gráfico de líneas con eventos agrupados por hora."""
    hourly = df.set_index("timestamp").resample("1h").size()

    fig, ax = plt.subplots(figsize=(14, 5))
    ax.plot(hourly.index, hourly.values, color="#38bdf8", linewidth=1.5)
    ax.fill_between(hourly.index, hourly.values, alpha=0.15, color="#38bdf8")

    # Marcar horario laboral
    ax.axvspan(
        hourly.index[0].replace(hour=8),
        hourly.index[0].replace(hour=18),
        alpha=0.05, color="green", label="Horario laboral"
    )

    ax.set_title(title, fontsize=14, fontweight="bold")
    ax.set_xlabel("Hora")
    ax.set_ylabel("Cantidad de eventos")
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
    ax.legend()
    plt.tight_layout()
    plt.savefig("events_timeline.png", dpi=150)
    plt.show()

Top IPs por volumen (barras horizontales)

def plot_top_ips(df: pd.DataFrame, column: str = "src_ip", top_n: int = 15):
    """Gráfico de barras horizontales con las IPs más frecuentes."""
    top = df[column].value_counts().head(top_n)

    fig, ax = plt.subplots(figsize=(10, 6))
    colors = ["#fb7185" if v > top.mean() + 2 * top.std()
              else "#38bdf8" for v in top.values]

    bars = ax.barh(range(len(top)), top.values, color=colors)
    ax.set_yticks(range(len(top)))
    ax.set_yticklabels(top.index, fontsize=9)
    ax.set_xlabel("Número de eventos")
    ax.set_title(f"Top {top_n} IPs por frecuencia", fontsize=14, fontweight="bold")
    ax.invert_yaxis()

    # Anotar valores
    for bar, val in zip(bars, top.values):
        ax.text(val + max(top.values) * 0.01, bar.get_y() + bar.get_height() / 2,
                str(val), va="center", fontsize=8)

    plt.tight_layout()
    plt.savefig("top_ips.png", dpi=150)
    plt.show()

Heatmap de actividad (día x hora)

import numpy as np

def plot_activity_heatmap(df: pd.DataFrame):
    """Heatmap de actividad por día de la semana y hora."""
    df["day"] = df["timestamp"].dt.day_name()
    df["hour"] = df["timestamp"].dt.hour

    days_order = ["Monday", "Tuesday", "Wednesday", "Thursday",
                  "Friday", "Saturday", "Sunday"]
    pivot = df.groupby(["day", "hour"]).size().unstack(fill_value=0)
    pivot = pivot.reindex(days_order)

    fig, ax = plt.subplots(figsize=(14, 5))
    im = ax.imshow(pivot.values, aspect="auto", cmap="YlOrRd")

    ax.set_yticks(range(len(days_order)))
    ax.set_yticklabels(days_order)
    ax.set_xticks(range(24))
    ax.set_xticklabels([f"{h:02d}" for h in range(24)])
    ax.set_xlabel("Hora")
    ax.set_title("Mapa de calor: actividad por día y hora", fontsize=14, fontweight="bold")

    plt.colorbar(im, ax=ax, label="Eventos")
    plt.tight_layout()
    plt.savefig("activity_heatmap.png", dpi=150)
    plt.show()

Parseo de EVTX para Windows forensics

Con la función parse_evtx definida antes, podemos crear análisis específicos para investigaciones en entornos Windows.

Análisis de logons fallidos (brute force detection)

def analyze_failed_logons(evtx_path: str) -> pd.DataFrame:
    """
    Analiza Event ID 4625 (logon fallido) para detectar
    intentos de fuerza bruta o password spraying.
    """
    events = parse_evtx(evtx_path, event_ids=[4625])
    df = pd.DataFrame(events)

    if df.empty:
        print("No se encontraron logons fallidos.")
        return df

    df["timestamp"] = pd.to_datetime(df["timestamp"])

    # Agrupar por IP de origen y cuenta objetivo
    by_source = df.groupby("IpAddress").agg(
        attempts=("event_id", "count"),
        unique_accounts=("TargetUserName", "nunique"),
        accounts=("TargetUserName", lambda x: list(x.unique())),
        first_attempt=("timestamp", "min"),
        last_attempt=("timestamp", "max"),
    ).sort_values("attempts", ascending=False)

    # Clasificar el tipo de ataque
    def classify_attack(row):
        if row["unique_accounts"] > 5 and row["attempts"] > 20:
            return "PASSWORD_SPRAYING"
        elif row["unique_accounts"] <= 2 and row["attempts"] > 10:
            return "BRUTE_FORCE"
        else:
            return "LOW_VOLUME"

    by_source["attack_type"] = by_source.apply(classify_attack, axis=1)

    return by_source

Automatizar la revisión diaria de logs

El script final combina todas las técnicas en una revisión automatizada que se ejecuta cada mañana:

#!/usr/bin/env python3
"""
daily_log_review.py - Revisión diaria automatizada de logs
Ejecutar con cron: 0 7 * * * /path/to/.venv/bin/python daily_log_review.py
"""

import pandas as pd
from datetime import datetime, timedelta
import json


def daily_review(
    firewall_log: str,
    proxy_log: str = None,
    output_dir: str = "./reports"
) -> dict:
    """Genera un informe diario de seguridad."""
    report = {
        "date": datetime.now().strftime("%Y-%m-%d"),
        "generated_at": datetime.now().isoformat(),
        "findings": []
    }

    # 1. Parsear logs del firewall
    events = parse_syslog_file(firewall_log)
    df = pd.DataFrame(events)
    df["timestamp"] = pd.to_datetime(df["timestamp"], format="mixed")

    # Filtrar últimas 24h
    cutoff = datetime.now() - timedelta(hours=24)
    df_24h = df[df["timestamp"] >= cutoff]

    report["total_events_24h"] = len(df_24h)

    # 2. Top IPs bloqueadas
    if "src_ip" in df_24h.columns:
        blocked = df_24h[df_24h["message"].str.contains("BLOCK", na=False)]
        top_blocked = blocked["src_ip"].value_counts().head(10).to_dict()
        report["top_blocked_ips"] = top_blocked

        if len(blocked) > 0:
            report["findings"].append({
                "type": "blocked_connections",
                "severity": "INFO",
                "detail": f"{len(blocked)} conexiones bloqueadas de "
                          f"{blocked['src_ip'].nunique()} IPs únicas"
            })

    # 3. Conexiones a puertos sensibles
    sensitive_ports = [22, 23, 3389, 445, 1433, 3306, 5432]
    if "dst_port" in df_24h.columns:
        sensitive = df_24h[df_24h["dst_port"].isin(sensitive_ports)]
        if len(sensitive) > 0:
            report["findings"].append({
                "type": "sensitive_port_access",
                "severity": "MEDIUM",
                "detail": f"{len(sensitive)} conexiones a puertos sensibles",
                "ports": sensitive["dst_port"].value_counts().to_dict()
            })

    # 4. Actividad fuera de horario (22:00 - 06:00)
    if "timestamp" in df_24h.columns:
        off_hours = df_24h[
            (df_24h["timestamp"].dt.hour >= 22) |
            (df_24h["timestamp"].dt.hour < 6)
        ]
        if len(off_hours) > len(df_24h) * 0.3:  # Mas del 30%
            report["findings"].append({
                "type": "off_hours_activity",
                "severity": "HIGH",
                "detail": f"{len(off_hours)} eventos fuera de horario "
                          f"({len(off_hours)/len(df_24h)*100:.1f}% del total)"
            })

    # 5. Guardar informe
    report_path = f"{output_dir}/daily_review_{report['date']}.json"
    with open(report_path, "w") as f:
        json.dump(report, f, indent=2, default=str)

    return report


if __name__ == "__main__":
    report = daily_review("/var/log/firewall.log")
    print(f"Informe generado: {report['date']}")
    print(f"Total eventos 24h: {report['total_events_24h']}")
    print(f"Hallazgos: {len(report['findings'])}")
    for f in report["findings"]:
        print(f"  [{f['severity']}] {f['detail']}")

Configura un cron job para ejecutarlo cada mañana a las 7:00:

crontab -e
# Añadir:
0 7 * * * /home/analyst/soc-scripts/.venv/bin/python /home/analyst/soc-scripts/daily_log_review.py

Próximos pasos

Con las técnicas de parsing, filtrado, correlación y visualización dominadas, el siguiente artículo conecta Python con las APIs de threat intelligence más importantes: VirusTotal v3, Shodan, AbuseIPDB y OTX. Aprenderás a construir un script de enrichment multi-fuente que recibe una lista de IOCs y devuelve un informe consolidado con datos de reputación, geolocalización, y contexto de amenaza.


Recursos

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.