AvanzadoClusteringaprendizaje no supervisadoDBSCANUMAPthreat hunting

Clustering de Familias de Malware con Aprendizaje No Supervisado

Aplicación de algoritmos de clustering (DBSCAN, HDBSCAN, K-Means) para agrupar muestras de malware en familias sin etiquetas previas. Reducción de dimensionalidad con t-SNE y UMAP, métricas de evaluación de clusters y aplicaciones en threat hunting.

MalwareIntel Research··8 min lectura
Serie: AI/ML para Malware — Parte 13

Cuando las etiquetas no son suficientes

La clasificación supervisada asume que tienes etiquetas correctas para entrenar. En malware, esa suposición es más frágil de lo que parece.

VirusTotal agrega resultados de más de 70 motores antivirus. Para una misma muestra, es común ver etiquetas como "Trojan.Gen", "Win32/Emotet", "Backdoor.Generic" y "Malware.AI" simultáneamente. Cuatro motores, cuatro nombres diferentes. ¿Cuál es la familia real?

Además, cuando aparece malware completamente nuevo, no tiene etiquetas. Puede pasar semanas o meses antes de que los investigadores lo clasifiquen. Durante ese tiempo, el clustering es la única herramienta que puede agrupar muestras relacionadas y revelar que "estas 200 muestras sin clasificar probablemente pertenecen a la misma familia nueva".

Preparando los datos

El clustering opera sobre el mismo espacio de features que la clasificación supervisada, pero sin usar las etiquetas durante el agrupamiento:

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

def prepare_for_clustering(
    feature_matrix: np.ndarray,
    pca_components: int = 50
) -> np.ndarray:
    """Prepara la matriz de features para clustering."""
    
    # 1. Manejar valores infinitos y NaN
    feature_matrix = np.nan_to_num(
        feature_matrix, nan=0.0, posinf=1e6, neginf=-1e6
    )
    
    # 2. Estandarizar (media=0, std=1)
    scaler = StandardScaler()
    scaled = scaler.fit_transform(feature_matrix)
    
    # 3. Reducir dimensionalidad con PCA
    # El clustering funciona mejor en dimensiones reducidas
    pca = PCA(n_components=pca_components, random_state=42)
    reduced = pca.fit_transform(scaled)
    
    explained = pca.explained_variance_ratio_.sum()
    print(f"PCA: {pca_components} componentes, "
          f"{explained:.1%} varianza explicada")
    
    return reduced

¿Por qué PCA antes de clustering? En espacios de alta dimensionalidad (cientos o miles de features), las distancias entre puntos tienden a igualarse (la "maldición de la dimensionalidad"). PCA reduce el espacio a las dimensiones más informativas, mejorando la separación entre clusters reales.

DBSCAN: clustering basado en densidad

DBSCAN (Density-Based Spatial Clustering of Applications with Noise) es ideal para malware porque:

  1. No requiere especificar el número de clusters.
  2. Encuentra clusters de forma arbitraria.
  3. Identifica outliers (muestras que no pertenecen a ningún cluster).
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score

def cluster_with_dbscan(
    features: np.ndarray,
    eps: float = 2.5,
    min_samples: int = 5
) -> np.ndarray:
    """Aplica DBSCAN y retorna labels de cluster."""
    
    db = DBSCAN(
        eps=eps,
        min_samples=min_samples,
        metric="euclidean",
        n_jobs=-1,
    )
    labels = db.fit_predict(features)
    
    n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
    n_noise = (labels == -1).sum()
    
    print(f"Clusters encontrados: {n_clusters}")
    print(f"Muestras como ruido: {n_noise} ({n_noise/len(labels):.1%})")
    
    # Silhouette score (excluyendo ruido)
    if n_clusters > 1:
        mask = labels != -1
        sil = silhouette_score(features[mask], labels[mask])
        print(f"Silhouette Score: {sil:.3f}")
    
    return labels

Ajuste de hiperparámetros para DBSCAN

El parámetro eps (radio de vecindad) es crítico. Demasiado pequeño fragmenta familias en múltiples clusters. Demasiado grande fusiona familias distintas. El método del k-distance graph ayuda a encontrar el valor óptimo:

from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt

def find_optimal_eps(features: np.ndarray, k: int = 5):
    """Método del k-distance graph para encontrar eps óptimo."""
    
    nn = NearestNeighbors(n_neighbors=k)
    nn.fit(features)
    distances, _ = nn.kneighbors(features)
    
    # Distancia al k-ésimo vecino, ordenada
    k_distances = np.sort(distances[:, k-1])
    
    plt.figure(figsize=(10, 6))
    plt.plot(k_distances)
    plt.xlabel("Puntos ordenados por distancia")
    plt.ylabel(f"Distancia al {k}-ésimo vecino")
    plt.title("K-Distance Graph para selección de eps")
    plt.grid(True, alpha=0.3)
    
    # El "codo" de la curva indica el eps óptimo
    # Detectar programáticamente el punto de máxima curvatura
    diffs = np.diff(k_distances)
    second_diffs = np.diff(diffs)
    elbow_idx = np.argmax(second_diffs) + 2
    optimal_eps = k_distances[elbow_idx]
    
    plt.axhline(y=optimal_eps, color="red", linestyle="--",
                label=f"eps sugerido: {optimal_eps:.2f}")
    plt.legend()
    plt.tight_layout()
    plt.savefig("k_distance_graph.png", dpi=150)
    
    print(f"eps sugerido: {optimal_eps:.2f}")
    return optimal_eps

HDBSCAN: la evolución de DBSCAN

HDBSCAN (Hierarchical DBSCAN) resuelve la principal limitación de DBSCAN: la sensibilidad al parámetro eps. En lugar de un radio fijo, HDBSCAN construye una jerarquía de clusters a múltiples escalas y selecciona los más estables:

import hdbscan

def cluster_with_hdbscan(
    features: np.ndarray,
    min_cluster_size: int = 10,
    min_samples: int = 5,
) -> tuple[np.ndarray, np.ndarray]:
    """Aplica HDBSCAN y retorna labels + probabilidades."""
    
    clusterer = hdbscan.HDBSCAN(
        min_cluster_size=min_cluster_size,
        min_samples=min_samples,
        metric="euclidean",
        cluster_selection_method="eom",  # Excess of Mass
        prediction_data=True,
    )
    
    labels = clusterer.fit_predict(features)
    probabilities = clusterer.probabilities_
    
    n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
    n_noise = (labels == -1).sum()
    
    print(f"Clusters: {n_clusters}")
    print(f"Ruido: {n_noise} ({n_noise/len(labels):.1%})")
    print(f"Probabilidad media: {probabilities[labels != -1].mean():.3f}")
    
    # Información por cluster
    for cluster_id in range(n_clusters):
        mask = labels == cluster_id
        size = mask.sum()
        avg_prob = probabilities[mask].mean()
        print(f"  Cluster {cluster_id}: {size} muestras, "
              f"prob media: {avg_prob:.3f}")
    
    return labels, probabilities

# El min_cluster_size controla el tamaño mínimo de familia
# Para malware: 10-50 es un rango razonable
# Familias con menos muestras se clasifican como outliers
labels, probs = cluster_with_hdbscan(
    features, min_cluster_size=15, min_samples=5
)

Ventaja clave de HDBSCAN: las probabilidades de membresía. Cada muestra tiene un score que indica cuán firmemente pertenece a su cluster. Muestras en los bordes (baja probabilidad) merecen revisión manual. Esto es exactamente lo que un analista CTI necesita: certeza cuantificada, no binaria.

Visualización con UMAP y t-SNE

La reducción a 2D permite visualizar los clusters y validar visualmente la calidad del agrupamiento:

import umap
from sklearn.manifold import TSNE

def visualize_clusters(
    features: np.ndarray,
    labels: np.ndarray,
    method: str = "umap",
    title: str = "Clustering de Malware",
):
    """Visualiza clusters en 2D con UMAP o t-SNE."""
    
    if method == "umap":
        reducer = umap.UMAP(
            n_components=2,
            n_neighbors=30,
            min_dist=0.1,
            metric="euclidean",
            random_state=42,
        )
        embedding = reducer.fit_transform(features)
    else:
        reducer = TSNE(
            n_components=2,
            perplexity=30,
            learning_rate="auto",
            init="pca",
            random_state=42,
        )
        embedding = reducer.fit_transform(features)
    
    # Visualización
    plt.figure(figsize=(14, 10))
    
    unique_labels = set(labels)
    colors = plt.cm.tab20(np.linspace(0, 1, len(unique_labels)))
    
    for label, color in zip(sorted(unique_labels), colors):
        if label == -1:
            # Ruido en gris
            mask = labels == label
            plt.scatter(
                embedding[mask, 0], embedding[mask, 1],
                c="gray", alpha=0.2, s=5, label="Noise"
            )
        else:
            mask = labels == label
            plt.scatter(
                embedding[mask, 0], embedding[mask, 1],
                c=[color], alpha=0.6, s=15,
                label=f"Cluster {label} ({mask.sum()})"
            )
    
    plt.title(title)
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left", fontsize=8)
    plt.tight_layout()
    plt.savefig("malware_clusters.png", dpi=200, bbox_inches="tight")
    plt.show()

UMAP vs t-SNE: UMAP es más rápido, preserva mejor la estructura global (distancias entre clusters) y es determinista con semilla fija. t-SNE produce visualizaciones más "dramáticas" pero distorsiona distancias entre clusters. Para análisis de malware, UMAP es la elección preferida.

Métricas de evaluación

Métricas intrínsecas (sin etiquetas)

from sklearn.metrics import (
    silhouette_score,
    davies_bouldin_score,
    calinski_harabasz_score,
)

def evaluate_clustering(
    features: np.ndarray, 
    labels: np.ndarray
) -> dict:
    """Evalúa la calidad del clustering con métricas intrínsecas."""
    
    # Excluir ruido para métricas
    mask = labels != -1
    if mask.sum() < 2 or len(set(labels[mask])) < 2:
        return {"error": "Insuficientes clusters para evaluar"}
    
    features_clean = features[mask]
    labels_clean = labels[mask]
    
    metrics = {
        # Silhouette: -1 (malo) a 1 (perfecto)
        "silhouette": silhouette_score(features_clean, labels_clean),
        
        # Davies-Bouldin: menor es mejor (0 es perfecto)
        "davies_bouldin": davies_bouldin_score(features_clean, labels_clean),
        
        # Calinski-Harabasz: mayor es mejor
        "calinski_harabasz": calinski_harabasz_score(
            features_clean, labels_clean
        ),
        
        # Estadísticas de clusters
        "num_clusters": len(set(labels_clean)),
        "noise_ratio": 1 - mask.mean(),
        "min_cluster_size": min(
            (labels_clean == l).sum() 
            for l in set(labels_clean)
        ),
        "max_cluster_size": max(
            (labels_clean == l).sum() 
            for l in set(labels_clean)
        ),
    }
    
    return metrics

Métricas extrínsecas (con etiquetas parciales)

Si tienes etiquetas de VirusTotal u otra fuente para un subconjunto de muestras:

from sklearn.metrics import (
    adjusted_rand_score,
    normalized_mutual_info_score,
    homogeneity_completeness_v_measure,
)

def evaluate_with_labels(
    predicted_labels: np.ndarray,
    true_labels: np.ndarray,
) -> dict:
    """Evalúa clustering contra etiquetas conocidas."""
    
    # Solo evaluar muestras con etiqueta conocida
    mask = true_labels != -1  # -1 = sin etiqueta
    pred = predicted_labels[mask]
    true = true_labels[mask]
    
    homogeneity, completeness, v_measure = (
        homogeneity_completeness_v_measure(true, pred)
    )
    
    return {
        "adjusted_rand_index": adjusted_rand_score(true, pred),
        "normalized_mutual_info": normalized_mutual_info_score(true, pred),
        "homogeneity": homogeneity,
        "completeness": completeness,
        "v_measure": v_measure,
    }

Aplicaciones en threat hunting

Descubrimiento de familias nuevas

def discover_new_families(
    features: np.ndarray,
    labels: np.ndarray,
    known_family_labels: dict[int, str],
) -> list[dict]:
    """Identifica clusters que no corresponden a familias conocidas."""
    
    new_families = []
    for cluster_id in set(labels):
        if cluster_id == -1:
            continue
        
        if cluster_id not in known_family_labels:
            mask = labels == cluster_id
            cluster_size = mask.sum()
            
            new_families.append({
                "cluster_id": cluster_id,
                "size": cluster_size,
                "description": f"Posible familia nueva con {cluster_size} muestras",
                "priority": "HIGH" if cluster_size > 50 else "MEDIUM",
            })
    
    return sorted(new_families, key=lambda x: x["size"], reverse=True)

Detección de variantes

def detect_variants(
    features: np.ndarray,
    labels: np.ndarray,
    probabilities: np.ndarray,
    threshold: float = 0.5,
) -> list[dict]:
    """Identifica muestras borderline que podrían ser variantes."""
    
    variants = []
    for i in range(len(features)):
        if labels[i] != -1 and probabilities[i] < threshold:
            variants.append({
                "sample_idx": i,
                "cluster": labels[i],
                "probability": probabilities[i],
                "note": "Baja probabilidad de membresía: posible variante "
                        "o transición entre familias",
            })
    
    return sorted(variants, key=lambda x: x["probability"])

Pipeline completo de clustering de malware

class MalwareClusteringPipeline:
    """Pipeline end-to-end para clustering de familias de malware."""
    
    def __init__(
        self,
        pca_components: int = 50,
        min_cluster_size: int = 15,
        min_samples: int = 5,
    ):
        self.pca_components = pca_components
        self.min_cluster_size = min_cluster_size
        self.min_samples = min_samples
    
    def run(
        self, 
        feature_matrix: np.ndarray,
        sample_ids: list[str] | None = None,
    ) -> dict:
        """Ejecuta el pipeline completo."""
        
        # 1. Preprocesar
        print("Preprocesando features...")
        processed = prepare_for_clustering(
            feature_matrix, self.pca_components
        )
        
        # 2. Clustering con HDBSCAN
        print("Ejecutando HDBSCAN...")
        labels, probs = cluster_with_hdbscan(
            processed, self.min_cluster_size, self.min_samples
        )
        
        # 3. Evaluar
        print("Evaluando calidad...")
        metrics = evaluate_clustering(processed, labels)
        
        # 4. Visualizar
        print("Generando visualización...")
        visualize_clusters(processed, labels, method="umap")
        
        return {
            "labels": labels,
            "probabilities": probs,
            "metrics": metrics,
            "n_clusters": len(set(labels)) - (1 if -1 in labels else 0),
        }

Conclusión

El clustering es una herramienta fundamental para cualquier equipo de CTI que procese volúmenes significativos de malware. No reemplaza la clasificación supervisada: la complementa. Descubre lo que la clasificación no puede ver (familias nuevas), valida lo que la clasificación dice (clusters coherentes con etiquetas) y prioriza el trabajo del analista (muestras de alto impacto en clusters grandes).

HDBSCAN con UMAP para visualización es la combinación más robusta para el dominio de malware. Requiere ajuste mínimo, maneja outliers de forma natural y proporciona probabilidades de membresía que permiten triage automatizado. La clave está en la calidad de las features de entrada: el clustering solo puede encontrar estructura que realmente existe en los datos.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.