-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathChatWithYourData_v2.py
162 lines (130 loc) · 6.74 KB
/
ChatWithYourData_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import streamlit as st
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import ConversationalRetrievalChain
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import UnstructuredFileLoader
from langchain.document_loaders.image import UnstructuredImageLoader
from langchain.document_loaders import ImageCaptionLoader
from langchain.docstore.document import Document
import os
import pytube
import openai
# Chat UI title
st.header("Upload your own file and ask questions like ChatGPT")
st.subheader('File types supported: PDF/DOCX/TXT/JPG/PNG/YouTube :city_sunrise:')
# File uploader in the sidebar on the left
with st.sidebar:
# Input for OpenAI API Key
openai_api_key = st.text_input("OpenAI API Key", type="password")
# Check if OpenAI API Key is provided
if not openai_api_key:
st.info("Please add your OpenAI API key to continue.")
st.stop()
# Set OPENAI_API_KEY as an environment variable
os.environ["OPENAI_API_KEY"] = openai_api_key
# Initialize ChatOpenAI model
llm = ChatOpenAI(temperature=0, max_tokens=16000, model_name="gpt-3.5-turbo-16k", streaming=True)
# Load version history from the text file
def load_version_history():
with open("version_history.txt", "r") as file:
return file.read()
# Sidebar section for uploading files and providing a YouTube URL
with st.sidebar:
uploaded_files = st.file_uploader("Please upload your files", accept_multiple_files=True, type=None)
youtube_url = st.text_input("YouTube URL")
# Create an expander for the version history in the sidebar
with st.sidebar.expander("**Version History**", expanded=False):
st.write(load_version_history())
st.info("Please refresh the browser if you decide to upload more files to reset the session", icon="🚨")
# Check if files are uploaded or YouTube URL is provided
if uploaded_files or youtube_url:
# Print the number of files uploaded or YouTube URL provided to the console
st.write(f"Number of files uploaded: {len(uploaded_files)}")
# Load the data and perform preprocessing only if it hasn't been loaded before
if "processed_data" not in st.session_state:
# Load the data from uploaded files
documents = []
if uploaded_files:
for uploaded_file in uploaded_files:
# Get the full file path of the uploaded file
file_path = os.path.join(os.getcwd(), uploaded_file.name)
# Save the uploaded file to disk
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
# Check if the file is an image
if file_path.endswith((".png", ".jpg")):
# Use ImageCaptionLoader to load the image file
image_loader = ImageCaptionLoader(path_images=[file_path])
# Load image captions
image_documents = image_loader.load()
# Append the Langchain documents to the documents list
documents.extend(image_documents)
elif file_path.endswith((".pdf", ".docx", ".txt")):
# Use UnstructuredFileLoader to load the PDF/DOCX/TXT file
loader = UnstructuredFileLoader(file_path)
loaded_documents = loader.load()
# Extend the main documents list with the loaded documents
documents.extend(loaded_documents)
# Load the YouTube audio stream if URL is provided
if youtube_url:
youtube_video = pytube.YouTube(youtube_url)
streams = youtube_video.streams.filter(only_audio=True)
stream = streams.first()
stream.download(filename="youtube_audio.mp4")
# Set the API key for Whisper
openai.api_key = openai_api_key
with open("youtube_audio.mp4", "rb") as audio_file:
transcript = openai.Audio.transcribe("whisper-1", audio_file)
youtube_text = transcript['text']
# Create a Langchain document instance for the transcribed text
youtube_document = Document(page_content=youtube_text, metadata={})
documents.append(youtube_document)
# Chunk the data, create embeddings, and save in vectorstore
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=150)
document_chunks = text_splitter.split_documents(documents)
embeddings = OpenAIEmbeddings()
vectorstore = Chroma.from_documents(document_chunks, embeddings)
# Store the processed data in session state for reuse
st.session_state.processed_data = {
"document_chunks": document_chunks,
"vectorstore": vectorstore,
}
else:
# If the processed data is already available, retrieve it from session state
document_chunks = st.session_state.processed_data["document_chunks"]
vectorstore = st.session_state.processed_data["vectorstore"]
# Initialize Langchain's QA Chain with the vectorstore
qa = ConversationalRetrievalChain.from_llm(llm, vectorstore.as_retriever())
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
if prompt := st.chat_input("Ask your questions?"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Query the assistant using the latest chat history
history = [
f"{message['role']}: {message['content']}"
for message in st.session_state.messages
]
result = qa({
"question": prompt,
"chat_history": history
})
# Display assistant response in chat message container
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = result["answer"]
message_placeholder.markdown(full_response + "|")
message_placeholder.markdown(full_response)
print(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})
else:
st.write("Please upload your files and provide a YouTube URL for transcription.")