Feature Extraction de Binarios PE para Machine Learning
Técnicas avanzadas de extracción de features de binarios PE para modelos de Machine Learning: headers, imports, secciones, entropía, histogramas de bytes, strings, recursos y metadata. Cómo convertir un ejecutable en un vector numérico que un clasificador pueda procesar.
El binario como señal: de bytes a vectores
Un ejecutable PE de Windows es una estructura de datos rica. Contiene metadatos sobre cómo se compiló, qué librerías usa, cómo se organizan sus secciones en memoria, qué strings contiene y cómo se distribuyen sus bytes. Cada uno de estos elementos es una señal potencial para distinguir malware de software legítimo.
El reto de la extracción de features es doble: capturar suficiente información para que el modelo discrimine, sin generar tanto ruido que el modelo memorice en lugar de generalizar. Un buen extractor de features es más importante que un buen algoritmo de ML. Con features mediocres, ni el mejor modelo del mundo funcionará.
Anatomía del formato PE
Antes de extraer features necesitas entender la estructura que estás parseando:
┌─────────────────────┐
│ DOS Header │ MZ signature + e_lfanew (offset al PE header)
├─────────────────────┤
│ DOS Stub │ "This program cannot be run in DOS mode"
├─────────────────────┤
│ PE Signature │ "PE\0\0"
├─────────────────────┤
│ COFF Header │ Machine, NumberOfSections, TimeDateStamp
├─────────────────────┤
│ Optional Header │ Magic, EntryPoint, ImageBase, Subsystem
│ + Data Dirs │ Import/Export/Resource/Debug/TLS directories
├─────────────────────┤
│ Section Headers │ .text, .data, .rdata, .rsrc, .reloc...
├─────────────────────┤
│ Section Data │ Código, datos, recursos, relocaciones
└─────────────────────┘
Cada bloque contiene features útiles. Las anomalías en cualquier parte de esta estructura pueden indicar manipulación: timestamps falsos, entry points en secciones inusuales, secciones con permisos de escritura y ejecución simultáneos, imports resueltas dinámicamente para evadir análisis.
Features del COFF File Header
El COFF header contiene metadatos fundamentales sobre el binario:
import pefile
from datetime import datetime, timezone
def extract_coff_features(pe: pefile.PE) -> dict:
fh = pe.FILE_HEADER
# Timestamp: muchos builders de malware usan timestamps falsos
ts = fh.TimeDateStamp
compile_date = datetime.fromtimestamp(ts, tz=timezone.utc)
now = datetime.now(timezone.utc)
return {
# Tipo de máquina (x86=0x14c, x64=0x8664)
"machine_type": fh.Machine,
"is_x64": int(fh.Machine == 0x8664),
# Número de secciones (malware suele tener pocas o muchas inusuales)
"num_sections": fh.NumberOfSections,
# Timestamp de compilación
"timestamp": ts,
"timestamp_year": compile_date.year,
"timestamp_future": int(compile_date > now),
"timestamp_ancient": int(compile_date.year < 2000),
"timestamp_zero": int(ts == 0),
# Pointer to symbol table (suele ser 0 en PE modernos)
"has_symbols": int(fh.PointerToSymbolTable != 0),
"num_symbols": fh.NumberOfSymbols,
# Characteristics flags
"is_dll": int(bool(fh.Characteristics & 0x2000)),
"is_executable": int(bool(fh.Characteristics & 0x0002)),
"is_large_address_aware": int(bool(fh.Characteristics & 0x0020)),
"relocs_stripped": int(bool(fh.Characteristics & 0x0001)),
"debug_stripped": int(bool(fh.Characteristics & 0x0200)),
}
Señales de alerta en el timestamp: valores de cero, fechas anteriores a la existencia de Windows, fechas futuras. Estos patrones son comunes en malware compilado con herramientas que no preservan el timestamp real.
Features del Optional Header
El Optional Header (que pese a su nombre es obligatorio) contiene información sobre el layout en memoria:
def extract_optional_header_features(pe: pefile.PE) -> dict:
oh = pe.OPTIONAL_HEADER
features = {
# Magic number: PE32 (0x10b) o PE32+ (0x20b)
"magic": oh.Magic,
"is_pe32_plus": int(oh.Magic == 0x20b),
# Tamaños de código y datos
"size_of_code": oh.SizeOfCode,
"size_of_initialized_data": oh.SizeOfInitializedData,
"size_of_uninitialized_data": oh.SizeOfUninitializedData,
# Entry point
"entry_point": oh.AddressOfEntryPoint,
"base_of_code": oh.BaseOfCode,
# Image layout
"image_base": oh.ImageBase,
"section_alignment": oh.SectionAlignment,
"file_alignment": oh.FileAlignment,
"size_of_image": oh.SizeOfImage,
"size_of_headers": oh.SizeOfHeaders,
# Subsystem
"subsystem": oh.Subsystem,
"is_gui": int(oh.Subsystem == 2),
"is_console": int(oh.Subsystem == 3),
# DLL Characteristics (security features)
"dll_characteristics": oh.DllCharacteristics,
"has_aslr": int(bool(oh.DllCharacteristics & 0x0040)),
"has_dep": int(bool(oh.DllCharacteristics & 0x0100)),
"has_seh": int(not bool(oh.DllCharacteristics & 0x0400)),
"has_cfg": int(bool(oh.DllCharacteristics & 0x4000)),
# Stack y heap sizes
"size_of_stack_reserve": oh.SizeOfStackReserve,
"size_of_heap_reserve": oh.SizeOfHeapReserve,
# Número de data directories
"num_data_directories": oh.NumberOfRvaAndSizes,
}
# Ratio code/data: malware empacado suele tener mucho más dato que código
total_size = oh.SizeOfCode + oh.SizeOfInitializedData
if total_size > 0:
features["code_to_data_ratio"] = oh.SizeOfCode / total_size
else:
features["code_to_data_ratio"] = 0
return features
Feature clave: seguridad del binario. Software legítimo moderno casi siempre tiene ASLR, DEP y CFG habilitados. Malware compilado con herramientas personalizadas frecuentemente no los tiene. La ausencia de estas protecciones es una señal estadística fuerte.
Features de secciones
Las secciones son la columna vertebral del análisis estático. Cada sección tiene un nombre, permisos, tamaño en disco vs. tamaño en memoria y entropía que revelan mucho sobre el binario:
import math
from collections import Counter
def calculate_entropy(data: bytes) -> float:
"""Calcula la entropía de Shannon de una secuencia de bytes."""
if not data:
return 0.0
counts = Counter(data)
total = len(data)
entropy = 0.0
for count in counts.values():
p = count / total
entropy -= p * math.log2(p)
return entropy
def extract_section_features(pe: pefile.PE) -> dict:
features = {}
entropies = []
raw_sizes = []
virtual_sizes = []
section_names = []
# Nombres de secciones estándar de compiladores legítimos
standard_names = {
b".text", b".data", b".rdata", b".bss", b".rsrc",
b".reloc", b".edata", b".idata", b".pdata", b".tls",
b".debug", b"CODE", b"DATA", b".CRT",
}
writable_executable = 0
high_entropy_sections = 0
zero_raw_size = 0
non_standard_names = 0
for section in pe.sections:
name = section.Name.rstrip(b"\x00")
section_names.append(name)
# Entropía de la sección
data = section.get_data()
entropy = calculate_entropy(data)
entropies.append(entropy)
raw_size = section.SizeOfRawData
virtual_size = section.Misc_VirtualSize
raw_sizes.append(raw_size)
virtual_sizes.append(virtual_size)
# Flags de permisos
chars = section.Characteristics
is_writable = bool(chars & 0x80000000)
is_executable = bool(chars & 0x20000000)
if is_writable and is_executable:
writable_executable += 1
if entropy > 7.0:
high_entropy_sections += 1
if raw_size == 0 and virtual_size > 0:
zero_raw_size += 1
if name not in standard_names:
non_standard_names += 1
# Estadísticas agregadas de entropía
if entropies:
features["section_entropy_max"] = max(entropies)
features["section_entropy_min"] = min(entropies)
features["section_entropy_mean"] = sum(entropies) / len(entropies)
features["section_entropy_std"] = (
sum((e - features["section_entropy_mean"])**2 for e in entropies)
/ len(entropies)
) ** 0.5
else:
features["section_entropy_max"] = 0
features["section_entropy_min"] = 0
features["section_entropy_mean"] = 0
features["section_entropy_std"] = 0
# Estadísticas de tamaños
if raw_sizes:
features["section_raw_size_max"] = max(raw_sizes)
features["section_raw_size_min"] = min(raw_sizes)
features["section_raw_size_total"] = sum(raw_sizes)
else:
features["section_raw_size_max"] = 0
features["section_raw_size_min"] = 0
features["section_raw_size_total"] = 0
# Ratios virtual/raw (indicadores de packing)
vr_ratios = []
for v, r in zip(virtual_sizes, raw_sizes):
if r > 0:
vr_ratios.append(v / r)
if vr_ratios:
features["vr_ratio_max"] = max(vr_ratios)
features["vr_ratio_mean"] = sum(vr_ratios) / len(vr_ratios)
else:
features["vr_ratio_max"] = 0
features["vr_ratio_mean"] = 0
# Contadores de anomalías
features["num_sections"] = len(pe.sections)
features["writable_executable_sections"] = writable_executable
features["high_entropy_sections"] = high_entropy_sections
features["zero_raw_size_sections"] = zero_raw_size
features["non_standard_section_names"] = non_standard_names
# Entry point en sección inusual
ep = pe.OPTIONAL_HEADER.AddressOfEntryPoint
ep_section = None
for section in pe.sections:
if (section.VirtualAddress <= ep <
section.VirtualAddress + section.Misc_VirtualSize):
ep_section = section.Name.rstrip(b"\x00")
break
features["ep_in_text"] = int(ep_section == b".text")
features["ep_in_code"] = int(ep_section in (b".text", b"CODE"))
features["ep_in_non_standard"] = int(
ep_section is not None and ep_section not in standard_names
)
return features
Entropía como detector de packing
La entropía de Shannon mide la aleatoriedad de los datos. Para secciones de código y datos de un PE:
| Rango de entropía | Significado |
|---|---|
| 0.0 a 1.0 | Datos muy repetitivos (padding, null bytes) |
| 1.0 a 5.0 | Código compilado normal, datos estructurados |
| 5.0 a 6.5 | Código optimizado, datos comprimidos parcialmente |
| 6.5 a 7.5 | Posible compresión o cifrado ligero |
| 7.5 a 8.0 | Muy probablemente cifrado o comprimido (packing) |
El malware empacado con UPX, Themida, VMProtect u otros packers muestra entropía cercana a 8.0 en la sección que contiene el payload cifrado.
Features de imports
Las imports son posiblemente la fuente de features más informativa. Cada DLL y cada función importada revela intenciones del binario:
def extract_import_features(pe: pefile.PE) -> dict:
features = {
"num_imports": 0,
"num_import_dlls": 0,
}
# Categorías de DLLs
dll_categories = {
"system": ["kernel32", "ntdll", "kernelbase"],
"network": ["ws2_32", "wininet", "winhttp", "urlmon", "dnsapi"],
"crypto": ["crypt32", "bcrypt", "ncrypt", "advapi32"],
"ui": ["user32", "gdi32", "shell32", "ole32", "oleaut32"],
"process": ["psapi", "tlhelp32", "dbghelp"],
"registry": ["advapi32"],
"com": ["ole32", "oleaut32", "comctl32"],
}
# APIs sospechosas agrupadas por comportamiento
suspicious_groups = {
"injection": [
"VirtualAllocEx", "WriteProcessMemory", "CreateRemoteThread",
"NtCreateThreadEx", "RtlCreateUserThread",
"QueueUserAPC", "NtQueueApcThread",
"SetThreadContext", "NtSetContextThread",
],
"hooking": [
"SetWindowsHookEx", "SetWinEventHook",
"GetAsyncKeyState", "GetKeyState", "GetKeyboardState",
],
"evasion": [
"IsDebuggerPresent", "CheckRemoteDebuggerPresent",
"NtQueryInformationProcess", "OutputDebugString",
"GetTickCount", "QueryPerformanceCounter",
"Sleep", "SleepEx",
],
"persistence": [
"RegSetValueEx", "RegCreateKeyEx",
"CreateService", "StartService",
"SHGetFolderPath", "SHGetKnownFolderPath",
],
"download": [
"URLDownloadToFile", "InternetOpen", "InternetOpenUrl",
"HttpSendRequest", "InternetReadFile",
"WinHttpOpen", "WinHttpConnect",
],
"crypto_ops": [
"CryptAcquireContext", "CryptEncrypt", "CryptDecrypt",
"CryptGenKey", "CryptImportKey", "CryptExportKey",
"BCryptEncrypt", "BCryptDecrypt",
],
"file_ops": [
"CreateFile", "WriteFile", "ReadFile", "DeleteFile",
"MoveFile", "CopyFile", "FindFirstFile",
],
}
# Inicializar contadores
for category in dll_categories:
features[f"dll_cat_{category}"] = 0
for group in suspicious_groups:
features[f"suspicious_{group}"] = 0
if not hasattr(pe, "DIRECTORY_ENTRY_IMPORT"):
return features
entries = pe.DIRECTORY_ENTRY_IMPORT
features["num_import_dlls"] = len(entries)
total_imports = 0
for entry in entries:
dll_name = entry.dll.decode("utf-8", errors="ignore").lower()
dll_base = dll_name.replace(".dll", "")
# Categorizar DLL
for category, dlls in dll_categories.items():
if dll_base in dlls:
features[f"dll_cat_{category}"] += 1
# Analizar cada import
for imp in entry.imports:
total_imports += 1
if imp.name:
api_name = imp.name.decode("utf-8", errors="ignore")
for group, apis in suspicious_groups.items():
if api_name in apis:
features[f"suspicious_{group}"] += 1
features["num_imports"] = total_imports
# Ratio de imports sospechosas sobre total
total_suspicious = sum(
features[f"suspicious_{g}"] for g in suspicious_groups
)
features["suspicious_ratio"] = (
total_suspicious / total_imports if total_imports > 0 else 0
)
return features
Imports que delatan al malware
| Categoría | APIs clave | Lo que indica |
|---|---|---|
| Inyección | VirtualAllocEx, WriteProcessMemory, CreateRemoteThread | Process injection (T1055) |
| Keylogging | SetWindowsHookEx, GetAsyncKeyState | Captura de teclado (T1056) |
| Anti-análisis | IsDebuggerPresent, CheckRemoteDebuggerPresent | Evasión de sandbox (T1497) |
| Descarga | URLDownloadToFile, InternetOpen | Dropper, downloader (T1105) |
| Cifrado | CryptEncrypt, BCryptEncrypt | Ransomware, exfiltración cifrada |
| Persistencia | RegSetValueEx, CreateService | Supervivencia post-reboot (T1547) |
Histograma de bytes
El histograma de bytes es una feature de bajo nivel sorprendentemente potente. Representa la frecuencia de cada valor de byte (0x00 a 0xFF) en el fichero completo:
import numpy as np
def extract_byte_histogram(data: bytes, normalize: bool = True) -> np.ndarray:
"""
Genera un histograma de 256 bins con la frecuencia de cada byte.
Normalizado produce una distribución de probabilidad.
"""
histogram = np.zeros(256, dtype=np.float64)
for byte in data:
histogram[byte] += 1
if normalize and len(data) > 0:
histogram /= len(data)
return histogram
def extract_byte_features(data: bytes) -> dict:
"""Features derivadas del histograma de bytes."""
hist = extract_byte_histogram(data, normalize=True)
features = {}
# El histograma completo como 256 features
for i in range(256):
features[f"byte_{i:02x}"] = hist[i]
# Estadísticas agregadas
features["byte_mean"] = float(np.mean(hist))
features["byte_std"] = float(np.std(hist))
features["byte_max"] = float(np.max(hist))
features["byte_min"] = float(np.min(hist[hist > 0])) if np.any(hist > 0) else 0
# Bytes únicos presentes
features["byte_unique"] = int(np.count_nonzero(hist))
# Ratio de bytes imprimibles (ASCII 32-126)
printable = float(np.sum(hist[32:127]))
features["printable_ratio"] = printable
# Ratio de null bytes
features["null_ratio"] = float(hist[0])
# Entropía del histograma (equivalente a entropía del fichero)
non_zero = hist[hist > 0]
features["file_entropy"] = float(-np.sum(non_zero * np.log2(non_zero)))
return features
¿Por qué funciona? Un ejecutable compilado normalmente tiene un perfil de bytes predecible: muchos null bytes, alta frecuencia de ciertos opcodes x86, distribución no uniforme. El malware cifrado o comprimido tiene una distribución casi uniforme (entropía alta). Cada packer produce un perfil de bytes diferente, lo que permite al modelo aprender a identificar packers específicos.
Histograma de entropía por ventana
Una mejora sobre la entropía global es calcular la entropía en ventanas deslizantes a lo largo del fichero. Esto captura la variabilidad interna:
def extract_entropy_histogram(
data: bytes,
window_size: int = 1024,
step: int = 256,
num_bins: int = 64
) -> dict:
"""
Calcula entropía en ventanas deslizantes y genera un histograma
de la distribución de entropías.
"""
if len(data) < window_size:
return {f"entropy_bin_{i}": 0.0 for i in range(num_bins)}
entropies = []
for start in range(0, len(data) - window_size, step):
window = data[start:start + window_size]
entropy = calculate_entropy(window)
entropies.append(entropy)
# Histograma de entropías (bins de 0 a 8)
hist, _ = np.histogram(entropies, bins=num_bins, range=(0, 8))
hist = hist.astype(np.float64)
if hist.sum() > 0:
hist /= hist.sum()
features = {}
for i in range(num_bins):
features[f"entropy_bin_{i}"] = float(hist[i])
# Estadísticas
arr = np.array(entropies)
features["entropy_window_mean"] = float(np.mean(arr))
features["entropy_window_std"] = float(np.std(arr))
features["entropy_window_max"] = float(np.max(arr))
features["entropy_window_min"] = float(np.min(arr))
# Porcentaje de ventanas con alta entropía (posible cifrado)
features["high_entropy_window_ratio"] = float(np.mean(arr > 7.0))
return features
Este enfoque revela la estructura interna del binario. Un ejecutable normal tiene zonas de código (entropía media), zonas de datos (entropía baja) y quizás recursos comprimidos (entropía alta). Malware empacado muestra entropía uniformemente alta en casi todas las ventanas.
Features de strings
Las strings embebidas en un binario revelan URLs, IPs, rutas del sistema, comandos y otros indicadores:
import re
def extract_string_features(data: bytes, min_length: int = 6) -> dict:
"""Extrae features basadas en strings ASCII y Unicode del binario."""
# Extraer strings ASCII
ascii_strings = re.findall(
rb"[\x20-\x7e]{" + str(min_length).encode() + rb",}", data
)
ascii_strings = [s.decode("ascii", errors="ignore") for s in ascii_strings]
# Extraer strings Unicode (UTF-16LE)
unicode_strings = re.findall(
rb"(?:[\x20-\x7e]\x00){" + str(min_length).encode() + rb",}", data
)
unicode_strings = [
s.decode("utf-16-le", errors="ignore") for s in unicode_strings
]
all_strings = ascii_strings + unicode_strings
features = {
"num_strings_ascii": len(ascii_strings),
"num_strings_unicode": len(unicode_strings),
"num_strings_total": len(all_strings),
}
if all_strings:
lengths = [len(s) for s in all_strings]
features["string_avg_length"] = sum(lengths) / len(lengths)
features["string_max_length"] = max(lengths)
else:
features["string_avg_length"] = 0
features["string_max_length"] = 0
# Patrones sospechosos
combined = " ".join(all_strings).lower()
# URLs y dominios
url_pattern = r"https?://[^\s]+"
features["has_urls"] = int(bool(re.search(url_pattern, combined)))
features["num_urls"] = len(re.findall(url_pattern, combined))
# IPs
ip_pattern = r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"
features["has_ips"] = int(bool(re.search(ip_pattern, combined)))
features["num_ips"] = len(re.findall(ip_pattern, combined))
# Rutas del sistema
path_patterns = [
r"c:\\windows", r"c:\\users", r"\\appdata\\",
r"\\temp\\", r"\\system32\\", r"hkey_",
]
features["has_system_paths"] = int(
any(re.search(p, combined) for p in path_patterns)
)
# Comandos de shell
shell_keywords = [
"cmd.exe", "powershell", "wscript", "cscript",
"regsvr32", "rundll32", "mshta", "certutil",
]
features["has_shell_commands"] = int(
any(kw in combined for kw in shell_keywords)
)
# Indicadores de anti-análisis
antidebug_strings = [
"isdebuggerpresent", "ollydbg", "wireshark",
"vmware", "virtualbox", "sandboxie", "fiddler",
]
features["has_antidebug_strings"] = int(
any(s in combined for s in antidebug_strings)
)
return features
Features de recursos y exports
Dos fuentes de features adicionales que aportan valor:
def extract_resource_features(pe: pefile.PE) -> dict:
features = {
"has_resources": 0,
"num_resources": 0,
"resource_total_size": 0,
"resource_max_entropy": 0,
"resource_types": 0,
}
if not hasattr(pe, "DIRECTORY_ENTRY_RESOURCE"):
return features
features["has_resources"] = 1
resource_entropies = []
total_size = 0
resource_types = set()
count = 0
def walk_resources(entry, depth=0):
nonlocal count, total_size
if hasattr(entry, "directory"):
for child in entry.directory.entries:
if depth == 0 and hasattr(child, "id"):
resource_types.add(child.id)
walk_resources(child, depth + 1)
elif hasattr(entry, "data"):
count += 1
rva = entry.data.struct.OffsetToData
size = entry.data.struct.Size
total_size += size
try:
data = pe.get_data(rva, size)
resource_entropies.append(calculate_entropy(data))
except Exception:
pass
for entry in pe.DIRECTORY_ENTRY_RESOURCE.entries:
walk_resources(entry)
features["num_resources"] = count
features["resource_total_size"] = total_size
features["resource_types"] = len(resource_types)
if resource_entropies:
features["resource_max_entropy"] = max(resource_entropies)
features["resource_mean_entropy"] = (
sum(resource_entropies) / len(resource_entropies)
)
return features
def extract_export_features(pe: pefile.PE) -> dict:
features = {
"has_exports": 0,
"num_exports": 0,
"num_named_exports": 0,
}
if not hasattr(pe, "DIRECTORY_ENTRY_EXPORT"):
return features
features["has_exports"] = 1
exports = pe.DIRECTORY_ENTRY_EXPORT.symbols
features["num_exports"] = len(exports)
features["num_named_exports"] = sum(
1 for exp in exports if exp.name is not None
)
return features
El extractor completo
Combinando todos los módulos anteriores en un pipeline unificado:
class CompletePEExtractor:
"""Pipeline completo de extracción de features de PE."""
FEATURE_GROUPS = [
"coff_header",
"optional_header",
"sections",
"imports",
"byte_histogram",
"entropy_histogram",
"strings",
"resources",
"exports",
]
def __init__(self, filepath: str):
self.filepath = filepath
with open(filepath, "rb") as f:
self.raw_bytes = f.read()
self.pe = pefile.PE(filepath, fast_load=False)
def extract(self, groups: list[str] | None = None) -> dict:
"""Extrae features de los grupos especificados (o todos)."""
if groups is None:
groups = self.FEATURE_GROUPS
features = {}
extractors = {
"coff_header": lambda: extract_coff_features(self.pe),
"optional_header": lambda: extract_optional_header_features(self.pe),
"sections": lambda: extract_section_features(self.pe),
"imports": lambda: extract_import_features(self.pe),
"byte_histogram": lambda: extract_byte_features(self.raw_bytes),
"entropy_histogram": lambda: extract_entropy_histogram(self.raw_bytes),
"strings": lambda: extract_string_features(self.raw_bytes),
"resources": lambda: extract_resource_features(self.pe),
"exports": lambda: extract_export_features(self.pe),
}
for group in groups:
if group in extractors:
try:
group_features = extractors[group]()
features.update(group_features)
except Exception as e:
# Log pero no falla: features faltantes se rellenan
print(f"Error extracting {group}: {e}")
return features
def to_vector(self, feature_order: list[str]) -> np.ndarray:
"""Convierte features a vector numérico con orden fijo."""
features = self.extract()
return np.array(
[features.get(name, 0) for name in feature_order],
dtype=np.float64
)
Selección de features y reducción de dimensionalidad
Con cientos o miles de features, no todas son útiles. Algunas son redundantes, otras añaden ruido:
from sklearn.feature_selection import mutual_info_classif
from sklearn.decomposition import PCA
def select_features(
X_train: np.ndarray,
y_train: np.ndarray,
feature_names: list[str],
top_k: int = 100
) -> list[str]:
"""Selecciona las top_k features por mutual information."""
mi_scores = mutual_info_classif(
X_train, y_train, random_state=42, n_jobs=-1
)
# Ranking por mutual information
ranked = sorted(
zip(feature_names, mi_scores),
key=lambda x: x[1],
reverse=True
)
selected = [name for name, score in ranked[:top_k]]
print(f"Top 10 features por mutual information:")
for name, score in ranked[:10]:
print(f" {name}: {score:.4f}")
return selected
Tradeoff importante: más features capturan más información pero aumentan el riesgo de overfitting y el tiempo de extracción. En producción, donde cada milisegundo cuenta, un extractor con 100 features bien seleccionadas suele superar a uno con 2.000 features redundantes.
Rendimiento y producción
Para sistemas que procesan miles de ficheros por hora, la velocidad de extracción importa:
import time
def benchmark_extractor(filepath: str, iterations: int = 100):
"""Mide el tiempo de extracción por fichero."""
times = []
for _ in range(iterations):
start = time.perf_counter()
extractor = CompletePEExtractor(filepath)
_ = extractor.extract()
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = sum(times) / len(times)
print(f"Tiempo medio de extracción: {avg*1000:.1f}ms")
print(f"Throughput: {1/avg:.0f} ficheros/segundo")
En un servidor moderno, la extracción completa de features de un PE tarda entre 10ms y 50ms. Esto permite procesar entre 20 y 100 ficheros por segundo con un solo core. Para volúmenes mayores, la extracción se paraleliza trivialmente con multiprocessing.
El cuello de botella en producción no suele ser la extracción sino la inferencia del modelo. Con un Random Forest de 200 árboles, la predicción tarda menos de 1ms por muestra. Con un modelo de deep learning, puede subir a 10ms o más.
Conclusión: las features son el fundamento
La calidad de un clasificador de malware depende más de la calidad de las features que del algoritmo. Un Random Forest con buenas features supera consistentemente a un deep learning con features pobres.
El extractor presentado aquí cubre las nueve categorías de features más relevantes para análisis estático de PE. Es extensible: puedes añadir features de debug directories, rich header, certificados digitales, overlay data y más. Cada nueva categoría mejora marginalmente la capacidad discriminativa del modelo, con rendimientos decrecientes.
El siguiente paso natural es aplicar deep learning directamente sobre los bytes del binario, eliminando la necesidad de extracción manual de features. Eso lo cubrimos en el siguiente artículo de la serie.
Preguntas frecuentes
Artículos relacionados
Este contenido tiene fines exclusivamente educativos y de investigación en ciberseguridad defensiva. No se proporcionan binarios maliciosos ni payloads ejecutables. El uso indebido de esta información es responsabilidad exclusiva del usuario. Leer disclaimer completo.