자연어처리

LSTM을 활용한 영문 트위터 감성분석 모델

포켓몬빵 2021. 2. 6. 03:07

안녕하세요 이번 포스팅에서는 LSTM을 활용한 영문 트위터 감성분석 모델에 대해 진행하겠습니다.

 

1. 감성분석이란?

감정 분석은 정서적 상태와 주관적인 정보를 체계적으로 식별, 추출, 정량화 및 연구하기 위해 자연어 처리, 텍스트 분석, 전산 언어학 및 생체 인식을 사용하는 것을 말합니다

 

2. LSTM

 

3. 데이터셋

해당 모델 제작을 위해 사용된 데이터셋은 Kaggle에서 개최한 Tweet Sentiment Extraction 내의 영문 트위터 데이터를 활용하였습니다. 데이터셋의 경우 아래의 링크에서 다운 및 확인 하실수 있습니다.

www.kaggle.com/c/tweet-sentiment-extraction

 

Tweet Sentiment Extraction

Extract support phrases for sentiment labels

www.kaggle.com

데이터셋를 다운로드 받으시면, 다음과 같은 데이터를 확인하실수 있습니다. 트윗내용과 해당 트윗에대한 감성이 분류되어있는 데이터임을 확인할수 있습니다.

f87dea47db Last session of the day http://twitpic.com/67ezh neutral
96d74cb729 Shanghai is also really exciting (precisely -- skyscrapers galore). Good tweeps in China: (SH) (BJ). positive
eee518ae67 Recession hit Veronique Branquinho, she has to quit her company, such a shame! negative
01082688c6 happy bday! positive

4. 딥러닝 프레임워크

해당모델은 pytorch를 사용하였으며 pytorch에 대한 자세한 내용은 아래 링크에서 확인하실수 있습니다.

pytorch.org/

 

PyTorch

An open source deep learning platform that provides a seamless path from research prototyping to production deployment.

pytorch.org

 

5. 데이터 전처리 및 test/train 데이터셋 분리

# -*- coding: utf-8 -*-
import pickle
import string
import torch
import random
import torchtext
import torch
import codecs
import random
import torch.utils.data as Data
import pandas as pd
import numpy as np
# input: a sequence of tokens, and a token_to_index dictionary
# output: a LongTensor variable to encode the sequence of idxs
def prepare_sequence(seq, to_ix, cuda=False):
    var = torch.LongTensor([to_ix[w] for w in seq.split(' ')])
    return var

def prepare_label(label,label_to_ix, cuda=False):
    var = torch.LongTensor([label_to_ix[label]])
    return var

def build_token_to_ix(sentences):
    token_to_ix = dict()
    for sent in sentences:
        if sent != sent:
            continue
        for token in sent.split(' '):
            if token not in token_to_ix:
                token_to_ix[token] = len(token_to_ix)
    token_to_ix['<pad>'] = len(token_to_ix)
    return token_to_ix

def clean_str(data):
    for example in data.examples:
        text = [x.lower() for x in vars(example)['text']]  # 소문자
        text = [x.replace("<br", "") for x in text]  # <br 제거
        text = [''.join(c for c in s if c not in string.punctuation) for s in text]  # 문장부호
        text = [s for s in text if s and not s == " " and not s == "  "]  # 공란제거
        vars(example)['text'] = text
    return data


def load_data():
    label_to_ix = {'negative': 0, 'neutral': 1, 'positive': 2}

    # already tokenized and there is no standard split
    # the size follow the Mou et al. 2016 instead

    test_data = []
    test = pd.read_csv("test.csv")
    # for idx in range(len(test)):
    #     data.append(torchtext.data.Example.fromlist([test[idx], labels[idx]], datafields))
    # data = torchtext.data.Dataset(data, datafields)
    TEXT = torchtext.data.Field(tokenize='spacy')
    LABEL = torchtext.data.LabelField()

    datafields = [('text', TEXT), ('label', LABEL)]
    temp = []
    for data in test.values:
        if data[1] != data[1]:
            continue

        test_data.append(torchtext.data.Example.fromlist([data[1], label_to_ix[data[2]]],datafields))

    train_data = []
    train = pd.read_csv("train.csv")
    for data in train.values:
        if data[2] != data[2]:
            continue
        train_data.append(torchtext.data.Example.fromlist([data[2], label_to_ix[data[3]]], datafields))
    train_data = torchtext.data.Dataset(train_data, datafields)
    test_data = torchtext.data.Dataset(test_data, datafields)

    train_data = clean_str(train_data)
    test_data = clean_str(test_data)
    train_data, valid_data = train_data.split(random_state=random.seed(0), split_ratio=0.8)

    TEXT.build_vocab(train_data, test_data,valid_data, max_size=50000)
    LABEL.build_vocab(train_data,test_data,valid_data)
    # word_to_ix = build_token_to_ix([s for s, _ in test_data + train_data])

    pickle.dump(TEXT, open("text.pkl", "wb"))
    pickle.dump(LABEL, open("label.pkl", "wb"))

    print('vocab size:',len(TEXT.vocab),'label size:',len(label_to_ix))
    print('loading data done!')
    return TEXT,LABEL,train_data,valid_data,test_data,label_to_ix

 

6. 모델제작

import torch
import torch.autograd as autograd
import torch.nn as nn


class Sentiment(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()


        self.embed = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout)
        self.fc = nn.Linear(hidden_dim * n_layers, output_dim)
        self.drop = nn.Dropout(dropout)

    def forward(self, x):

        emb = self.drop(self.embed(x))

        out, (h, c) = self.lstm(emb)
        h = self.drop(torch.cat((h[-2, :, :], h[-1, :, :]), dim=1))

        return self.fc(h.squeeze(0))

7. 모델 학습

# -*- coding: utf-8 -*-

import sys
sys.path.insert(0,r'C:\Users\Administrator\Desktop\sentimental')

import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import data_loader
import os
import random
import torchtext
from tqdm import tqdm
import time
from model import Sentiment
torch.set_num_threads(8)
torch.manual_seed(1)
random.seed(1)


def get_accuracy(truth, pred):
    assert len(truth) == len(pred)
    right = 0
    for i in range(len(truth)):
        if truth[i] == pred[i]:
            right += 1.0
    return right / len(truth)


def categorical_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """
    max_preds = preds.argmax(dim = 1, keepdim = True) # get the index of the max probability
    correct = max_preds.squeeze(1).eq(y)
    return correct.sum() / torch.FloatTensor([y.shape[0]])


def evaluate(model, iterator, criterion, device):
    epoch_loss = 0
    epoch_acc = 0

    # evaluation mode
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            batch.text = batch.text.to(device)
            batch.label = batch.label.to(device)
            predictions = model(batch.text)

            loss = criterion(predictions, batch.label)
            acc = categorical_accuracy(predictions, batch.label)

            epoch_loss += loss.item()
            epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def train(model, iterator, optimizer, criterion,device):
    epoch_loss = 0
    epoch_acc = 0

    model.train()  # train_mode
    for batch in iterator:
        # initializing
        optimizer.zero_grad()
        # forward pass
        batch.text = batch.text.to(device)
        batch.label = batch.label.to(device)
        predictions = model(batch.text)
        loss = criterion(predictions, batch.label)


        acc = categorical_accuracy(predictions, batch.label)

        # backward pass
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc += acc.item()

    return epoch_loss / len(iterator), epoch_acc / len(iterator)


def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = 'cpu'
batch_size = 128

TEXT, LABEL, train_data, valid_data, test_data, label_to_ix = data_loader.load_data()
EMBEDDING_DIM = 400
HIDDEN_DIM = 400
EPOCH = 20
best_dev_acc = 0.0
OUTPUT_DIM = len(label_to_ix)
train_iterator, valid_iterator, test_iterator = torchtext.data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=batch_size,
    device=device, sort=False)

model = Sentiment(len(TEXT.vocab), EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, 2, 0.5)
model.to(device)
loss_function = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)


def write_embeddings(path, embeddings, vocab):
    with open(path, 'w') as f:
        for i, embedding in enumerate(tqdm(embeddings)):
            word = vocab.itos[i]
            # skip words with unicode symbols
            if len(word) != len(word.encode()):
                continue
            vector = ' '.join([str(i) for i in embedding.tolist()])
            f.write(f'{word} {vector}\n')

best_valid_loss = float('inf')

for epoch in range(EPOCH):
    start_time = time.time()
    train_loss, train_acc = train(model, train_iterator, optimizer, loss_function, device)
    valid_loss, valid_acc = evaluate(model, valid_iterator, loss_function, device)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')

model.load_state_dict(torch.load('best_model.pt'))

test_loss, test_acc= evaluate(model, test_iterator, loss_function, device)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

8. 모델 사용

import torch
import pickle
import spacy
from model import Sentiment

nlp = spacy.load('en')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def predict_class(model, TEXT,sentence, min_len = 4):
    model.eval()
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
    if len(tokenized) < min_len:
        tokenized += ['<pad>'] * (min_len - len(tokenized))
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]
    tensor = torch.LongTensor(indexed).to(device)
    tensor = tensor.unsqueeze(1)
    preds = model(tensor)
    print(preds)
    max_preds = preds.argmax(dim = 0)

    return max_preds.item()



EMBEDDING_DIM = 400
HIDDEN_DIM = 400
EPOCH = 20
OUTPUT_DIM = 3

TEXT = pickle.load(open("text.pkl", "rb"))
LABEL = pickle.load(open("label.pkl", "rb"))
ix_to_label = {0:'negative', 1:'neutral',  2:'positive'}


model = Sentiment(len(TEXT.vocab), EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, 2, 0.5)
model.to(device)
model.load_state_dict(torch.load('best_model.pt'))

pred_class = predict_class(model,TEXT, "I love you")



print(f'Predicted class is:  {ix_to_label[pred_class]}')