AvanzadoDeep LearningCNNLSTMdetección de malwareredes neuronales

Deep Learning para Malware: CNN sobre Imágenes Binarias y LSTM para Secuencias

Aplicación de redes neuronales profundas a la detección de malware: CNN para clasificación de binarios como imágenes en escala de grises, LSTM para secuencias de API calls dinámicas, y arquitecturas híbridas que combinan análisis estático y dinámico.

MalwareIntel Research··13 min lectura
Serie: AI/ML para Malware — Parte 11

Más allá del feature engineering manual

Los modelos clásicos de ML (Random Forest, Gradient Boosting) requieren que un experto diseñe features manualmente. Es un proceso efectivo pero con un techo: solo puedes extraer las features que sabes que existen. El deep learning elimina esa limitación. Dado suficiente cantidad de datos, una red neuronal puede descubrir patrones en los datos crudos que ningún humano habría diseñado como feature.

Para la detección de malware, hay dos paradigmas dominantes en deep learning:

  1. CNN (Convolutional Neural Networks): tratan el binario como una imagen o señal espacial. Aprenden patrones locales (texturas, estructuras) que se repiten en familias de malware.
  2. LSTM (Long Short-Term Memory): tratan las ejecuciones como secuencias temporales. Aprenden patrones en el orden de API calls, syscalls o instrucciones.

Cada uno captura un aspecto diferente del malware. Las arquitecturas más potentes combinan ambos.

Malware como imagen: la intuición visual

En 2011, Nataraj et al. publicaron un paper que cambió el campo: "Malware Images: Visualization and Automatic Classification". La idea es elegantemente simple.

Toma cada byte del binario. Interprétalo como un pixel en escala de grises (0 = negro, 255 = blanco). Organiza los bytes en filas de ancho fijo. El resultado es una imagen donde las diferentes secciones del PE producen texturas visuales distintas:

  • Secciones de código (.text): textura granular con patrones repetitivos de opcodes.
  • Secciones de datos (.data, .rdata): zonas más uniformes, posiblemente con strings visibles.
  • Padding (null bytes): bandas negras.
  • Secciones cifradas/empacadas: ruido uniforme gris, sin patrones visibles.
  • Recursos (iconos, bitmaps): patrones estructurados reconocibles.

Lo notable es que variantes de la misma familia producen imágenes visualmente similares, incluso cuando los hashes son completamente diferentes.

import numpy as np
from PIL import Image

def binary_to_image(
    filepath: str,
    width: int = 256,
    max_size: int = 1_000_000
) -> np.ndarray:
    """Convierte un binario en una imagen en escala de grises."""
    with open(filepath, "rb") as f:
        raw = f.read(max_size)
    
    # Convertir bytes a array de uint8
    byte_array = np.frombuffer(raw, dtype=np.uint8)
    
    # Calcular alto para el ancho dado
    height = len(byte_array) // width
    if height == 0:
        height = 1
    
    # Recortar al tamaño exacto
    byte_array = byte_array[:width * height]
    
    # Reshape a imagen 2D
    image = byte_array.reshape(height, width)
    
    return image

def resize_binary_image(
    image: np.ndarray, 
    target_size: tuple[int, int] = (256, 256)
) -> np.ndarray:
    """Redimensiona la imagen binaria a tamaño fijo para CNN."""
    pil_image = Image.fromarray(image, mode="L")
    resized = pil_image.resize(target_size, Image.BILINEAR)
    return np.array(resized)

CNN para clasificación de imágenes de malware

Con el binario convertido en imagen, podemos aplicar arquitecturas CNN estándar. No necesitamos ResNet o EfficientNet; una arquitectura más simple funciona bien porque las texturas de malware son menos complejas que las de imágenes naturales.

import torch
import torch.nn as nn
import torch.nn.functional as F

class MalwareImageCNN(nn.Module):
    """CNN para clasificación de malware a partir de imágenes binarias."""
    
    def __init__(
        self, 
        num_classes: int = 2,
        input_size: int = 256,
        dropout: float = 0.3
    ):
        super().__init__()
        
        # Bloque convolucional 1
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
        )
        
        # Bloque convolucional 2
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
        )
        
        # Bloque convolucional 3
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((4, 4)),
        )
        
        # Clasificador fully connected
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128 * 4 * 4, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(256, 64),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(64, num_classes),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.classifier(x)
        return x

# Uso
model = MalwareImageCNN(num_classes=25)  # 25 familias de malware
print(f"Parámetros: {sum(p.numel() for p in model.parameters()):,}")

Dataset y DataLoader para imágenes de malware

from torch.utils.data import Dataset, DataLoader
from pathlib import Path

class MalwareImageDataset(Dataset):
    """Dataset de imágenes binarias de malware."""
    
    def __init__(
        self,
        file_paths: list[str],
        labels: list[int],
        image_size: int = 256,
        max_file_size: int = 1_000_000,
        augment: bool = False,
    ):
        self.file_paths = file_paths
        self.labels = labels
        self.image_size = image_size
        self.max_file_size = max_file_size
        self.augment = augment
    
    def __len__(self) -> int:
        return len(self.file_paths)
    
    def __getitem__(self, idx: int) -> tuple[torch.Tensor, int]:
        filepath = self.file_paths[idx]
        label = self.labels[idx]
        
        # Convertir binario a imagen
        image = binary_to_image(filepath, width=self.image_size)
        image = resize_binary_image(image, (self.image_size, self.image_size))
        
        # Normalizar a [0, 1]
        image = image.astype(np.float32) / 255.0
        
        # Añadir canal (grayscale)
        tensor = torch.from_numpy(image).unsqueeze(0)  # (1, H, W)
        
        return tensor, label

# DataLoader con workers paralelos
train_loader = DataLoader(
    MalwareImageDataset(train_files, train_labels, augment=True),
    batch_size=64,
    shuffle=True,
    num_workers=4,
    pin_memory=True,
)

Entrenamiento

def train_epoch(
    model: nn.Module,
    loader: DataLoader,
    optimizer: torch.optim.Optimizer,
    criterion: nn.Module,
    device: torch.device,
) -> tuple[float, float]:
    """Entrena una época y retorna loss y accuracy."""
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    
    for images, labels in loader:
        images = images.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item() * images.size(0)
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    
    return total_loss / total, correct / total

# Configuración de entrenamiento
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MalwareImageCNN(num_classes=25).to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)

for epoch in range(50):
    train_loss, train_acc = train_epoch(
        model, train_loader, optimizer, criterion, device
    )
    scheduler.step()
    
    if (epoch + 1) % 5 == 0:
        print(f"Epoch {epoch+1}: loss={train_loss:.4f}, acc={train_acc:.4f}")

MalConv: CNN directamente sobre bytes crudos

La técnica de imágenes tiene una limitación: la redimensión a tamaño fijo pierde información. MalConv (Raff et al., 2018) propuso una alternativa: procesar los bytes crudos directamente con una CNN 1D, sin conversión a imagen.

class MalConv(nn.Module):
    """
    MalConv: CNN 1D sobre bytes crudos del binario.
    Paper: Raff et al., 'Malware Detection by Eating a Whole EXE' (2018)
    """
    
    def __init__(
        self,
        max_input_size: int = 2_000_000,
        embedding_dim: int = 8,
        num_filters: int = 128,
        filter_size: int = 500,
        num_classes: int = 2,
    ):
        super().__init__()
        
        self.max_input_size = max_input_size
        
        # Embedding: cada byte (0-255) se mapea a un vector
        self.embedding = nn.Embedding(257, embedding_dim, padding_idx=256)
        
        # Convolución temporal con gating
        self.conv_signal = nn.Conv1d(
            embedding_dim, num_filters, filter_size, stride=filter_size
        )
        self.conv_gate = nn.Conv1d(
            embedding_dim, num_filters, filter_size, stride=filter_size
        )
        
        # Clasificador
        self.fc = nn.Sequential(
            nn.Linear(num_filters, 64),
            nn.ReLU(inplace=True),
            nn.Linear(64, num_classes),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # x: (batch, seq_len) de bytes como integers
        embedded = self.embedding(x)  # (batch, seq_len, emb_dim)
        embedded = embedded.transpose(1, 2)  # (batch, emb_dim, seq_len)
        
        # Gated convolution
        signal = self.conv_signal(embedded)
        gate = torch.sigmoid(self.conv_gate(embedded))
        gated = signal * gate  # (batch, num_filters, reduced_len)
        
        # Global max pooling
        pooled = F.adaptive_max_pool1d(gated, 1).squeeze(-1)
        
        # Clasificación
        output = self.fc(pooled)
        return output

Ventaja de MalConv: procesa hasta 2MB de bytes crudos sin necesidad de ingeniería de features ni redimensión. Aprende automáticamente qué regiones del binario son relevantes a través del mecanismo de gating y max pooling global.

Limitación: requiere mucha memoria GPU y es lento de entrenar. Cada muestra puede tener hasta 2 millones de tokens de entrada.

LSTM para secuencias de API calls

El análisis dinámico captura el comportamiento del malware durante su ejecución en sandbox. Las secuencias de API calls son el dato más valioso: revelan qué hace realmente el malware, no solo qué podría hacer.

class MalwareLSTM(nn.Module):
    """LSTM para clasificación de malware basada en secuencias de API calls."""
    
    def __init__(
        self,
        vocab_size: int = 500,      # Número de APIs únicas
        embedding_dim: int = 64,
        hidden_size: int = 128,
        num_layers: int = 2,
        num_classes: int = 2,
        bidirectional: bool = True,
        dropout: float = 0.3,
    ):
        super().__init__()
        
        self.hidden_size = hidden_size
        self.num_directions = 2 if bidirectional else 1
        
        # Embedding de API calls
        self.embedding = nn.Embedding(
            vocab_size + 1, embedding_dim, padding_idx=0
        )
        
        # LSTM bidireccional
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=bidirectional,
            dropout=dropout if num_layers > 1 else 0,
        )
        
        # Attention para ponderar la importancia de cada API call
        self.attention = nn.Sequential(
            nn.Linear(hidden_size * self.num_directions, 64),
            nn.Tanh(),
            nn.Linear(64, 1),
        )
        
        # Clasificador
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size * self.num_directions, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes),
        )
    
    def forward(
        self, 
        x: torch.Tensor,
        lengths: torch.Tensor | None = None
    ) -> torch.Tensor:
        # x: (batch, seq_len) de API call indices
        embedded = self.embedding(x)  # (batch, seq_len, emb_dim)
        
        # Pack para manejar secuencias de largo variable
        if lengths is not None:
            packed = nn.utils.rnn.pack_padded_sequence(
                embedded, lengths.cpu(), batch_first=True, enforce_sorted=False
            )
            lstm_out, _ = self.lstm(packed)
            lstm_out, _ = nn.utils.rnn.pad_packed_sequence(
                lstm_out, batch_first=True
            )
        else:
            lstm_out, _ = self.lstm(embedded)
        
        # Attention weights
        attn_weights = self.attention(lstm_out)  # (batch, seq_len, 1)
        attn_weights = F.softmax(attn_weights, dim=1)
        
        # Weighted sum
        context = (lstm_out * attn_weights).sum(dim=1)  # (batch, hidden*2)
        
        # Clasificación
        output = self.classifier(context)
        return output

Preparando datos de API calls

class APICallVocabulary:
    """Vocabulario de API calls para tokenización."""
    
    def __init__(self, max_vocab: int = 500):
        self.max_vocab = max_vocab
        self.api_to_idx: dict[str, int] = {"<PAD>": 0, "<UNK>": 1}
        self.idx_to_api: dict[int, str] = {0: "<PAD>", 1: "<UNK>"}
        self.counter: Counter = Counter()
    
    def fit(self, sequences: list[list[str]]) -> "APICallVocabulary":
        """Construye vocabulario a partir de secuencias de entrenamiento."""
        for seq in sequences:
            self.counter.update(seq)
        
        # Top APIs más frecuentes
        for api, _ in self.counter.most_common(self.max_vocab - 2):
            idx = len(self.api_to_idx)
            self.api_to_idx[api] = idx
            self.idx_to_api[idx] = api
        
        return self
    
    def encode(
        self, 
        sequence: list[str], 
        max_length: int = 2000
    ) -> list[int]:
        """Convierte secuencia de nombres de API a índices."""
        encoded = []
        for api in sequence[:max_length]:
            encoded.append(self.api_to_idx.get(api, 1))  # 1 = UNK
        
        # Padding
        while len(encoded) < max_length:
            encoded.append(0)  # 0 = PAD
        
        return encoded

# Ejemplo de uso con datos de Cuckoo/CAPE sandbox
api_sequences = [
    ["NtCreateFile", "NtWriteFile", "NtClose", "RegSetValueExW",
     "InternetOpenA", "HttpSendRequestA", "InternetReadFile"],
    ["VirtualAllocEx", "WriteProcessMemory", "CreateRemoteThread",
     "NtResumeThread", "GetProcAddress", "LoadLibraryA"],
]

vocab = APICallVocabulary(max_vocab=500)
vocab.fit(api_sequences)

for seq in api_sequences:
    encoded = vocab.encode(seq, max_length=100)
    print(f"Original: {len(seq)} calls -> Encoded: {len(encoded)} tokens")

Arquitectura híbrida: combinando estático y dinámico

Los mejores resultados se obtienen combinando features estáticas (CNN sobre bytes) con features dinámicas (LSTM sobre API calls):

class HybridMalwareModel(nn.Module):
    """
    Modelo híbrido que combina análisis estático (CNN) y dinámico (LSTM).
    """
    
    def __init__(
        self,
        # Parámetros CNN
        image_size: int = 256,
        cnn_features: int = 128,
        # Parámetros LSTM
        api_vocab_size: int = 500,
        lstm_hidden: int = 128,
        # Clasificador
        num_classes: int = 2,
        dropout: float = 0.3,
    ):
        super().__init__()
        
        # Rama estática: CNN sobre imagen binaria
        self.static_branch = nn.Sequential(
            nn.Conv2d(1, 32, 5, stride=2, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.AdaptiveAvgPool2d((4, 4)),
            nn.Flatten(),
            nn.Linear(64 * 16, cnn_features),
            nn.ReLU(inplace=True),
        )
        
        # Rama dinámica: LSTM sobre API calls
        self.api_embedding = nn.Embedding(api_vocab_size + 1, 64, padding_idx=0)
        self.dynamic_branch = nn.LSTM(
            64, lstm_hidden, num_layers=2,
            batch_first=True, bidirectional=True, dropout=dropout,
        )
        self.dynamic_pool = nn.AdaptiveMaxPool1d(1)
        
        # Fusión y clasificador
        combined_size = cnn_features + lstm_hidden * 2
        self.classifier = nn.Sequential(
            nn.Linear(combined_size, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(128, 64),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(64, num_classes),
        )
    
    def forward(
        self,
        image: torch.Tensor,       # (batch, 1, H, W)
        api_calls: torch.Tensor,    # (batch, seq_len)
    ) -> torch.Tensor:
        # Rama estática
        static_features = self.static_branch(image)
        
        # Rama dinámica
        api_embedded = self.api_embedding(api_calls)
        lstm_out, _ = self.dynamic_branch(api_embedded)
        dynamic_features = self.dynamic_pool(
            lstm_out.transpose(1, 2)
        ).squeeze(-1)
        
        # Fusión por concatenación
        combined = torch.cat([static_features, dynamic_features], dim=1)
        
        # Clasificación
        output = self.classifier(combined)
        return output

Transfer Learning con modelos pre-entrenados

Para datasets pequeños, el transfer learning evita entrenar desde cero:

import torchvision.models as models

class MalwareResNet(nn.Module):
    """ResNet18 adaptado para imágenes binarias de malware."""
    
    def __init__(self, num_classes: int = 25, pretrained: bool = True):
        super().__init__()
        
        # Cargar ResNet18 pre-entrenado en ImageNet
        resnet = models.resnet18(
            weights=models.ResNet18_Weights.DEFAULT if pretrained else None
        )
        
        # Modificar primera capa: de 3 canales RGB a 1 canal grayscale
        # Promediamos los pesos de los 3 canales
        original_conv = resnet.conv1
        self.conv1 = nn.Conv2d(
            1, 64, kernel_size=7, stride=2, padding=3, bias=False
        )
        if pretrained:
            self.conv1.weight.data = original_conv.weight.data.mean(
                dim=1, keepdim=True
            )
        
        # Copiar el resto de capas
        self.bn1 = resnet.bn1
        self.relu = resnet.relu
        self.maxpool = resnet.maxpool
        self.layer1 = resnet.layer1
        self.layer2 = resnet.layer2
        self.layer3 = resnet.layer3
        self.layer4 = resnet.layer4
        self.avgpool = resnet.avgpool
        
        # Nuevo clasificador
        self.fc = nn.Sequential(
            nn.Dropout(0.3),
            nn.Linear(512, num_classes),
        )
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

Resultados comparativos

Los benchmarks publicados muestran las fortalezas y debilidades de cada enfoque:

ModeloDatasetAUCFPR al 1% FNRTiempo inferencia
Random Forest + features manualesEMBER0.99800.15%0.5ms
LightGBM + EMBER featuresEMBER0.99920.08%0.3ms
CNN imagen 256x256Malimg0.98702.1%5ms
MalConv (bytes crudos)EMBER0.99850.12%15ms
LSTM API callsCuckoo traces0.97503.5%8ms
Híbrido CNN+LSTMCustom0.99600.35%25ms

Observaciones clave:

  • LightGBM con features manuales de EMBER sigue siendo difícil de superar en datasets estáticos.
  • MalConv iguala a los modelos clásicos sin necesidad de feature engineering.
  • LSTM tiene menor AUC porque depende de la calidad de la ejecución en sandbox (muchas muestras no se ejecutan completamente).
  • El modelo híbrido no siempre gana: su ventaja aparece en muestras donde el análisis estático solo o el dinámico solo fallan.

Limitaciones y consideraciones prácticas

Coste computacional

Los modelos de deep learning requieren GPU para entrenamiento e inferencia rápida. MalConv con secuencias de 2M bytes consume gigabytes de memoria GPU. En producción, el balance entre precisión y latencia es crítico.

Concept drift

El deep learning es más susceptible al concept drift que los modelos clásicos. Cuando aparecen familias nuevas con patrones genuinamente distintos, el modelo necesita reentrenamiento. Los modelos clásicos con features interpretables permiten ajustes quirúrgicos; los modelos profundos requieren reentrenar completamente.

Interpretabilidad

Un Random Forest te dice qué features son importantes. Una CNN te muestra mapas de activación. Pero ninguno de los dos explica por qué una muestra específica fue clasificada como malware de una forma que un analista SOC pueda actuar directamente. La explainability es un problema abierto que tratamos más adelante en la serie.

Evasión adversarial

Los modelos de deep learning son vulnerables a perturbaciones adversariales. Cambios mínimos en los bytes del binario (que no alteran su funcionalidad) pueden cambiar la predicción del modelo. MalConv es particularmente vulnerable a ataques de append (añadir bytes benignos al final del binario). Esto lo cubrimos en detalle en el artículo de Adversarial ML.

Conclusión: herramienta potente con tradeoffs claros

El deep learning no reemplaza al ML clásico en detección de malware. Lo complementa. En entornos con recursos computacionales suficientes y datos abundantes, los modelos profundos capturan patrones que los modelos clásicos pierden. Pero el coste en computación, interpretabilidad y robustez adversarial es real.

La recomendación práctica: empieza con LightGBM + features manuales (rápido, interpretable, efectivo). Cuando alcances su techo, añade MalConv o un modelo de imágenes como segunda capa. Usa LSTM solo si tienes acceso a datos de sandbox de calidad. Y siempre evalúa la robustez adversarial antes de desplegar en producción.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.