Clustering de Familias de Malware con Aprendizaje No Supervisado
Aplicación de algoritmos de clustering (DBSCAN, HDBSCAN, K-Means) para agrupar muestras de malware en familias sin etiquetas previas. Reducción de dimensionalidad con t-SNE y UMAP, métricas de evaluación de clusters y aplicaciones en threat hunting.
Cuando las etiquetas no son suficientes
La clasificación supervisada asume que tienes etiquetas correctas para entrenar. En malware, esa suposición es más frágil de lo que parece.
VirusTotal agrega resultados de más de 70 motores antivirus. Para una misma muestra, es común ver etiquetas como "Trojan.Gen", "Win32/Emotet", "Backdoor.Generic" y "Malware.AI" simultáneamente. Cuatro motores, cuatro nombres diferentes. ¿Cuál es la familia real?
Además, cuando aparece malware completamente nuevo, no tiene etiquetas. Puede pasar semanas o meses antes de que los investigadores lo clasifiquen. Durante ese tiempo, el clustering es la única herramienta que puede agrupar muestras relacionadas y revelar que "estas 200 muestras sin clasificar probablemente pertenecen a la misma familia nueva".
Preparando los datos
El clustering opera sobre el mismo espacio de features que la clasificación supervisada, pero sin usar las etiquetas durante el agrupamiento:
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
def prepare_for_clustering(
feature_matrix: np.ndarray,
pca_components: int = 50
) -> np.ndarray:
"""Prepara la matriz de features para clustering."""
# 1. Manejar valores infinitos y NaN
feature_matrix = np.nan_to_num(
feature_matrix, nan=0.0, posinf=1e6, neginf=-1e6
)
# 2. Estandarizar (media=0, std=1)
scaler = StandardScaler()
scaled = scaler.fit_transform(feature_matrix)
# 3. Reducir dimensionalidad con PCA
# El clustering funciona mejor en dimensiones reducidas
pca = PCA(n_components=pca_components, random_state=42)
reduced = pca.fit_transform(scaled)
explained = pca.explained_variance_ratio_.sum()
print(f"PCA: {pca_components} componentes, "
f"{explained:.1%} varianza explicada")
return reduced
¿Por qué PCA antes de clustering? En espacios de alta dimensionalidad (cientos o miles de features), las distancias entre puntos tienden a igualarse (la "maldición de la dimensionalidad"). PCA reduce el espacio a las dimensiones más informativas, mejorando la separación entre clusters reales.
DBSCAN: clustering basado en densidad
DBSCAN (Density-Based Spatial Clustering of Applications with Noise) es ideal para malware porque:
- No requiere especificar el número de clusters.
- Encuentra clusters de forma arbitraria.
- Identifica outliers (muestras que no pertenecen a ningún cluster).
from sklearn.cluster import DBSCAN
from sklearn.metrics import silhouette_score
def cluster_with_dbscan(
features: np.ndarray,
eps: float = 2.5,
min_samples: int = 5
) -> np.ndarray:
"""Aplica DBSCAN y retorna labels de cluster."""
db = DBSCAN(
eps=eps,
min_samples=min_samples,
metric="euclidean",
n_jobs=-1,
)
labels = db.fit_predict(features)
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
print(f"Clusters encontrados: {n_clusters}")
print(f"Muestras como ruido: {n_noise} ({n_noise/len(labels):.1%})")
# Silhouette score (excluyendo ruido)
if n_clusters > 1:
mask = labels != -1
sil = silhouette_score(features[mask], labels[mask])
print(f"Silhouette Score: {sil:.3f}")
return labels
Ajuste de hiperparámetros para DBSCAN
El parámetro eps (radio de vecindad) es crítico. Demasiado pequeño fragmenta familias en múltiples clusters. Demasiado grande fusiona familias distintas. El método del k-distance graph ayuda a encontrar el valor óptimo:
from sklearn.neighbors import NearestNeighbors
import matplotlib.pyplot as plt
def find_optimal_eps(features: np.ndarray, k: int = 5):
"""Método del k-distance graph para encontrar eps óptimo."""
nn = NearestNeighbors(n_neighbors=k)
nn.fit(features)
distances, _ = nn.kneighbors(features)
# Distancia al k-ésimo vecino, ordenada
k_distances = np.sort(distances[:, k-1])
plt.figure(figsize=(10, 6))
plt.plot(k_distances)
plt.xlabel("Puntos ordenados por distancia")
plt.ylabel(f"Distancia al {k}-ésimo vecino")
plt.title("K-Distance Graph para selección de eps")
plt.grid(True, alpha=0.3)
# El "codo" de la curva indica el eps óptimo
# Detectar programáticamente el punto de máxima curvatura
diffs = np.diff(k_distances)
second_diffs = np.diff(diffs)
elbow_idx = np.argmax(second_diffs) + 2
optimal_eps = k_distances[elbow_idx]
plt.axhline(y=optimal_eps, color="red", linestyle="--",
label=f"eps sugerido: {optimal_eps:.2f}")
plt.legend()
plt.tight_layout()
plt.savefig("k_distance_graph.png", dpi=150)
print(f"eps sugerido: {optimal_eps:.2f}")
return optimal_eps
HDBSCAN: la evolución de DBSCAN
HDBSCAN (Hierarchical DBSCAN) resuelve la principal limitación de DBSCAN: la sensibilidad al parámetro eps. En lugar de un radio fijo, HDBSCAN construye una jerarquía de clusters a múltiples escalas y selecciona los más estables:
import hdbscan
def cluster_with_hdbscan(
features: np.ndarray,
min_cluster_size: int = 10,
min_samples: int = 5,
) -> tuple[np.ndarray, np.ndarray]:
"""Aplica HDBSCAN y retorna labels + probabilidades."""
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=min_samples,
metric="euclidean",
cluster_selection_method="eom", # Excess of Mass
prediction_data=True,
)
labels = clusterer.fit_predict(features)
probabilities = clusterer.probabilities_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
n_noise = (labels == -1).sum()
print(f"Clusters: {n_clusters}")
print(f"Ruido: {n_noise} ({n_noise/len(labels):.1%})")
print(f"Probabilidad media: {probabilities[labels != -1].mean():.3f}")
# Información por cluster
for cluster_id in range(n_clusters):
mask = labels == cluster_id
size = mask.sum()
avg_prob = probabilities[mask].mean()
print(f" Cluster {cluster_id}: {size} muestras, "
f"prob media: {avg_prob:.3f}")
return labels, probabilities
# El min_cluster_size controla el tamaño mínimo de familia
# Para malware: 10-50 es un rango razonable
# Familias con menos muestras se clasifican como outliers
labels, probs = cluster_with_hdbscan(
features, min_cluster_size=15, min_samples=5
)
Ventaja clave de HDBSCAN: las probabilidades de membresía. Cada muestra tiene un score que indica cuán firmemente pertenece a su cluster. Muestras en los bordes (baja probabilidad) merecen revisión manual. Esto es exactamente lo que un analista CTI necesita: certeza cuantificada, no binaria.
Visualización con UMAP y t-SNE
La reducción a 2D permite visualizar los clusters y validar visualmente la calidad del agrupamiento:
import umap
from sklearn.manifold import TSNE
def visualize_clusters(
features: np.ndarray,
labels: np.ndarray,
method: str = "umap",
title: str = "Clustering de Malware",
):
"""Visualiza clusters en 2D con UMAP o t-SNE."""
if method == "umap":
reducer = umap.UMAP(
n_components=2,
n_neighbors=30,
min_dist=0.1,
metric="euclidean",
random_state=42,
)
embedding = reducer.fit_transform(features)
else:
reducer = TSNE(
n_components=2,
perplexity=30,
learning_rate="auto",
init="pca",
random_state=42,
)
embedding = reducer.fit_transform(features)
# Visualización
plt.figure(figsize=(14, 10))
unique_labels = set(labels)
colors = plt.cm.tab20(np.linspace(0, 1, len(unique_labels)))
for label, color in zip(sorted(unique_labels), colors):
if label == -1:
# Ruido en gris
mask = labels == label
plt.scatter(
embedding[mask, 0], embedding[mask, 1],
c="gray", alpha=0.2, s=5, label="Noise"
)
else:
mask = labels == label
plt.scatter(
embedding[mask, 0], embedding[mask, 1],
c=[color], alpha=0.6, s=15,
label=f"Cluster {label} ({mask.sum()})"
)
plt.title(title)
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left", fontsize=8)
plt.tight_layout()
plt.savefig("malware_clusters.png", dpi=200, bbox_inches="tight")
plt.show()
UMAP vs t-SNE: UMAP es más rápido, preserva mejor la estructura global (distancias entre clusters) y es determinista con semilla fija. t-SNE produce visualizaciones más "dramáticas" pero distorsiona distancias entre clusters. Para análisis de malware, UMAP es la elección preferida.
Métricas de evaluación
Métricas intrínsecas (sin etiquetas)
from sklearn.metrics import (
silhouette_score,
davies_bouldin_score,
calinski_harabasz_score,
)
def evaluate_clustering(
features: np.ndarray,
labels: np.ndarray
) -> dict:
"""Evalúa la calidad del clustering con métricas intrínsecas."""
# Excluir ruido para métricas
mask = labels != -1
if mask.sum() < 2 or len(set(labels[mask])) < 2:
return {"error": "Insuficientes clusters para evaluar"}
features_clean = features[mask]
labels_clean = labels[mask]
metrics = {
# Silhouette: -1 (malo) a 1 (perfecto)
"silhouette": silhouette_score(features_clean, labels_clean),
# Davies-Bouldin: menor es mejor (0 es perfecto)
"davies_bouldin": davies_bouldin_score(features_clean, labels_clean),
# Calinski-Harabasz: mayor es mejor
"calinski_harabasz": calinski_harabasz_score(
features_clean, labels_clean
),
# Estadísticas de clusters
"num_clusters": len(set(labels_clean)),
"noise_ratio": 1 - mask.mean(),
"min_cluster_size": min(
(labels_clean == l).sum()
for l in set(labels_clean)
),
"max_cluster_size": max(
(labels_clean == l).sum()
for l in set(labels_clean)
),
}
return metrics
Métricas extrínsecas (con etiquetas parciales)
Si tienes etiquetas de VirusTotal u otra fuente para un subconjunto de muestras:
from sklearn.metrics import (
adjusted_rand_score,
normalized_mutual_info_score,
homogeneity_completeness_v_measure,
)
def evaluate_with_labels(
predicted_labels: np.ndarray,
true_labels: np.ndarray,
) -> dict:
"""Evalúa clustering contra etiquetas conocidas."""
# Solo evaluar muestras con etiqueta conocida
mask = true_labels != -1 # -1 = sin etiqueta
pred = predicted_labels[mask]
true = true_labels[mask]
homogeneity, completeness, v_measure = (
homogeneity_completeness_v_measure(true, pred)
)
return {
"adjusted_rand_index": adjusted_rand_score(true, pred),
"normalized_mutual_info": normalized_mutual_info_score(true, pred),
"homogeneity": homogeneity,
"completeness": completeness,
"v_measure": v_measure,
}
Aplicaciones en threat hunting
Descubrimiento de familias nuevas
def discover_new_families(
features: np.ndarray,
labels: np.ndarray,
known_family_labels: dict[int, str],
) -> list[dict]:
"""Identifica clusters que no corresponden a familias conocidas."""
new_families = []
for cluster_id in set(labels):
if cluster_id == -1:
continue
if cluster_id not in known_family_labels:
mask = labels == cluster_id
cluster_size = mask.sum()
new_families.append({
"cluster_id": cluster_id,
"size": cluster_size,
"description": f"Posible familia nueva con {cluster_size} muestras",
"priority": "HIGH" if cluster_size > 50 else "MEDIUM",
})
return sorted(new_families, key=lambda x: x["size"], reverse=True)
Detección de variantes
def detect_variants(
features: np.ndarray,
labels: np.ndarray,
probabilities: np.ndarray,
threshold: float = 0.5,
) -> list[dict]:
"""Identifica muestras borderline que podrían ser variantes."""
variants = []
for i in range(len(features)):
if labels[i] != -1 and probabilities[i] < threshold:
variants.append({
"sample_idx": i,
"cluster": labels[i],
"probability": probabilities[i],
"note": "Baja probabilidad de membresía: posible variante "
"o transición entre familias",
})
return sorted(variants, key=lambda x: x["probability"])
Pipeline completo de clustering de malware
class MalwareClusteringPipeline:
"""Pipeline end-to-end para clustering de familias de malware."""
def __init__(
self,
pca_components: int = 50,
min_cluster_size: int = 15,
min_samples: int = 5,
):
self.pca_components = pca_components
self.min_cluster_size = min_cluster_size
self.min_samples = min_samples
def run(
self,
feature_matrix: np.ndarray,
sample_ids: list[str] | None = None,
) -> dict:
"""Ejecuta el pipeline completo."""
# 1. Preprocesar
print("Preprocesando features...")
processed = prepare_for_clustering(
feature_matrix, self.pca_components
)
# 2. Clustering con HDBSCAN
print("Ejecutando HDBSCAN...")
labels, probs = cluster_with_hdbscan(
processed, self.min_cluster_size, self.min_samples
)
# 3. Evaluar
print("Evaluando calidad...")
metrics = evaluate_clustering(processed, labels)
# 4. Visualizar
print("Generando visualización...")
visualize_clusters(processed, labels, method="umap")
return {
"labels": labels,
"probabilities": probs,
"metrics": metrics,
"n_clusters": len(set(labels)) - (1 if -1 in labels else 0),
}
Conclusión
El clustering es una herramienta fundamental para cualquier equipo de CTI que procese volúmenes significativos de malware. No reemplaza la clasificación supervisada: la complementa. Descubre lo que la clasificación no puede ver (familias nuevas), valida lo que la clasificación dice (clusters coherentes con etiquetas) y prioriza el trabajo del analista (muestras de alto impacto en clusters grandes).
HDBSCAN con UMAP para visualización es la combinación más robusta para el dominio de malware. Requiere ajuste mínimo, maneja outliers de forma natural y proporciona probabilidades de membresía que permiten triage automatizado. La clave está en la calidad de las features de entrada: el clustering solo puede encontrar estructura que realmente existe en los datos.
Preguntas frecuentes
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.