Python para Análisis de Logs: Parsear, Filtrar y Correlacionar Eventos
Técnicas avanzadas de análisis de logs con Python: parsing de Syslog, JSON y EVTX, filtrado con Pandas, correlación de eventos entre múltiples fuentes, visualización con matplotlib y automatización de revisiones diarias.
El volumen de logs como desafío y oportunidad
Un SOC de tamaño medio genera entre 10 y 50 GB de logs al día. Firewalls, proxies, endpoints, servidores de correo, Active Directory, aplicaciones web. Cada fuente tiene su formato, su nivel de verbosidad y su propia manera de registrar eventos. El SIEM ingesta y correlaciona a escala, pero hay escenarios donde Python es más efectivo: investigaciones forenses puntuales, análisis de logs históricos exportados, y creación de herramientas personalizadas que el SIEM no soporta de serie.
Este artículo cubre las técnicas esenciales: parsear los formatos más comunes (Syslog, JSON, CSV, EVTX), filtrar con Pandas para encontrar anomalías, correlacionar eventos de múltiples fuentes, y generar visualizaciones que revelen patrones.
Instalar las dependencias
pip install pandas matplotlib python-evtx openpyxl
- pandas: manipulación y análisis de datos tabulares.
- matplotlib: gráficos y visualizaciones.
- python-evtx: parsing de archivos Windows Event Log (.evtx).
- openpyxl: exportar resultados a Excel.
Formatos de log: anatomía y parsing
Syslog (RFC 5424)
El formato más extendido en dispositivos de red, firewalls y servidores Linux. Una línea típica:
Jun 05 14:23:45 fw-prod-01 kernel: [UFW BLOCK] IN=eth0 OUT= MAC=00:1a:2b... SRC=203.0.113.42 DST=10.0.1.5 PROTO=TCP SPT=44521 DPT=22
El parseo con regex necesita capturar los campos relevantes:
import re
from datetime import datetime
SYSLOG_PATTERN = re.compile(
r'(?P<timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+'
r'(?P<hostname>[\w\-\.]+)\s+'
r'(?P<process>[\w\-\[\]]+):\s+'
r'(?P<message>.*)'
)
def parse_syslog_line(line: str) -> dict | None:
"""Parsea una línea de syslog y extrae campos clave."""
match = SYSLOG_PATTERN.match(line)
if not match:
return None
result = match.groupdict()
# Extraer IPs del mensaje (si existen)
ips = re.findall(r'(?:SRC|DST)=([\d\.]+)', result["message"])
if len(ips) >= 2:
result["src_ip"] = ips[0]
result["dst_ip"] = ips[1]
# Extraer puertos
ports = re.findall(r'(?:SPT|DPT)=(\d+)', result["message"])
if len(ports) >= 2:
result["src_port"] = int(ports[0])
result["dst_port"] = int(ports[1])
# Extraer protocolo
proto = re.search(r'PROTO=(\w+)', result["message"])
if proto:
result["protocol"] = proto.group(1)
return result
def parse_syslog_file(filepath: str) -> list[dict]:
"""Parsea un archivo de syslog completo."""
events = []
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
parsed = parse_syslog_line(line.strip())
if parsed:
parsed["line_number"] = line_num
events.append(parsed)
return events
JSON estructurado
Los SIEMs modernos y muchas aplicaciones emiten logs en JSON. Este formato es el más cómodo de trabajar:
import json
def parse_json_logs(filepath: str) -> list[dict]:
"""Parsea un archivo con una línea JSON por evento."""
events = []
with open(filepath, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
events.append(event)
except json.JSONDecodeError:
continue # Líneas malformadas se ignoran
return events
Para archivos JSON donde todo el contenido es un array:
def parse_json_array(filepath: str) -> list[dict]:
"""Parsea un archivo JSON que contiene un array de eventos."""
with open(filepath, "r") as f:
return json.load(f)
CSV
Exports de SIEM, feeds de IOCs y reportes suelen venir en CSV:
import csv
def parse_csv_logs(filepath: str) -> list[dict]:
"""Parsea un archivo CSV de logs."""
events = []
with open(filepath, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
events.append(dict(row))
return events
Windows EVTX
Los archivos .evtx contienen los Event Logs de Windows. La librería python-evtx permite parsearlos sin necesidad de un sistema Windows:
import Evtx.Evtx as evtx
import Evtx.Views as evtx_views
import xml.etree.ElementTree as ET
def parse_evtx(filepath: str, event_ids: list[int] = None) -> list[dict]:
"""
Parsea un archivo EVTX y extrae eventos.
Si event_ids se especifica, filtra solo esos Event IDs.
"""
events = []
with evtx.Evtx(filepath) as log:
for record in log.records():
try:
xml_str = record.xml()
root = ET.fromstring(xml_str)
# Namespace de Windows Event Log
ns = {"ev": "http://schemas.microsoft.com/win/2004/08/events/event"}
system = root.find("ev:System", ns)
event_id = int(system.find("ev:EventID", ns).text)
# Filtrar por Event ID si se especifica
if event_ids and event_id not in event_ids:
continue
event = {
"event_id": event_id,
"timestamp": system.find("ev:TimeCreated", ns).get("SystemTime"),
"computer": system.find("ev:Computer", ns).text,
"channel": system.find("ev:Channel", ns).text,
"provider": system.find("ev:Provider", ns).get("Name"),
}
# Extraer datos del EventData
event_data = root.find("ev:EventData", ns)
if event_data is not None:
for data in event_data.findall("ev:Data", ns):
name = data.get("Name", "unknown")
event[name] = data.text
events.append(event)
except Exception:
continue
return events
Event IDs críticos para seguridad en Windows:
| Event ID | Descripción | Relevancia |
|---|---|---|
| 4624 | Logon exitoso | Detectar logons sospechosos |
| 4625 | Logon fallido | Fuerza bruta, password spraying |
| 4648 | Logon con credenciales explícitas | Lateral movement |
| 4688 | Nuevo proceso creado | Ejecución de herramientas |
| 4720 | Cuenta de usuario creada | Persistencia |
| 4732 | Miembro añadido a grupo local | Escalada de privilegios |
| 7045 | Servicio instalado | Persistencia, malware |
| 1102 | Log de seguridad borrado | Anti-forensics |
Análisis con Pandas
Una vez parseados los logs, Pandas permite filtrar, agrupar y analizar millones de eventos con pocas líneas de código.
Cargar logs en un DataFrame
import pandas as pd
# Desde una lista de diccionarios (resultado del parsing)
events = parse_syslog_file("/var/log/firewall.log")
df = pd.DataFrame(events)
# Desde un CSV directamente
df = pd.read_csv("siem_export.csv", parse_dates=["timestamp"])
# Desde JSON lines
df = pd.read_json("events.jsonl", lines=True)
Filtrar por severidad, fuente y ventana temporal
# Convertir timestamp a datetime
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Filtrar eventos de las últimas 24 horas
from datetime import datetime, timedelta
cutoff = datetime.now() - timedelta(hours=24)
recent = df[df["timestamp"] >= cutoff]
# Filtrar por severidad
critical = df[df["severity"].isin(["CRITICAL", "HIGH"])]
# Filtrar por IP de origen
suspect = df[df["src_ip"] == "203.0.113.42"]
# Combinar filtros
brute_force = df[
(df["dst_port"] == 22) &
(df["message"].str.contains("BLOCK", na=False)) &
(df["timestamp"] >= cutoff)
]
Agrupar y contar
# Top 10 IPs origen con más conexiones bloqueadas
top_src = df[df["message"].str.contains("BLOCK", na=False)] \
.groupby("src_ip") \
.size() \
.sort_values(ascending=False) \
.head(10)
print(top_src)
# Eventos por hora (detectar picos)
df["hour"] = df["timestamp"].dt.hour
hourly = df.groupby("hour").size()
# Puertos destino más atacados
top_ports = df.groupby("dst_port").size().sort_values(ascending=False).head(10)
Detectar anomalías estadísticas
# Calcular baseline: conexiones por IP por hora
hourly_by_ip = df.groupby([df["timestamp"].dt.hour, "src_ip"]).size()
mean = hourly_by_ip.mean()
std = hourly_by_ip.std()
# IPs con actividad superior a 3 desviaciones estándar
threshold = mean + 3 * std
anomalies = hourly_by_ip[hourly_by_ip > threshold]
print(f"IPs anómalas: {anomalies.index.get_level_values('src_ip').unique().tolist()}")
Correlación entre múltiples fuentes
La correlación es donde Python supera a muchos SIEMs en flexibilidad. Combinar logs de firewall, proxy, endpoint y Active Directory revela patrones que ninguna fuente aislada muestra.
Ejemplo: detectar exfiltración de datos
La hipótesis: un endpoint interno se comunica con una IP externa que nunca había sido contactada, usando volúmenes de datos inusuales.
def correlate_exfiltration(
firewall_df: pd.DataFrame,
proxy_df: pd.DataFrame,
baseline_days: int = 30
) -> pd.DataFrame:
"""
Correlaciona logs de firewall y proxy para detectar
posible exfiltración de datos.
"""
# 1. IPs externas contactadas en las últimas 24h
cutoff_24h = datetime.now() - timedelta(hours=24)
recent_external = firewall_df[
(firewall_df["timestamp"] >= cutoff_24h) &
(firewall_df["dst_ip"].apply(lambda ip: not is_private(ip)))
]["dst_ip"].unique()
# 2. Baseline: IPs contactadas en los últimos 30 días
cutoff_baseline = datetime.now() - timedelta(days=baseline_days)
baseline_ips = firewall_df[
(firewall_df["timestamp"] >= cutoff_baseline) &
(firewall_df["timestamp"] < cutoff_24h)
]["dst_ip"].unique()
# 3. IPs nuevas (no vistas en el baseline)
new_ips = set(recent_external) - set(baseline_ips)
# 4. Filtrar por volumen alto (proxy logs)
suspicious = proxy_df[
(proxy_df["dst_ip"].isin(new_ips)) &
(proxy_df["bytes_sent"] > 10_000_000) # Mas de 10 MB enviados
]
return suspicious[["timestamp", "src_ip", "dst_ip",
"bytes_sent", "url", "user_agent"]]
def is_private(ip_str: str) -> bool:
"""Verifica si una IP es privada."""
import ipaddress
try:
return ipaddress.ip_address(ip_str).is_private
except ValueError:
return False
Ejemplo: correlacionar logon y ejecución de procesos
Detectar movimiento lateral: un logon seguido de ejecución de herramientas sospechosas en el mismo equipo.
def correlate_logon_execution(
logon_df: pd.DataFrame,
process_df: pd.DataFrame,
window_minutes: int = 5
) -> pd.DataFrame:
"""
Correlaciona logons (4624) con creación de procesos (4688)
dentro de una ventana temporal.
"""
suspicious_tools = [
"powershell.exe", "cmd.exe", "wmic.exe", "psexec.exe",
"net.exe", "nltest.exe", "mimikatz.exe", "procdump.exe",
"certutil.exe", "bitsadmin.exe", "mshta.exe", "rundll32.exe"
]
results = []
for _, logon in logon_df.iterrows():
# Buscar procesos sospechosos en la ventana temporal
window_start = logon["timestamp"]
window_end = window_start + timedelta(minutes=window_minutes)
matches = process_df[
(process_df["computer"] == logon["computer"]) &
(process_df["timestamp"] >= window_start) &
(process_df["timestamp"] <= window_end) &
(process_df["process_name"].str.lower().isin(suspicious_tools))
]
for _, proc in matches.iterrows():
results.append({
"logon_time": logon["timestamp"],
"logon_type": logon.get("LogonType", "N/A"),
"username": logon.get("TargetUserName", "N/A"),
"computer": logon["computer"],
"src_ip": logon.get("IpAddress", "N/A"),
"process_time": proc["timestamp"],
"process": proc["process_name"],
"command_line": proc.get("CommandLine", "N/A"),
"delta_seconds": (proc["timestamp"] - logon["timestamp"]).total_seconds()
})
return pd.DataFrame(results)
Visualización con matplotlib
Los gráficos transforman tablas de datos en patrones visibles. Un pico de actividad a las 3 AM, un puerto que recibe 10 veces más tráfico de lo normal, o una IP que aparece en múltiples fuentes son más evidentes en un gráfico que en una tabla.
Eventos por hora (detectar picos fuera de horario)
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
def plot_events_timeline(df: pd.DataFrame, title: str = "Eventos por hora"):
"""Gráfico de líneas con eventos agrupados por hora."""
hourly = df.set_index("timestamp").resample("1h").size()
fig, ax = plt.subplots(figsize=(14, 5))
ax.plot(hourly.index, hourly.values, color="#38bdf8", linewidth=1.5)
ax.fill_between(hourly.index, hourly.values, alpha=0.15, color="#38bdf8")
# Marcar horario laboral
ax.axvspan(
hourly.index[0].replace(hour=8),
hourly.index[0].replace(hour=18),
alpha=0.05, color="green", label="Horario laboral"
)
ax.set_title(title, fontsize=14, fontweight="bold")
ax.set_xlabel("Hora")
ax.set_ylabel("Cantidad de eventos")
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.legend()
plt.tight_layout()
plt.savefig("events_timeline.png", dpi=150)
plt.show()
Top IPs por volumen (barras horizontales)
def plot_top_ips(df: pd.DataFrame, column: str = "src_ip", top_n: int = 15):
"""Gráfico de barras horizontales con las IPs más frecuentes."""
top = df[column].value_counts().head(top_n)
fig, ax = plt.subplots(figsize=(10, 6))
colors = ["#fb7185" if v > top.mean() + 2 * top.std()
else "#38bdf8" for v in top.values]
bars = ax.barh(range(len(top)), top.values, color=colors)
ax.set_yticks(range(len(top)))
ax.set_yticklabels(top.index, fontsize=9)
ax.set_xlabel("Número de eventos")
ax.set_title(f"Top {top_n} IPs por frecuencia", fontsize=14, fontweight="bold")
ax.invert_yaxis()
# Anotar valores
for bar, val in zip(bars, top.values):
ax.text(val + max(top.values) * 0.01, bar.get_y() + bar.get_height() / 2,
str(val), va="center", fontsize=8)
plt.tight_layout()
plt.savefig("top_ips.png", dpi=150)
plt.show()
Heatmap de actividad (día x hora)
import numpy as np
def plot_activity_heatmap(df: pd.DataFrame):
"""Heatmap de actividad por día de la semana y hora."""
df["day"] = df["timestamp"].dt.day_name()
df["hour"] = df["timestamp"].dt.hour
days_order = ["Monday", "Tuesday", "Wednesday", "Thursday",
"Friday", "Saturday", "Sunday"]
pivot = df.groupby(["day", "hour"]).size().unstack(fill_value=0)
pivot = pivot.reindex(days_order)
fig, ax = plt.subplots(figsize=(14, 5))
im = ax.imshow(pivot.values, aspect="auto", cmap="YlOrRd")
ax.set_yticks(range(len(days_order)))
ax.set_yticklabels(days_order)
ax.set_xticks(range(24))
ax.set_xticklabels([f"{h:02d}" for h in range(24)])
ax.set_xlabel("Hora")
ax.set_title("Mapa de calor: actividad por día y hora", fontsize=14, fontweight="bold")
plt.colorbar(im, ax=ax, label="Eventos")
plt.tight_layout()
plt.savefig("activity_heatmap.png", dpi=150)
plt.show()
Parseo de EVTX para Windows forensics
Con la función parse_evtx definida antes, podemos crear análisis específicos para investigaciones en entornos Windows.
Análisis de logons fallidos (brute force detection)
def analyze_failed_logons(evtx_path: str) -> pd.DataFrame:
"""
Analiza Event ID 4625 (logon fallido) para detectar
intentos de fuerza bruta o password spraying.
"""
events = parse_evtx(evtx_path, event_ids=[4625])
df = pd.DataFrame(events)
if df.empty:
print("No se encontraron logons fallidos.")
return df
df["timestamp"] = pd.to_datetime(df["timestamp"])
# Agrupar por IP de origen y cuenta objetivo
by_source = df.groupby("IpAddress").agg(
attempts=("event_id", "count"),
unique_accounts=("TargetUserName", "nunique"),
accounts=("TargetUserName", lambda x: list(x.unique())),
first_attempt=("timestamp", "min"),
last_attempt=("timestamp", "max"),
).sort_values("attempts", ascending=False)
# Clasificar el tipo de ataque
def classify_attack(row):
if row["unique_accounts"] > 5 and row["attempts"] > 20:
return "PASSWORD_SPRAYING"
elif row["unique_accounts"] <= 2 and row["attempts"] > 10:
return "BRUTE_FORCE"
else:
return "LOW_VOLUME"
by_source["attack_type"] = by_source.apply(classify_attack, axis=1)
return by_source
Automatizar la revisión diaria de logs
El script final combina todas las técnicas en una revisión automatizada que se ejecuta cada mañana:
#!/usr/bin/env python3
"""
daily_log_review.py - Revisión diaria automatizada de logs
Ejecutar con cron: 0 7 * * * /path/to/.venv/bin/python daily_log_review.py
"""
import pandas as pd
from datetime import datetime, timedelta
import json
def daily_review(
firewall_log: str,
proxy_log: str = None,
output_dir: str = "./reports"
) -> dict:
"""Genera un informe diario de seguridad."""
report = {
"date": datetime.now().strftime("%Y-%m-%d"),
"generated_at": datetime.now().isoformat(),
"findings": []
}
# 1. Parsear logs del firewall
events = parse_syslog_file(firewall_log)
df = pd.DataFrame(events)
df["timestamp"] = pd.to_datetime(df["timestamp"], format="mixed")
# Filtrar últimas 24h
cutoff = datetime.now() - timedelta(hours=24)
df_24h = df[df["timestamp"] >= cutoff]
report["total_events_24h"] = len(df_24h)
# 2. Top IPs bloqueadas
if "src_ip" in df_24h.columns:
blocked = df_24h[df_24h["message"].str.contains("BLOCK", na=False)]
top_blocked = blocked["src_ip"].value_counts().head(10).to_dict()
report["top_blocked_ips"] = top_blocked
if len(blocked) > 0:
report["findings"].append({
"type": "blocked_connections",
"severity": "INFO",
"detail": f"{len(blocked)} conexiones bloqueadas de "
f"{blocked['src_ip'].nunique()} IPs únicas"
})
# 3. Conexiones a puertos sensibles
sensitive_ports = [22, 23, 3389, 445, 1433, 3306, 5432]
if "dst_port" in df_24h.columns:
sensitive = df_24h[df_24h["dst_port"].isin(sensitive_ports)]
if len(sensitive) > 0:
report["findings"].append({
"type": "sensitive_port_access",
"severity": "MEDIUM",
"detail": f"{len(sensitive)} conexiones a puertos sensibles",
"ports": sensitive["dst_port"].value_counts().to_dict()
})
# 4. Actividad fuera de horario (22:00 - 06:00)
if "timestamp" in df_24h.columns:
off_hours = df_24h[
(df_24h["timestamp"].dt.hour >= 22) |
(df_24h["timestamp"].dt.hour < 6)
]
if len(off_hours) > len(df_24h) * 0.3: # Mas del 30%
report["findings"].append({
"type": "off_hours_activity",
"severity": "HIGH",
"detail": f"{len(off_hours)} eventos fuera de horario "
f"({len(off_hours)/len(df_24h)*100:.1f}% del total)"
})
# 5. Guardar informe
report_path = f"{output_dir}/daily_review_{report['date']}.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2, default=str)
return report
if __name__ == "__main__":
report = daily_review("/var/log/firewall.log")
print(f"Informe generado: {report['date']}")
print(f"Total eventos 24h: {report['total_events_24h']}")
print(f"Hallazgos: {len(report['findings'])}")
for f in report["findings"]:
print(f" [{f['severity']}] {f['detail']}")
Configura un cron job para ejecutarlo cada mañana a las 7:00:
crontab -e
# Añadir:
0 7 * * * /home/analyst/soc-scripts/.venv/bin/python /home/analyst/soc-scripts/daily_log_review.py
Próximos pasos
Con las técnicas de parsing, filtrado, correlación y visualización dominadas, el siguiente artículo conecta Python con las APIs de threat intelligence más importantes: VirusTotal v3, Shodan, AbuseIPDB y OTX. Aprenderás a construir un script de enrichment multi-fuente que recibe una lista de IOCs y devuelve un informe consolidado con datos de reputación, geolocalización, y contexto de amenaza.
Recursos
- Pandas Documentation. Referencia completa de la librería.
- python-evtx en GitHub. Librería para parsear Windows Event Logs.
- Matplotlib Gallery. Ejemplos de gráficos listos para adaptar.
- Windows Security Event IDs (SANS). Catálogo de Event IDs relevantes para seguridad.
- Sigma Rules Repository. Reglas de detección que mapean directamente a Event IDs de Windows.
- Polars Documentation. Alternativa a Pandas con mejor rendimiento en datasets grandes.
Preguntas frecuentes
Libros recomendados
Artículos relacionados
Python para SOC Analysts: Fundamentos y Primeros Scripts de Seguridad
Python y APIs de Threat Intelligence: VirusTotal, Shodan, OTX y AbuseIPDB
SOAR: Qué Es, Arquitectura y Diseño de Playbooks de Automatización
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.