Ssul's Blog

[추천시스템#6] FM(Factorization Machines) 본문

AI & ML

[추천시스템#6] FM(Factorization Machines)

Ssul 2024. 1. 3. 00:27

0. MF와 FM의 차이

- MF는 user, item 두개의 변수를 K차원의 Latent factor로 만들어서 평점을 예측

- user, item 이외에 다른 변수들이 평점에 영향을 준다면? 그러면 FM을 사용

 

1. FM을 이해하기

- FM은 위 그림과 같이 하나의 평점에 대해서, 그에 영향을 준다고 판단되는 입력변수를 각각 임베딩하여 표현

- 이렇게 하면, 데이터로 가지고 있는 모든 평점에 대해서, 각각의 입력변수의 고유한 임베딩 벡터의 조합으로 표현 가능

- 평점을 예측하는 pred 모델식은 다음과 같음

Blue:bias score, Red: latent score

- 모델의 업데이트 룰(feat. 교수님 강의록)

 

2. 코드로 구현하기

import numpy as np
import pandas as pd
from sklearn.utils import shuffle

# Load the u.data file into a dataframe
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('./u.data', sep='\t', names=r_cols, encoding='latin-1')
ratings = ratings[['user_id', 'movie_id', 'rating']]

# 각 데이터들 1개 행으로 나열할 준비
# User encoding
user_dict = {}
# user 수만큼
for i in set(ratings['user_id']):
    # 0: user_id, 1: user_id
    user_dict[i] = len(user_dict)
n_user = len(user_dict)
# Item encoding
item_dict = {}
start_point = n_user
# item수만큼
for i in set(ratings['movie_id']):
    item_dict[i] = start_point + len(item_dict)
n_item = len(item_dict)
start_point += n_item
num_x = start_point               # Total number of x
x = shuffle(ratings, random_state=12)

# Generate X data
data = []
y = []
w0 = np.mean(x['rating'])
for i in range(len(ratings)):
    # rating의 i번째 행 [userid, movieid, rating)
    case = x.iloc[i]
    x_index = []
    x_value = []
    # x행의 0열(user_id)의 값 > user_id > FM의 열
    x_index.append(user_dict[case['user_id']])     # User id encoding
    x_value.append(1)
    # x행의 1열의 값 > movie_id> FM의 열
    x_index.append(item_dict[case['movie_id']])    # Movie id encoding
    x_value.append(1)
    # N행의 값 > data = [ [1,108, 109,..], [1,1,0.5,....] ]
    data.append([x_index, x_value])
    # 평균을 뺀 y값
    y.append(case['rating'] - w0)
    if (i % 10000) == 0:
        print('Encoding ', i, ' cases...')

def RMSE(y_true, y_pred):
    return np.sqrt(np.mean((np.array(y_true) - np.array(y_pred))**2))

class FM():
    def __init__(self, N, K, data, y, alpha, beta, train_ratio=0.75, iterations=100, tolerance=0.005, l2_reg=True, verbose=True):
        self.K = K          # Number of latent factors
        # 열의 갯수, 여기서는 user_id갯수 + movie_id
        self.N = N          # Number of x (variables)
        # 10만개
        self.n_cases = len(data)            # N of observations
        self.alpha = alpha
        self.beta = beta
        self.iterations = iterations
        self.l2_reg = l2_reg
        self.tolerance = tolerance
        self.verbose = verbose
        # w 초기화
        # FM의 열갯수 > 여기서는 user_id + movie_id
        self.w = np.random.normal(scale=1./self.N, size=(self.N))
        # v 초기화
        # N*K = v
        self.v = np.random.normal(scale=1./self.K, size=(self.N, self.K))
        # Train/Test 분리
        cutoff = int(train_ratio * len(data))
        self.train_x = data[:cutoff]
        self.test_x = data[cutoff:]
        self.train_y = y[:cutoff]
        self.test_y = y[cutoff:]

    def test(self):                                     # Training 하면서 RMSE 계산 
        # SGD를 iterations 숫자만큼 수행
        best_RMSE = 10000
        best_iteration = 0
        training_process = []
        for i in range(self.iterations):
            rmse1 = self.sgd(self.train_x, self.train_y)        # SGD & Train RMSE 계산
            rmse2 = self.test_rmse(self.test_x, self.test_y)    # Test RMSE 계산     
            training_process.append((i, rmse1, rmse2))
            if self.verbose:
                if (i+1) % 10 == 0:
                    print("Iteration: %d ; Train RMSE = %.6f ; Test RMSE = %.6f" % (i+1, rmse1, rmse2))
            if best_RMSE > rmse2:                       # New best record
                best_RMSE = rmse2
                best_iteration = i
            elif (rmse2 - best_RMSE) > self.tolerance:  # RMSE is increasing over tolerance
                break
        print(best_iteration, best_RMSE)
        return training_process
        
    # w, v 업데이트를 위한 Stochastic gradient descent 
    def sgd(self, x_data, y_data):
        y_pred = []
        # [x_index, x_value], [rating]
        for data, y in zip(x_data, y_data):
            # [1,190]
            x_idx = data[0]
            # [1,1]
            x_0 = np.array(data[1])     # xi axis=0 [1, 2, 3]
            # [[1],[1]]
            x_1 = x_0.reshape(-1, 1)    # xi axis=1 [[1], [2], [3]]
    
            # biases
            # 학습하는 w*x값 > w = 1행*(user_id갯수+movie_id갯수)> w[1, 190]*[1,1]
            bias_score = np.sum(self.w[x_idx] * x_0)

            # v = (user_id+movie_id) x K(latent
            # v[x_idx]: user_id가 1인 행, movie_id가 190인 행 > 2개행*350(latent)
            # x_1: 2행1열 > [[1],[1]]
            # score 계산, vx: row는 x가 0이 아닌것의 갯수, col은 k
            # v[x_idx]: 2*K, x_1: 2*1 = [[1],[1]]
            vx = self.v[x_idx] * (x_1)          # v matrix * x
            # 2행 k열 > 1행 K
            sum_vx = np.sum(vx, axis=0)         # sigma(vx)
            # 1행 K
            sum_vx_2 = np.sum(vx * vx, axis=0)  # ( v matrix * x )의 제곱
            # (1행K열)^2 - (1행K열) > sum(1행K열) > 1
            latent_score = 0.5 * np.sum(np.square(sum_vx) - sum_vx_2)

            # 예측값 계산
            y_hat = bias_score + latent_score
            y_pred.append(y_hat)
            error = y - y_hat
            # w, v 업데이트
            if self.l2_reg:     # regularization이 있는 경우
                self.w[x_idx] += error * self.alpha * (x_0 - self.beta * self.w[x_idx])
                self.v[x_idx] += error * self.alpha * ((x_1) * sum(vx) - (vx * x_1) - self.beta * self.v[x_idx])
            else:               # regularization이 없는 경우
                self.w[x_idx] += error * self.alpha * x_0
                self.v[x_idx] += error * self.alpha * ((x_1) * sum(vx) - (vx * x_1))
        return RMSE(y_data, y_pred)

    def test_rmse(self, x_data, y_data):
        y_pred = []
        for data , y in zip(x_data, y_data):
            y_hat = self.predict(data[0], data[1])
            y_pred.append(y_hat)
        return RMSE(y_data, y_pred)

    def predict(self, idx, x):
        # 업데이트 없이 계산(test data)
        x_0 = np.array(x)
        x_1 = x_0.reshape(-1, 1)

        # biases
        bias_score = np.sum(self.w[idx] * x_0)

        # score 계산
        vx = self.v[idx] * (x_1)
        sum_vx = np.sum(vx, axis=0)
        sum_vx_2 = np.sum(vx * vx, axis=0)
        latent_score = 0.5 * np.sum(np.square(sum_vx) - sum_vx_2)

        # 예측값 계산
        y_hat = bias_score + latent_score
        return y_hat

    def predict_one(self, user_id, movie_id):
        x_idx = np.array([user_dict[user_id], item_dict[movie_id]])
        x_data = np.array([1, 1])
        return self.predict(x_idx, x_data) + w0

K = 350
# tolerance 허용오차. 최고 test rmse대비 tolerance이상 나빠지면 멈추기
fm1 = FM(num_x, K, data, y, alpha=0.002, beta=0.01, train_ratio=0.75, iterations=600, tolerance=0.0005, l2_reg=True, verbose=True)
result = fm1.test()

 

 

3. FM의 특성

- 입력변수의 특성, 변수간의 상호작용도 학습한다.

(나중에 해볼 DeepFM은 입력변수의 특성, 변수간의 상호작용, 그리고 변수간 비정형적 상호작용까지 학습)

- sparse data도 Factorization을 사용하여 잘 학습됨

- 다양한 입력변수를 넣기 쉽다

- latent factor 차원이 MF보다는 상대적으로 작은 것이 성능 좋음