728x90

모델 정의

  • 파이토치에서 모델 정의를 위해서는 module을 상속한 클래스를 사용
  • 계층(layer): 모듈 또는 모듈을 구성하는 한 개의 계층으로 합성곱층(convolutional layer), 선형 계층(linear layer) 등이 있음
  • 모듈(module): 한 개 이상의 레이어가 모여 구성된 것
  • 모델(model): 최종적으로 원하는 네트워크로, 한 개의 모듈이 모델이 될 수도 있음

MLP 란 여러 개의 퍼셉트론 뉴런을 여러 층으로 쌓은 다층신경망 구조. 입력층과 출력층 사이에 하나 이상의 은닉층을 가지고 있는 신경망이다.

MNIST DATASET
download_root = './data/MNIST_DATASET'  
  
train_dataset = MNIST(download_root, transform=mnist_transform, train=True, download=True)  
valid_dataset = MNIST(download_root, transform=mnist_transform, train=False, download=True)  
test_dataset = MNIST(download_root, transform=mnist_transform, train=False, download=True)

nn.Module()을 상속하여 정의하는 방법
class MLP(nn.Module):
    def __init__(self, inputs):
        super(MLP, self).__init__()
        self.layer = Linear(inputs, 1) # 계층 정의
        self.activation = Sigmoid() # 활성화 함수
        
    def forward(self, X):
        X = self.layer(X)
        X = self.activation(X)
        return X
  • init()에서는 모델에서 사용할 모듈(nn.Linear, nn.Conv2d), 활성화 함수 등을 정의
  • forward() 에서는 모델에서 실행해야 하는 연산 정의

nn.Model : 복잡한 구조
nn.Sequential : 간단한 구조

nn.Sequential을 사용하면 init에서 네트워크 모델을 정의해주고, forward에서도 연산을 순차적으로 실행되게하는데 가독성이 뛰어나게 작성 가능

MLP

class MLP(nn.Module):
    def __init__(self):
        super(MLP, self).__init__()
        
        # 선형 함수
        self.layer1 = nn.Sequential(
            # 특징 값 찾을 때 convolutional layer 사용
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=5),
            nn.ReLU(inplace=True),
            # pooling 층
            nn.MaxPool2d(2)
            )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=30, kernel_size=5),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
            )
        
        self.layer3 = nn.Sequential(
            nn.Linear(in_features=30*5*5, out_features=10, bias=True),
            nn.ReLU(inplace=True)
            )
        
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = x.view(x.shape[0], -1)
        x = self.layer3(x)
        return x
    
  • convolutional layer: 이미지의 특징을 학습시키는것
  • layer1, layer2에서 이미지의 특징을 학습함
  • in_channels = 3은 RGB
  • out_channels = 64은 convolutional 처리 후 출력되는 채널 수(filter의 개수)
  • kernel_size = 5는 출력 필터의 사이즈로 정사각형 5x5 필터를 통해 특징을 추출
  • ReLU()는 활성화 함수로 여기서 나온 값을 pooling층으로 넘김
  • ReLU()의 inplace는 기존의 변수에 덮어쓰기할 지 여부
model = MLP()

print(list(model.children()))
print(list(model.modules()))

# result

[Sequential(
  (0): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1))
  (1): ReLU(inplace=True)
  (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
), Sequential(
  (0): Conv2d(64, 30, kernel_size=(5, 5), stride=(1, 1))
  (1): ReLU(inplace=True)
  (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
), Sequential(
  (0): Linear(in_features=750, out_features=10, bias=True)
  (1): ReLU(inplace=True)
)]

[MLP(
  (layer1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(64, 30, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer3): Sequential(
    (0): Linear(in_features=750, out_features=10, bias=True)
    (1): ReLU(inplace=True)
  )
), Sequential(
  (0): Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1))
  (1): ReLU(inplace=True)
  (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
), Conv2d(3, 64, kernel_size=(5, 5), stride=(1, 1)), ReLU(inplace=True), MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False), Sequential(
  (0): Conv2d(64, 30, kernel_size=(5, 5), stride=(1, 1))
  (1): ReLU(inplace=True)
  (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
), Conv2d(64, 30, kernel_size=(5, 5), stride=(1, 1)), ReLU(inplace=True), MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False), Sequential(
  (0): Linear(in_features=750, out_features=10, bias=True)
  (1): ReLU(inplace=True)
), Linear(in_features=750, out_features=10, bias=True), ReLU(inplace=True)]

모델의 파라미터 정의

손실함수(loss function)

  • wx + b를 계산한 값과 실제 값인 y의 오차를 구해서 모델의 정확성을 측정

옵티마이저(optimizer)

  • 데이터와 손실 함수를 바탕으로 모델의 업데이트 방법을 결정
  • step() 으로 weight과 bias(파라미터)를 업데이트
  • 모델의 파라미터별로 다른 학습률을 적용할 수 있음
  • torch.optim.Optimizer(params, defaults) 클래스 사용
  • zero_grad(): 옵티마이저에 사용된 파라미터들의 기울기(gradient)를 0으로 만듦
  • torch.optim.lr_scheduler는 epoch에 따라 학습률을 조정
  • ex) optim.Adadelta, optim.Adagrad, optim.Adam, optim.SparseAdam, optim,Adamax
  • optim.ASGD, optim.LBFGS
  • optim.RMSProp, optim.Rprop, optim.SGD

모델 훈련

loss.backward() 로 기울기를 자동 계산
배치 반복 시에 오차가 중첩적으로 쌓이게 되면 zero_grad()를 사용하여 미분 값을 0으로 초기화

모델 평가

metric = torchmetrics.Accuracy(task='multiclass', num_classes=5)

n_batches = 10
for i in range(n_batches):
    preds = torch.randn(10, 5).softmax(dim=-1)
    print(f"preds: {preds}")
    target = torch.randint(5, (10,))
    print(f"target: {target}")
    
    acc = metric(preds, target)
    print(f"Accuracy on batch {i}: {acc}")

acc = metric.compute()
print(f"Accuracy on all data: {acc}")
  • randint(high, size, dtype=None, layout)
훈련과정 모니터링
writer = SummaryWriter("tensorboard")  
  
for epoch in range(num_epochs):  
    model.train() # 학습 모드로 전환  
    batch_loss = 0.0  
    for i, (x, y) in enumerate(dataloader):  
        x, y = x.to(device).float(), y.to(device).float()  
        outputs = model(x)  
        loss = criterion(outputs, y)  
        writer.add_scalar("loss", loss, epoch) # 스칼라 값(오차)를 기록  
        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step()  
          
  
writer.close()

tensorboard

import torch  
import torch.nn as nn  
from torch.utils.data import DataLoader  
from torch.utils.tensorboard import SummaryWriter  
  
# 가상의 데이터셋과 모델 생성 (예시)  
class MyDataset(torch.utils.data.Dataset):  
    def __init__(self):  
        self.data = torch.randn(100, 10)  # 예시 데이터  
        self.targets = torch.randint(2, (100,))  # 예시 타겟  
  
    def __len__(self):  
        return len(self.data)  
  
    def __getitem__(self, idx):  
        return self.data[idx], self.targets[idx]  
  
# 모델 정의 (예시)  
class MyModel(nn.Module):  
    def __init__(self):  
        super(MyModel, self).__init__()  
        self.fc = nn.Linear(10, 1)  # 입력 차원 10, 출력 차원 1  
    def forward(self, x):  
        return self.fc(x)  
  
# 하이퍼파라미터 설정  
num_epochs = 10  
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  
  
# 데이터 로더 준비 (예시)  
dataset = MyDataset()  
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)  
  
# 모델 및 손실 함수, 최적화 알고리즘 설정 (예시)  
model = MyModel().to(device)  
criterion = nn.MSELoss()  
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)  
  
# TensorBoard SummaryWriter 설정  
writer = SummaryWriter('./summary/tensorboard')  
  
# 훈련 루프  
for epoch in range(num_epochs):  
    model.train()  
    batch_loss = 0.0  
    for i, (x, y) in enumerate(dataloader):  
        x, y = x.to(device).float(), y.to(device).float()  
        outputs = model(x)  
        loss = criterion(outputs, y)  
        writer.add_scalar("LOSS", loss, epoch * len(dataloader) + i)  # 배치마다 손실 값 기록  
        optimizer.zero_grad()  
        loss.backward()  
        optimizer.step()  
  
# SummaryWriter 닫기  
writer.close()

monitoring

import torchvision.transforms as transforms  
from torchvision.datasets import MNIST  
import torch.nn as nn  
import requests  
import torch  
import torchmetrics  
import torch.utils.tensorboard import SummaryWriter  
  
  
model.eval()  
with torch.no_grad():  
    valid_loss = 0  
  
    for x, y in valid_dataloader:  
        outpus = model(x)  
        loss = F.cross_entropy(outputs, y.long().squeeze()) # 차원축소  
        valid_loss += float(loss)  
        y_hat += [outputs]  
          
valid_loss = valid_loss / len(valid_loader)

car evaluation

price(자동차 가격)
maint(자동차 유지 비용)
doors(자동차 문 개수)
persons(수용 인원)
lug_capacity(수하물 용량)
safety(안전성)
output(차 상태): 이 데이터는 unacc(허용 불가능한 수준) 및 acc(허용 가능한 수준),
양호(good) 및 매우 좋은(very good, vgood) 중 하나의 값을 갖음

import numpy as np  
import pandas as pd  
import matplotlib.pyplot as plt  
import seaborn as sns  
import torch  
from torch import nn  
  
dataset = pd.read_csv('data/car_evaluation.csv')  
print(dataset.shape)  
print(dataset['output'].value_counts())  
  
  
fig_size = plt.rcParams["figure.figsize"]  
fig_size[0] = 8  
fig_size[1] = 6  
plt.rcParams["figure.figsize"] = fig_size  
dataset.output.value_counts().plot(kind='pie', autopct='%0.05f%%',  
                                   colors=['lightblue','lightgreen','orange','pink'],  
                                   explode = (0.05, 0.05, 0.05, 0.05)  
                                   )  
plt.show()  
  
  
  
dataset['output'].value_counts().plot(kind='bar', color=['lightblue', 'lightgreen', 'orange', 'pink'], legend=False)  
plt.title("bar")  
plt.show()  
  
  
categorical_columns = ['price', 'maint', 'doors', 'persons', 'lug_capacity', 'safety']  
categorical_columns = list(dataset.columns)[:-1]  
  
for category in categorical_columns:  
    dataset[category] = dataset[category].astype('category')
import torch  
import torch.nn as nn  
import numpy as np  
import pandas as pd  
import matplotlib.pyplot as plt  
import seaborn as sns  
  
  
fig_size = plt.rcParams["figure.figsize"]  
fig_size[0] = 8  
fig_size[1] = 6  
plt.rcParams["figure.figsize"] = fig_size  
dataset.output.value_counts().plot(kind='pie', autopct='%0.05f%%', colors=['lightblue', 'lightgreen', 'orange', 'pink'], explode=(0.05, 0.05, 0.05, 0.05))  
  
categorical_columns = ['price', 'maint', 'doors', 'persons', 'lug_capacity', 'safety']  
  
for category in categorical_columns:  
    dataset[category] = dataset[category].astype('category')  
  
price = dataset['price'].cat.codes.values  
maint = dataset['maint'].cat.codes.values  
doors = dataset['doors'].cat.codes.values  
persons = dataset['persons'].cat.codes.values  
lug_capacity = dataset['lug_capacity'].cat.codes.values  
safety = dataset['safety'].cat.codes.values  
  
categorical_data = np.stack([price, maint, doors, persons, lug_capacity, safety], 1)  
categorical_data[:10]  
  
categorical_data = torch.tensor(categorical_data, dtype=torch.int64)  
categorical_data[:10]  
  
outputs1 = pd.get_dummies(dataset.output)  
outputs2 = outputs1.values  
outputs3 = torch.tensor(outputs2).flatten()  
  
print(categorical_data.shape)  
print(outputs3.shape)  
  
categorical_column_sizes = [len(dataset[column].cat.categories) for column in categorical_columns]  
categorical_embedding_sizes = [(col_size, min(50, (col_size+1)//2)) for col_size in categorical_column_sizes]  
print(categorical_embedding_sizes)  
  
total_records = 1728  
test_records = int(total_records * .2)# ------ 전체 데이터 중 20%를 테스트 용도로 사용  
  
categorical_train_data = categorical_data[:total_records - test_records]  
categorical_test_data = categorical_data[total_records - test_records:total_records]  
train_outputs = outputs3[:total_records - test_records]  
test_outputs = outputs3[total_records - test_records:total_records]  
  
print(len(categorical_train_data))  
print(len(train_outputs))  
print(len(categorical_test_data))  
print(len(test_outputs))
class Model(nn.Module):  
    def __init__(self, embedding_size, output_size, layers, p=0.4):  
        super().__init__()  
        self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])  
        self.embedding_dropout = nn.Dropout(p)  
          
        all_layers = []  
        num_categorical_cols = sum((nf for ni, nf in embedding_size))  
        input_size = num_categorical_cols   
  
        for i in layers:  
            all_layers.append(nn.Linear(input_size, i))  
            all_layers.append(nn.ReLU(inplace=True))  
            all_layers.append(nn.BatchNorm1d(i))  
            all_layers.append(nn.Dropout(p))  
            input_size = i  
  
        all_layers.append(nn.Linear(layers[-1], output_size))  
        self.layers = nn.Sequential(*all_layers)  
  
    def forward(self, x_categorical):  
        embeddings = []  
        for i,e in enumerate(self.all_embeddings):  
            embeddings.append(e(x_categorical[:,i]))  
        x = torch.cat(embeddings, 1)  
        x = self.embedding_dropout(x)  
        x = self.layers(x)  
        return x  
      
model = Model(categorical_embedding_sizes, 4, [200,100,50], p=0.4)  
print(model)  
  
loss_function = nn.CrossEntropyLoss()  
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  
  
if torch.cuda.is_available():  
    device = torch.device('cuda')  
else :  
    device = torch.device('cpu')  
      
epochs = 500  
aggregated_losses = []  
train_outputs = train_outputs.to(device=device, dtype=torch.int64)  
for i in range(epochs):  
    i += 1  
    y_pred = model(categorical_train_data).to(device)  
    single_loss = loss_function(y_pred, train_outputs)  
    aggregated_losses.append(single_loss)  
  
    if i%25 == 1:  
        print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')  
  
    optimizer.zero_grad()  
    single_loss.backward()  
    optimizer.step()  
  
print(f'epoch: {i:3} loss: {single_loss.item():10.10f}')  
  
test_outputs = test_outputs.to(device=device, dtype=torch.int64)  
with torch.no_grad():  
    y_val = model(categorical_test_data)  
    loss = loss_function(y_val, test_outputs)  
  
print(f"loss: {loss:.8f}")
반응형