Files
chatgpt_telegram_bot/bot/bot.py
T
Karim Iskakov fdb91a8da6 Move to MongoDB
2023-01-19 14:56:02 -06:00

240 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import logging
import traceback
import html
import json
from datetime import datetime
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
ApplicationBuilder,
CallbackContext,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
filters
)
from telegram.constants import ParseMode, ChatAction
import config
import database
import chatgpt
# setup
db = database.Database()
logger = logging.getLogger(__name__)
HELP_MESSAGE = """Commands:
⚪ /retry Regenerate last bot answer
⚪ /new Start new dialog
⚪ /mode Select chat mode
⚪ /balance Show balance
⚪ /help Show help
"""
async def start_handle(update: Update, context: CallbackContext):
user = update.message.from_user
user_id = user.id
if not db.check_if_user_exists(user.id):
db.add_new_user(
user_id,
update.message.chat_id,
username=user.username,
first_name=user.first_name,
last_name= user.last_name
)
db.set_user_attribute(user_id, "last_interaction", datetime.now())
db.start_new_dialog(user_id)
reply_text = "Hi! I'm <b>ChatGPT</b> bot implemented with GPT-3.5 OpenAI API 🤖\n\n"
reply_text += HELP_MESSAGE
reply_text += "\nAnd now... ask me anything!"
await update.message.reply_text(reply_text, parse_mode=ParseMode.HTML)
async def help_handle(update: Update, context: CallbackContext):
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
await update.message.reply_text(HELP_MESSAGE, parse_mode=ParseMode.HTML)
async def retry_handle(update: Update, context: CallbackContext):
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
if len(dialog_messages) == 0:
await update.message.reply_text("No message to retry 🤷‍♂️")
return
last_dialog_message = dialog_messages.pop()
db.set_dialog_messages(user_id, dialog_messages, dialog_id=None) # last message was removed from the context
await message_handle(update, context, message=last_dialog_message["user"], use_new_dialog_timeout=False)
async def message_handle(update: Update, context: CallbackContext, message=None, use_new_dialog_timeout=True):
user_id = update.message.from_user.id
# new dialog timeout
if use_new_dialog_timeout:
if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout:
db.start_new_dialog(user_id)
await update.message.reply_text("Starting new dialog due to timeout ✅")
db.set_user_attribute(user_id, "last_interaction", datetime.now())
# send typing action
await update.message.chat.send_action(action="typing")
try:
message = message or update.message.text
answer, prompt, n_used_tokens, n_first_dialog_messages_removed = chatgpt.ChatGPT().send_message(
message,
dialog_messages=db.get_dialog_messages(user_id, dialog_id=None),
chat_mode=db.get_user_attribute(user_id, "current_chat_mode"),
)
# update user data
new_dialog_message = {"user": message, "bot": answer, "date": datetime.now()}
db.set_dialog_messages(
user_id,
db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
dialog_id=None
)
db.set_user_attribute(user_id, "n_used_tokens", n_used_tokens + db.get_user_attribute(user_id, "n_used_tokens"))
except Exception as e:
error_text = f"Something went wrong during completion. Reason: {e}"
logger.error(error_text)
await update.message.reply_text(error_text)
return
# send message if some messages were removed from the context
if n_first_dialog_messages_removed > 0:
if n_first_dialog_messages_removed == 1:
text = "✍️ <i>Note:</i> Your current dialog is too long, so your <b>first message</b> was removed from the context.\n Send /new command to start new dialog"
else:
text = f"✍️ <i>Note:</i> Your current dialog is too long, so <b>{n_first_dialog_messages_removed} first messages</b> were removed from the context.\n Send /new command to start new dialog"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
try:
await update.message.reply_text(answer, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest:
# answer has invalid characters, so we send it without parse_mode
await update.message.reply_text(answer)
async def new_dialog_handle(update: Update, context: CallbackContext):
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
db.start_new_dialog(user_id)
await update.message.reply_text("Starting new dialog ✅")
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
await update.message.reply_text(f"{chatgpt.CHAT_MODES[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML)
async def show_chat_modes_handle(update: Update, context: CallbackContext):
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
keyboard = []
for chat_mode, chat_mode_dict in chatgpt.CHAT_MODES.items():
keyboard.append([InlineKeyboardButton(chat_mode_dict["name"], callback_data=f"set_chat_mode|{chat_mode}")])
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text("Select chat mode:", reply_markup=reply_markup)
async def set_chat_mode_handle(update: Update, context: CallbackContext):
user_id = update.callback_query.from_user.id
query = update.callback_query
await query.answer()
chat_mode = query.data.split("|")[1]
db.set_user_attribute(user_id, "current_chat_mode", chat_mode)
db.start_new_dialog(user_id)
await query.edit_message_text(
f"<b>{chatgpt.CHAT_MODES[chat_mode]['name']}</b> chat mode is set",
parse_mode=ParseMode.HTML
)
await query.edit_message_text(f"{chatgpt.CHAT_MODES[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML)
async def show_balance_handle(update: Update, context: CallbackContext):
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
n_used_tokens = db.get_user_attribute(user_id, "n_used_tokens")
n_spent_dollars = n_used_tokens * (0.01 / 1000)
text = f"You spent <b>{n_spent_dollars:.03f}$</b>\n"
text += f"You used <b>{n_used_tokens}</b> tokens <i>(price: 0.01$ per 1000 tokens)</i>\n"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
async def error_handler(update: Update, context: CallbackContext) -> None:
logger.error(msg="Exception while handling an update:", exc_info=context.error)
# collect error message
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = "".join(tb_list)[:2000]
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
f"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
await context.bot.send_message(update.effective_chat.id, message, parse_mode=ParseMode.HTML)
def run_bot() -> None:
application = (
ApplicationBuilder()
.token(config.telegram_token)
.build()
)
# add handlers
if len(config.allowed_telegram_usernames) == 0:
user_filter = filters.ALL
else:
user_filter = filters.User(username=config.allowed_telegram_usernames)
application.add_handler(CommandHandler("start", start_handle, filters=user_filter))
application.add_handler(CommandHandler("help", help_handle, filters=user_filter))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND & user_filter, message_handle))
application.add_handler(CommandHandler("retry", retry_handle, filters=user_filter))
application.add_handler(CommandHandler("new", new_dialog_handle, filters=user_filter))
application.add_handler(CommandHandler("mode", show_chat_modes_handle, filters=user_filter))
application.add_handler(CallbackQueryHandler(set_chat_mode_handle, pattern="^set_chat_mode"))
application.add_handler(CommandHandler("balance", show_balance_handle, filters=user_filter))
application.add_error_handler(error_handler)
# start the bot
application.run_polling()
if __name__ == "__main__":
run_bot()