RAG가 적용된 나만의 챗봇 만들기(langchain, FastAPI, streamlit)
1. 챗봇의 주요기능 및 구조
①RAG에 사용할 참고도서, 문헌, 웹자료를 적절히 쪼개서 임베딩 합니다.
- RAG에 사용할 자료를 적절히 쪼개고
- 임베딩 모델을 결정(필자는 openai의 "text-embedding-3-small"모델 사용)하여, 해당 모델로 쪼갠 자료를 임베딩 진행(N차원 공간에 1개의 점들로)
- 해당 임베딩 정보를 csv파일 또는 chromadb, faiss로 저장
②채팅창에서 사용자의 채팅 입력을 받습니다.
- 화면에 채팅입력창을 출력하고, 사용자의 채팅을 입력받습니다.
- 입력받은 채팅을 백엔드 api.py로 보냅니다.
③사용자의 입력내용을 RAG진행
- 사용자의 입력내용을 임베딩(N차원의 공간에 1개의 점으로)
- 이미 임베딩 해놓은 파일/db에서, 가장 유사한(근처에 있는) 데이터3개 가져옴
- 사용자의 입력과 가장 관련있는 내용을 3개 뽑아 반환합니다.
④RAG내용이 포함된 시스템 메세지 완성
- 관리자가 지정한 system message와 RAG로 뽑은 3개의 참고자료를 결합하여 완성된 시스템 메세지를 만듭니다.
- llm모델을 생성하고 해당 시스템 메세지(관리자메세지+RAG결과 Top3자료) 입력합니다.
⑤모델에게 유저가 입력한 채팅을 User_message로 전달하고, 답을 받습니다.
- 채팅 내용을 csv 또는 db로 저장합니다.
⑥AI의 응답을 채팅창에 출력합니다.
etc. 저장된 채팅내용을 확인할수 있는 chatlog.py
2. Embedding - 가지고 있는 자료 RAG를 위해서 임베딩하기
RAG임베딩 작업할때 고려해야 할 것이 몇개 있다.
- 어떤 임베딩 모델을 사용할 것인가?
- 자료 리더기는 무엇을 쓸 것인가?
- 어떻게 자료를 쪼갤 것인가? 뭘로 쪼갤 것인가?
- 어떻게 저장할 것인가?
여기에는 정답은 없는 것 같다. 나와 같은 초보라면 고수들이 사용한다는 것을 따라 쓰기를 추천
이번 챗봇 만들기에서는
- 임베딩 모델: Openai의 "text-embedding-3-small"모델 사용(유료지만 부담없는 가격)
- 자료리더기: pdfplumber
- 텍스트스플리터: RecursiveCharacterTextSplitter(청크 250, overlap=50...)
- 임베딩 자료 저장: csv, faiss 두개다 해보았다
2-1. pdf자료들을 임베딩하여, csv파일에 저장(소스코드에 주석으로 설명)
[embedding.py]
import ast
from pathlib import Path
import openai
import pdfplumber
from langchain.text_splitter import RecursiveCharacterTextSplitter
from openai import OpenAI
import os
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')
client = OpenAI()
# openai embedding함수
def get_embedding(text, model="text-embedding-3-small"):
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=model).data[0].embedding
# 파일명 입력 함수
def get_filename():
filename = input("저장할 파일명을 입력하세요 (예: embedding.csv): ")
return filename
# textspliter 설정
text_splitter = RecursiveCharacterTextSplitter(
# 청크 크기 설정(약 500글자)
chunk_size=250,
# 청크 간의 중복되는 문자 수를 설정
chunk_overlap=50,
# 문자열 길이를 계산하는 함수를 지정합니다.
length_function=len,
# 구분자로 정규식을 사용할지 여부를 설정합니다.
is_separator_regex=False,
)
folder_path = './database'
images_path = os.path.join(folder_path, 'images')
tables_path = os.path.join(folder_path, 'tables')
Path(folder_path).mkdir(parents=True, exist_ok=True)
Path(images_path).mkdir(parents=True, exist_ok=True)
Path(tables_path).mkdir(parents=True, exist_ok=True)
embedding_file = get_filename()
embedding_file_path = os.path.join(folder_path, embedding_file)
# embedding.csv가 존재하면, 데이터프레임 df로 로드
if os.path.exists(embedding_file_path):
print(f"{embedding_file} is exist")
df = pd.read_csv(embedding_file_path)
# string으로 저장된 embedding을 list로 변환
df['embedding'] = df['embedding'].apply(ast.literal_eval)
# pdf파일을 읽어서 embedding하여 csv파일로 저장
else:
dataset_path = './bookdata/second'
pdf_files = [file for file in os.listdir(dataset_path) if file.endswith('.pdf')]
data = []
# 모든 PDF 파일을 순회하며 embedding
for file in pdf_files:
pdf_file_path = os.path.join(dataset_path, file)
reader = pdfplumber.open(pdf_file_path)
# 각 pdf의 페이지별로 embedding
for i, page in enumerate(reader.pages):
# 이미지 추출 및 경로 기록
image_paths = ';'.join([os.path.join(images_path, f"{Path(file).stem}_page_{i}_image_{image_index}.png")
for image_index, image in enumerate(page.images)])
# 이미지 저장
for image_index, image_dict in enumerate(page.images):
boxpoint = (image_dict['x0'], max(image_dict['top'],0), image_dict['x1'], image_dict['bottom'])
copy_crop = page.crop(boxpoint)
image_data = copy_crop.to_image(resolution=400)
image_filename = f"{Path(file).stem}_page_{i}_image_{image_index}.png"
image_file_path = os.path.join(images_path, image_filename)
image_data.save(image_file_path)
# 테이블 추출 및 경로 기록
table_paths = ';'.join([os.path.join(tables_path, f"{Path(file).stem}_page_{i}_table_{table_index}.csv")
for table_index, _ in enumerate(page.extract_tables())])
# 테이블 저장
for table_index, table in enumerate(page.extract_tables()):
table_filename = f"{Path(file).stem}_page_{i}_table_{table_index}.csv"
table_file_path = os.path.join(tables_path, table_filename)
pd.DataFrame(table).to_csv(table_file_path, index=False, header=False)
# 텍스트 추출
text = page.extract_text() if page.extract_text() else '' # 있을 경우 추출, 없으면 빈문자열
# 텍스트 분할
chunks = text_splitter.split_text(text)
for chunk in chunks:
data.append([file, i, chunk, image_paths, table_paths]) # 파일명, 페이지번호, 텍스트
# 데이터프레임으로 생성
# df = pd.DataFrame(data, columns=['filename', 'page', 'text'])
df = pd.DataFrame(data, columns=['filename', 'page', 'text', 'image_paths', 'table_paths'])
# embedding
df['embedding'] = df['text'].apply(lambda x: get_embedding(x, model="text-embedding-3-small"))
# csv파일로 저장
df.to_csv(embedding_file_path, index=False, encoding='utf-8')
2-2. pdf자료들을 임베딩하여, FAISS 로 저장(소스코드에 주석으로 설명)
- 물론 임베딩을 한번더 실행해도 되지만, 비용이 들기 때문에
- 2-1에서 이미 진행한 임베딩 자료(csv파일)를 읽어서, 해당 내용을 FAISS db에 저장
[embedding_faiss.py]
import ast
import os
import numpy as np
import openai
import pandas as pd
import faiss
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')
def create_and_save_faiss_index(embeddings, index_path):
# 첫 번째 임베딩을 기반으로 인덱스의 차원을 설정
d = len(embeddings[0])
index = faiss.IndexFlatL2(d)
# 임베딩을 순회하면서 인덱스에 추가
for embedding in embeddings:
# 각 임베딩을 넘파이 배열로 변환하고 차원을 (1, d)로 재조정
np_embedding = np.array(embedding, dtype='float32').reshape(1, -1)
index.add(np_embedding)
# 인덱스를 파일로 저장
faiss.write_index(index, index_path)
print(f"FAISS 인덱스가 {index_path}에 성공적으로 저장되었습니다.")
# 디렉토리 설정
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
FAISS_INDEX_DIR = os.path.join(CUR_DIR, 'faiss_index')
# FAISS_INDEX_NAME = "ai-kwon-book.index"
FAISS_INDEX_NAME = "gyeonggi-do-book.index"
# FAISS 인덱스를 저장할 디렉토리가 없다면 생성
if not os.path.exists(FAISS_INDEX_DIR):
os.makedirs(FAISS_INDEX_DIR)
# 인덱스 파일 경로
index_path = os.path.join(FAISS_INDEX_DIR, FAISS_INDEX_NAME)
# embedding.csv 파일 경로
folder_path = './database'
# embedding_file = 'embedding.csv'
embedding_file = 'second_embedding.csv'
embedding_file_path = os.path.join(folder_path, embedding_file)
# embedding.csv가 존재하면, 데이터프레임 df로 로드
if os.path.exists(embedding_file_path):
print(f"{embedding_file} is exist")
df = pd.read_csv(embedding_file_path)
# string으로 저장된 embedding을 list로 변환
df['embedding'] = df['embedding'].apply(ast.literal_eval)
# embedding을 numpy array로 변환
embeddings = df['embedding'].to_list()
# faiss 인덱스 생성
create_and_save_faiss_index(embeddings, index_path)
else:
print(f"{embedding_file} is not exist")
3. Frontend - 챗봇, 채팅log
3-1. 챗봇
- streamlit을 사용하여 개발
- 사용자의 채팅을 입력받습니다.
- 백엔드 서버, API_URL/chat으로 user_message를 보냅니다.
- 백엔드 서버에서 응답받은 내용을 채팅창에 출력합니다.
[app.py]
import os
import time
import requests
import streamlit as st
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv('API_URL')
def request_chat_api(user_message: str) -> str:
url = f"{API_URL}/chat"
resp = requests.post(
url,
json={
"user_message": user_message,
},
)
# 파이선 dict로 역직렬화
resp = resp.json()
resp = resp["answer"]
print(resp)
return resp
def init_streamlit():
st.title("AI Kwon chatbot")
# initial chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# 채팅 메세지 보여주기
for message in st.session_state.messages:
with st.chat_message(message['role']):
st.markdown(message['content'])
def chat_main():
# chat_input을 message에 할당하되, 빈 문자열일때만(엔터만 눌렀을때) 조건문
if message := st.chat_input(""):
# 사용자 메세지를 chat history에 추가
st.session_state.messages.append({"role": "user", "content": message})
# 유저 메세지 보여주기
with st.chat_message("user"):
st.markdown(message)
# api대답 받아서 보여주기 > chat api 호출
assistant_response = request_chat_api(message)
# 챗봇 메세지를 chat history에 추가
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
for lines in assistant_response.split("\n"):
for chunkl in lines.split():
full_response += chunkl + " "
time.sleep(0.05)
# add a blinking cursor
message_placeholder.markdown(full_response)
full_response += "\n"
message_placeholder.markdown(full_response)
st.session_state.messages.append(
{"role": "assistant", "content": full_response}
)
if __name__ == "__main__":
init_streamlit()
chat_main()
3-2. 채팅 로그
- 백엔드에서 사용자의 채팅과 AI응답을 매턴마다 csv파일로 저장합니다(원래는 DB에 저장해야 함)
- 해당 DB(csv파일)을 참고해서, 이때까지 있었던 채팅 로그를 확인합니다.
- 각 응답에서 최종적으로 참고했던 Top3 자료의 내용, 문헌제목, 페이지수 등을 보여줍니다(AI가 어떻게 응답했는지 체크할 수 있습니다. 그리고 RAG가 효과적으로 이뤄지고 있는지도 체크가능)
[chatlog.py]
import os
import pandas as pd
import requests
import streamlit as st
from dotenv import load_dotenv
load_dotenv()
API_URL = os.getenv('API_URL')
def init_streamlit():
st.title("상담 로그 분석기")
def show_chatlog():
st.write("채팅 로그")
url = f"{API_URL}/chatlog"
resp = requests.get(url)
resp = resp.json()
# # 데이터를 테이블로 나타냄
# st.table(resp)
# JSON 응답을 DataFrame으로 변환
df = pd.DataFrame(resp)
# # 개행 문자 처리를 위해 문자열 컬럼을 HTML로 변환 (예시)
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].str.replace('\n', '<br>', regex=False)
# # Streamlit으로 DataFrame 렌더링
# st.write(df.to_html(escape=False), unsafe_allow_html=True)
# 열 순서 재정렬
cols = ['User Input', 'RAG References', 'References Source', 'References Text Page', 'GPT Response']
df = df[cols]
# 새로운 열 생성 및 포맷팅
html_rows = []
for idx, row in df.iterrows():
html_row = f'<tr><td>{idx}</td><td>'
html_row += f'<table><tr><td><b>User Input:</b></td><td>{row["User Input"]}</td></tr>'
html_row += f'<tr><td><b>RAG References:</b></td><td>{row["RAG References"]}</td></tr>'
html_row += f'<tr><td><b>References Source:</b></td><td>{row["References Source"]}</td></tr>'
html_row += f'<tr><td><b>References Text Page:</b></td><td>{row["References Text Page"]}</td></tr>'
html_row += f'<tr><td><b>GPT Response:</b></td><td>{row["GPT Response"]}</td></tr></table></td></tr>'
html_rows.append(html_row)
html_table = '<table>' + ''.join(html_rows) + '</table>'
# Streamlit으로 HTML 테이블 렌더링
st.write(html_table, unsafe_allow_html=True)
if __name__ == "__main__":
init_streamlit()
show_chatlog()
4. Backend - RAG를 통한 AI응답
4-1. RAG작업(사용자 입력 채팅과 가장 관련 높은 참고자료 가져오기)
- 사용자의 채팅을 동일한 임베딩 모델("text-embedding-3-small")로 임베딩 하여, N차원 공간의 1개 점으로 구성(예: "안녕상담가능?" -> [132, 2, 3, 100, 200, ..............])
- 임베딩된 사용자의 채팅과 2에서 임베딩 해놓은 데이터(csv, faiss)테이블에서 cosine유사도로 가장 유사도가 높은 3개의 자료를 찾음
- 찾은 자료의 내용, 자료이름, 페이지 등을 담아서 반환
[rag.py]
import ast
import os
import numpy as np
import openai
import pandas as pd
from dotenv import load_dotenv
from numpy import dot
from numpy.linalg import norm
from openai import OpenAI
load_dotenv()
openai.api_key = os.getenv('OPENAI_API_KEY')
client = OpenAI()
# openai embedding함수
def get_embedding(text, model="text-embedding-3-small"):
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=model).data[0].embedding
def cos_sim(A, B):
return dot(A, B)/(norm(A)*norm(B))
def query_csv(query: str, use_retriever: bool = False):
# 입력된 문장 임베딩
query_embedding = get_embedding(
query,
model="text-embedding-3-small"
)
if use_retriever:
# csv파일을 읽어서 임베딩값과 가장 가까운 3개 문장을 반환
# K교수 저서
df = pd.read_csv('./database/embedding.csv')
# 경기도 상담메뉴얼
# df = pd.read_csv('./database/second_embedding.csv')
# 문자열로 저장된 embedding을 실제 숫자 배열로 변환
df['embedding'] = df['embedding'].apply(ast.literal_eval)
df['similarity'] = df.embedding.apply(lambda x: cos_sim(np.array(x), np.array(query_embedding)))
top_docs = df.sort_values('similarity', ascending=False).head(3)
else:
# TODO: 다른 경우 처리
top_docs = []
return top_docs
4-2. 최종 시스템 메세지 완성
- AI챗봇의 페르소나(예: 상담사, 친구 등등)를 system메세지로 줍니다.
- rag를 통해서, 사용자의 입력과 가장 관련이 높은 3개의 자료를 system메세지에 더해줍니다.
- 이렇게 완성된 시스템 메세지를 반환합니다.
[chain.py]
import os
from ver4.rag import query_csv
def read_prompt_template(file_path: str) -> str:
with open(file_path, "r") as f:
prompt_template = f.read()
return prompt_template
def create_prompt(query):
# 질문과 가장 관련있는 본문 3개 가져옴
result = query_csv(query, use_retriever=True)
print(result)
print(result.iloc[0]['filename'])
references_source = '\n\n'.join([result.iloc[0]['filename'],
result.iloc[2]['filename'],
result.iloc[1]['filename']])
references_text = '\n\n'.join([result.iloc[0]['text'],
result.iloc[2]['text'],
result.iloc[1]['text']])
# 'page' 열의 값을 문자열로 변환
result['page'] = result['page'].astype(str)
references_text_page = '\n\n'.join([result.iloc[0]['page'],
result.iloc[2]['page'],
result.iloc[1]['page']])
CUR_DIR = os.path.dirname(os.path.abspath(__file__))
SYSTEM_MESSAGE_TEMPLATE = os.path.join(CUR_DIR, "prompt_templates", "system_message2.txt")
system_message = read_prompt_template(SYSTEM_MESSAGE_TEMPLATE)
filled_system_message = system_message.format(document1=references_text.split('\n\n')[0],
document2=references_text.split('\n\n')[1],
document3=references_text.split('\n\n')[2])
return filled_system_message, references_source, references_text, references_text_page
4-3. 최종 시스템 메세지로 완성된 AI모델이 응답하기
- 페르소나, rag참고자료가 포함된 시스템 메세지를 주면서 AI생성
- 해당 AI에 사용자의 채팅 물음
- AI의 응답을 frontend로 전달
- 매턴의 채팅내용을 csv파일 또는 db에 저장
- langchain의 RunnableWithMessageHistory를 활용하여 이전 대화내용 기억하기(현재는 시스템 램을 사용하지만, 상용서비스 시 redis사용해야 함)
[api.py]
import os
from typing import Dict
import pandas as pd
from dotenv import load_dotenv
from fastapi import FastAPI
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import ConfigurableFieldSpec
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from ver4.chain import read_prompt_template, create_prompt
# 1. 시스템 템플릿 설정
# CUR_DIR = os.path.dirname(os.path.abspath(__file__))
# SYSTEM_MESSAGE_TEMPLATE = os.path.join(CUR_DIR, "prompt_templates", "system_message2.txt")
# system_message = read_prompt_template(SYSTEM_MESSAGE_TEMPLATE)
# 2. 모델 생성
load_dotenv()
api_key = os.getenv("OPENAI_API_KEY")
model = ChatOpenAI(model="gpt-3.5-turbo", api_key=api_key)
prompt = ChatPromptTemplate.from_messages(
[
# ("system", system_message),
("system", "{system_message}"),
# 대화 기록을 변수로 사용, history 가 MessageHistory 의 key 가 됨
MessagesPlaceholder(variable_name="history"),
("human", "{user_message}"), # 사용자 입력을 변수로 사용
]
)
runnable = prompt | model # 프롬프트와 모델을 연결하여 runnable 객체 생성
# 3. 세션 기록을 저장할 딕셔너리 생성한 모델생성
store = {} # 빈 딕셔너리를 초기화합니다.
def get_session_history(user_id: str, conversation_id: str) -> BaseChatMessageHistory:
# 주어진 user_id와 conversation_id에 해당하는 세션 기록을 반환합니다.
if (user_id, conversation_id) not in store:
# 해당 키가 store에 없으면 새로운 ChatMessageHistory를 생성하여 저장합니다.
store[(user_id, conversation_id)] = ChatMessageHistory()
return store[(user_id, conversation_id)]
with_message_history = RunnableWithMessageHistory(
runnable,
get_session_history,
input_messages_key="user_message",
history_messages_key="history",
history_factory_config=[ # 기존의 "session_id" 설정을 대체하게 됩니다.
ConfigurableFieldSpec(
id="user_id", # get_session_history 함수의 첫 번째 인자로 사용됩니다.
annotation=str,
name="User ID",
description="사용자의 고유 식별자입니다.",
default="",
is_shared=True,
),
ConfigurableFieldSpec(
id="conversation_id", # get_session_history 함수의 두 번째 인자로 사용됩니다.
annotation=str,
name="Conversation ID",
description="대화의 고유 식별자입니다.",
default="",
is_shared=True,
),
],
)
# 4. 대화 실행
app = FastAPI()
class UserRequest(BaseModel):
user_message: str
def create_chat_log_csv(user_input: str, use_rag: bool, rag_references: str, references_source: str, references_text_page: str, gpt_response: str):
# CSV 파일 경로를 설정합니다.
# K교수 저서 참고 대화내용 저장
csv_file_path = os.path.join('./chat_log', 'chat_logs.csv')
# 경기도 상담메뉴얼 대화내용 저장
# csv_file_path = os.path.join('./chat_log', 'chat_logs_second.csv')
# 새로운 로그 데이터를 DataFrame으로 생성합니다.
new_log = pd.DataFrame({
'User Input': [user_input],
'Use RAG': [use_rag],
'RAG References': [rag_references],
'References Source': [references_source],
'References Text Page': [references_text_page],
'GPT Response': [gpt_response]
})
# 파일이 존재하지 않거나 비어 있는 경우, 헤더를 포함하여 새로운 데이터를 저장합니다.
# 그렇지 않으면, 헤더 없이 데이터만 추가합니다.
if not os.path.isfile(csv_file_path) or os.stat(csv_file_path).st_size == 0:
# 파일이 존재하지 않거나 비어 있으면, 새 파일을 생성하고 헤더를 포함하여 데이터를 저장합니다.
new_log.to_csv(csv_file_path, mode='w', index=False, encoding='utf-8-sig', sep=';', header=True)
else:
# 파일이 이미 존재하고 비어 있지 않으면, 기존 데이터에 새 로그를 추가합니다(헤더 없음).
new_log.to_csv(csv_file_path, mode='a', index=False, encoding='utf-8-sig', sep=';', header=False)
@app.post("/chat")
def generate_answer(req: UserRequest) -> Dict[str, str]:
context = req.dict()
user_input = context["user_message"]
# RAG를 한 시스템 메세지 생성
use_rag = True
system_message, references_source, references_text, references_text_page = create_prompt(user_input)
# 사용자 입력을 받아서 대화를 실행합니다.
answer = with_message_history.invoke(
{
"user_message": user_input,
"system_message": system_message,
},
config={"configurable": {"user_id": "test1", "conversation_id": "test_conv1"}},
)
# 대화내용을 저장합니다
create_chat_log_csv(
user_input,
use_rag,
# [message for message in messages if message['role'] == 'system'],
references_text,
references_source,
references_text_page,
answer.content)
return {"answer": answer.content}
# AI Kwon 상담내용
@app.get("/chatlog")
async def read_csv_data():
csv_file_path = './chat_log/chat_logs.csv'
data = pd.read_csv(csv_file_path, sep=';')
return data.to_dict(orient='records')
4-3. 그동안의 채팅내용을 테이블로 보여주기
- csv파일 또는 db에 저장된 사용자와 ai의 대화를 턴별로 보여줍니다.
- 해당 응답시 사용되었던, Top3참고문헌 내용, 제목, 페이지 정보도 함께 볼수 있습니다
- 이를 통하여 RAG 성능의 정성적인 평가가 가능합니다.
- front의 chatlog.py를 실행하여, 백엔드서버의 API_URL/chatlog에 접속
[api.py 하단부에 삽입]
# AI Kwon 상담내용
@app.get("/chatlog")
async def read_csv_data():
csv_file_path = './chat_log/chat_logs.csv'
data = pd.read_csv(csv_file_path, sep=';')
return data.to_dict(orient='records')
5. 서비스 실행방법
5-1. 백앤드 서버
uvicorn api:app --reload
5-2. 프론트 서버
streamlit run app.py
streamlit run chatlog.py