IntermedioMachine LearningPythonscikit-learnclasificaciónmalware detection

Python para Clasificación de Malware: Primer Modelo con scikit-learn

Guía práctica para construir tu primer clasificador de malware con Python y scikit-learn. Desde la carga de datasets hasta la evaluación del modelo con métricas reales de detección, usando Random Forest y features estáticas de binarios PE.

MalwareIntel Research··13 min lectura
Serie: AI/ML para Malware — Parte 9

Por qué Python domina el ML aplicado a malware

Python no es el lenguaje más rápido. Ni el más seguro para manejar binarios maliciosos. Pero tiene algo que ningún otro lenguaje iguala en el contexto de ML para ciberseguridad: un ecosistema de librerías maduro, documentado y con una comunidad activa que publica modelos, datasets y papers con código reproducible.

scikit-learn, pandas, numpy, pefile, LIEF, angr, capstone. Cada pieza del pipeline de clasificación de malware tiene una librería Python probada en producción por miles de investigadores. No necesitas reinventar nada para construir tu primer modelo.

Este artículo te lleva desde cero hasta un clasificador funcional. Sin atajos teóricos: código real, datos reales, métricas que importan.

El pipeline completo de clasificación

Un clasificador de malware no es un script que entrenas una vez. Es un pipeline con cinco fases, cada una con sus propios retos:

Recolección → Extracción de features → Preprocesamiento → Entrenamiento → Evaluación
     ↑                                                                         |
     └─────────────────── Retroalimentación ──────────────────────────────────┘

Fase 1: Recolección de datos. Necesitas binarios etiquetados como benignos o maliciosos. Los datasets públicos como EMBER, SOREL-20M y BODMAS resuelven este paso sin que toques un solo binario real.

Fase 2: Extracción de features. Convertir un binario PE en un vector numérico que el modelo pueda procesar. Headers, imports, secciones, entropía, strings, tamaños. Cada feature captura un aspecto del binario.

Fase 3: Preprocesamiento. Normalización, manejo de valores faltantes, encoding de features categóricas, split train/test con estratificación.

Fase 4: Entrenamiento. Selección de algoritmo, tuning de hiperparámetros, validación cruzada.

Fase 5: Evaluación. Métricas específicas de detección de malware, no solo accuracy genérica.

Preparando el entorno

# requirements.txt para clasificación de malware
scikit-learn>=1.4.0
pandas>=2.1.0
numpy>=1.26.0
pefile>=2023.2.7
lief>=0.14.0
matplotlib>=3.8.0
seaborn>=0.13.0
joblib>=1.3.0

La instalación es directa:

python -m venv malware-ml
source malware-ml/bin/activate
pip install -r requirements.txt

Nota de seguridad: si trabajas con binarios reales (no datasets preprocesados), hazlo siempre en una máquina virtual aislada sin conexión a red. Nunca en tu máquina de trabajo.

Cargando el dataset EMBER

EMBER (Endgame Malware BEnchmark for Research) es el dataset de referencia para clasificación de malware con ML. Contiene features preprocesadas de 1.1 millones de binarios PE, etiquetados como benignos, maliciosos o sin etiquetar.

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

# EMBER proporciona features ya extraídas en formato vectorizado
# Descarga desde: https://github.com/elastic/ember
X_train = np.load("ember2018/X_train.npy")
y_train = np.load("ember2018/y_train.npy")
X_test = np.load("ember2018/X_test.npy")
y_test = np.load("ember2018/y_test.npy")

# Filtrar muestras sin etiquetar (label == -1)
train_mask = y_train != -1
X_train = X_train[train_mask]
y_train = y_train[train_mask]

test_mask = y_test != -1
X_test = X_test[test_mask]
y_test = y_test[test_mask]

print(f"Train: {X_train.shape[0]} muestras, {X_train.shape[1]} features")
print(f"Test: {X_test.shape[0]} muestras")
print(f"Distribución train - Benigno: {(y_train==0).sum()}, Malicioso: {(y_train==1).sum()}")

EMBER incluye 2.381 features agrupadas en nueve categorías: información general del PE, headers, imports, exports, secciones, histograma de bytes, histograma de entropía, strings y datos del propio fichero. No necesitas extraerlas tú mismo para empezar.

Extracción manual de features (para datasets propios)

Si trabajas con tus propios binarios, necesitas un extractor. Aquí está la versión simplificada que captura las features más discriminativas:

import pefile
import math
import hashlib
from collections import Counter

class PEFeatureExtractor:
    """Extrae features estáticas de un binario PE para clasificación."""
    
    def __init__(self, filepath: str):
        self.filepath = filepath
        with open(filepath, "rb") as f:
            self.raw_bytes = f.read()
        self.pe = pefile.PE(filepath)
    
    def extract_all(self) -> dict:
        """Retorna diccionario con todas las features extraídas."""
        features = {}
        features.update(self._general_info())
        features.update(self._header_features())
        features.update(self._section_features())
        features.update(self._import_features())
        features.update(self._entropy_features())
        return features
    
    def _general_info(self) -> dict:
        return {
            "file_size": len(self.raw_bytes),
            "has_debug": int(hasattr(self.pe, "DIRECTORY_ENTRY_DEBUG")),
            "has_tls": int(hasattr(self.pe, "DIRECTORY_ENTRY_TLS")),
            "has_resources": int(hasattr(self.pe, "DIRECTORY_ENTRY_RESOURCE")),
            "has_relocations": int(hasattr(self.pe, "DIRECTORY_ENTRY_BASERELOC")),
            "has_signature": int(
                self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[4].Size > 0
            ),
        }
    
    def _header_features(self) -> dict:
        return {
            "machine": self.pe.FILE_HEADER.Machine,
            "num_sections": self.pe.FILE_HEADER.NumberOfSections,
            "timestamp": self.pe.FILE_HEADER.TimeDateStamp,
            "characteristics": self.pe.FILE_HEADER.Characteristics,
            "optional_magic": self.pe.OPTIONAL_HEADER.Magic,
            "size_of_code": self.pe.OPTIONAL_HEADER.SizeOfCode,
            "size_of_initialized_data": self.pe.OPTIONAL_HEADER.SizeOfInitializedData,
            "entry_point": self.pe.OPTIONAL_HEADER.AddressOfEntryPoint,
            "image_base": self.pe.OPTIONAL_HEADER.ImageBase,
            "subsystem": self.pe.OPTIONAL_HEADER.Subsystem,
            "dll_characteristics": self.pe.OPTIONAL_HEADER.DllCharacteristics,
            "size_of_image": self.pe.OPTIONAL_HEADER.SizeOfImage,
            "size_of_headers": self.pe.OPTIONAL_HEADER.SizeOfHeaders,
        }
    
    def _section_features(self) -> dict:
        features = {}
        entropies = []
        raw_sizes = []
        virtual_sizes = []
        
        for i, section in enumerate(self.pe.sections):
            entropy = section.get_entropy()
            entropies.append(entropy)
            raw_sizes.append(section.SizeOfRawData)
            virtual_sizes.append(section.Misc_VirtualSize)
        
        features["num_sections"] = len(self.pe.sections)
        features["max_entropy"] = max(entropies) if entropies else 0
        features["min_entropy"] = min(entropies) if entropies else 0
        features["mean_entropy"] = sum(entropies) / len(entropies) if entropies else 0
        features["max_raw_size"] = max(raw_sizes) if raw_sizes else 0
        features["min_raw_size"] = min(raw_sizes) if raw_sizes else 0
        
        # Ratio entre tamaño virtual y raw (indicador de packing)
        ratios = []
        for v, r in zip(virtual_sizes, raw_sizes):
            if r > 0:
                ratios.append(v / r)
        features["max_vr_ratio"] = max(ratios) if ratios else 0
        features["mean_vr_ratio"] = sum(ratios) / len(ratios) if ratios else 0
        
        return features
    
    def _import_features(self) -> dict:
        features = {
            "num_imports": 0,
            "num_import_dlls": 0,
            "has_kernel32": 0,
            "has_ntdll": 0,
            "has_ws2_32": 0,      # Networking
            "has_wininet": 0,     # HTTP
            "has_advapi32": 0,    # Registry, crypto
            "has_crypt32": 0,     # Cryptography
        }
        
        suspicious_apis = [
            "VirtualAlloc", "VirtualProtect", "WriteProcessMemory",
            "CreateRemoteThread", "NtUnmapViewOfSection",
            "SetWindowsHookEx", "GetAsyncKeyState",
            "InternetOpen", "URLDownloadToFile",
            "CryptEncrypt", "CryptDecrypt",
        ]
        
        if hasattr(self.pe, "DIRECTORY_ENTRY_IMPORT"):
            dlls = self.pe.DIRECTORY_ENTRY_IMPORT
            features["num_import_dlls"] = len(dlls)
            
            total_imports = 0
            suspicious_count = 0
            
            for entry in dlls:
                dll_name = entry.dll.decode("utf-8", errors="ignore").lower()
                
                if "kernel32" in dll_name:
                    features["has_kernel32"] = 1
                elif "ntdll" in dll_name:
                    features["has_ntdll"] = 1
                elif "ws2_32" in dll_name:
                    features["has_ws2_32"] = 1
                elif "wininet" in dll_name:
                    features["has_wininet"] = 1
                elif "advapi32" in dll_name:
                    features["has_advapi32"] = 1
                elif "crypt32" in dll_name:
                    features["has_crypt32"] = 1
                
                for imp in entry.imports:
                    total_imports += 1
                    if imp.name:
                        name = imp.name.decode("utf-8", errors="ignore")
                        if name in suspicious_apis:
                            suspicious_count += 1
            
            features["num_imports"] = total_imports
            features["suspicious_api_count"] = suspicious_count
        
        return features
    
    def _entropy_features(self) -> dict:
        """Entropía global del fichero y distribución de bytes."""
        byte_counts = Counter(self.raw_bytes)
        total = len(self.raw_bytes)
        
        entropy = 0.0
        for count in byte_counts.values():
            p = count / total
            if p > 0:
                entropy -= p * math.log2(p)
        
        return {
            "file_entropy": entropy,
            "byte_unique_count": len(byte_counts),
            "byte_zero_ratio": byte_counts.get(0, 0) / total,
            "byte_printable_ratio": sum(
                byte_counts.get(b, 0) for b in range(32, 127)
            ) / total,
        }

Este extractor captura las features más relevantes para distinguir malware de software legítimo: entropía (packing), imports sospechosas (inyección de proceso, keylogging, comunicación de red), anomalías en secciones y metadatos del header.

Entrenando el modelo con Random Forest

Random Forest es el algoritmo recomendado para empezar. Funciona bien con features heterogéneas, no requiere normalización estricta y ofrece interpretabilidad vía feature importance.

from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import joblib
import time

# Pipeline con escalado (opcional para RF, útil si añades otros modelos)
pipeline = Pipeline([
    ("scaler", StandardScaler()),
    ("classifier", RandomForestClassifier(
        n_estimators=200,
        max_depth=None,
        min_samples_split=5,
        min_samples_leaf=2,
        max_features="sqrt",
        n_jobs=-1,
        random_state=42,
        class_weight="balanced",  # Compensar desbalance si existe
    ))
])

# Entrenamiento
print("Entrenando Random Forest...")
start = time.time()
pipeline.fit(X_train, y_train)
elapsed = time.time() - start
print(f"Entrenamiento completado en {elapsed:.1f}s")

# Validación cruzada en train set
cv_scores = cross_val_score(
    pipeline, X_train, y_train, cv=5, scoring="roc_auc", n_jobs=-1
)
print(f"CV AUC: {cv_scores.mean():.4f} (+/- {cv_scores.std():.4f})")

# Guardar modelo
joblib.dump(pipeline, "malware_rf_model.pkl")

¿Por qué class_weight="balanced"? En muchos datasets de malware, la proporción benigno/malicioso no es 50/50. Este parámetro ajusta el peso de cada clase inversamente proporcional a su frecuencia, evitando que el modelo se sesgue hacia la clase mayoritaria.

Evaluación con métricas de seguridad

La accuracy no es suficiente para evaluar un detector de malware. Un modelo con 99% de accuracy puede tener una tasa de falsos negativos del 5%, dejando pasar malware real. Las métricas que importan son otras.

from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    precision_recall_curve,
    roc_curve,
    f1_score,
)
import matplotlib.pyplot as plt
import seaborn as sns

# Predicciones
y_pred = pipeline.predict(X_test)
y_proba = pipeline.predict_proba(X_test)[:, 1]

# Reporte de clasificación
print(classification_report(
    y_test, y_pred,
    target_names=["Benigno", "Malicioso"],
    digits=4
))

# Métricas clave
auc = roc_auc_score(y_test, y_proba)
print(f"\nROC AUC: {auc:.4f}")

# Matriz de confusión
cm = confusion_matrix(y_test, y_pred)
print(f"\nMatriz de confusión:")
print(f"  VP (malware detectado):     {cm[1][1]}")
print(f"  VN (benigno correcto):      {cm[0][0]}")
print(f"  FP (benigno como malware):  {cm[0][1]}")
print(f"  FN (malware como benigno):  {cm[1][0]}")

# Tasa de falsos negativos (lo más crítico en seguridad)
fnr = cm[1][0] / (cm[1][0] + cm[1][1])
print(f"\nFalse Negative Rate: {fnr:.4f} ({fnr*100:.2f}%)")

# Tasa de falsos positivos
fpr = cm[0][1] / (cm[0][1] + cm[0][0])
print(f"False Positive Rate: {fpr:.4f} ({fpr*100:.2f}%)")

Métricas que importan en detección de malware

MétricaQué midePor qué importa
AUC-ROCCapacidad discriminativa globalResume el rendimiento en todos los umbrales
False Negative RateMalware que se escapaLo más crítico: malware no detectado infecta
False Positive RateSoftware legítimo bloqueadoImpacta productividad, alert fatigue
PrecisionDe los detectados, cuántos son realesCalidad de las alertas
RecallDe todo el malware, cuánto detectamosCobertura de detección
F1 a 0.1% FPRF1 con falsos positivos controladosMétrica realista para producción

La curva Precision-Recall es más informativa que ROC

precision, recall, thresholds = precision_recall_curve(y_test, y_proba)

plt.figure(figsize=(10, 6))
plt.plot(recall, precision, "b-", linewidth=2)
plt.xlabel("Recall (Malware detectado)")
plt.ylabel("Precision (Calidad de detección)")
plt.title("Curva Precision-Recall del Clasificador de Malware")
plt.grid(True, alpha=0.3)

# Marcar punto de operación al 99% recall
idx_99 = np.argmin(np.abs(recall - 0.99))
plt.scatter(recall[idx_99], precision[idx_99], color="red", s=100, zorder=5)
plt.annotate(
    f"99% Recall\nPrecision: {precision[idx_99]:.3f}",
    (recall[idx_99], precision[idx_99]),
    textcoords="offset points", xytext=(15, -15),
)
plt.tight_layout()
plt.savefig("pr_curve_malware.png", dpi=150)
plt.show()

En seguridad, preferimos operar en un punto con alto recall (detectar la mayor cantidad de malware posible) aceptando algo menos de precision (más falsos positivos). Un malware que se escapa es mucho peor que una alerta falsa.

Feature importance: qué aprende el modelo

Una de las ventajas de Random Forest es la transparencia. Puedes inspeccionar qué features son más discriminativas:

# Extraer feature importance del modelo
rf = pipeline.named_steps["classifier"]
importances = rf.feature_importances_

# Si tienes nombres de features (EMBER los proporciona)
feature_names = [f"feature_{i}" for i in range(X_train.shape[1])]

# Top 20 features más importantes
indices = np.argsort(importances)[::-1][:20]

plt.figure(figsize=(12, 8))
plt.barh(
    range(20),
    importances[indices][::-1],
    color="#6366f1"
)
plt.yticks(range(20), [feature_names[i] for i in indices][::-1])
plt.xlabel("Importancia (Gini)")
plt.title("Top 20 Features más Discriminativas")
plt.tight_layout()
plt.savefig("feature_importance.png", dpi=150)

En la práctica, las features más discriminativas suelen ser:

  1. Entropía de secciones. El malware empacado tiene entropía cercana a 8.0 (máximo).
  2. Número y tipo de imports. APIs de inyección de proceso, networking, criptografía.
  3. Ratio tamaño virtual/raw. Secciones con mucha más memoria virtual que datos en disco indican desempacado en runtime.
  4. Timestamp del header. Muchos compiladores de malware dejan timestamps falsos.
  5. Presencia de firma digital. El software legítimo suele estar firmado.

Comparando algoritmos

Random Forest no es el único candidato. Vale la pena comparar con otros algoritmos para tener una referencia:

from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.metrics import roc_auc_score
import lightgbm as lgb

models = {
    "Random Forest": RandomForestClassifier(
        n_estimators=200, n_jobs=-1, random_state=42
    ),
    "Gradient Boosting": GradientBoostingClassifier(
        n_estimators=200, max_depth=6, random_state=42
    ),
    "LightGBM": lgb.LGBMClassifier(
        n_estimators=200, max_depth=8, num_leaves=64,
        random_state=42, n_jobs=-1, verbose=-1
    ),
    "Logistic Regression": LogisticRegression(
        max_iter=1000, random_state=42, n_jobs=-1
    ),
}

results = {}
for name, model in models.items():
    print(f"Entrenando {name}...")
    model.fit(X_train, y_train)
    
    if hasattr(model, "predict_proba"):
        y_proba = model.predict_proba(X_test)[:, 1]
    else:
        y_proba = model.decision_function(X_test)
    
    auc = roc_auc_score(y_test, y_proba)
    results[name] = auc
    print(f"  AUC: {auc:.4f}")

# Resultados típicos con EMBER:
# LightGBM:            0.9990
# Gradient Boosting:   0.9985
# Random Forest:       0.9980
# Logistic Regression: 0.9720

LightGBM suele ganar en datasets de malware. Es más rápido de entrenar que Random Forest, escala mejor a datasets grandes y alcanza AUC marginalmente superior. Para producción, es la elección habitual.

Ajuste de umbral para producción

El umbral por defecto de 0.5 rara vez es óptimo para detección de malware. En producción necesitas ajustarlo según el contexto operacional:

from sklearn.metrics import f1_score

# Evaluar múltiples umbrales
thresholds = np.arange(0.1, 0.95, 0.05)
results = []

for thresh in thresholds:
    y_pred_t = (y_proba >= thresh).astype(int)
    cm_t = confusion_matrix(y_test, y_pred_t)
    
    tp = cm_t[1][1]
    fp = cm_t[0][1]
    fn = cm_t[1][0]
    tn = cm_t[0][0]
    
    fpr_t = fp / (fp + tn) if (fp + tn) > 0 else 0
    fnr_t = fn / (fn + tp) if (fn + tp) > 0 else 0
    f1 = f1_score(y_test, y_pred_t)
    
    results.append({
        "threshold": thresh,
        "FPR": fpr_t,
        "FNR": fnr_t,
        "F1": f1,
        "detected": tp,
        "missed": fn,
    })

df_results = pd.DataFrame(results)
print(df_results.to_string(index=False))

# Seleccionar umbral con FPR aceptable para el SOC
# Ejemplo: máximo 0.1% de falsos positivos
target_fpr = 0.001
optimal = df_results[df_results["FPR"] <= target_fpr].iloc[-1]
print(f"\nUmbral óptimo para FPR <= {target_fpr}: {optimal['threshold']:.2f}")
print(f"  FNR: {optimal['FNR']:.4f}")
print(f"  F1: {optimal['F1']:.4f}")

Regla práctica: en un SOC que procesa 100.000 ficheros al día, un FPR del 1% genera 1.000 falsos positivos diarios. A 0.1% son 100. A 0.01% son 10. El umbral depende de cuántas alertas puede absorber tu equipo.

Errores comunes y cómo evitarlos

1. Data leakage temporal

El error más grave y más frecuente. Si entrenas con malware de 2024 y testeas con malware de 2023, el modelo ve el futuro. EMBER resuelve esto con un split temporal: train con datos anteriores a una fecha, test con datos posteriores.

# MAL: split aleatorio ignora la evolución temporal del malware
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)

# BIEN: split temporal respeta la cronología
cutoff_date = "2024-01-01"
train_mask = dates < cutoff_date
test_mask = dates >= cutoff_date

2. Concept drift

El malware evoluciona. Un modelo entrenado con datos de 2023 perderá precisión en 2025. Necesitas reentrenar periódicamente con muestras recientes.

3. Evaluar solo con accuracy

Un dataset con 95% malware y 5% benigno da 95% accuracy sin aprender nada. Usa AUC, precision-recall y especialmente la tasa de falsos negativos.

4. Ignorar el coste asimétrico de los errores

Un falso negativo (malware no detectado) puede significar un ransomware ejecutándose. Un falso positivo (software legítimo bloqueado) es una molestia. Los costes no son iguales, y el modelo debe reflejarlo.

Siguiente paso: del prototipo a la realidad

Este modelo es un punto de partida funcional. Para avanzar hacia un sistema de producción, los siguientes pasos son:

  1. Feature engineering avanzado. Extraer features más sofisticadas de los binarios PE (artículo siguiente de esta serie).
  2. Deep Learning. CNN sobre imágenes binarias, LSTM sobre secuencias de API calls.
  3. Adversarial robustness. Evaluar cómo el malware puede evadir el modelo.
  4. MLOps. Pipeline de reentrenamiento continuo con datos frescos.

El clasificador Random Forest con features estáticas de PE es sorprendentemente efectivo. Con el dataset EMBER, alcanza un AUC superior a 0.998. Para muchos casos de uso defensivos, es más que suficiente como primera capa de detección. La clave está en entender sus limitaciones y complementarlo con otras capas.

Recursos para profundizar

  • EMBER dataset: github.com/elastic/ember. Dataset de referencia con features preprocesadas.
  • LIEF: lief-project.github.io. Librería para parseo de PE, ELF, Mach-O. Alternativa moderna a pefile.
  • Practical Malware Analysis (Sikorski & Honig): El libro clásico de análisis de malware. Los capítulos de análisis estático son la base teórica de la extracción de features.
  • Anderson & Roth (2018): "EMBER: An Open Dataset for Training Static PE Malware Machine Learning Models". El paper que define el estándar de benchmarking.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.