贝叶斯优化

用于训练模型,以获得最佳参数

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
from skopt import gp_minimize
from skopt.space import Real, Integer, Categorical
from skopt.utils import use_named_args
import time
import copy
from model import *  ###



# 训练函数
def train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=5):
    model = model.to(device)
    best_val_acc = 0.0
    best_model = None
    
    for epoch in range(int(num_epochs)):
        model.train()
        running_loss = 0.0
        
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs, inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        # 验证
        val_acc = evaluate_model(model, val_loader, device)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}, Val Acc: {val_acc:.4f}')
        
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_model = copy.deepcopy(model)
    
    return best_model, best_val_acc

# 评估函数
def evaluate_model(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs, inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return correct / total

# 定义超参数空间
dimensions = [
    # CNN参数
    Integer(16, 64, name='conv1_channels'),
    Integer(32, 128, name='conv2_channels'),
    Categorical([3, 5], name='kernel_size'),
    Categorical([True, False], name='use_leaky_relu'),
    Real(0.2, 0.7, name='dropout_rate'),
    
    # LSTM参数
    Integer(64, 256, name='lstm_hidden'),
    Integer(1, 3, name='lstm_layers'),
    
    # 融合模型参数
    Integer(128, 512, name='fc1_units'),
    
    # 训练参数
    Real(1e-4, 1e-2, prior='log-uniform', name='learning_rate'),
    Integer(16, 128, name='batch_size'),
    Categorical(['adam', 'sgd'], name='optimizer'),
    Real(0, 0.001, name='weight_decay')
]

# 目标函数 - 贝叶斯优化将尝试最小化此函数
@use_named_args(dimensions)
def objective(**params):
    start_time = time.time()
    
    # 打印当前测试的超参数组合
    print(f"Testing hyperparameters: {params}")
    
    # 创建模型,确保所有整数值参数转换为Python原生int
    model = Fusion_CNN2D_LSTM_Model(
        input_channel=5,
        conv1_channels=int(params['conv1_channels']),
        conv2_channels=int(params['conv2_channels']),
        kernel_size=int(params['kernel_size']),
        use_leaky_relu=params['use_leaky_relu'],
        dropout_rate=params['dropout_rate'],
        lstm_hidden=int(params['lstm_hidden']),
        lstm_layers=int(params['lstm_layers']),
        fc1_units=int(params['fc1_units'])
    )
    
    # 假设的设备配置
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 假设的数据集和数据加载器
    train_dataset = MyDataset(torch.randn(1000, 5, 64, 64), torch.randint(0, 4, (1000,)))
    val_dataset = MyDataset(torch.randn(200, 5, 64, 64), torch.randint(0, 4, (200,)))
    train_loader = DataLoader(train_dataset, batch_size=int(params['batch_size']), shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=int(params['batch_size']))
    
    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    
    if params['optimizer'] == 'adam':
        optimizer = optim.Adam(model.parameters(), 
                             lr=params['learning_rate'],
                             weight_decay=params['weight_decay'])
    else:
        optimizer = optim.SGD(model.parameters(), 
                            lr=params['learning_rate'],
                            momentum=0.9,
                            weight_decay=params['weight_decay'])
    
    # 训练模型
    _, val_acc = train_model(model, train_loader, val_loader, criterion, optimizer, device, num_epochs=3)
    
    # 计算耗时
    elapsed_time = time.time() - start_time
    print(f"Validation accuracy: {val_acc:.4f}, Elapsed time: {elapsed_time:.2f}s")
    
    # 贝叶斯优化目标是最小化,所以返回1 - 准确率
    return 1 - val_acc

# 执行贝叶斯优化
def bayesian_optimization(n_calls=10):
    print("Starting Bayesian optimization...")
    result = gp_minimize(
        func=objective,
        dimensions=dimensions,
        n_calls=n_calls,
        random_state=42,
        verbose=True
    )
    
    print("\n贝叶斯优化完成!")
    print(f"最佳验证准确率: {1 - result.fun:.4f}")
    print("最佳超参数:")
    # 修改这里,正确获取参数名称
    for i, dimension in enumerate(result.space.dimensions):
        print(f"{dimension.name}: {result.x[i]}")
    
    return result

# 运行优化
if __name__ == "__main__":
    bayesian_optimization(n_calls=15)
// Show image

import pickle
from skopt.plots import plot_convergence, plot_objective, plot_evaluations
import matplotlib.pyplot as plt
import os

### THIS IS A SAMPLE, BUT CANNOT BE RUN DIRECTLY

dataset_name = "aaa"

model_dir = f"checkpoints_{dataset_name}"
best_result_file = os.path.join(model_dir, "bayesian_optimization_result.pkl")
# 读取优化结果
with open(best_result_file, 'rb') as f:
    result = pickle.load(f)

# 1. 绘制收敛曲线
plt.figure(figsize=(10, 6))
plot_convergence(result)
plt.title('优化收敛曲线')
plt.grid(True)
plt.tight_layout()
plt.savefig('convergence_curve.png', dpi=300)

# 2. 绘制参数重要性热图
plt.figure(figsize=(12, 10))
plot_objective(result, n_points=40)  # 增加 n_points 使图像更平滑
plt.suptitle('参数重要性热图')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])  # 调整布局
plt.savefig('parameter_importance.png', dpi=300)

# 3. 绘制参数评估分布
plt.figure(figsize=(15, 10))
plot_evaluations(result, bins=20)  # 调整 bins 控制直方图粒度
plt.suptitle('参数评估分布')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig('parameter_evaluations.png', dpi=300)

# 4. 打印最佳参数
print("最佳目标函数值:", result.fun)
print("最佳参数组合:")
for i, dim in enumerate(result.space.dimensions):
    print(f"  {dim.name}: {result.x[i]}")

plt.show()
// CNN-LSM sample code
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import pandas as pd

# 设置随机种子,保证结果可复现
torch.manual_seed(42)
np.random.seed(42)

# -------------------
# 1. 数据生成与预处理
# -------------------
def generate_sample_data(n_samples=1000, seq_len=10, n_features=5):
    """生成示例时间序列数据"""
    # 生成随机时间序列
    X = np.random.randn(n_samples, seq_len, n_features)
    
    # 生成目标值 (简单非线性关系)
    y = np.sum(X[:, -3:, 0] ** 2, axis=1) + np.sin(X[:, -1, 1])
    y = y.reshape(-1, 1)
    
    # 数据标准化
    scaler_X = MinMaxScaler()
    scaler_y = MinMaxScaler()
    
    # 重塑数据以适应scaler
    X_reshaped = X.reshape(-1, n_features)
    X_scaled = scaler_X.fit_transform(X_reshaped)
    X = X_scaled.reshape(n_samples, seq_len, n_features)
    
    y = scaler_y.fit_transform(y)
    
    return X, y, scaler_y

# -------------------
# 2. 定义CNN-LSTM模型
# -------------------
class CNNLSTM(nn.Module):
    def __init__(self, seq_len, n_features, cnn_filters, kernel_size, lstm_units, dropout):
        super(CNNLSTM, self).__init__()
        
        # CNN层 - 提取特征
        self.cnn = nn.Sequential(
            nn.Conv1d(in_channels=n_features, 
                      out_channels=cnn_filters, 
                      kernel_size=kernel_size),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Dropout(dropout)
        )
        
        # 计算CNN输出序列长度
        cnn_out_seq_len = (seq_len - kernel_size + 1) // 2
        
        # LSTM层 - 处理序列
        self.lstm = nn.LSTM(
            input_size=cnn_filters,
            hidden_size=lstm_units,
            batch_first=True,
            dropout=dropout if lstm_units > 1 else 0
        )
        
        # 全连接输出层
        self.fc = nn.Linear(lstm_units, 1)
        
    def forward(self, x):
        # 输入形状: (batch, seq_len, n_features)
        # 转换为CNN期望的形状: (batch, n_features, seq_len)
        x = x.permute(0, 2, 1)
        
        # 通过CNN
        x = self.cnn(x)
        
        # 转换回LSTM期望的形状: (batch, seq_len, features)
        x = x.permute(0, 2, 1)
        
        # 通过LSTM
        lstm_out, _ = self.lstm(x)
        
        # 使用最后一个时间步的输出
        x = lstm_out[:, -1, :]
        
        # 通过全连接层预测
        output = self.fc(x)
        
        return output

# -------------------
# 3. 训练和评估函数
# -------------------
def train_and_evaluate_model(params, X_train, y_train, X_val, y_val):
    """根据给定参数训练并评估模型"""
    # 设置设备
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # 提取参数
    learning_rate = params['learning_rate']
    batch_size = int(params['batch_size'])
    cnn_filters = int(params['cnn_filters'])
    kernel_size = int(params['kernel_size'])
    lstm_units = int(params['lstm_units'])
    dropout = params['dropout']
    epochs = int(params['epochs'])
    
    # 创建数据加载器
    train_dataset = TensorDataset(
        torch.FloatTensor(X_train), 
        torch.FloatTensor(y_train)
    )
    val_dataset = TensorDataset(
        torch.FloatTensor(X_val), 
        torch.FloatTensor(y_val)
    )
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    
    # 初始化模型
    model = CNNLSTM(
        seq_len=X_train.shape[1],
        n_features=X_train.shape[2],
        cnn_filters=cnn_filters,
        kernel_size=kernel_size,
        lstm_units=lstm_units,
        dropout=dropout
    ).to(device)
    
    # 定义损失函数和优化器
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # 训练循环
    best_val_loss = float('inf')
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            
            # 前向传播
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            
            # 反向传播和优化
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # 验证
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                outputs = model(X_batch)
                val_loss += criterion(outputs, y_batch).item()
        
        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
    
    return best_val_loss

# -------------------
# 4. 贝叶斯优化目标函数
# -------------------
def objective(params):
    """贝叶斯优化的目标函数"""
    print(f"评估参数: {params}")
    
    # 训练并评估模型
    val_loss = train_and_evaluate_model(params, X_train, y_train, X_val, y_val)
    
    print(f"验证集损失: {val_loss:.6f}")
    
    # 返回优化结果
    return {
        'loss': val_loss,
        'status': STATUS_OK,
        'params': params
    }

# -------------------
# 5. 主函数
# -------------------
if __name__ == "__main__":
    # 生成数据
    X, y, scaler = generate_sample_data(n_samples=2000, seq_len=20, n_features=10)
    
    # 划分训练集和验证集
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # 定义超参数搜索空间
    space = {
        'learning_rate': hp.loguniform('learning_rate', np.log(0.0001), np.log(0.01)),
        'batch_size': hp.quniform('batch_size', 16, 128, 16),
        'cnn_filters': hp.quniform('cnn_filters', 16, 64, 8),
        'kernel_size': hp.quniform('kernel_size', 2, 5, 1),
        'lstm_units': hp.quniform('lstm_units', 32, 128, 16),
        'dropout': hp.uniform('dropout', 0.1, 0.5),
        'epochs': hp.quniform('epochs', 10, 50, 5)
    }
    
    # 记录优化过程
    trials = Trials()
    
    # 创建兼容新版本numpy的随机数生成器
    from numpy.random import default_rng
    rstate = default_rng(42)
    
    # 运行贝叶斯优化
    best = fmin(
        fn=objective,
        space=space,
        algo=tpe.suggest,
        max_evals=30,  # 优化迭代次数
        trials=trials,
        rstate=rstate  # 使用兼容的随机数生成器
    )
    
    # 输出最佳参数
    print("\n最佳参数:")
    for param, value in best.items():
        if param in ['batch_size', 'cnn_filters', 'kernel_size', 'lstm_units', 'epochs']:
            print(f"{param}: {int(value)}")
        else:
            print(f"{param}: {value}")
    
    
    # 提取所有试验结果
    results = pd.DataFrame({
        'iteration': range(len(trials.losses())),
        'loss': trials.losses(),
        'learning_rate': [t['misc']['vals']['learning_rate'][0] for t in trials.trials],
        'batch_size': [int(t['misc']['vals']['batch_size'][0]) for t in trials.trials],
        'cnn_filters': [int(t['misc']['vals']['cnn_filters'][0]) for t in trials.trials],
        'kernel_size': [int(t['misc']['vals']['kernel_size'][0]) for t in trials.trials],
        'lstm_units': [int(t['misc']['vals']['lstm_units'][0]) for t in trials.trials],
        'dropout': [t['misc']['vals']['dropout'][0] for t in trials.trials],
        'epochs': [int(t['misc']['vals']['epochs'][0]) for t in trials.trials]
    })
    
    # 保存结果
    results.to_csv('bayesian_optimization_results.csv', index=False)
    
    # 可视化优化过程
    plt.figure(figsize=(10, 6))
    plt.plot(results['iteration'], results['loss'], 'o-')
    plt.xlabel('Iteration')
    plt.ylabel('Validation Loss (MSE)')
    plt.title('Bayesian Optimization Process')
    plt.grid(True)
    plt.savefig('optimization_process.png')
    plt.show()

验证集损失: 0.041993 70%|████████████████████████████████████████████████ 21/30 [02:00<00:00, 4.03s/trial, best loss: 0.04199310950934887]

最佳参数: batch_size: 128 cnn_filters: 56 dropout: 0.10191651697060819 epochs: 30 kernel_size: 2 learning_rate: 0.007169589625095676 lstm_units: 48

Sample 贝叶斯调参过程

Last updated

Was this helpful?