01 02 03 import torch import torch.nn as nn import torch.utils.data as Data import numpy as np import os import torch.nn.functional as F import random
def generate_dataset(sample_num, class_num, X_shape): Label_list = [] Sample_list = [] for i in range(sample_num): y = np.random.randint(0, class_num) Label_list.append(y) Sample_list.append(np.random.normal(y, 0.2, X_shape)) return Sample_list, Label_list
def Sample_dataset(numpy_dataset, batch_size): index_list = random.sample(range(0, len(numpy_dataset[0])), batch_size) data_list = [] label_list = [] for index in index_list: data_list.append(numpy_dataset[0][index]) label_list.append(numpy_dataset[1][index]) return torch.tensor(data_list).to(torch.float32), torch.tensor(label_list).to(torch.int64)
class Normal_Dataset(Data.Dataset): def __init__(self, Numpy_Dataset): super(Normal_Dataset, self).__init__() self.data_tensor = torch.tensor(Numpy_Dataset[0]).to(torch.float32) self.target_tensor = torch.tensor(Numpy_Dataset[1]).to(torch.int64)
def __getitem__(self, index): return self.data_tensor[index], self.target_tensor[index]
def __len__(self): return self.data_tensor.size(0)
class Classifer(nn.Module): def __init__(self): super(Classifer, self).__init__() self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 10, kernel_size = 9) # 10, 36x36 self.conv2 = nn.Conv2d(in_channels = 10, out_channels = 20, kernel_size = 17 ) # 20, 20x20 self.fc1 = nn.Linear(20*20*20, 512) self.fc2 = nn.Linear(512, 7)
def forward(self, x): in_size = x.size(0) out = self.conv1(x) out = F.relu(self.conv2(out)) out = out.view(in_size, -1) out = F.relu(self.fc1(out)) out = self.fc2(out) out = F.softmax(out, dim=1) return out
def MAP(DataLoader, alpha, beta, model, loss_fn, PI_epsilon,input_shape, epoches, numpy_dataset): v = torch.zeros(input_shape) # [3, 44, 44] for epoch in range(epoches): for batch_x, batch_y in DataLoader: v.requires_grad = True # Evaluate nable_vL(f_theta) using B with v outputs = model(batch_x + v) # [2, 3, 44, 44] + [3, 44, 44] = [2, 3, 44, 44] loss = loss_fn(outputs, batch_y) loss.backward() # Compute v_prime v_prime = (v + alpha * v.grad.data).detach_() v_prime.requires_grad = True # Sample B_prime dataset batch_x_prime, batch_y_prime = Sample_dataset(numpy_dataset, 2) # Evaluate nable_vL(f_theta) using B_prime with v_prime outputs_prime = model(batch_x_prime + v_prime) loss_prime = loss_fn(outputs_prime, batch_y_prime) loss_prime.backward() # Update v v = (v + beta * v_prime.grad).detach() return v
if __name__ == '__main__': input_shape = (3,44,44) numpy_dataset = generate_dataset(100, 7, input_shape) Dataset = Normal_Dataset(numpy_dataset) DataLoader = Data.DataLoader( dataset = Dataset, batch_size = 2, shuffle = True, num_workers = 0, ) model = Classifer() loss_fn = nn.CrossEntropyLoss() alpha = 0.01 belta = 0.01 epoches = 1 MAP(DataLoader, alpha, belta, model, loss_fn, 'PI', input_shape, epoches, numpy_dataset) END |
|