aioimaplib icon indicating copy to clipboard operation
aioimaplib copied to clipboard

AUTHENTICATE command

Open bamthomas opened this issue 9 years ago • 4 comments

bamthomas avatar Jul 06 '16 08:07 bamthomas

Hello, do you know the service where I can check this method?

ikrivosheev avatar May 04 '18 15:05 ikrivosheev

Normally every server compliant with rfc3501 should implement this.

I've seen lines like this :

2018-05-09 10:00:41,006 DEBUG [aioimaplib:312] Received : b'* CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE AUTH=PLAIN AUTH=LOGIN\r\nGEHP0 OK Pre-login capabilities listed, post-login capabilities have more.\r\n'
2018-05-09 10:00:41,006 DEBUG [aioimaplib:598] tagged status GEHP0 OK Pre-login capabilities listed, post-login capabilities have more.

With orange.fr, outlook.com, yahoo.com, gmail.com, free.fr

I'd say that whenever a server says SASL-IR as capability means it supports rfc-4959 so the AUTHENTICATE method.

bamthomas avatar May 09 '18 08:05 bamthomas

Please add this. it is extremely important nowadays with e.g. Microsoft requiring XOAUTH2.

ThiefMaster avatar Nov 23 '22 09:11 ThiefMaster

FYI here's a very ugly implementation that works. Proper upstream support would be nicer of course :)

class IMAP4ClientProtocolWithXOAUTH2(IMAP4ClientProtocol):
    def __init__(self, loop, conn_lost_cb=None):
        super().__init__(loop, conn_lost_cb)
        self.literal_auth_data = None

    def _continuation(self, line: bytes) -> None:
        if self.pending_sync_command and self.pending_sync_command.name == 'AUTHENTICATE':
            if self.literal_auth_data is None:
                Abort('asked for literal auth data but have no literal auth data to send')
            self.transport.write(self.literal_auth_data)
            self.transport.write(CRLF)
            self.literal_auth_data = None
            return
        super()._continuation(line)

    @change_state
    async def authenticate(self, mechanism: str, data: str):
        self.literal_auth_data = base64.b64encode(data.encode('ascii'))
        response = await self.execute(Command('AUTHENTICATE', self.new_tag(), mechanism, loop=self.loop))
        if 'OK' == response.result:
            self.state = AUTH
            for line in response.lines:
                if b'CAPABILITY' in line:
                    self.capabilities = self.capabilities.union(set(line.decode().replace('CAPABILITY', '').strip().split()))
        return response


class IMAP4WithXOAUTH2(IMAP4):
    def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
        local_loop = loop if loop is not None else get_running_loop()
        self.protocol = IMAP4ClientProtocolWithXOAUTH2(local_loop, conn_lost_cb)
        local_loop.create_task(local_loop.create_connection(lambda: self.protocol, host, port, ssl=ssl_context))

    async def authenticate(self, mechanism: str, data: str):
        return await asyncio.wait_for(self.protocol.authenticate(mechanism, data), self.timeout)


class IMAP4WithXOAUTH2_SSL(IMAP4WithXOAUTH2):
    def __init__(self, host = '127.0.0.1', port=993, loop=None, timeout=IMAP4.TIMEOUT_SECONDS, ssl_context=None):
        super().__init__(host, port, loop, timeout, None, ssl_context)

    def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
        if ssl_context is None:
            ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        super().create_client(host, port, loop, conn_lost_cb, ssl_context)

Used like this:

with open(oauthfile, 'r') as tokenfile:
    oauthdata = 'user={}\1auth=Bearer {}\1\1'.format(user, f.read().strip())
await imap_client.authenticate('XOAUTH2', oauthdata)

ThiefMaster avatar Nov 23 '22 11:11 ThiefMaster