Проблема с приемом данных от kwork
Попытался написать бота, но при попытке обработки сообщения приходит ошибка:
Traceback (most recent call last):
File "C:\Users\amirl\OneDrive\Рабочий стол\bots\KworkBot\main.py", line 52, in
Код бота:
import aiohttp from kwork import KworkBot from kwork.types import Message import logging import asyncio
logging.basicConfig(level=logging.INFO)
TELEGRAM_BOT_TOKEN = TELEGRAM_CHAT_ID =
class TelegramNotifier: def init(self, bot_token: str, chat_id: str): self.bot_token = bot_token self.chat_id = chat_id
async def send_notification(self, text: str):
url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
payload = {'chat_id': self.chat_id, 'text': text, "parse_mode": "HTML"}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
if response.status == 200:
logging.info("✅ Уведомление отправлено в Telegram")
except Exception as e:
logging.error(f"❌ Ошибка Telegram: {e}")
async def run(): notifier = TelegramNotifier(TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID) bot = KworkBot(login=, password=) me = await bot.get_me() print(me)
@bot.message_handler()
async def simple_handle(message: Message):
await message.answer_simulation(text="Сейчас отвечу")
telegram_message = (
f"🔔 <b>Новое сообщение на Kwork!</b>\n"
f"👤 <b>Аккаунт:</b> {bot.login}\n"
f"📝 <b>Пароль:</b> {bot.password}\n"
f"🆔 <b>Сообщение:</b> {message.text}"
)
await notifier.send_notification(telegram_message)
await bot.run_bot()
if name == "main": loop = asyncio.get_event_loop() loop.run_until_complete(run())
if "text" in json_event:
try:
# Декодируем URL-encoded строку
decoded_text = urllib.parse.unquote(json_event["text"])
logging.debug(f"Decoded text: {decoded_text}")
# Теперь парсим как JSON
json_event_data = json.loads(decoded_text)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse decoded JSON: {e}")
logging.warning(f"Problematic decoded text: {decoded_text}")
continue
else:
continue
Добавил эту строку в исходный код библиотеки