Ssul's Blog

[AI, NLP] LSTM으로 스팸분류기 만들기 본문

AI & ML/학습하기

[AI, NLP] LSTM으로 스팸분류기 만들기

Ssul 2024. 3. 18. 23:23

문장이 입력되면 스팸문자인지, 정상문자인지 확인하는 모델을 만들어보자.

앞전에는 PLM에 LoRA를 붙여서, 파인튜닝 후 스팸문자분류 모델 만들었음

 

#0. 성능비교

우선 결과부터 이야기해보면,

[Gemma-2B+LoRA파인 튜닝으로 만든모델(GPU의 한계로 0.8epoch모델)]

- test data에 대한 정확도 68%

- 내가 직접 생성한 문자에 대한 구분... 체감상 잘함

 

[LSTM, CNN으로 만든모델(각각 5epoch)]

- test data에 대한 정확도 99%

- 내가 직접 생성한 문자에 대한 구분... 체감상 못함. 조금만 치팅을 넣어서 문자를 만들면 바로 스팸으로 분류

(예: 일상 대화문자 맨 앞에 '[Web발신]'을 넣으면 바로 스팸으로 분류.... 특정 부분에 과도하게 집중하고 있는 느낌)

 

여튼 GPU만 감당할 수 있다면, PLM+LoRA모델이 더 매력적으로 느껴짐. epoch더 돌리면 잘할것 같고, 다음달 코랩프로+ 채워지면 다시 학습해볼 예정

 

[아래 학습코드는 https://wikidocs.net/217687 의 코드를 참고하여, 학습하였습니다]

#1. 데이터 Preprocessing

 

1-1. 데이터 가져오기

우선 데이터를 가져옵니다. 저는 허깅페이스에 올려놓은 스팸데이터를 가져 왔습니다.

from datasets import load_dataset

dataset_name = "허길페이스 주소/spam_data"
dataset = load_dataset(dataset_name)

 

저는 개인적으로 pd로 다루는게 편해서, 각각의 데이터를 pd로 변환합니다.

train_data = pd.DataFrame(dataset["train"])
test_data = pd.DataFrame(dataset["test"])

 

 

1-2. 데이터 정제하기

데이터는 이렇게 생겼습니다.

train_data[:5] # 상위 5개 출력

 

중복된 문자가 있다면 삭제합니다

# sms 열과 label 열의 중복을 제외한 값의 개수
train_data['sms'].nunique(), train_data['spam'].nunique()

# sms 열과 label 열의 중복을 제외한 값의 개수
test_data['sms'].nunique(), test_data['spam'].nunique()

# sms 열의 중복 제거
train_data.drop_duplicates(subset=['sms'], inplace=True)
# sms 열의 중복 제거
test_data.drop_duplicates(subset=['sms'], inplace=True)

 

데이터들의 분포도 한번 확인합니다

train_data['spam'].value_counts().plot(kind = 'bar')
test_data['spam'].value_counts().plot(kind = 'bar')

 

정규표현식을 통해서, 지저분한 데이터를 정리합니다.

import re
# 대괄호와 그 안의 내용을 삭제하는 함수
def remove_brackets(text):
    return re.sub(r'\[.*?\]', '', text)

# 'sms' 컬럼에 대해 대괄호와 안의 내용 삭제
train_data['sms'] = train_data['sms'].apply(remove_brackets)

# 한글과 공백을 제외하고 모두 제거
train_data['sms'] = train_data['sms'].str.replace("[^ㄱ-ㅎㅏ-ㅣ가-힣 ]","")

# 정규표현식을 통해서, null이 된 데이터 삭제하기
train_data['sms'] = train_data['sms'].str.replace('^ +', "") # white space 데이터를 empty value로 변경
train_data['sms'].replace('', np.nan, inplace=True)
train_data = train_data.dropna(how = 'any')

 

test데이터도 동일하게 진행합니다

test_data.drop_duplicates(subset = ['sms'], inplace=True) # document 열에서 중복인 내용이 있다면 중복 제거
# 'sms' 컬럼에 대해 대괄호와 안의 내용 삭제
test_data['sms'] = test_data['sms'].apply(remove_brackets)
test_data['sms'] = test_data['sms'].str.replace("[^ㄱ-ㅎㅏ-ㅣ가-힣 ]","") # 정규 표현식 수행
test_data['sms'] = test_data['sms'].str.replace('^ +', "") # 공백은 empty 값으로 변경
test_data['sms'].replace('', np.nan, inplace=True) # 공백은 Null 값으로 변경
test_data = test_data.dropna(how='any') # Null 값 제거

 

 

 

#2. 사전=vocab을 만들기

2-1. 형태소 분석기로 토큰화

우선 불용어를 선언합니다.

(*불용어: 자연어 처리(NLP)에서 문장의 의미를 분석하는 데 있어 상대적으로 중요하지 않은 단어를 의미합니다. 이러한 단어들은 텍스트 데이터에서 매우 자주 등장하지만, 텍스트의 실질적인 의미나 컨텍스트를 파악하는 데 크게 기여하지 않습니다. 예를 들어, "the", "is", "at", "which", "on" 같은 관사, 전치사, 접속사 등이 불용어에 해당)

한글 스팸 문자이니, 한글 조사들

stopwords = ['도', '는', '다', '의', '가', '이', '은', '한', '에', '하', '고', '을', '를', '인', '듯', '과', '와', '네', '들', '듯', '지', '임', '게']

 

사용할 형태소 분석기를 선택하여, 테스트

from konlpy.tag import Mecab
mecab = Mecab()
mecab.morphs('[국제발신]ifg@주말까지 기다리기 힘들어금요일에 만나요NO.* RKGR**.COM')

 

사용될 sms데이터를 토큰화 합니다

X_train = []
for sentence in tqdm(train_data['sms']):
    tokenized_sentence = mecab.morphs(sentence) # 토큰화
    stopwords_removed_sentence = [word for word in tokenized_sentence if not word in stopwords] # 불용어 제거
    X_train.append(stopwords_removed_sentence)

X_test = []
for sentence in tqdm(test_data['sms']):
    tokenized_sentence = mecab.morphs(sentence) # 토큰화
    stopwords_removed_sentence = [word for word in tokenized_sentence if not word in stopwords] # 불용어 제거
    X_test.append(stopwords_removed_sentence)

 

스팸 여부의 데이터를 더해서, 최종적인 학습, 검증, 테스트 데이터를 완성합니다

y_train = np.array(train_data['spam'])
y_test = np.array(test_data['spam'])

X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=0.2, random_state=0, stratify=y_train)

 

2-2. 사전=vocab 생성하기

학습데이터를 순회하며, 분리된 형태로를 사전에 기록합니다.

#vocab 만들기
word_list = []
for sent in X_train:
    for word in sent:
      word_list.append(word)

word_counts = Counter(word_list)
print('총 단어수 :', len(word_counts))

vocab = sorted(word_counts, key=word_counts.get, reverse=True)
print('등장 빈도수 상위 10개 단어')
print(vocab[:10])

 

2번 이하 등장하는 단어의 영향력을 확인합니다

threshold = 3
total_cnt = len(word_counts) # 단어의 수
rare_cnt = 0 # 등장 빈도수가 threshold보다 작은 단어의 개수를 카운트
total_freq = 0 # 훈련 데이터의 전체 단어 빈도수 총 합
rare_freq = 0 # 등장 빈도수가 threshold보다 작은 단어의 등장 빈도수의 총 합

# 단어와 빈도수의 쌍(pair)을 key와 value로 받는다.
for key, value in word_counts.items():
    total_freq = total_freq + value

    # 단어의 등장 빈도수가 threshold보다 작으면
    if(value < threshold):
        rare_cnt = rare_cnt + 1
        rare_freq = rare_freq + value

print('단어 집합(vocabulary)의 크기 :',total_cnt)
print('등장 빈도가 %s번 이하인 희귀 단어의 수: %s'%(threshold - 1, rare_cnt))
print("단어 집합에서 희귀 단어의 비율:", (rare_cnt / total_cnt)*100)
print("전체 등장 빈도에서 희귀 단어 등장 빈도 비율:", (rare_freq / total_freq)*100)

단어 사전 중, 2번 이하로 등장하는 희귀단어가 사전에서 50%를 차지하지만, 실제 문자에서는 0.9%밖에 등장하지 않습니다.

사전에서 제거 합니다.

# 전체 단어 개수 중 빈도수 2이하인 단어는 제거.
vocab_size = total_cnt - rare_cnt
vocab = vocab[:vocab_size]
print('단어 집합의 크기 :', len(vocab))

 

 

2-3. pad와 unk토큰을 넣기

pad토큰은 문장의 크기를 일정하게 하고, 비어있는 공간 채우는 토큰

unk는 사전에 없는 단어 표시하는 토큰

word_to_index = {}
word_to_index['<PAD>'] = 0
word_to_index['<UNK>'] = 1

for index, word in enumerate(vocab) :
  word_to_index[word] = index + 2

print('단어 <PAD>와 맵핑되는 정수 :', word_to_index['<PAD>'])
print('단어 <UNK>와 맵핑되는 정수 :', word_to_index['<UNK>'])
print('단어 부자와 맵핑되는 정수 :', word_to_index['부자'])

 

 

#3. 학습을 위한 데이터 셋팅(임베딩, 최대 문장길이 결정)

3-1. 완성된 사전으로 문자데이터 임베딩

문장이 들어오면 사전의 번호로 임베딩해주는 함수

# 임베딩
def texts_to_sequences(tokenized_X_data, word_to_index):
  encoded_X_data = []
  for sent in tokenized_X_data:
    index_sequences = []
    for word in sent:
      try:
          index_sequences.append(word_to_index[word])
      except KeyError:
          index_sequences.append(word_to_index['<UNK>'])
    encoded_X_data.append(index_sequences)
  return encoded_X_data

 

각 데이터셋을 임베딩 진행

encoded_X_train = texts_to_sequences(X_train, word_to_index)
encoded_X_valid = texts_to_sequences(X_valid, word_to_index)
encoded_X_test = texts_to_sequences(X_test, word_to_index)

# 상위 샘플 2개 출력
for sent in encoded_X_train[:2]:
  print(sent)

 

임베딩작동 확인하기

index_to_word = {}
for key, value in word_to_index.items():
    index_to_word[value] = key

decoded_sample = [index_to_word[word] for word in encoded_X_train[100]]
print('기존의 첫번째 샘플 :', X_train[100])
print('복원된 첫번째 샘플 :', decoded_sample)

유모차는 데이터셋에서 2번 이하로 등장한 단어이기 때문에 UNK인듯

 

3-2. 최대문장길이 설정하여 짜르고, pad채워넣기

데이터들의 문장길이 분포를 확인합니다.

print('문자의 최대 길이 :',max(len(sms) for sms in encoded_X_train))
print('문자의 평균 길이 :',sum(map(len, encoded_X_train))/len(encoded_X_train))
plt.hist([len(sms) for sms in encoded_X_train], bins=50)
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.show()

대부분 50이내이다

 

def below_threshold_len(max_len, nested_list):
  count = 0
  for sentence in nested_list:
    if(len(sentence) <= max_len):
        count = count + 1
  print('전체 샘플 중 길이가 %s 이하인 샘플의 비율: %s'%(max_len, (count / len(nested_list))*100))

max_len = 50
below_threshold_len(max_len, X_train)

#전체 샘플 중 길이가 50 이하인 샘플의 비율: 95.98465883990843

 

최대 길이 기준으로 짜르고, 그것보다 작은 문장은 pad토큰으로 채워넣습니다

def pad_sequences(sentences, max_len):
  features = np.zeros((len(sentences), max_len), dtype=int)
  for index, sentence in enumerate(sentences):
    if len(sentence) != 0:
      features[index, :len(sentence)] = np.array(sentence)[:max_len]
  return features

padded_X_train = pad_sequences(encoded_X_train, max_len=max_len)
padded_X_valid = pad_sequences(encoded_X_valid, max_len=max_len)
padded_X_test = pad_sequences(encoded_X_test, max_len=max_len)

print('훈련 데이터의 크기 :', padded_X_train.shape)
print('검증 데이터의 크기 :', padded_X_valid.shape)
print('테스트 데이터의 크기 :', padded_X_test.shape)

 

데이터가 완성되었습니다.

확인해 봅시다

print('첫번째 샘플의 길이 :', len(padded_X_train[0]))
print('첫번째 샘플 :', padded_X_train[0])

문장은 50으로, 뒤에는 0으로 채워넣음

 

 

#4. 모델설계

각 데이터를 텐서로 변환하여, 학습을 할수 있게 dataloader로 구성

train_label_tensor = torch.tensor(np.array(y_train))
valid_label_tensor = torch.tensor(np.array(y_valid))
test_label_tensor = torch.tensor(np.array(y_test))


encoded_train = torch.tensor(padded_X_train).to(torch.int64)
train_dataset = torch.utils.data.TensorDataset(encoded_train, train_label_tensor)
train_dataloader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=32)

encoded_test = torch.tensor(padded_X_test).to(torch.int64)
test_dataset = torch.utils.data.TensorDataset(encoded_test, test_label_tensor)
test_dataloader = torch.utils.data.DataLoader(test_dataset, shuffle=True, batch_size=1)

encoded_valid = torch.tensor(padded_X_valid).to(torch.int64)
valid_dataset = torch.utils.data.TensorDataset(encoded_valid, valid_label_tensor)
valid_dataloader = torch.utils.data.DataLoader(valid_dataset, shuffle=True, batch_size=1)

total_batch = len(train_dataloader)
print('총 배치의 수 : {}'.format(total_batch))

 

LSTM모델을 설계합니다.

class TextClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(TextClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, num_layers=5)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # x: (batch_size, seq_length)
        embedded = self.embedding(x)  # (batch_size, seq_length, embedding_dim)

        # LSTM은 (hidden state, cell state)의 튜플을 반환합니다
        lstm_out, (hidden, cell) = self.lstm(embedded)  # lstm_out: (batch_size, seq_length, hidden_dim), hidden: (1, batch_size, hidden_dim)

        # last_hidden = hidden.squeeze(0)  # (batch_size, hidden_dim)
        last_hidden = hidden[-1]
        logits = self.fc(last_hidden)  # (batch_size, output_dim)
        return logits

 

 

하이퍼 파라미터를 설정합니다

embedding_dim = 100
hidden_dim = 128
output_dim = 2
learning_rate = 0.01
num_epochs = 10

model = TextClassifier(vocab_size, embedding_dim, hidden_dim, output_dim)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

 

정확도 측정함수, 평가함수를 선언합니다

def calculate_accuracy(logits, labels):
    # _, predicted = torch.max(logits, 1)
    predicted = torch.argmax(logits, dim=1)
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = correct / total
    return accuracy

def evaluate(model, valid_dataloader, criterion, device):
    val_loss = 0
    val_correct = 0
    val_total = 0

    model.eval()
    with torch.no_grad():
        # 데이터로더로부터 배치 크기만큼의 데이터를 연속으로 로드
        for batch_X, batch_y in valid_dataloader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            # 모델의 예측값
            logits = model(batch_X)

            # 손실을 계산
            loss = criterion(logits, batch_y)

            # 정확도와 손실을 계산함
            val_loss += loss.item()
            val_correct += calculate_accuracy(logits, batch_y) * batch_y.size(0)
            val_total += batch_y.size(0)

    val_accuracy = val_correct / val_total
    val_loss /= len(valid_dataloader)

    return val_loss, val_accuracy

 

 

#5. 학습 및 평가

5epoch 학습합니다.

num_epochs = 5

# Training loop
best_val_loss = float('inf')

# Training loop
for epoch in range(num_epochs):
    # Training
    train_loss = 0
    train_correct = 0
    train_total = 0
    model.train()
    for batch_X, batch_y in train_dataloader:
        # Forward pass
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        # batch_X.shape == (batch_size, max_len)
        logits = model(batch_X)

        # Compute loss
        loss = criterion(logits, batch_y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Calculate training accuracy and loss
        train_loss += loss.item()
        train_correct += calculate_accuracy(logits, batch_y) * batch_y.size(0)
        train_total += batch_y.size(0)

    train_accuracy = train_correct / train_total
    train_loss /= len(train_dataloader)

    # Validation
    val_loss, val_accuracy = evaluate(model, valid_dataloader, criterion, device)

    print(f'Epoch {epoch+1}/{num_epochs}:')
    print(f'Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

    # 검증 손실이 최소일 때 체크포인트 저장
    if val_loss < best_val_loss:
        print(f'Validation loss improved from {best_val_loss:.4f} to {val_loss:.4f}. 체크포인트를 저장합니다.')
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'best_model_checkpoint.pth')

99% acc

 

테스트 데이터로 성능을 확인합니다.

# 모델 로드
model.load_state_dict(torch.load('best_model_checkpoint.pth'))

# 모델을 device에 올립니다.
model.to(device)

# 테스트 데이터에 대한 정확도와 손실 계산
test_loss, test_accuracy = evaluate(model, test_dataloader, criterion, device)

print(f'Best model test loss: {test_loss:.4f}')
print(f'Best model test accuracy: {test_accuracy:.4f}')

테스트 데이터도 99%

 

정말 잘되는지, 휴먼(?)테스트 진행

test_input = "[web발신]등산얘기한지 하루 하고도 15시간이 지났어요. 뭐하고 계셨어요?"
predict(test_input, model, word_to_index, index_to_tag)

#'스팸'

test_input = "야야 뭐하냐?"
predict(test_input, model, word_to_index, index_to_tag)

#'스팸'

뭥미.... 99% 정확도인데..... 내가 작성한 정상문자를 이렇게 ㅜㅜ

 

우선 데이터셋의 문제가 가장 큰것 같다. 시중에 있는 spam데이터는 확보했지만,

일반 문자데이터가 없어서, ai허브에 있는 일상 대화셋을 ham데이터로 활용.

결과가 만족스럽지 못하다...

 

LSTM모델을 완성하고, NLP모델 학습의 전체 프로세스를 확인한 것에 만족하자 :)