[REQUEST] - want to use own UI to call kotaemon. But did not find good way to call API way.
Reference Issues
want to use own UI to call kotaemon. But did not find good way to call API way.
Summary
want to use own UI to call kotaemon. But did not find good way to call API way.
Basic Example
want to use own UI to call kotaemon. But did not find good way to call API way.
Drawbacks
want to use own UI to call kotaemon. But did not find good way to call API way.
Additional information
No response
@kksasa for enhancement request, would you mind elaborating more on the details?
Gradio itself (which Kotaemon is built upon) provide automated API wrapper. See https://www.gradio.app/guides/getting-started-with-the-python-client.
Here is a quick code snippet that you can try. (with Kotaemon running in port 7860).
import os.path
import sqlite3
import time
from functools import lru_cache
from typing import List
from gradio_client import Client
db_path = "ktem_app_data/user_data/sql.db"
@lru_cache(1)
def get_all_file_ids(db_path: str, indices: List[int] | None = None):
assert os.path.isfile(db_path)
file_ids: List[str] = []
conn = sqlite3.connect(db_path)
try:
c = conn.cursor()
if indices is None:
# Get all fileIndex
c.execute("SELECT * FROM ktem__index")
indices = [each[0] for each in c.fetchall()]
# For each index get all file_ids
for i in indices:
table_name = f"index__{i}__source"
c.execute(
f"SELECT * FROM {table_name}",
)
file_ids += [each[0] for each in c.fetchall()]
finally:
conn.close()
return file_ids
file_ids = get_all_file_ids(db_path)
print(f"File_ids: {file_ids}")
client = Client("http://localhost:7860/")
job = client.submit(
[
[
"What is the summary of this file",
None,
]
],
"select",
file_ids,
api_name="/chat_fn",
)
while not job.done():
time.sleep(0.1)
print(job.outputs()[-1])
Gradio itself (which Kotaemon is built upon) provide automated API wrapper. See https://www.gradio.app/guides/getting-started-with-the-python-client.
Here is a quick code snippet that you can try. (with Kotaemon running in port 7860).
import os.path import sqlite3 import time from functools import lru_cache from typing import List from gradio_client import Client db_path = "ktem_app_data/user_data/sql.db" @lru_cache(1) def get_all_file_ids(db_path: str, indices: List[int] | None = None): assert os.path.isfile(db_path) file_ids: List[str] = [] conn = sqlite3.connect(db_path) try: c = conn.cursor() if indices is None: # Get all fileIndex c.execute("SELECT * FROM ktem__index") indices = [each[0] for each in c.fetchall()] # For each index get all file_ids for i in indices: table_name = f"index__{i}__source" c.execute( f"SELECT * FROM {table_name}", ) file_ids += [each[0] for each in c.fetchall()] finally: conn.close() return file_ids file_ids = get_all_file_ids(db_path) print(f"File_ids: {file_ids}") client = Client("http://localhost:7860/") job = client.submit( [ [ "What is the summary of this file", None, ] ], "select", file_ids, api_name="/chat_fn", ) while not job.done(): time.sleep(0.1) print(job.outputs()[-1])
Thanks for the help, I will have a try. Have nice day.
Gradio itself (which Kotaemon is built upon) provide automated API wrapper. See https://www.gradio.app/guides/getting-started-with-the-python-client. Here is a quick code snippet that you can try. (with Kotaemon running in port 7860).
import os.path import sqlite3 import time from functools import lru_cache from typing import List from gradio_client import Client db_path = "ktem_app_data/user_data/sql.db" @lru_cache(1) def get_all_file_ids(db_path: str, indices: List[int] | None = None): assert os.path.isfile(db_path) file_ids: List[str] = [] conn = sqlite3.connect(db_path) try: c = conn.cursor() if indices is None: # Get all fileIndex c.execute("SELECT * FROM ktem__index") indices = [each[0] for each in c.fetchall()] # For each index get all file_ids for i in indices: table_name = f"index__{i}__source" c.execute( f"SELECT * FROM {table_name}", ) file_ids += [each[0] for each in c.fetchall()] finally: conn.close() return file_ids file_ids = get_all_file_ids(db_path) print(f"File_ids: {file_ids}") client = Client("http://localhost:7860/") job = client.submit( [ [ "What is the summary of this file", None, ] ], "select", file_ids, api_name="/chat_fn", ) while not job.done(): time.sleep(0.1) print(job.outputs()[-1])Thanks for the help, I will have a try. Have nice day.
Tired it, but looks not support stream way.
And there are so many reps (print(len(job.outputs())) = 200) , I don`t know which one is chat/llm resp
To use streaming mode, https://www.gradio.app/guides/getting-started-with-the-python-client#generator-endpoints may be this can help. We have not test this API in detail so you would have to perform experiments yourself.
no , it is not a real stream, just print all output from list..
To use streaming mode, https://www.gradio.app/guides/getting-started-with-the-python-client#generator-endpoints may be this can help. We have not test this API in detail so you would have to perform experiments yourself.
Thanks for the help.
like below , WA way to achieve stream mode in streamlit
import streamlit as st
from streamlit_extras.bottom_container import bottom
import os.path
import sqlite3
from functools import lru_cache
from typing import List
from gradio_client import Client
import html2text
def kotaemon_app(prompt):
db_path = "ktem_app_data/user_data/sql.db"
@lru_cache(1)
def get_all_file_ids(db_path: str, indices: List[int] | None = None):
assert os.path.isfile(db_path)
file_ids: List[str] = []
conn = sqlite3.connect(db_path)
try:
c = conn.cursor()
if indices is None:
# Get all fileIndex
c.execute("SELECT * FROM ktem__index")
indices = [each[0] for each in c.fetchall()]
# For each index get all file_ids
for i in indices:
table_name = f"index__{i}__source"
c.execute(
f"SELECT * FROM {table_name}",
)
file_ids += [each[0] for each in c.fetchall()]
finally:
conn.close()
return file_ids
file_ids = get_all_file_ids(db_path)
print(f"File_ids: {file_ids}")
client = Client("http://xxx:7860/")
job = client.submit(
[
[
prompt,
None,
]
],
"select",
file_ids,
api_name="/chat_fn",
)
from difflib import SequenceMatcher
def find_differences(str1, str2):
matcher = SequenceMatcher(None, str1, str2)
differences = []
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag != 'equal':
differences.append(str2[j1:j2])
return "".join(differences)
resps = ['']
content = ""
for o in job:
if 'Thinking' not in o[0][0][1]:
#print('------------------')
if len(resps[-1]) != len(o[0][0][1]):
content = find_differences(resps[-1], o[0][0][1])
resps.append(o[0][0][1])
yield content
#print(content, end="", flush=True)
else:
st.session_state.refers.append(o[-2])
#print(o[-2])
break
def chat_message_bot_default(prompt):
st.chat_message("user",avatar="🙂").markdown(prompt)
st.session_state.messages.append({"role": "user", "content": prompt})
full_response = ""
with st.spinner("bot is thinking..."):
try:
with st.chat_message("assistant",avatar="🤖"):
message_placeholder = st.empty()
for res in kotaemon_app(prompt):
full_response+=res
message_placeholder.markdown(full_response)
st.session_state.messages.append({"role": "assistant", "content": full_response})
except Exception as e:
message_placeholder = st.empty()
print(f"e =--->>> {e}")
st.chat_message("assistant",avatar="🤖").markdown(':red[API Bad Server]')
st.session_state.messages.append({"role": "assistant", "content": 'API Bad Server'})
st.set_page_config(
page_title="RAG_Bot",
page_icon="./cebot.ico",
layout='centered',
)
def Load_QA():
# Initialize chat history
if "messages" not in st.session_state:
st.session_state.messages = []
if "refers" not in st.session_state:
st.session_state.refers = []
for _message in st.session_state.messages:
if _message["role"] == 'user':
with st.chat_message(_message["role"],avatar="🙂"):
st.markdown(_message["content"])
else:
with st.chat_message(_message["role"],avatar="🤖"):
st.markdown(_message["content"].replace('\_','_').replace('\\n','\n').replace('\\\\ ','\\'))
with bottom():
prompt = st.chat_input("What is your question?")
if prompt:
chat_message_bot_default(prompt)
Load_QA()
with st.expander('refers',expanded=False):
if st.session_state.refers:
markdown = html2text.html2text(st.session_state.refers[0])
st.markdown(markdown)
st.session_state.refers = []