본문 바로가기
공부/AI

[PyTorch] 긍정 리뷰, 부정 리뷰 분류하기 (3) - 모델 변경 (GRU)

by 웅대 2023. 9. 19.
728x90
반응형

https://growth-coder.tistory.com/248

 

[PyTorch] 긍정 리뷰, 부정 리뷰 분류하기 (2) - 구현 (임베딩, RNN)

https://growth-coder.tistory.com/247 [PyTorch] 긍정 리뷰, 부정 리뷰 분류하기 (1) - 개념 리뷰의 내용이 긍정적인 내용인지 부정적인 내용인지 분류를 해보려고 한다. 본격적인 구현에 앞서 이번 포스팅에

growth-coder.tistory.com

이전 포스팅에서 모델을 만들어 직접 학습을 해보았다.

 

그러나 정말 단순한 모델을 사용했기 때문에 정확도는 50% 정도밖에 되지 않았다.

 

이번 포스팅에서는 모델의 성능을 높이는 여러 방법들에 대해서 알아보자.

 

우선 이전 포스팅에서 사용했던 모델은 다음과 같다.

class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_classes, batch_first=True):
        super(Model, self).__init__()
        # 임베딩
        self.embedding_layer = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=1
        )
        # RNN
        self.rnn_layer = nn.RNN(
            embed_dim,
            hidden_dim, # 입력 차원, 은닉 상태의 크기 정의
            batch_first=batch_first
            )
        # 마지막은 클래스의 개수로 변환
        # 그렇게 해야 클래스 분류를 진행할 수 있음\
        self.linear = nn.Linear(hidden_dim, n_classes) # 출력은 원-핫 벡터의 크기를 가져야함. 또는 단어 집합의 크기만큼 가져야함.

    def forward(self, x):
        # (배치 크기, 시퀀스 길이) => (배치 크기, 시퀀스 길이, 임베딩 차원)
        x = self.embedding_layer(x)
        
        # (배치 크기, 시퀀스 길이, 임베딩 차원) => output (배치 크기, 시퀀스 길이, 은닉 상태 크기), hidden (1, 배치 크기, 은닉층 크기)
        output, hidden = self.rnn_layer(x)
        output = output[:, -1, :]
        
        # (배치 크기, 시퀀스 길이, 은닉 상태 크기) => (배치 크기, 시퀀스 길이, 클래스 크기)
        output = self.linear(output)
        
        # (배치 크기, 시퀀스 길이, 클래스 크기) => (배치 크기*시퀀스 길이, 클래스 크기)
        return output

모델을 변경해보자.

 

GRU (Gated Recurrent Unit)

위에서 사용한 모델은 기본적인 RNN 구조이다.

 

위 모델의 구조는 아래와 같다.

우선 기본적인 RNN 셀의 문제점을 알아보자.

 

RNN은 현재 시점의 입력이전 시점의 은닉 상태를 입력을 받는다.

 

그런데 기본적인 RNN은 시퀀스의 길이가 길어질수록 초기 시점의 입력값에 대한 정보가 옅어진다.

이러한 문제를 장기 의존성 문제라고 한다.

 

이를 해결하기 위해 RNN을 보완한 것이 바로 LSTM(Long Short-Term Memory)이다.

 

LSTM은 입력 게이트, 삭제 게이트, 출력 게이트를 사용하여 불필요한 기억을 지우고 중요한 정보를 기억한다.

 

즉 시퀀스의 길이가 길다면 RNN 보다는 LSTM을 사용하는 것이 좋다.

 

GRU는 LSTM과 성능은 비슷하나 업데이트 게이트와 리셋 게이트, 총 두 가지 게이트만 사용하여 학습 속도가 빠르다.

 

torch.nn에서는 RNN, GRU, LSTM을 모두 제공하고 사용법은 동일하다.

 

nn.RNN(input_dim, hidden_size, batch_fisrt=True)
nn.GRU(input_dim, hidden_size, batch_fisrt=True)
nn.LSTM(input_dim, hidden_size, batch_fisrt=True)

이 중에서 GRU를 사용해보자.

 

RNN 셀을 GRU로 변경하고 층의 개수도 추가해보자.

 

기존에는 RNN 층의 개수를 default 값인 1로 사용을 하였다.

 

만약 층의 개수를 늘리면 다수의 은닉층을 갖는 깊은 RNN이 되는 것이다.

 

마찬가지로 GRU에서도 다수의 은닉층을 사용할 수 있다.

 

사용법은 단순히 nn.GRU의 num_layers 인자를 사용하면 된다.

self.gru = nn.GRU(embed_dim, hidden_dim, num_layers=2, batch_first=batch_first)

이렇게 다수의 은닉층을 사용하면 GRU의 결과 값의 shape도 달라진다.

 

기존 RNN 층을 지나면 두 개의 결과 값이 나온다.

 

첫 번째는 모든 시점의 은닉 상태(배치 크기, 시점의 개수, 은닉층 크기)이고 두 번째는 마지막 시점의 은닉 상태(배치 크기, 1, 은닉층 크기)이다.

 

그런데 은닉층의 개수가 늘어나면 shape이 바뀐다.

 

은닉층이 두 개라고 가정하면 결과 값은 다음과 같다.

 

첫 번째는 마지막 층의 모든 시점의 은닉 상태(배치 크기, 시점의 개수, 은닉층 크기)이고 두 번째는 마지막 시점의 은닉 상태(배치 크기*층의 개수, 1, 은닉층 크기)이다.

 

첫 번째 값은 가장 마지막 층의 모든 시점의 은닉 상태이기 때문에 은닉층의 개수와 관계 없다.

 

그러나 두 번째 값은 마지막 시점의 은닉 상태이기 때문에 마지막 시점에서 모든 은닉 층에서의 은닉 상태이기 때문에 은닉 층의 개수에 따라 shape이 달라진다.

 

첫 번째 값은 "마지막 층"의 모든 시점이기 때문에 은닉 층의 개수가 달라진다고 변하지는 않지만 두 번째 값은 "모든 층의 마지막 은닉 상태"이기 때문에 은닉 층의 개수에 따라 달라진다.

 

이제 은닉 층의 개수를 추가하고 초기 은닉 상태의 값도 정의해보자.

 

RNN은 기본적으로 이전 시점의 은닉 상태의 값과 현재 시점의 입력 값을 받는다.

 

그런데 가장 처음 시점에서는 이전 시점이 없기 때문에 이 은닉 상태를 0으로 초기화 해줘야 한다.

 

이제 우리가 할 것은 세 가지이다.

반응형
1. RNN을 GRU로 바꾼다.
2. 은닉층의 개수를 2개로 늘린다.
3. 초기 은닉 상태를 0으로 만든다.

모델의 재사용성을 높이기 위해 은닉 층의 개수를 인자로 받자.

class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_classes, n_layers, batch_first=True):
        super(Model, self).__init__()
        # 임베딩
        self.embedding_layer = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=1
        )
        # GRU
        self.gru_layer = nn.GRU(
            embed_dim,
            hidden_dim,  # 입력 차원, 은닉 상태의 크기 정의
            num_layers=n_layers,
            batch_first=batch_first
        )
        # 마지막은 클래스의 개수로 변환
        # 그렇게 해야 클래스 분류를 진행할 수 있음\
        self.linear = nn.Linear(hidden_dim, n_classes)  # 출력은 원-핫 벡터의 크기를 가져야함. 또는 단어 집합의 크기만큼 가져야함.

    def forward(self, x):
        # (배치 크기, 시퀀스 길이) => (배치 크기, 시퀀스 길이, 임베딩 차원)
        x = self.embedding_layer(x)

        # 초기 은닉 상태를 0으로 초기화
        h_0 = self._init_state(batch_size=x.size(0))

        # (배치 크기, 시퀀스 길이, 임베딩 차원) => output (배치 크기, 시퀀스 길이, 은닉 상태 크기), hidden (은닉 층의 개수, 배치 크기, 은닉 상태 크기)
        output, hidden = self.gru_layer(x, h_0)

        # (배치 크기, 시퀀스 길이, 은닉 상태 크기) => (배치 크기, 은닉 상태 크기)
        output = output[:, -1, :]

        # (배치 크기, 은닉 상태 크기) => (배치 크기, 클래스 크기)
        output = self.linear(output)
        return output

    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data
        return weight.new(2, batch_size, hidden_dim).zero_()

인자가 추가되었으므로 모델 인스턴스를 생성할 때 n_layers 값을 넣어줘야 한다.

 

Dropout

dropout은 인공 신경망의 과대적합을 방지하기 위해 사용한다.

 

신경망 구조에서 확률적으로 뉴런을 제거하는 것이다.

 

https://ko.d2l.ai/chapter_deep-learning-basics/dropout.html

드롭아웃 확률을 인자로 받고 적용해보자.

class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_classes, n_layers, dropout_p, batch_first=True):
        super(Model, self).__init__()
        # 임베딩
        self.embedding_layer = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=1
        )
        # GRU
        self.gru_layer = nn.GRU(
            embed_dim,
            hidden_dim,  # 입력 차원, 은닉 상태의 크기 정의
            num_layers=n_layers,
            batch_first=batch_first
        )
        # drop out
        self.dropout = nn.Dropout(dropout_p)
        # 마지막은 클래스의 개수로 변환
        # 그렇게 해야 클래스 분류를 진행할 수 있음\
        self.linear = nn.Linear(hidden_dim, n_classes)  # 출력은 원-핫 벡터의 크기를 가져야함. 또는 단어 집합의 크기만큼 가져야함.

    def forward(self, x):
        # (배치 크기, 시퀀스 길이) => (배치 크기, 시퀀스 길이, 임베딩 차원)
        x = self.embedding_layer(x)

        # 초기 은닉 상태를 0으로 초기화
        h_0 = self._init_state(batch_size=x.size(0))

        # (배치 크기, 시퀀스 길이, 임베딩 차원) => output (배치 크기, 시퀀스 길이, 은닉 상태 크기), hidden (은닉 층의 개수, 배치 크기, 은닉 상태 크기)
        output, hidden = self.gru_layer(x, h_0)

        # (배치 크기, 시퀀스 길이, 은닉 상태 크기) => (배치 크기, 은닉 상태 크기)
        output = output[:, -1, :]

        # 드롭아웃 적용
        self.dropout(output)

        # (배치 크기, 은닉 상태 크기) => (배치 크기, 클래스 크기)
        output = self.linear(output)
        return output

    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data
        return weight.new(2, batch_size, hidden_dim).zero_()

이제 이 모델을 가지고 학습과 평가를 해보자.

728x90

 

최종 코드

from torch.utils.data import Dataset, DataLoader
from torchtext.vocab import build_vocab_from_iterator
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import torch.nn.functional as F
from konlpy.tag import Okt
import pandas as pd
import torch


class Model(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, n_classes, n_layers, dropout_p, batch_first=True):
        super(Model, self).__init__()
        # 임베딩
        self.embedding_layer = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=embed_dim,
            padding_idx=1
        )
        # GRU
        self.gru_layer = nn.GRU(
            embed_dim,
            hidden_dim,  # 입력 차원, 은닉 상태의 크기 정의
            num_layers=n_layers,
            batch_first=batch_first
        )
        # drop out
        self.dropout = nn.Dropout(dropout_p)
        # 마지막은 클래스의 개수로 변환
        # 그렇게 해야 클래스 분류를 진행할 수 있음\
        self.linear = nn.Linear(hidden_dim, n_classes)  # 출력은 원-핫 벡터의 크기를 가져야함. 또는 단어 집합의 크기만큼 가져야함.

    def forward(self, x):
        # (배치 크기, 시퀀스 길이) => (배치 크기, 시퀀스 길이, 임베딩 차원)
        x = self.embedding_layer(x)

        # 초기 은닉 상태를 0으로 초기화
        h_0 = self._init_state(batch_size=x.size(0))

        # (배치 크기, 시퀀스 길이, 임베딩 차원) => output (배치 크기, 시퀀스 길이, 은닉 상태 크기), hidden (은닉 층의 개수, 배치 크기, 은닉 상태 크기)
        output, hidden = self.gru_layer(x, h_0)

        # (배치 크기, 시퀀스 길이, 은닉 상태 크기) => (배치 크기, 은닉 상태 크기)
        output = output[:, -1, :]

        # 드롭아웃 적용
        self.dropout(output)

        # (배치 크기, 은닉 상태 크기) => (배치 크기, 클래스 크기)
        output = self.linear(output)
        return output

    def _init_state(self, batch_size=1):
        weight = next(self.parameters()).data
        return weight.new(2, batch_size, hidden_dim).zero_()

class CustomDataset(Dataset):
    def __init__(self, filename):
        data_df = pd.read_table(filename).fillna('')
        x_data = data_df['document'].values
        self.x_data = x_data

        self.y_data = data_df['label'].values

    def __len__(self):
        return len(self.x_data)

    def __getitem__(self, idx):
        x = self.x_data[idx]
        y = self.y_data[idx]
        return y, x


# tokenizer = Mecab(dicpath="C:/mecab/mecab-ko-dic")
tokenizer = Okt()


def yield_tokens(data_iter):
    for _, x in data_iter:
        yield tokenizer.morphs(x)


train_iter = CustomDataset("ratings_train.txt")
eval_iter = CustomDataset("ratings_test.txt")

# 단어 사전 생성
# unknown 토큰과 padding 토큰 추가
vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>", "<pad>"])
# 단어 사전이 없으면 <unk> 토큰 사용
vocab.set_default_index(vocab["<unk>"])
# 텍스트 처리 파이프라인
text_pipeline = lambda x: vocab(tokenizer.morphs(x))
label_pipeline = lambda x: int(x)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)


def collate_batch(batch):
    label_list, text_list = [], []
    for (label, text) in batch:
        # label 파이프라인
        label_list.append(label_pipeline(label))

        # 토큰화 진행 후 정수 인덱스로 변경
        processed_text = torch.tensor(text_pipeline(text), dtype=torch.int64)
        text_list.append(processed_text)

    # label list 텐서로 변경
    label_list = torch.tensor(label_list, dtype=torch.int64)

    # 문장의 길이가 전부 다르기 때문에 padding을 추가하여 전부 동일하게 맞춤
    text_list = pad_sequence(text_list, batch_first=True, padding_value=1)
    return label_list, text_list


# 하이퍼 파라미터
vocab_size = len(vocab)
n_classes = 2
learning_late = 0.001
batch_size = 64
epochs = 10
hidden_dim = 256
embed_dim = 128
n_layers = 2
dropout_p = 0.2

def train(model, optimizer, train_iter):
    model.train()
    for i, (y, x) in enumerate(train_iter):
        y, x = y.to(device), x.to(device),
        optimizer.zero_grad()
        hypothesis = model(x)
        loss = F.cross_entropy(hypothesis, y)
        loss.backward()
        optimizer.step()
        if (i + 1) % 100 == 0:
            print(f'{i + 1}번 반복. loss : {loss.sum().item()}')


def evaluate(model, val_iter):
    model.eval()
    corrects, total_loss = 0, 0
    for y, x in val_iter:
        y, x = y.to(device), x.to(device),
        hypothesis = model(x)
        loss = F.cross_entropy(hypothesis, y)
        total_loss += loss.item()
        corrects += (hypothesis.max(dim=1)[1] == y).sum()
    size = len(val_iter)
    avg_loss = total_loss / size
    acc = (corrects / (size * batch_size)) * 100
    return avg_loss, acc


# 모델 생성
model = Model(vocab_size, embed_dim, hidden_dim, n_classes, n_layers, dropout_p, batch_first=True).to(device)

# 옵티마이저 생성
optimizer = torch.optim.Adam(model.parameters(), lr=learning_late)

# 데이터 로더 생성
train_dataloader = DataLoader(train_iter, batch_size=batch_size, shuffle=True, collate_fn=collate_batch, drop_last=True)
eval_dataloader = DataLoader(eval_iter, batch_size=batch_size, shuffle=True, collate_fn=collate_batch, drop_last=True)
# 학습 시작
for i in range(1, epochs + 1):
    train(model, optimizer, train_dataloader)
    # 평가 시작
    avg_loss, acc = evaluate(model, eval_dataloader)
    print(f'[{i} / {epochs}] 평균 loss : {avg_loss} 정학도 : {acc}')

 

85% 정도의 정확도를 보여준다.

728x90
반응형

댓글