diff --git a/README.md b/README.md
index b066e6f..ab20f69 100644
--- a/README.md
+++ b/README.md
@@ -20,14 +20,10 @@ This repo is ChatGPT re-created with GPT-3.5 LLM as Telegram Bot. **And it works
You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/chatgpt_karfly_bot)
-## News
-- *9 Mar 2023*: Now you can easily create your own Chat Modes by editing `config/chat_modes.yml`
-- *8 Mar 2023*: Added voice message recognition with [OpenAI Whisper API](https://openai.com/blog/introducing-chatgpt-and-whisper-apis). Record a voice message and ChatGPT will answer you!
-- *2 Mar 2023*: Added support of [ChatGPT API](https://platform.openai.com/docs/guides/chat/introduction). It's enabled by default and can be disabled with `use_chatgpt_api` option in config. Don't forget to **rebuild** you docker image (`--build`).
-
## Features
- Low latency replies (it usually takes about 3-5 seconds)
- No request limits
+- Message streaming (watch demo)
- Voice message recognition
- Code highlighting
- Special chat modes: 👩🏼🎓 Assistant, 👩🏼💻 Code Assistant, 📝 Text Improver and 🎬 Movie Expert. You can easily create your own chat modes by editing `config/chat_modes.yml`
@@ -35,6 +31,18 @@ You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/cha
- List of allowed Telegram users
- Track $ balance spent on OpenAI API
+
+
+
+
+---
+
+## News
+- *15 Mar 2023*: Added message streaming. Now you don't have to wait until the whole message is ready, it's streamed to Telegram part-by-part (watch demo)
+- *9 Mar 2023*: Now you can easily create your own Chat Modes by editing `config/chat_modes.yml`
+- *8 Mar 2023*: Added voice message recognition with [OpenAI Whisper API](https://openai.com/blog/introducing-chatgpt-and-whisper-apis). Record a voice message and ChatGPT will answer you!
+- *2 Mar 2023*: Added support of [ChatGPT API](https://platform.openai.com/docs/guides/chat/introduction). It's enabled by default and can be disabled with `use_chatgpt_api` option in config. Don't forget to **rebuild** you docker image (`--build`).
+
## Bot commands
- `/retry` – Regenerate last bot answer
- `/new` – Start new dialog
@@ -48,16 +56,15 @@ You can deploy your own bot, or use mine: [@chatgpt_karfly_bot](https://t.me/cha
2. Get your Telegram bot token from [@BotFather](https://t.me/BotFather)
3. Edit `config/config.example.yml` to set your tokens and run 2 commands below (*if you're advanced user, you can also edit* `config/config.example.env`):
-```bash
-mv config/config.example.yml config/config.yml
-mv config/config.example.env config/config.env
-```
+ ```bash
+ mv config/config.example.yml config/config.yml
+ mv config/config.example.env config/config.env
+ ```
-🔥 And now **run**:
-
-```bash
-docker-compose --env-file config/config.env up --build
-```
+4. 🔥 And now **run**:
+ ```bash
+ docker-compose --env-file config/config.env up --build
+ ```
## ❤️ Top donations
You can be in this list:
diff --git a/bot/bot.py b/bot/bot.py
index 75dcc64..37ecb85 100644
--- a/bot/bot.py
+++ b/bot/bot.py
@@ -1,5 +1,6 @@
import os
import logging
+import asyncio
import traceback
import html
import json
@@ -23,6 +24,7 @@ from telegram.ext import (
CommandHandler,
MessageHandler,
CallbackQueryHandler,
+ AIORateLimiter,
filters
)
from telegram.constants import ParseMode, ChatAction
@@ -35,6 +37,7 @@ import openai_utils
# setup
db = database.Database()
logger = logging.getLogger(__name__)
+user_semaphores = {}
HELP_MESSAGE = """Commands:
⚪ /retry – Regenerate last bot answer
@@ -64,6 +67,9 @@ async def register_user_if_not_exists(update: Update, context: CallbackContext,
if db.get_user_attribute(user.id, "current_dialog_id") is None:
db.start_new_dialog(user.id)
+ if user.id not in user_semaphores:
+ user_semaphores[user.id] = asyncio.Semaphore(1)
+
async def start_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
@@ -89,6 +95,8 @@ async def help_handle(update: Update, context: CallbackContext):
async def retry_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
+ if await is_previous_message_not_answered_yet(update, context): return
+
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -110,72 +118,127 @@ async def message_handle(update: Update, context: CallbackContext, message=None,
return
await register_user_if_not_exists(update, context, update.message.from_user)
+ if await is_previous_message_not_answered_yet(update, context): return
+
user_id = update.message.from_user.id
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
+
+ async with user_semaphores[user_id]:
+ # new dialog timeout
+ if use_new_dialog_timeout:
+ if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
+ db.start_new_dialog(user_id)
+ await update.message.reply_text(f"Starting new dialog due to timeout ({openai_utils.CHAT_MODES[chat_mode]['name']} mode) ✅", parse_mode=ParseMode.HTML)
+ db.set_user_attribute(user_id, "last_interaction", datetime.now())
- # new dialog timeout
- if use_new_dialog_timeout:
- if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
- db.start_new_dialog(user_id)
- await update.message.reply_text(f"Starting new dialog due to timeout ({openai_utils.CHAT_MODES[chat_mode]['name']} mode) ✅", parse_mode=ParseMode.HTML)
- db.set_user_attribute(user_id, "last_interaction", datetime.now())
+ # send typing action
+ await update.message.chat.send_action(action="typing")
- # send typing action
- await update.message.chat.send_action(action="typing")
-
- try:
- message = message or update.message.text
-
- dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
- chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
-
- chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api)
- answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message(
- message,
- dialog_messages=dialog_messages,
- chat_mode=chat_mode
- )
-
- # update user data
- new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()}
- db.set_dialog_messages(
- user_id,
- db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
- dialog_id=None
- )
-
- db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens"))
-
- except Exception as e:
- error_text = f"Something went wrong during completion. Reason: {e}"
- logger.error(error_text)
- await update.message.reply_text(error_text)
- return
-
- # send message if some messages were removed from the context
- if n_first_dialog_messages_removed > 0:
- if n_first_dialog_messages_removed == 1:
- text = "✍️ Note: Your current dialog is too long, so your first message was removed from the context.\n Send /new command to start new dialog"
- else:
- text = f"✍️ Note: Your current dialog is too long, so {n_first_dialog_messages_removed} first messages were removed from the context.\n Send /new command to start new dialog"
- await update.message.reply_text(text, parse_mode=ParseMode.HTML)
-
- # split answer into multiple messages due to 4096 character limit
- for answer_chunk in split_text_into_chunks(answer, 4000):
try:
+ message = message or update.message.text
+
+ dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
parse_mode = {
"html": ParseMode.HTML,
"markdown": ParseMode.MARKDOWN
}[openai_utils.CHAT_MODES[chat_mode]["parse_mode"]]
-
- await update.message.reply_text(answer_chunk, parse_mode=parse_mode)
- except telegram.error.BadRequest:
- # answer has invalid characters, so we send it without parse_mode
- await update.message.reply_text(answer_chunk)
+
+ chatgpt_instance = openai_utils.ChatGPT(use_chatgpt_api=config.use_chatgpt_api)
+ if config.enable_message_streaming:
+ gen = chatgpt_instance.send_message_stream(message, dialog_messages=dialog_messages, chat_mode=chat_mode)
+ else:
+ answer, n_used_tokens, n_first_dialog_messages_removed = await chatgpt_instance.send_message(
+ message,
+ dialog_messages=dialog_messages,
+ chat_mode=chat_mode
+ )
+
+ async def fake_gen():
+ yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed
+
+ gen = fake_gen()
+
+ # send message to user
+ prev_answer = ""
+ i = -1
+ async for gen_item in gen:
+ i += 1
+
+ status = gen_item[0]
+ if status == "not_finished":
+ status, answer = gen_item
+ elif status == "finished":
+ status, answer, n_used_tokens, n_first_dialog_messages_removed = gen_item
+ else:
+ raise ValueError(f"Streaming status {status} is unknown")
+
+ answer = answer[:4096] # telegram message limit
+ if i == 0: # send first message (then it'll be edited if message streaming is enabled)
+ try:
+ sent_message = await update.message.reply_text(answer, parse_mode=parse_mode)
+ except telegram.error.BadRequest as e:
+ if str(e).startswith("Message must be non-empty"): # first answer chunk from openai was empty
+ i = -1 # try again to send first message
+ continue
+ else:
+ sent_message = await update.message.reply_text(answer)
+ else: # edit sent message
+ # update only when 100 new symbols are ready
+ if abs(len(answer) - len(prev_answer)) < 100 and status != "finished":
+ continue
+
+ try:
+ await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id, parse_mode=parse_mode)
+ except telegram.error.BadRequest as e:
+ if str(e).startswith("Message is not modified"):
+ continue
+ else:
+ await context.bot.edit_message_text(answer, chat_id=sent_message.chat_id, message_id=sent_message.message_id)
+
+ await asyncio.sleep(0.01) # wait a bit to avoid flooding
+
+ prev_answer = answer
+
+ # update user data
+ new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()}
+ db.set_dialog_messages(
+ user_id,
+ db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
+ dialog_id=None
+ )
+
+ db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens"))
+ except Exception as e:
+ error_text = f"Something went wrong during completion. Reason: {e}"
+ logger.error(error_text)
+ await update.message.reply_text(error_text)
+ return
+
+ # send message if some messages were removed from the context
+ if n_first_dialog_messages_removed > 0:
+ if n_first_dialog_messages_removed == 1:
+ text = "✍️ Note: Your current dialog is too long, so your first message was removed from the context.\n Send /new command to start new dialog"
+ else:
+ text = f"✍️ Note: Your current dialog is too long, so {n_first_dialog_messages_removed} first messages were removed from the context.\n Send /new command to start new dialog"
+ await update.message.reply_text(text, parse_mode=ParseMode.HTML)
+
+
+async def is_previous_message_not_answered_yet(update: Update, context: CallbackContext):
+ await register_user_if_not_exists(update, context, update.message.from_user)
+
+ user_id = update.message.from_user.id
+ if user_semaphores[user_id].locked():
+ text = "⏳ Please wait for a reply to the previous message"
+ await update.message.reply_text(text, reply_to_message_id=update.message.id, parse_mode=ParseMode.HTML)
+ return True
+ else:
+ return False
async def voice_message_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
+ if await is_previous_message_not_answered_yet(update, context): return
+
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -212,6 +275,8 @@ async def voice_message_handle(update: Update, context: CallbackContext):
async def new_dialog_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
+ if await is_previous_message_not_answered_yet(update, context): return
+
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -224,6 +289,8 @@ async def new_dialog_handle(update: Update, context: CallbackContext):
async def show_chat_modes_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
+ if await is_previous_message_not_answered_yet(update, context): return
+
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
@@ -247,11 +314,6 @@ async def set_chat_mode_handle(update: Update, context: CallbackContext):
db.set_user_attribute(user_id, "current_chat_mode", chat_mode)
db.start_new_dialog(user_id)
- await query.edit_message_text(
- f"{openai_utils.CHAT_MODES[chat_mode]['name']} chat mode is set",
- parse_mode=ParseMode.HTML
- )
-
await query.edit_message_text(f"{openai_utils.CHAT_MODES[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML)
@@ -297,7 +359,7 @@ async def error_handle(update: Update, context: CallbackContext) -> None:
)
# split text into multiple messages due to 4096 character limit
- for message_chunk in split_text_into_chunks(message, 4000):
+ for message_chunk in split_text_into_chunks(message, 4096):
try:
await context.bot.send_message(update.effective_chat.id, message_chunk, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest:
@@ -320,6 +382,7 @@ def run_bot() -> None:
ApplicationBuilder()
.token(config.telegram_token)
.concurrent_updates(True)
+ .rate_limiter(AIORateLimiter(max_retries=5))
.post_init(post_init)
.build()
)
diff --git a/bot/config.py b/bot/config.py
index 41ba363..2cc96aa 100644
--- a/bot/config.py
+++ b/bot/config.py
@@ -17,6 +17,7 @@ openai_api_key = config_yaml["openai_api_key"]
use_chatgpt_api = config_yaml.get("use_chatgpt_api", True)
allowed_telegram_usernames = config_yaml["allowed_telegram_usernames"]
new_dialog_timeout = config_yaml["new_dialog_timeout"]
+enable_message_streaming = config_yaml.get("enable_message_streaming", True)
mongodb_uri = f"mongodb://mongo:{config_env['MONGODB_PORT']}"
# chat_modes
diff --git a/bot/openai_utils.py b/bot/openai_utils.py
index 2b94b6b..032f90c 100644
--- a/bot/openai_utils.py
+++ b/bot/openai_utils.py
@@ -1,5 +1,6 @@
import config
+import tiktoken
import openai
openai.api_key = config.openai_api_key
@@ -58,6 +59,60 @@ class ChatGPT:
return answer, n_used_tokens, n_first_dialog_messages_removed
+ async def send_message_stream(self, message, dialog_messages=[], chat_mode="assistant"):
+ if chat_mode not in CHAT_MODES.keys():
+ raise ValueError(f"Chat mode {chat_mode} is not supported")
+
+ n_dialog_messages_before = len(dialog_messages)
+ answer = None
+ while answer is None:
+ try:
+ if self.use_chatgpt_api:
+ messages = self._generate_prompt_messages_for_chatgpt_api(message, dialog_messages, chat_mode)
+ r_gen = await openai.ChatCompletion.acreate(
+ model="gpt-3.5-turbo",
+ messages=messages,
+ stream=True,
+ **OPENAI_COMPLETION_OPTIONS
+ )
+
+ answer = ""
+ async for r_item in r_gen:
+ delta = r_item.choices[0].delta
+ if "content" in delta:
+ answer += delta.content
+ yield "not_finished", answer
+
+ n_used_tokens = self._count_tokens_for_chatgpt(messages, answer, model="gpt-3.5-turbo")
+ else:
+ prompt = self._generate_prompt(message, dialog_messages, chat_mode)
+ r_gen = await openai.Completion.acreate(
+ engine="text-davinci-003",
+ prompt=prompt,
+ stream=True,
+ **OPENAI_COMPLETION_OPTIONS
+ )
+
+ answer = ""
+ async for r_item in r_gen:
+ answer += r_item.choices[0].text
+ yield "not_finished", answer
+
+ n_used_tokens = self._count_tokens_for_gpt(prompt, answer, model="text-davinci-003")
+
+ answer = self._postprocess_answer(answer)
+
+ except openai.error.InvalidRequestError as e: # too many tokens
+ if len(dialog_messages) == 0:
+ raise ValueError("Dialog messages is reduced to zero, but still has too many tokens to make completion") from e
+
+ # forget first message in dialog_messages
+ dialog_messages = dialog_messages[1:]
+
+ n_first_dialog_messages_removed = n_dialog_messages_before - len(dialog_messages)
+
+ yield "finished", answer, n_used_tokens, n_first_dialog_messages_removed # sending final answer
+
def _generate_prompt(self, message, dialog_messages, chat_mode):
prompt = CHAT_MODES[chat_mode]["prompt_start"]
prompt += "\n\n"
@@ -90,6 +145,29 @@ class ChatGPT:
answer = answer.strip()
return answer
+ def _count_tokens_for_chatgpt(self, prompt_messages, answer, model="gpt-3.5-turbo"):
+ prompt_messages += [{"role": "assistant", "content": answer}]
+
+ encoding = tiktoken.encoding_for_model(model)
+ n_tokens = 0
+ for message in prompt_messages:
+ n_tokens += 4 # every message follows "{role/name}\n{content}\n"
+ for key, value in message.items():
+ if key == "role":
+ n_tokens += 1
+ elif key == "content":
+ n_tokens += len(encoding.encode(value))
+ else:
+ raise ValueError(f"Unknown key in message: {key}")
+
+ n_tokens -= 1 # remove 1 "" token
+ return n_tokens
+
+ def _count_tokens_for_gpt(self, prompt, answer, model="text-davinci-003"):
+ encoding = tiktoken.encoding_for_model(model)
+ n_tokens = len(encoding.encode(prompt)) + len(encoding.encode(answer)) + 1
+ return n_tokens
+
async def transcribe_audio(audio_file):
r = await openai.Audio.atranscribe("whisper-1", audio_file)
diff --git a/config/config.example.yml b/config/config.example.yml
index 08b17b7..fe6f189 100644
--- a/config/config.example.yml
+++ b/config/config.example.yml
@@ -3,7 +3,9 @@ openai_api_key: ""
use_chatgpt_api: true
allowed_telegram_usernames: [] # if empty, the bot is available to anyone. pass a username string to allow it and/or user ids as integers
new_dialog_timeout: 600 # new dialog starts after timeout (in seconds)
+enable_message_streaming: true # if set, messages will be shown to user word-by-word
+# prices
chatgpt_price_per_1000_tokens: 0.002
gpt_price_per_1000_tokens: 0.02
whisper_price_per_1_min: 0.006
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index c6d4c3d..354063b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,6 @@
-python-telegram-bot==20.1
+python-telegram-bot[rate-limiter]==20.1
openai>=0.27.0
+tiktoken>=0.3.0
PyYAML==6.0
pymongo==4.3.3
python-dotenv==0.21.0