Ssul's Blog

[추천시스템#5] MF(Matrix Factorization) 본문

AI & ML

[추천시스템#5] MF(Matrix Factorization)

Ssul 2023. 12. 29. 14:30

0. Memory-based vs Model-based

- 지금까지 봤던 CF는 Memory-based의 추천시스템. 추천을 위해서 기존의 rating정보를 연산하여, 추천리스트를 생산

- Memory-based방식은 pred하는데 rating정보 모두가 매번 계산이 진행됨

- 이는 대량의 데이터에는 현실적으로 가능하지 않음. 매번 추천때마다 유사도 측정하고, 추천하는게... 연산이 너무 많음

- 오늘 이야기할 MF는 Model-based모델. 이 모델은 학습을 통해서 모델을 만들고, 이후 예측할때는 학습을 마친 모델로 바로 결과값을 내보냄

 

1. MF의 개념

- user를 특정한 latent factor로 임베딩

- Item역시 특정한 latent factor로 임베딩

- 이는 user와 item을 latent factor차원의 공간으로 배치하는 구조

- 가까운 공간에 위치하는 user*item의 값은 높을 것이고, 먼 공간에 위치하는 user*item의 값은 낮게 됨

- user*item의 값을 rating(user의 item에 대한 평점)으로 학습시키면, 높은 평점의 user-영화는 같은공간에, 낮은 평점의 user-영화는 서로 다른 공간에 위치하게 됨

- 이렇게 학습이 된 user, item의 latent factor는 자연스럽게 user와 영화의 성향을 반영하여 공간에 배치됨.

- 그리고 user*item은 평점을 예측하는 점수가 나오는 구조

- 학습은 user*item의 값이 pred, 원래 rating이 label > (label-pred)이 최소화 되는 방향으로 user, item의 latent-factor 학습

- 최종적으로 학습된 latent-factor는 user*item을 하면, 평점을 예측해주는 모델이 완성됨

원래 rating - 예측된값(user_i*item_j) = error

 

 

2. MF구현하기(w code)

import numpy as np
import pandas as pd

r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('./u.data', names=r_cols,  sep='\t',encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']].astype(int)            # timestamp 제거
ratings = ratings.pivot(index = 'user_id', columns ='movie_id', values = 'rating').fillna(0)
# 943명이 평가, 1682개의 영화(943*1682)
type(ratings)

- 행: user, 열: movies, 요소값: rating

 

class MF():
    # Initializing the object
    def __init__(self, ratings, K, alpha, beta, iterations, verbose=True):
        self.R = np.array(ratings)
        # user_id, movie_id를 R의 index와 매칭하기 위한 dictionary 생성
        movie_id_index = []
        index_movie_id = []
        # i는 0, one_id는 movie_id
        for i, one_id in enumerate(ratings):
            # [movie_id, 0부터]
            movie_id_index.append([one_id, i])
            # [0부터, movie_id]
            index_movie_id.append([i, one_id])
        # {1부터, 0부터}
        self.movie_id_index = dict(movie_id_index)
        # {0부터, 1부터}
        self.index_movie_id = dict(index_movie_id)        
        user_id_index = []
        index_user_id = []
        # i: 0부터, one_id: user_id
        for i, one_id in enumerate(ratings.T):
            user_id_index.append([one_id, i])
            index_user_id.append([i, one_id])
        # {user_id 1부터, 0부터}
        self.user_id_index = dict(user_id_index)
        # {0부터, user_id 1부터}
        self.index_user_id = dict(index_user_id)
        # 다른 변수 초기화(943,1682)
        self.num_users, self.num_items = np.shape(self.R)
        self.K = K
        self.alpha = alpha
        self.beta = beta
        self.iterations = iterations
        self.verbose = verbose

    def train(self):                             # Training 하면서 test set의 정확도를 계산하는 메소드 
        # Initializing user-feature and movie-feature matrix
        # 학습P,Q 초기화
        # normal함수(정규분포에서 샘플생성) 인자: loc(평균) = 0, scale(표준편차)=1./self.K, size(크기)
        # P = user수 * K, Q = K * movie수
        self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
        self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))

        # Initializing the bias terms
        self.b_u = np.zeros(self.num_users) #943
        self.b_d = np.zeros(self.num_items) #1682
        # 모든 평점의 평균: 0이 아닌것 추출 > 전체 평균
        self.b = np.mean(self.R[self.R.nonzero()])

        # List of training samples
        # self.R에서 0이 아닌 모든 평점의 위치와 값을 추출하여 (행 인덱스, 열 인덱스, 평점) 형태의 튜플 리스트로 저장
        rows, columns = self.R.nonzero()
        # 샘플1개 = (유저, 영화, 평점), samples = [(유저, 영화, 평점),(유저, 영화, 평점)..]
        self.samples = [(i,j, self.R[i,j]) for i, j in zip(rows, columns)]

        # Stochastic gradient descent for given number of iterations
        training_process = []
        for i in range(self.iterations):
            # samples수정됨
            np.random.shuffle(self.samples)
            self.sgd() # epoch 실행
            rmse = self.rmse()
            training_process.append((i, rmse))
            if self.verbose:
                if (i+1) % 2 == 0:
                    print("Iteration: %d ; Train RMSE = %.6f " % (i+1, rmse))
        return training_process

    # Stochastic gradient descent to get optimized P and Q matrix
    def sgd(self):
        for i, j, r in self.samples:
            # pred해서 오차 구하고
            prediction = self.get_prediction(i, j)
            error = (r - prediction)

            # bias업데이트
            self.b_u[i] += self.alpha * (error - self.beta * self.b_u[i])
            self.b_d[j] += self.alpha * (error - self.beta * self.b_d[j])

            # P, Q update
            self.Q[j, :] += self.alpha * (error * self.P[i, :] - self.beta * self.Q[j,:])
            self.P[i, :] += self.alpha * (error * self.Q[j, :] - self.beta * self.P[i,:])

    # Computing mean squared error
    def rmse(self):
        # 0이 아닌 모든 평점값의 user_id, movie_id
        rows, columns = self.R.nonzero()
        self.predictions = []
        self.errors = []
        for x, y in zip(rows, columns):
            prediction = self.get_prediction(x, y)
            self.predictions.append(prediction)
            self.errors.append(self.R[x, y] - prediction)
        self.predictions = np.array(self.predictions)
        self.errors = np.array(self.errors)
        return np.sqrt(np.mean(self.errors**2))

    # Ratings for user i and moive j
    def get_prediction(self, i, j):
        # P*K에서 user i정보, K*Q에서 movie j정보
        prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T)
        return prediction

    # Ratings for user_id and moive_id
    def get_one_prediction(self, user_id, movie_id):
        return self.get_prediction(self.user_id_index[user_id], self.movie_id_index[movie_id])

 __init__: user, movie의 index를 가져오기 위한 dict생성 > label평점 가져올때 사용

- user들의 초기 latent-factor 매트릭스, movie의 초기 latent-factor 매트릭스 셋팅

- 기타 하이퍼 파라미터 설정: K=latent factor, iterations=학습횟수

 

- get_prediction: userid, movieid를 입력받아, MF를 통한 평첨 예측하는 함수

- prediction = self.b + self.b_u[i] + self.b_d[j] + self.P[i, :].dot(self.Q[j, :].T): latent매트릭스에서 i,j에 해당하는 것 추출해서 P*Q를 통해 해당 user-movie에 대한 평점 예측

 

sgd: 모든 샘플(모든 rating데이터)에 대해서, 학습하여 lactent 매트릭스를 업데이트 하는 함수

prediction = self.get_prediction(i, j)
error = (r - prediction)

- user i, 영화 j에 대한 rating예측값 구하기

- 실제 평점r(label)과 예측값 prediction을 통해 error 구함

 

# bias업데이트
self.b_u[i] += self.alpha * (error - self.beta * self.b_u[i])
self.b_d[j] += self.alpha * (error - self.beta * self.b_d[j])

# P, Q update
self.Q[j, :] += self.alpha * (error * self.P[i, :] - self.beta * self.Q[j,:])
self.P[i, :] += self.alpha * (error * self.Q[j, :] - self.beta * self.P[i,:])

- 공식에 따라, latent 매트릭스 업데이트

 

 

train: 설정된 iterations에 따라 학습을 반복해서, 모델을 완성

# 학습P,Q 초기화
# normal함수(정규분포에서 샘플생성) 인자: loc(평균) = 0, scale(표준편차)=1./self.K, size(크기)
# P = user수 * K, Q = K * movie수
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))

# Initializing the bias terms
self.b_u = np.zeros(self.num_users) #943
self.b_d = np.zeros(self.num_items) #1682
# 모든 평점의 평균: 0이 아닌것 추출 > 전체 평균
self.b = np.mean(self.R[self.R.nonzero()])

# List of training samples
# self.R에서 0이 아닌 모든 평점의 위치와 값을 추출하여 (행 인덱스, 열 인덱스, 평점) 형태의 튜플 리스트로 저장
rows, columns = self.R.nonzero()
# 샘플1개 = (유저, 영화, 평점), samples = [(유저, 영화, 평점),(유저, 영화, 평점)..]
self.samples = [(i,j, self.R[i,j]) for i, j in zip(rows, columns)]

- latent매트릭스 P,Q 정규화

- 학습할 samples 데이터 셋 구성

 

training_process = []
for i in range(self.iterations):
    # samples수정됨
    np.random.shuffle(self.samples)
    self.sgd() # epoch 실행
    rmse = self.rmse()
    training_process.append((i, rmse))
    if self.verbose:
        if (i+1) % 2 == 0:
            print("Iteration: %d ; Train RMSE = %.6f " % (i+1, rmse))
return training_process

- 설정된 iterations에 따라 반복

- 학습할 데이터셋 섞고,

- sgd함수를 통하여, P,Q 매트릭스 업데이트

- 최종적으로 학습을 마친 P, Q는 모든 user에 대한 latent벡터, movie에 대한 벡터 보유

- 평점을 예측하려면, 해당 매트릭스에서 user i에 해당하는 벡터 p와 movie j에 해당하는 벡터 q를 추출, p*q를 계산하면, 예측 평점이 생성됨

 

 

3. 정리

- 기존의 CF와는 다르게 모든 user 또는 movie간의 유사도를 계산하고, 추천하지 않음.

- 학습을 마친 P, Q라는 latent매트릭스를 보유(Model based)

- user/movie 정보가 들어오면, 이미 학습된 매트릭스에서 해당 벡터만 추출하여 연산. 최종 평점을 예측(빠른 속도)