Ssul's Blog
[추천시스템#5] MF(Matrix Factorization) 본문
0. Memory-based vs Model-based
- 지금까지 봤던 CF는 Memory-based의 추천시스템. 추천을 위해서 기존의 rating정보를 연산하여, 추천리스트를 생산
- Memory-based방식은 pred하는데 rating정보 모두가 매번 계산이 진행됨
- 이는 대량의 데이터에는 현실적으로 가능하지 않음. 매번 추천때마다 유사도 측정하고, 추천하는게... 연산이 너무 많음
- 오늘 이야기할 MF는 Model-based모델. 이 모델은 학습을 통해서 모델을 만들고, 이후 예측할때는 학습을 마친 모델로 바로 결과값을 내보냄
1. MF의 개념
- user를 특정한 latent factor로 임베딩
- Item역시 특정한 latent factor로 임베딩
- 이는 user와 item을 latent factor차원의 공간으로 배치하는 구조
- 가까운 공간에 위치하는 user*item의 값은 높을 것이고, 먼 공간에 위치하는 user*item의 값은 낮게 됨
- user*item의 값을 rating(user의 item에 대한 평점)으로 학습시키면, 높은 평점의 user-영화는 같은공간에, 낮은 평점의 user-영화는 서로 다른 공간에 위치하게 됨
- 이렇게 학습이 된 user, item의 latent factor는 자연스럽게 user와 영화의 성향을 반영하여 공간에 배치됨.
- 그리고 user*item은 평점을 예측하는 점수가 나오는 구조
- 학습은 user*item의 값이 pred, 원래 rating이 label > (label-pred)이 최소화 되는 방향으로 user, item의 latent-factor 학습
- 최종적으로 학습된 latent-factor는 user*item을 하면, 평점을 예측해주는 모델이 완성됨
2. MF구현하기(w code)
import numpy as np
import pandas as pd
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('./u.data', names=r_cols, sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int) # timestamp 제거
ratings = ratings.pivot(index = 'user_id', columns ='movie_id', values = 'rating').fillna(0)
# 943명이 평가, 1682개의 영화(943*1682)
type(ratings)
- 행: user, 열: movies, 요소값: rating
class MF():
# Initializing the object
def __init__(self, ratings, K, alpha, beta, iterations, verbose=True):
self.R = np.array(ratings)
# user_id, movie_id를 R의 index와 매칭하기 위한 dictionary 생성
movie_id_index = []
index_movie_id = []
# i는 0, one_id는 movie_id
for i, one_id in enumerate(ratings):
# [movie_id, 0부터]
movie_id_index.append([one_id, i])
# [0부터, movie_id]
index_movie_id.append([i, one_id])
# {1부터, 0부터}
self.movie_id_index = dict(movie_id_index)
# {0부터, 1부터}
self.index_movie_id = dict(index_movie_id)
user_id_index = []
index_user_id = []
# i: 0부터, one_id: user_id
for i, one_id in enumerate(ratings.T):
user_id_index.append([one_id, i])
index_user_id.append([i, one_id])
# {user_id 1부터, 0부터}
self.user_id_index = dict(user_id_index)
# {0부터, user_id 1부터}
self.index_user_id = dict(index_user_id)
# 다른 변수 초기화(943,1682)
self.num_users, self.num_items = np.shape(self.R)
self.K = K
self.alpha = alpha
self.beta = beta
self.iterations = iterations
self.verbose = verbose
def train(self): # Training 하면서 test set의 정확도를 계산하는 메소드
# Initializing user-feature and movie-feature matrix
# 학습P,Q 초기화
# normal함수(정규분포에서 샘플생성) 인자: loc(평균) = 0, scale(표준편차)=1./self.K, size(크기)
# P = user수 * K, Q = K * movie수
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initializing the bias terms
self.b_u = np.zeros(self.num_users) #943
self.b_d = np.zeros(self.num_items) #1682
# 모든 평점의 평균: 0이 아닌것 추출 > 전체 평균
self.b = np.mean(self.R[self.R.nonzero()])
# List of training samples
# self.R에서 0이 아닌 모든 평점의 위치와 값을 추출하여 (행 인덱스, 열 인덱스, 평점) 형태의 튜플 리스트로 저장
rows, columns = self.R.nonzero()
# 샘플1개 = (유저, 영화, 평점), samples = [(유저, 영화, 평점),(유저, 영화, 평점)..]
self.samples = [(i,j, self.R[i,j]) for i, j in zip(rows, columns)]
# Stochastic gradient descent for given number of iterations
training_process = []
for i in range(self.iterations):
# samples수정됨
np.random.shuffle(self.samples)
self.sgd() # epoch 실행
rmse = self.rmse()
training_process.append((i, rmse))
if self.verbose:
if (i+1) % 2 == 0:
print("Iteration: %d ; Train RMSE = %.6f " % (i+1, rmse))
return training_process
# Stochastic gradient descent to get optimized P and Q matrix
def sgd(self):
for i, j, r in self.samples:
# pred해서 오차 구하고
prediction = self.get_prediction(i, j)
error = (r - prediction)
# bias업데이트
self.b_u[i] += self.alpha * (error - self.beta * self.b_u[i])
self.b_d[j] += self.alpha * (error - self.beta * self.b_d[j])
# P, Q update
self.Q[j, :] += self.alpha * (error * self.P[i, :] - self.beta * self.Q[j,:])
self.P[i, :] += self.alpha * (error * self.Q[j, :] - self.beta * self.P[i,:])
# Computing mean squared error
def rmse(self):
# 0이 아닌 모든 평점값의 user_id, movie_id
rows, columns = self.R.nonzero()
self.predictions = []
self.errors = []
for x, y in zip(rows, columns):
prediction = self.get_prediction(x, y)
self.predictions.append(prediction)
self.errors.append(self.R[x, y] - prediction)
self.predictions = np.array(self.predictions)
self.errors = np.array(self.errors)
return np.sqrt(np.mean(self.errors**2))
# Ratings for user i and moive j
def get_prediction(self, i, j):
# P*K에서 user i정보, K*Q에서 movie j정보
prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
return prediction
# Ratings for user_id and moive_id
def get_one_prediction(self, user_id, movie_id):
return self.get_prediction(self.user_id_index[user_id], self.movie_id_index[movie_id])
__init__: user, movie의 index를 가져오기 위한 dict생성 > label평점 가져올때 사용
- user들의 초기 latent-factor 매트릭스, movie의 초기 latent-factor 매트릭스 셋팅
- 기타 하이퍼 파라미터 설정: K=latent factor, iterations=학습횟수
- get_prediction: userid, movieid를 입력받아, MF를 통한 평첨 예측하는 함수
- prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T): latent매트릭스에서 i,j에 해당하는 것 추출해서 P*Q를 통해 해당 user-movie에 대한 평점 예측
sgd: 모든 샘플(모든 rating데이터)에 대해서, 학습하여 lactent 매트릭스를 업데이트 하는 함수
prediction = self.get_prediction(i, j)
error = (r - prediction)
- user i, 영화 j에 대한 rating예측값 구하기
- 실제 평점r(label)과 예측값 prediction을 통해 error 구함
# bias업데이트
self.b_u[i] += self.alpha * (error - self.beta * self.b_u[i])
self.b_d[j] += self.alpha * (error - self.beta * self.b_d[j])
# P, Q update
self.Q[j, :] += self.alpha * (error * self.P[i, :] - self.beta * self.Q[j,:])
self.P[i, :] += self.alpha * (error * self.Q[j, :] - self.beta * self.P[i,:])
- 공식에 따라, latent 매트릭스 업데이트
train: 설정된 iterations에 따라 학습을 반복해서, 모델을 완성
# 학습P,Q 초기화
# normal함수(정규분포에서 샘플생성) 인자: loc(평균) = 0, scale(표준편차)=1./self.K, size(크기)
# P = user수 * K, Q = K * movie수
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initializing the bias terms
self.b_u = np.zeros(self.num_users) #943
self.b_d = np.zeros(self.num_items) #1682
# 모든 평점의 평균: 0이 아닌것 추출 > 전체 평균
self.b = np.mean(self.R[self.R.nonzero()])
# List of training samples
# self.R에서 0이 아닌 모든 평점의 위치와 값을 추출하여 (행 인덱스, 열 인덱스, 평점) 형태의 튜플 리스트로 저장
rows, columns = self.R.nonzero()
# 샘플1개 = (유저, 영화, 평점), samples = [(유저, 영화, 평점),(유저, 영화, 평점)..]
self.samples = [(i,j, self.R[i,j]) for i, j in zip(rows, columns)]
- latent매트릭스 P,Q 정규화
- 학습할 samples 데이터 셋 구성
training_process = []
for i in range(self.iterations):
# samples수정됨
np.random.shuffle(self.samples)
self.sgd() # epoch 실행
rmse = self.rmse()
training_process.append((i, rmse))
if self.verbose:
if (i+1) % 2 == 0:
print("Iteration: %d ; Train RMSE = %.6f " % (i+1, rmse))
return training_process
- 설정된 iterations에 따라 반복
- 학습할 데이터셋 섞고,
- sgd함수를 통하여, P,Q 매트릭스 업데이트
- 최종적으로 학습을 마친 P, Q는 모든 user에 대한 latent벡터, movie에 대한 벡터 보유
- 평점을 예측하려면, 해당 매트릭스에서 user i에 해당하는 벡터 p와 movie j에 해당하는 벡터 q를 추출, p*q를 계산하면, 예측 평점이 생성됨
3. 정리
- 기존의 CF와는 다르게 모든 user 또는 movie간의 유사도를 계산하고, 추천하지 않음.
- 학습을 마친 P, Q라는 latent매트릭스를 보유(Model based)
- user/movie 정보가 들어오면, 이미 학습된 매트릭스에서 해당 벡터만 추출하여 연산. 최종 평점을 예측(빠른 속도)
'AI & ML' 카테고리의 다른 글
[추천시스템#7] 딥러닝 추천 (0) | 2024.01.15 |
---|---|
[추천시스템#6] FM(Factorization Machines) (2) | 2024.01.03 |
[추천시스템#4]CB(콘텐츠 기반 필터링) (0) | 2023.12.22 |
[추천시스템#3]CF-UBCF(유저기반 협업필터링) (0) | 2023.12.21 |
[추천시스템#2]CF-IBCF(아이템기반 협업필터링) (1) | 2023.12.21 |