Telethon icon indicating copy to clipboard operation
Telethon copied to clipboard

Telethon can't send to a specific chat

Open MuhammadSawalhy opened this issue 2 years ago • 3 comments

Checklist

  • [x] The error is in the library's code, and not in my own.
  • [x] I have searched for this issue before posting it and there isn't a duplicate.
  • [x] I ran pip install -U https://github.com/LonamiWebs/Telethon/archive/master.zip and triggered the bug in the latest version.

Code that causes the issue Note: this code is not my code, cuz my code is somehow tons of lines and may be complicated. But It is what I am trying to do.

When the TARGET_USERNAME equals any other valid username, Telethon sends it without any problem. I don't know what is the problem here but it is likely from Telethon itself. Telethon can receive messages from the change due to the event listener as you can see in the code. However, for some reason, Telethon can not send any message to this specific chat: @da77e7_civil_bot.

I tried to remove the session and make a new one with a disappointment of the same issue still happens unpredictably.

from telethon import TelegramClient, events
from collections import deque
import asyncio

TARGET_USERNAME = "@da77e7_civil_bot"

async def send_to_target(message, telegram_client):
    done = False
    list_of_messages = deque([
        "message 1",
        "message 2",
        "message 3",
    ])

    async def send_message():
        message = list_of_messages.popleft()
        await telegram_client.send_message(TARGET_USERNAME, message)

    @telegram_client.on(events.NewMessage(from_users=TARGET_USERNAME))
    async def _(event):
        nonlocal done
        recieved_message = event.message.message.split("\n")[
            0]  # first line only
        logging.info(f"[{TARGET_USERNAME}] recieved: " + recieved_message)
        if len(list_of_messages) > 0:
            await send_message()
        else:
            done = True

    await send_message()
    while not done:
        await asyncio.sleep(1)

# then here I will use a thread and a loop inside it to run the send_to_target function
# I also send to multiple targets at once using asyncio.wait(tasks, timeout=30)

Telethon DEBUG logs with some logs of mine

Using selector: EpollSelector
[07/Apr/2022 20:44:48] "POST /broadcast/message/ HTTP/1.1" 302 0
Connecting to 149.154.167.92:443/TcpFull...
Connection attempt 1...
Connection success!
Starting send loop
Starting receive loop
Connection to 149.154.167.92:443/TcpFull complete!
Waiting for messages to send...
Assigned msg_id = 7083965680032254644 to InvokeWithLayerRequest (7fb4ecdff0d0)
Encrypting 1 message(s) in 80 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Receiving items from the network...
Handling bad salt for message 7083965680032254644
1 message(s) will be resent
Receiving items from the network...
Assigned msg_id = 7083965680937886308 to InvokeWithLayerRequest (7fb4ecdff0d0)
Encrypting 1 message(s) in 80 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Assigned msg_id = 7083965680940522264 to MsgsAck (7fb4ecdfee90)
Encrypting 1 message(s) in 36 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Handling bad msg BadMsgNotification(bad_msg_id=7083965680937886308, bad_msg_seqno=3, error_code=17)
Updated time offset (old offset 0, bad 7083965681561392852, good 7083934730803531777, new -7207)
System clock is wrong, set time offset to -7207s
1 messages will be resent due to bad msg
Receiving items from the network...
Assigned msg_id = 7083934727733104336 to InvokeWithLayerRequest (7fb4ecdff0d0)
Encrypting 1 message(s) in 80 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Assigned msg_id = 7083934727734432804 to MsgsAck (7fb4ed05e7a0)
Encrypting 1 message(s) in 36 bytes for sending
Handling bad msg BadMsgNotification(bad_msg_id=7083965680940522264, bad_msg_seqno=4, error_code=17)
System clock is wrong, set time offset to -7207s
1 messages will be resent due to bad msg
Receiving items from the network...
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Assigned msg_id = 7083934727736997236 to MsgsAck (7fb4ecdfee90)
Assigned msg_id = 7083934727737218488 to MsgsAck (7fb4ee4703a0)
Encrypting 2 message(s) in 96 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Handling container
Handling new session created
Handling acknowledge for [7083934727733104336]
Receiving items from the network...
Handling RPC result for message 7083934727733104336
Receiving items from the network...
Assigned msg_id = 7083934728395837416 to GetUsersRequest (7fb4ece60550)
Encrypting 1 message(s) in 32 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Assigned msg_id = 7083934728397019016 to MsgsAck (7fb4ee457490)
Encrypting 1 message(s) in 60 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Handling RPC result for message 7083934728395837416
Receiving items from the network...
Using selector: EpollSelector
[@da77e7_civil_bot] Alhamdulilah, all are done 💚
[@da77e7_civil_bot] sending: صل على النبي
Assigned msg_id = 7083934733413443796 to ResolveUsernameRequest (7fb4ecda75b0)
Encrypting 1 message(s) in 40 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Assigned msg_id = 7083934733415362588 to MsgsAck (7fb4ecda4400)
Encrypting 1 message(s) in 36 bytes for sending
Encrypted messages put in a queue to be sent
Waiting for messages to send...
Handling RPC result for message 7083934733413443796
Receiving items from the network...
Handling update UpdateShort
Receiving items from the network...
Handling container
Handling update UpdateShort
Handling update UpdateShort
Receiving items from the network...
Handling update UpdateShort
Receiving items from the network...
Handling gzipped data
Handling update Updates
Receiving items from the network...
Handling gzipped data
Handling update Updates
Receiving items from the network...
[@da77e7_civil_bot] recieved: اختار/ (ي) ي بشمهندس/ (ة)👷‍♂👇
Handling update UpdateShort
Receiving items from the network...
Disconnecting from 149.154.167.92:443/TcpFull...
Closing current connection...
Cancelling 0 pending message(s)...
Disconnection from 149.154.167.92:443/TcpFull complete!
Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<send_to_bot() running at /home/ms/myp/da7ee7-bot-platform/broadcast/broadcast.py:190> wait_for=<Future finished result=None>>

MuhammadSawalhy avatar Apr 07 '22 19:04 MuhammadSawalhy

I can see in the logs multiple Waiting for messages to send, I don't know what is the reason for that.

MuhammadSawalhy avatar Apr 07 '22 19:04 MuhammadSawalhy

OK, now Telethon can send to this chat, I also don't know why.

OH, another chat the Telethon can't send to now, @Da7ee7_Civil_1st_Year_Bot.

Any help, please!

MuhammadSawalhy avatar Apr 07 '22 19:04 MuhammadSawalhy

I think I have fixed this issue. Telethon runs in a thread inside a Django app which is also not the main thread but another secondary thread.

In this thread, I have multiple tasks that send messages to multiple targets at once using asyncio.wait(tasks, timeout=30). Telethon doesn't send directly inside these nested asyncio things. I have to debounce the send method that contains client.send_message to wait until the target completes his reply. Debouncing uses threading.Timer.

Now I have dove deeply inside threads and asycio universe so I lost the connection with the universe of Telethon.

A missing piece in this puzzle is that when the message is successfully sent to the targeted chat, await client.send_message doesn't actually stop awaiting. It stops waiting forever and the line of code after await which logs some message indicating success is never logged. So I was forced to move it before await.

I couldn't await client.send_file as well, telethon freezes with no progress. So this may be a result of the same issue.

import asyncio
from threading import Timer
from telethon import TelegramClient, events
from collections import deque

targets = [
    "username1",
    "username2",
    "username",
]


def send_to_targets_in_background(message, *, image, bots_usernames):
    broadcasting_thread = threading.Thread(
        target=send_to_targets_sync,
        args=(message,),
        kwargs={
            'image': image,
            'bots_usernames': bots_usernames,
        })
    broadcasting_thread.daemon = True
    broadcasting_thread.start()


def send_to_targets_sync(targets, message):
    try:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(send_to_tagets(targets, message))
        loop.close()
    finally:
        logging.info("*."*10 + " sending to targets is done")


async def send_to_targets(targets, message):
    async with get_telegram_client() as telegram_client:
        tasks = []
        for target in targets:
            task = asyncio.create_task(send_to_target(target, message))
            tasks.append(task)
        await asyncio.wait(tasks, timeout=30)


def get_telegram_client():
    return TelegramClient(StringSession(TELETHON_SESSION), TELEGRAM_API_ID, TELEGRAM_API_HASH)


async def send_to_target(message, telegram_client):
    done = False
    list_of_messages = deque([
        "message 1",
        "message 2",
        "message 3",
    ])

    @debounce_async(0.5)
    async def send_message():
        message = list_of_messages.popleft()
        await telegram_client.send_message(TARGET_USERNAME, message)

    @telegram_client.on(events.NewMessage(from_users=TARGET_USERNAME))
    async def _(event):
        nonlocal done
        recieved_message = event.message.message.split("\n")[
            0]  # first line only
        logging.info(f"[{TARGET_USERNAME}] recieved: " + recieved_message)
        if len(list_of_messages) > 0:
            await send_message()
        else:
            done = True

    await send_message()
    while not done:
        await asyncio.sleep(1)


def debounce_async(secs):
    def decorator(func):
        @debounce(secs)
        def intermediate_function(*args, **kwargs):
            asyncio.run(func(*args, **kwargs))
        async def wrapper(*args, **kwargs):
            intermediate_function(*args, **kwargs)
        return wrapper
    return decorator


def debounce(secs):
    """ Decorator that will postpone a functions
        execution until after wait seconds
        have elapsed since the last time it was invoked. """
    def decorator(fn):
        task = None
        def debounced(*args, **kwargs):
            nonlocal task
            def call_it():
                fn(*args, **kwargs)
            if task:
                task.cancel()
            task = Timer(secs, call_it)
            task.start()

        return debounced
    return decorator

To get everything working together I abandoned the deboucing and replaced it with some similar functionality that solved my issue.

All I did is to track the time of the received messages, and check if the last received message time is larger than X seconds, then send the next message and empty the list of received messages. Repeat the process to until all the messages are sent.

MuhammadSawalhy avatar Apr 08 '22 06:04 MuhammadSawalhy

Closing as I don't think there's anything to do in the Telethon side. I appreciate your investigation, since it may be helpful to others.

Lonami avatar Sep 20 '22 14:09 Lonami