Add message streaming

* Add message streaming
* Update README.md
This commit is contained in:
Karim Iskakov
2023-03-15 19:20:43 +03:00
committed by GitHub
parent 2b4d4ad860
commit 1ba09de2b9
6 changed files with 226 additions and 74 deletions
+122 -59
View File
@@ -1,5 +1,6 @@
import os
import logging
import asyncio
import traceback
import html
import json
@@ -23,6 +24,7 @@ from telegram.ext import (
CommandHandler,
MessageHandler,
CallbackQueryHandler,
AIORateLimiter,
filters
)
from telegram.constants import ParseMode, ChatAction
@@ -35,6 +37,7 @@ import openai_utils
# setup
db = database.Database()
logger = logging.getLogger(__name__)
user_semaphores = {}
HELP_MESSAGE = """Commands:
⚪ /retry Regenerate last bot answer
@@ -64,6 +67,9 @@ async def register_user_if_not_exists(update: Update, context: CallbackContext,
if db.get_user_attribute(user.id, "current_dialog_id") is None:
db.start_new_dialog(user.id)
if user.id not in user_semaphores:
user_semaphores[user.id] = asyncio.Semaphore(1)
async def start_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
@@ -89,6 +95,8 @@ async def help_handle(update: Update, context: CallbackContext):
async def retry_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -110,72 +118,127 @@ async def message_handle(update: Update, context: CallbackContext, message=None,
return
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
async with user_semaphores[user_id]:
# new dialog timeout
if use_new_dialog_timeout:
if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
db.start_new_dialog(user_id)
await update.message.reply_text(f"Starting new dialog due to timeout (<b>{openai_utils.CHAT_MODES[chat_mode]['name']}</b> mode) ✅", parse_mode=ParseMode.HTML)
db.set_user_attribute(user_id, "last_interaction", datetime.now())
# new dialog timeout
if use_new_dialog_timeout:
if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
db.start_new_dialog(user_id)
await update.message.reply_text(f"Starting new dialog due to timeout (<b>{openai_utils.CHAT_MODES[chat_mode]['name']}</b> mode) ✅", parse_mode=ParseMode.HTML)
db.set_user_attribute(user_id, "last_interaction", datetime.now())
# send typing action
await update.message.chat.send_action(action="typing")
# send typing action
await update.message.chat.send_action(action="typing")
try:
message = message or update.message.text
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api)
answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message(
message,
dialog_messages=dialog_messages,
chat_mode=chat_mode
)
# update user data
new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()}
db.set_dialog_messages(
user_id,
db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
dialog_id=None
)
db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens"))
except Exception as e:
error_text = f"Something went wrong during completion. Reason: {e}"
logger.error(error_text)
await update.message.reply_text(error_text)
return
# send message if some messages were removed from the context
if n_first_dialog_messages_removed > 0:
if n_first_dialog_messages_removed == 1:
text = "✍️ <i>Note:</i> Your current dialog is too long, so your <b>first message</b> was removed from the context.\n Send /new command to start new dialog"
else:
text = f"✍️ <i>Note:</i> Your current dialog is too long, so <b>{n_first_dialog_messages_removed} first messages</b> were removed from the context.\n Send /new command to start new dialog"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
# split answer into multiple messages due to 4096 character limit
for answer_chunk in split_text_into_chunks(answer, 4000):
try:
message = message or update.message.text
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
parse_mode = {
"html": ParseMode.HTML,
"markdown": ParseMode.MARKDOWN
}[openai_utils.CHAT_MODES[chat_mode]["parse_mode"]]
await update.message.reply_text(answer_chunk, parse_mode=parse_mode)
except telegram.error.BadRequest:
# answer has invalid characters, so we send it without parse_mode
await update.message.reply_text(answer_chunk)
chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api)
if config.enable_message_streaming:
gen = chatgpt_instance.send_message_stream(message, dialog_messages=dialog_messages, chat_mode=chat_mode)
else:
answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message(
message,
dialog_messages=dialog_messages,
chat_mode=chat_mode
)
async def fake_gen():
yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed
gen = fake_gen()
# send message to user
prev_answer = ""
i = -1
async for gen_item in gen:
i += 1
status = gen_item[0]
if status == "not_finished":
status, answer = gen_item
elif status == "finished":
status, answer, n_used_tokens, n_first_dialog_messages_removed = gen_item
else:
raise ValueError(f"Streaming status {status} is unknown")
answer = answer[:4096] # telegram message limit
if i == 0: # send first message (then it'll be edited if message streaming is enabled)
try:
sent_message = await update.message.reply_text(answer, parse_mode=parse_mode)
except telegram.error.BadRequest as e:
if str(e).startswith("Message must be non-empty"): # first answer chunk from openai was empty
i = -1 # try again to send first message
continue
else:
sent_message = await update.message.reply_text(answer)
else: # edit sent message
# update only when 100 new symbols are ready
if abs(len(answer) - len(prev_answer)) < 100 and status != "finished":
continue
try:
await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id, parse_mode=parse_mode)
except telegram.error.BadRequest as e:
if str(e).startswith("Message is not modified"):
continue
else:
await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id)
await asyncio.sleep(0.01) # wait a bit to avoid flooding
prev_answer = answer
# update user data
new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()}
db.set_dialog_messages(
user_id,
db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
dialog_id=None
)
db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens"))
except Exception as e:
error_text = f"Something went wrong during completion. Reason: {e}"
logger.error(error_text)
await update.message.reply_text(error_text)
return
# send message if some messages were removed from the context
if n_first_dialog_messages_removed > 0:
if n_first_dialog_messages_removed == 1:
text = "✍️ <i>Note:</i> Your current dialog is too long, so your <b>first message</b> was removed from the context.\n Send /new command to start new dialog"
else:
text = f"✍️ <i>Note:</i> Your current dialog is too long, so <b>{n_first_dialog_messages_removed} first messages</b> were removed from the context.\n Send /new command to start new dialog"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
async def is_previous_message_not_answered_yet(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
if user_semaphores[user_id].locked():
text = "⏳ Please <b>wait</b> for a reply to the previous message"
await update.message.reply_text(text, reply_to_message_id=update.message.id, parse_mode=ParseMode.HTML)
return True
else:
return False
async def voice_message_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -212,6 +275,8 @@ async def voice_message_handle(update: Update, context: CallbackContext):
async def new_dialog_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -224,6 +289,8 @@ async def new_dialog_handle(update: Update, context: CallbackContext):
async def show_chat_modes_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -247,11 +314,6 @@ async def set_chat_mode_handle(update: Update, context: CallbackContext):
db.set_user_attribute(user_id, "current_chat_mode", chat_mode)
db.start_new_dialog(user_id)
await query.edit_message_text(
f"<b>{openai_utils.CHAT_MODES[chat_mode]['name']}</b> chat mode is set",
parse_mode=ParseMode.HTML
)
await query.edit_message_text(f"{openai_utils.CHAT_MODES[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML)
@@ -297,7 +359,7 @@ async def error_handle(update: Update, context: CallbackContext) -> None:
)
# split text into multiple messages due to 4096 character limit
for message_chunk in split_text_into_chunks(message, 4000):
for message_chunk in split_text_into_chunks(message, 4096):
try:
await context.bot.send_message(update.effective_chat.id, message_chunk, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest:
@@ -320,6 +382,7 @@ def run_bot() -> None:
ApplicationBuilder()
.token(config.telegram_token)
.concurrent_updates(True)
.rate_limiter(AIORateLimiter(max_retries=5))
.post_init(post_init)
.build()
)
+1
View File
@@ -17,6 +17,7 @@ openai_api_key = config_yaml["openai_api_key"]
use_chatgpt_api = config_yaml.get("use_chatgpt_api", True)
allowed_telegram_usernames = config_yaml["allowed_telegram_usernames"]
new_dialog_timeout = config_yaml["new_dialog_timeout"]
enable_message_streaming = config_yaml.get("enable_message_streaming", True)
mongodb_uri = f"mongodb://mongo:{config_env['MONGODB_PORT']}"
# chat_modes
+78
View File
@@ -1,5 +1,6 @@
import config
import tiktoken
import openai
openai.api_key = config.openai_api_key
@@ -58,6 +59,60 @@ class ChatGPT:
return answer, n_used_tokens, n_first_dialog_messages_removed
async def send_message_stream(self, message, dialog_messages=[], chat_mode="assistant"):
if chat_mode not in CHAT_MODES.keys():
raise ValueError(f"Chat mode {chat_mode} is not supported")
n_dialog_messages_before = len(dialog_messages)
answer = None
while answer is None:
try:
if self.use_chatgpt_api:
messages = self._generate_prompt_messages_for_chatgpt_api(message, dialog_messages, chat_mode)
r_gen = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=messages,
stream=True,
**OPENAI_COMPLETION_OPTIONS
)
answer = ""
async for r_item in r_gen:
delta = r_item.choices[0].delta
if "content" in delta:
answer += delta.content
yield "not_finished", answer
n_used_tokens = self._count_tokens_for_chatgpt(messages, answer, model="gpt-3.5-turbo")
else:
prompt = self._generate_prompt(message, dialog_messages, chat_mode)
r_gen = await openai.Completion.acreate(
engine="text-davinci-003",
prompt=prompt,
stream=True,
**OPENAI_COMPLETION_OPTIONS
)
answer = ""
async for r_item in r_gen:
answer += r_item.choices[0].text
yield "not_finished", answer
n_used_tokens = self._count_tokens_for_gpt(prompt, answer, model="text-davinci-003")
answer = self._postprocess_answer(answer)
except openai.error.InvalidRequestError as e: # too many tokens
if len(dialog_messages) == 0:
raise ValueError("Dialog messages is reduced to zero, but still has too many tokens to make completion") from e
# forget first message in dialog_messages
dialog_messages = dialog_messages[1:]
n_first_dialog_messages_removed = n_dialog_messages_before - len(dialog_messages)
yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed # sending final answer
def _generate_prompt(self, message, dialog_messages, chat_mode):
prompt = CHAT_MODES[chat_mode]["prompt_start"]
prompt += "\n\n"
@@ -90,6 +145,29 @@ class ChatGPT:
answer = answer.strip()
return answer
def _count_tokens_for_chatgpt(self, prompt_messages, answer, model="gpt-3.5-turbo"):
prompt_messages += [{"role": "assistant", "content": answer}]
encoding = tiktoken.encoding_for_model(model)
n_tokens = 0
for message in prompt_messages:
n_tokens += 4 # every message follows "<im_start>{role/name}\n{content}<im_end>\n"
for key, value in message.items():
if key == "role":
n_tokens += 1
elif key == "content":
n_tokens += len(encoding.encode(value))
else:
raise ValueError(f"Unknown key in message: {key}")
n_tokens -= 1 # remove 1 "<im_end>" token
return n_tokens
def _count_tokens_for_gpt(self, prompt, answer, model="text-davinci-003"):
encoding = tiktoken.encoding_for_model(model)
n_tokens = len(encoding.encode(prompt)) + len(encoding.encode(answer)) + 1
return n_tokens
async def transcribe_audio(audio_file):
r = await openai.Audio.atranscribe("whisper-1", audio_file)