AvanzadoFeature ExtractionPE Analysisanálisis estáticoMachine Learningpefile

Feature Extraction de Binarios PE para Machine Learning

Técnicas avanzadas de extracción de features de binarios PE para modelos de Machine Learning: headers, imports, secciones, entropía, histogramas de bytes, strings, recursos y metadata. Cómo convertir un ejecutable en un vector numérico que un clasificador pueda procesar.

MalwareIntel Research··15 min lectura
Serie: AI/ML para Malware — Parte 10

El binario como señal: de bytes a vectores

Un ejecutable PE de Windows es una estructura de datos rica. Contiene metadatos sobre cómo se compiló, qué librerías usa, cómo se organizan sus secciones en memoria, qué strings contiene y cómo se distribuyen sus bytes. Cada uno de estos elementos es una señal potencial para distinguir malware de software legítimo.

El reto de la extracción de features es doble: capturar suficiente información para que el modelo discrimine, sin generar tanto ruido que el modelo memorice en lugar de generalizar. Un buen extractor de features es más importante que un buen algoritmo de ML. Con features mediocres, ni el mejor modelo del mundo funcionará.

Anatomía del formato PE

Antes de extraer features necesitas entender la estructura que estás parseando:

┌─────────────────────┐
│    DOS Header       │  MZ signature + e_lfanew (offset al PE header)
├─────────────────────┤
│    DOS Stub         │  "This program cannot be run in DOS mode"
├─────────────────────┤
│    PE Signature     │  "PE\0\0"
├─────────────────────┤
│    COFF Header      │  Machine, NumberOfSections, TimeDateStamp
├─────────────────────┤
│  Optional Header    │  Magic, EntryPoint, ImageBase, Subsystem
│   + Data Dirs       │  Import/Export/Resource/Debug/TLS directories
├─────────────────────┤
│  Section Headers    │  .text, .data, .rdata, .rsrc, .reloc...
├─────────────────────┤
│  Section Data       │  Código, datos, recursos, relocaciones
└─────────────────────┘

Cada bloque contiene features útiles. Las anomalías en cualquier parte de esta estructura pueden indicar manipulación: timestamps falsos, entry points en secciones inusuales, secciones con permisos de escritura y ejecución simultáneos, imports resueltas dinámicamente para evadir análisis.

Features del COFF File Header

El COFF header contiene metadatos fundamentales sobre el binario:

import pefile
from datetime import datetime, timezone

def extract_coff_features(pe: pefile.PE) -> dict:
    fh = pe.FILE_HEADER
    
    # Timestamp: muchos builders de malware usan timestamps falsos
    ts = fh.TimeDateStamp
    compile_date = datetime.fromtimestamp(ts, tz=timezone.utc)
    now = datetime.now(timezone.utc)
    
    return {
        # Tipo de máquina (x86=0x14c, x64=0x8664)
        "machine_type": fh.Machine,
        "is_x64": int(fh.Machine == 0x8664),
        
        # Número de secciones (malware suele tener pocas o muchas inusuales)
        "num_sections": fh.NumberOfSections,
        
        # Timestamp de compilación
        "timestamp": ts,
        "timestamp_year": compile_date.year,
        "timestamp_future": int(compile_date > now),
        "timestamp_ancient": int(compile_date.year < 2000),
        "timestamp_zero": int(ts == 0),
        
        # Pointer to symbol table (suele ser 0 en PE modernos)
        "has_symbols": int(fh.PointerToSymbolTable != 0),
        "num_symbols": fh.NumberOfSymbols,
        
        # Characteristics flags
        "is_dll": int(bool(fh.Characteristics & 0x2000)),
        "is_executable": int(bool(fh.Characteristics & 0x0002)),
        "is_large_address_aware": int(bool(fh.Characteristics & 0x0020)),
        "relocs_stripped": int(bool(fh.Characteristics & 0x0001)),
        "debug_stripped": int(bool(fh.Characteristics & 0x0200)),
    }

Señales de alerta en el timestamp: valores de cero, fechas anteriores a la existencia de Windows, fechas futuras. Estos patrones son comunes en malware compilado con herramientas que no preservan el timestamp real.

Features del Optional Header

El Optional Header (que pese a su nombre es obligatorio) contiene información sobre el layout en memoria:

def extract_optional_header_features(pe: pefile.PE) -> dict:
    oh = pe.OPTIONAL_HEADER
    
    features = {
        # Magic number: PE32 (0x10b) o PE32+ (0x20b)
        "magic": oh.Magic,
        "is_pe32_plus": int(oh.Magic == 0x20b),
        
        # Tamaños de código y datos
        "size_of_code": oh.SizeOfCode,
        "size_of_initialized_data": oh.SizeOfInitializedData,
        "size_of_uninitialized_data": oh.SizeOfUninitializedData,
        
        # Entry point
        "entry_point": oh.AddressOfEntryPoint,
        "base_of_code": oh.BaseOfCode,
        
        # Image layout
        "image_base": oh.ImageBase,
        "section_alignment": oh.SectionAlignment,
        "file_alignment": oh.FileAlignment,
        "size_of_image": oh.SizeOfImage,
        "size_of_headers": oh.SizeOfHeaders,
        
        # Subsystem
        "subsystem": oh.Subsystem,
        "is_gui": int(oh.Subsystem == 2),
        "is_console": int(oh.Subsystem == 3),
        
        # DLL Characteristics (security features)
        "dll_characteristics": oh.DllCharacteristics,
        "has_aslr": int(bool(oh.DllCharacteristics & 0x0040)),
        "has_dep": int(bool(oh.DllCharacteristics & 0x0100)),
        "has_seh": int(not bool(oh.DllCharacteristics & 0x0400)),
        "has_cfg": int(bool(oh.DllCharacteristics & 0x4000)),
        
        # Stack y heap sizes
        "size_of_stack_reserve": oh.SizeOfStackReserve,
        "size_of_heap_reserve": oh.SizeOfHeapReserve,
        
        # Número de data directories
        "num_data_directories": oh.NumberOfRvaAndSizes,
    }
    
    # Ratio code/data: malware empacado suele tener mucho más dato que código
    total_size = oh.SizeOfCode + oh.SizeOfInitializedData
    if total_size > 0:
        features["code_to_data_ratio"] = oh.SizeOfCode / total_size
    else:
        features["code_to_data_ratio"] = 0
    
    return features

Feature clave: seguridad del binario. Software legítimo moderno casi siempre tiene ASLR, DEP y CFG habilitados. Malware compilado con herramientas personalizadas frecuentemente no los tiene. La ausencia de estas protecciones es una señal estadística fuerte.

Features de secciones

Las secciones son la columna vertebral del análisis estático. Cada sección tiene un nombre, permisos, tamaño en disco vs. tamaño en memoria y entropía que revelan mucho sobre el binario:

import math
from collections import Counter

def calculate_entropy(data: bytes) -> float:
    """Calcula la entropía de Shannon de una secuencia de bytes."""
    if not data:
        return 0.0
    counts = Counter(data)
    total = len(data)
    entropy = 0.0
    for count in counts.values():
        p = count / total
        entropy -= p * math.log2(p)
    return entropy

def extract_section_features(pe: pefile.PE) -> dict:
    features = {}
    
    entropies = []
    raw_sizes = []
    virtual_sizes = []
    section_names = []
    
    # Nombres de secciones estándar de compiladores legítimos
    standard_names = {
        b".text", b".data", b".rdata", b".bss", b".rsrc",
        b".reloc", b".edata", b".idata", b".pdata", b".tls",
        b".debug", b"CODE", b"DATA", b".CRT",
    }
    
    writable_executable = 0
    high_entropy_sections = 0
    zero_raw_size = 0
    non_standard_names = 0
    
    for section in pe.sections:
        name = section.Name.rstrip(b"\x00")
        section_names.append(name)
        
        # Entropía de la sección
        data = section.get_data()
        entropy = calculate_entropy(data)
        entropies.append(entropy)
        
        raw_size = section.SizeOfRawData
        virtual_size = section.Misc_VirtualSize
        raw_sizes.append(raw_size)
        virtual_sizes.append(virtual_size)
        
        # Flags de permisos
        chars = section.Characteristics
        is_writable = bool(chars & 0x80000000)
        is_executable = bool(chars & 0x20000000)
        
        if is_writable and is_executable:
            writable_executable += 1
        
        if entropy > 7.0:
            high_entropy_sections += 1
        
        if raw_size == 0 and virtual_size > 0:
            zero_raw_size += 1
        
        if name not in standard_names:
            non_standard_names += 1
    
    # Estadísticas agregadas de entropía
    if entropies:
        features["section_entropy_max"] = max(entropies)
        features["section_entropy_min"] = min(entropies)
        features["section_entropy_mean"] = sum(entropies) / len(entropies)
        features["section_entropy_std"] = (
            sum((e - features["section_entropy_mean"])**2 for e in entropies)
            / len(entropies)
        ) ** 0.5
    else:
        features["section_entropy_max"] = 0
        features["section_entropy_min"] = 0
        features["section_entropy_mean"] = 0
        features["section_entropy_std"] = 0
    
    # Estadísticas de tamaños
    if raw_sizes:
        features["section_raw_size_max"] = max(raw_sizes)
        features["section_raw_size_min"] = min(raw_sizes)
        features["section_raw_size_total"] = sum(raw_sizes)
    else:
        features["section_raw_size_max"] = 0
        features["section_raw_size_min"] = 0
        features["section_raw_size_total"] = 0
    
    # Ratios virtual/raw (indicadores de packing)
    vr_ratios = []
    for v, r in zip(virtual_sizes, raw_sizes):
        if r > 0:
            vr_ratios.append(v / r)
    
    if vr_ratios:
        features["vr_ratio_max"] = max(vr_ratios)
        features["vr_ratio_mean"] = sum(vr_ratios) / len(vr_ratios)
    else:
        features["vr_ratio_max"] = 0
        features["vr_ratio_mean"] = 0
    
    # Contadores de anomalías
    features["num_sections"] = len(pe.sections)
    features["writable_executable_sections"] = writable_executable
    features["high_entropy_sections"] = high_entropy_sections
    features["zero_raw_size_sections"] = zero_raw_size
    features["non_standard_section_names"] = non_standard_names
    
    # Entry point en sección inusual
    ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
    ep_section = None
    for section in pe.sections:
        if (section.VirtualAddress <= ep < 
            section.VirtualAddress + section.Misc_VirtualSize):
            ep_section = section.Name.rstrip(b"\x00")
            break
    
    features["ep_in_text"] = int(ep_section == b".text")
    features["ep_in_code"] = int(ep_section in (b".text", b"CODE"))
    features["ep_in_non_standard"] = int(
        ep_section is not None and ep_section not in standard_names
    )
    
    return features

Entropía como detector de packing

La entropía de Shannon mide la aleatoriedad de los datos. Para secciones de código y datos de un PE:

Rango de entropíaSignificado
0.0 a 1.0Datos muy repetitivos (padding, null bytes)
1.0 a 5.0Código compilado normal, datos estructurados
5.0 a 6.5Código optimizado, datos comprimidos parcialmente
6.5 a 7.5Posible compresión o cifrado ligero
7.5 a 8.0Muy probablemente cifrado o comprimido (packing)

El malware empacado con UPX, Themida, VMProtect u otros packers muestra entropía cercana a 8.0 en la sección que contiene el payload cifrado.

Features de imports

Las imports son posiblemente la fuente de features más informativa. Cada DLL y cada función importada revela intenciones del binario:

def extract_import_features(pe: pefile.PE) -> dict:
    features = {
        "num_imports": 0,
        "num_import_dlls": 0,
    }
    
    # Categorías de DLLs
    dll_categories = {
        "system": ["kernel32", "ntdll", "kernelbase"],
        "network": ["ws2_32", "wininet", "winhttp", "urlmon", "dnsapi"],
        "crypto": ["crypt32", "bcrypt", "ncrypt", "advapi32"],
        "ui": ["user32", "gdi32", "shell32", "ole32", "oleaut32"],
        "process": ["psapi", "tlhelp32", "dbghelp"],
        "registry": ["advapi32"],
        "com": ["ole32", "oleaut32", "comctl32"],
    }
    
    # APIs sospechosas agrupadas por comportamiento
    suspicious_groups = {
        "injection": [
            "VirtualAllocEx", "WriteProcessMemory", "CreateRemoteThread",
            "NtCreateThreadEx", "RtlCreateUserThread",
            "QueueUserAPC", "NtQueueApcThread",
            "SetThreadContext", "NtSetContextThread",
        ],
        "hooking": [
            "SetWindowsHookEx", "SetWinEventHook",
            "GetAsyncKeyState", "GetKeyState", "GetKeyboardState",
        ],
        "evasion": [
            "IsDebuggerPresent", "CheckRemoteDebuggerPresent",
            "NtQueryInformationProcess", "OutputDebugString",
            "GetTickCount", "QueryPerformanceCounter",
            "Sleep", "SleepEx",
        ],
        "persistence": [
            "RegSetValueEx", "RegCreateKeyEx",
            "CreateService", "StartService",
            "SHGetFolderPath", "SHGetKnownFolderPath",
        ],
        "download": [
            "URLDownloadToFile", "InternetOpen", "InternetOpenUrl",
            "HttpSendRequest", "InternetReadFile",
            "WinHttpOpen", "WinHttpConnect",
        ],
        "crypto_ops": [
            "CryptAcquireContext", "CryptEncrypt", "CryptDecrypt",
            "CryptGenKey", "CryptImportKey", "CryptExportKey",
            "BCryptEncrypt", "BCryptDecrypt",
        ],
        "file_ops": [
            "CreateFile", "WriteFile", "ReadFile", "DeleteFile",
            "MoveFile", "CopyFile", "FindFirstFile",
        ],
    }
    
    # Inicializar contadores
    for category in dll_categories:
        features[f"dll_cat_{category}"] = 0
    for group in suspicious_groups:
        features[f"suspicious_{group}"] = 0
    
    if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
        return features
    
    entries = pe.DIRECTORY_ENTRY_IMPORT
    features["num_import_dlls"] = len(entries)
    
    total_imports = 0
    for entry in entries:
        dll_name = entry.dll.decode("utf-8", errors="ignore").lower()
        dll_base = dll_name.replace(".dll", "")
        
        # Categorizar DLL
        for category, dlls in dll_categories.items():
            if dll_base in dlls:
                features[f"dll_cat_{category}"] += 1
        
        # Analizar cada import
        for imp in entry.imports:
            total_imports += 1
            if imp.name:
                api_name = imp.name.decode("utf-8", errors="ignore")
                for group, apis in suspicious_groups.items():
                    if api_name in apis:
                        features[f"suspicious_{group}"] += 1
    
    features["num_imports"] = total_imports
    
    # Ratio de imports sospechosas sobre total
    total_suspicious = sum(
        features[f"suspicious_{g}"] for g in suspicious_groups
    )
    features["suspicious_ratio"] = (
        total_suspicious / total_imports if total_imports > 0 else 0
    )
    
    return features

Imports que delatan al malware

CategoríaAPIs claveLo que indica
InyecciónVirtualAllocEx, WriteProcessMemory, CreateRemoteThreadProcess injection (T1055)
KeyloggingSetWindowsHookEx, GetAsyncKeyStateCaptura de teclado (T1056)
Anti-análisisIsDebuggerPresent, CheckRemoteDebuggerPresentEvasión de sandbox (T1497)
DescargaURLDownloadToFile, InternetOpenDropper, downloader (T1105)
CifradoCryptEncrypt, BCryptEncryptRansomware, exfiltración cifrada
PersistenciaRegSetValueEx, CreateServiceSupervivencia post-reboot (T1547)

Histograma de bytes

El histograma de bytes es una feature de bajo nivel sorprendentemente potente. Representa la frecuencia de cada valor de byte (0x00 a 0xFF) en el fichero completo:

import numpy as np

def extract_byte_histogram(data: bytes, normalize: bool = True) -> np.ndarray:
    """
    Genera un histograma de 256 bins con la frecuencia de cada byte.
    Normalizado produce una distribución de probabilidad.
    """
    histogram = np.zeros(256, dtype=np.float64)
    for byte in data:
        histogram[byte] += 1
    
    if normalize and len(data) > 0:
        histogram /= len(data)
    
    return histogram

def extract_byte_features(data: bytes) -> dict:
    """Features derivadas del histograma de bytes."""
    hist = extract_byte_histogram(data, normalize=True)
    
    features = {}
    
    # El histograma completo como 256 features
    for i in range(256):
        features[f"byte_{i:02x}"] = hist[i]
    
    # Estadísticas agregadas
    features["byte_mean"] = float(np.mean(hist))
    features["byte_std"] = float(np.std(hist))
    features["byte_max"] = float(np.max(hist))
    features["byte_min"] = float(np.min(hist[hist > 0])) if np.any(hist > 0) else 0
    
    # Bytes únicos presentes
    features["byte_unique"] = int(np.count_nonzero(hist))
    
    # Ratio de bytes imprimibles (ASCII 32-126)
    printable = float(np.sum(hist[32:127]))
    features["printable_ratio"] = printable
    
    # Ratio de null bytes
    features["null_ratio"] = float(hist[0])
    
    # Entropía del histograma (equivalente a entropía del fichero)
    non_zero = hist[hist > 0]
    features["file_entropy"] = float(-np.sum(non_zero * np.log2(non_zero)))
    
    return features

¿Por qué funciona? Un ejecutable compilado normalmente tiene un perfil de bytes predecible: muchos null bytes, alta frecuencia de ciertos opcodes x86, distribución no uniforme. El malware cifrado o comprimido tiene una distribución casi uniforme (entropía alta). Cada packer produce un perfil de bytes diferente, lo que permite al modelo aprender a identificar packers específicos.

Histograma de entropía por ventana

Una mejora sobre la entropía global es calcular la entropía en ventanas deslizantes a lo largo del fichero. Esto captura la variabilidad interna:

def extract_entropy_histogram(
    data: bytes, 
    window_size: int = 1024, 
    step: int = 256,
    num_bins: int = 64
) -> dict:
    """
    Calcula entropía en ventanas deslizantes y genera un histograma
    de la distribución de entropías.
    """
    if len(data) < window_size:
        return {f"entropy_bin_{i}": 0.0 for i in range(num_bins)}
    
    entropies = []
    for start in range(0, len(data) - window_size, step):
        window = data[start:start + window_size]
        entropy = calculate_entropy(window)
        entropies.append(entropy)
    
    # Histograma de entropías (bins de 0 a 8)
    hist, _ = np.histogram(entropies, bins=num_bins, range=(0, 8))
    hist = hist.astype(np.float64)
    if hist.sum() > 0:
        hist /= hist.sum()
    
    features = {}
    for i in range(num_bins):
        features[f"entropy_bin_{i}"] = float(hist[i])
    
    # Estadísticas
    arr = np.array(entropies)
    features["entropy_window_mean"] = float(np.mean(arr))
    features["entropy_window_std"] = float(np.std(arr))
    features["entropy_window_max"] = float(np.max(arr))
    features["entropy_window_min"] = float(np.min(arr))
    
    # Porcentaje de ventanas con alta entropía (posible cifrado)
    features["high_entropy_window_ratio"] = float(np.mean(arr > 7.0))
    
    return features

Este enfoque revela la estructura interna del binario. Un ejecutable normal tiene zonas de código (entropía media), zonas de datos (entropía baja) y quizás recursos comprimidos (entropía alta). Malware empacado muestra entropía uniformemente alta en casi todas las ventanas.

Features de strings

Las strings embebidas en un binario revelan URLs, IPs, rutas del sistema, comandos y otros indicadores:

import re

def extract_string_features(data: bytes, min_length: int = 6) -> dict:
    """Extrae features basadas en strings ASCII y Unicode del binario."""
    
    # Extraer strings ASCII
    ascii_strings = re.findall(
        rb"[\x20-\x7e]{" + str(min_length).encode() + rb",}", data
    )
    ascii_strings = [s.decode("ascii", errors="ignore") for s in ascii_strings]
    
    # Extraer strings Unicode (UTF-16LE)
    unicode_strings = re.findall(
        rb"(?:[\x20-\x7e]\x00){" + str(min_length).encode() + rb",}", data
    )
    unicode_strings = [
        s.decode("utf-16-le", errors="ignore") for s in unicode_strings
    ]
    
    all_strings = ascii_strings + unicode_strings
    
    features = {
        "num_strings_ascii": len(ascii_strings),
        "num_strings_unicode": len(unicode_strings),
        "num_strings_total": len(all_strings),
    }
    
    if all_strings:
        lengths = [len(s) for s in all_strings]
        features["string_avg_length"] = sum(lengths) / len(lengths)
        features["string_max_length"] = max(lengths)
    else:
        features["string_avg_length"] = 0
        features["string_max_length"] = 0
    
    # Patrones sospechosos
    combined = " ".join(all_strings).lower()
    
    # URLs y dominios
    url_pattern = r"https?://[^\s]+"
    features["has_urls"] = int(bool(re.search(url_pattern, combined)))
    features["num_urls"] = len(re.findall(url_pattern, combined))
    
    # IPs
    ip_pattern = r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"
    features["has_ips"] = int(bool(re.search(ip_pattern, combined)))
    features["num_ips"] = len(re.findall(ip_pattern, combined))
    
    # Rutas del sistema
    path_patterns = [
        r"c:\\windows", r"c:\\users", r"\\appdata\\",
        r"\\temp\\", r"\\system32\\", r"hkey_",
    ]
    features["has_system_paths"] = int(
        any(re.search(p, combined) for p in path_patterns)
    )
    
    # Comandos de shell
    shell_keywords = [
        "cmd.exe", "powershell", "wscript", "cscript",
        "regsvr32", "rundll32", "mshta", "certutil",
    ]
    features["has_shell_commands"] = int(
        any(kw in combined for kw in shell_keywords)
    )
    
    # Indicadores de anti-análisis
    antidebug_strings = [
        "isdebuggerpresent", "ollydbg", "wireshark",
        "vmware", "virtualbox", "sandboxie", "fiddler",
    ]
    features["has_antidebug_strings"] = int(
        any(s in combined for s in antidebug_strings)
    )
    
    return features

Features de recursos y exports

Dos fuentes de features adicionales que aportan valor:

def extract_resource_features(pe: pefile.PE) -> dict:
    features = {
        "has_resources": 0,
        "num_resources": 0,
        "resource_total_size": 0,
        "resource_max_entropy": 0,
        "resource_types": 0,
    }
    
    if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
        return features
    
    features["has_resources"] = 1
    
    resource_entropies = []
    total_size = 0
    resource_types = set()
    count = 0
    
    def walk_resources(entry, depth=0):
        nonlocal count, total_size
        if hasattr(entry, "directory"):
            for child in entry.directory.entries:
                if depth == 0 and hasattr(child, "id"):
                    resource_types.add(child.id)
                walk_resources(child, depth + 1)
        elif hasattr(entry, "data"):
            count += 1
            rva = entry.data.struct.OffsetToData
            size = entry.data.struct.Size
            total_size += size
            try:
                data = pe.get_data(rva, size)
                resource_entropies.append(calculate_entropy(data))
            except Exception:
                pass
    
    for entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
        walk_resources(entry)
    
    features["num_resources"] = count
    features["resource_total_size"] = total_size
    features["resource_types"] = len(resource_types)
    
    if resource_entropies:
        features["resource_max_entropy"] = max(resource_entropies)
        features["resource_mean_entropy"] = (
            sum(resource_entropies) / len(resource_entropies)
        )
    
    return features

def extract_export_features(pe: pefile.PE) -> dict:
    features = {
        "has_exports": 0,
        "num_exports": 0,
        "num_named_exports": 0,
    }
    
    if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
        return features
    
    features["has_exports"] = 1
    exports = pe.DIRECTORY_ENTRY_EXPORT.symbols
    features["num_exports"] = len(exports)
    features["num_named_exports"] = sum(
        1 for exp in exports if exp.name is not None
    )
    
    return features

El extractor completo

Combinando todos los módulos anteriores en un pipeline unificado:

class CompletePEExtractor:
    """Pipeline completo de extracción de features de PE."""
    
    FEATURE_GROUPS = [
        "coff_header",
        "optional_header", 
        "sections",
        "imports",
        "byte_histogram",
        "entropy_histogram",
        "strings",
        "resources",
        "exports",
    ]
    
    def __init__(self, filepath: str):
        self.filepath = filepath
        with open(filepath, "rb") as f:
            self.raw_bytes = f.read()
        self.pe = pefile.PE(filepath, fast_load=False)
    
    def extract(self, groups: list[str] | None = None) -> dict:
        """Extrae features de los grupos especificados (o todos)."""
        if groups is None:
            groups = self.FEATURE_GROUPS
        
        features = {}
        
        extractors = {
            "coff_header": lambda: extract_coff_features(self.pe),
            "optional_header": lambda: extract_optional_header_features(self.pe),
            "sections": lambda: extract_section_features(self.pe),
            "imports": lambda: extract_import_features(self.pe),
            "byte_histogram": lambda: extract_byte_features(self.raw_bytes),
            "entropy_histogram": lambda: extract_entropy_histogram(self.raw_bytes),
            "strings": lambda: extract_string_features(self.raw_bytes),
            "resources": lambda: extract_resource_features(self.pe),
            "exports": lambda: extract_export_features(self.pe),
        }
        
        for group in groups:
            if group in extractors:
                try:
                    group_features = extractors[group]()
                    features.update(group_features)
                except Exception as e:
                    # Log pero no falla: features faltantes se rellenan
                    print(f"Error extracting {group}: {e}")
        
        return features
    
    def to_vector(self, feature_order: list[str]) -> np.ndarray:
        """Convierte features a vector numérico con orden fijo."""
        features = self.extract()
        return np.array(
            [features.get(name, 0) for name in feature_order],
            dtype=np.float64
        )

Selección de features y reducción de dimensionalidad

Con cientos o miles de features, no todas son útiles. Algunas son redundantes, otras añaden ruido:

from sklearn.feature_selection import mutual_info_classif
from sklearn.decomposition import PCA

def select_features(
    X_train: np.ndarray,
    y_train: np.ndarray,
    feature_names: list[str],
    top_k: int = 100
) -> list[str]:
    """Selecciona las top_k features por mutual information."""
    mi_scores = mutual_info_classif(
        X_train, y_train, random_state=42, n_jobs=-1
    )
    
    # Ranking por mutual information
    ranked = sorted(
        zip(feature_names, mi_scores),
        key=lambda x: x[1],
        reverse=True
    )
    
    selected = [name for name, score in ranked[:top_k]]
    
    print(f"Top 10 features por mutual information:")
    for name, score in ranked[:10]:
        print(f"  {name}: {score:.4f}")
    
    return selected

Tradeoff importante: más features capturan más información pero aumentan el riesgo de overfitting y el tiempo de extracción. En producción, donde cada milisegundo cuenta, un extractor con 100 features bien seleccionadas suele superar a uno con 2.000 features redundantes.

Rendimiento y producción

Para sistemas que procesan miles de ficheros por hora, la velocidad de extracción importa:

import time

def benchmark_extractor(filepath: str, iterations: int = 100):
    """Mide el tiempo de extracción por fichero."""
    times = []
    for _ in range(iterations):
        start = time.perf_counter()
        extractor = CompletePEExtractor(filepath)
        _ = extractor.extract()
        elapsed = time.perf_counter() - start
        times.append(elapsed)
    
    avg = sum(times) / len(times)
    print(f"Tiempo medio de extracción: {avg*1000:.1f}ms")
    print(f"Throughput: {1/avg:.0f} ficheros/segundo")

En un servidor moderno, la extracción completa de features de un PE tarda entre 10ms y 50ms. Esto permite procesar entre 20 y 100 ficheros por segundo con un solo core. Para volúmenes mayores, la extracción se paraleliza trivialmente con multiprocessing.

El cuello de botella en producción no suele ser la extracción sino la inferencia del modelo. Con un Random Forest de 200 árboles, la predicción tarda menos de 1ms por muestra. Con un modelo de deep learning, puede subir a 10ms o más.

Conclusión: las features son el fundamento

La calidad de un clasificador de malware depende más de la calidad de las features que del algoritmo. Un Random Forest con buenas features supera consistentemente a un deep learning con features pobres.

El extractor presentado aquí cubre las nueve categorías de features más relevantes para análisis estático de PE. Es extensible: puedes añadir features de debug directories, rich header, certificados digitales, overlay data y más. Cada nueva categoría mejora marginalmente la capacidad discriminativa del modelo, con rendimientos decrecientes.

El siguiente paso natural es aplicar deep learning directamente sobre los bytes del binario, eliminando la necesidad de extracción manual de features. Eso lo cubrimos en el siguiente artículo de la serie.

Preguntas frecuentes

Artículos relacionados

Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.