aioimaplib
aioimaplib copied to clipboard
AUTHENTICATE command
Hello, do you know the service where I can check this method?
Normally every server compliant with rfc3501 should implement this.
I've seen lines like this :
2018-05-09 10:00:41,006 DEBUG [aioimaplib:312] Received : b'* CAPABILITY IMAP4rev1 LITERAL+ SASL-IR LOGIN-REFERRALS ID ENABLE AUTH=PLAIN AUTH=LOGIN\r\nGEHP0 OK Pre-login capabilities listed, post-login capabilities have more.\r\n'
2018-05-09 10:00:41,006 DEBUG [aioimaplib:598] tagged status GEHP0 OK Pre-login capabilities listed, post-login capabilities have more.
With orange.fr, outlook.com, yahoo.com, gmail.com, free.fr
I'd say that whenever a server says SASL-IR as capability means it supports rfc-4959 so the AUTHENTICATE method.
Please add this. it is extremely important nowadays with e.g. Microsoft requiring XOAUTH2.
FYI here's a very ugly implementation that works. Proper upstream support would be nicer of course :)
class IMAP4ClientProtocolWithXOAUTH2(IMAP4ClientProtocol):
def __init__(self, loop, conn_lost_cb=None):
super().__init__(loop, conn_lost_cb)
self.literal_auth_data = None
def _continuation(self, line: bytes) -> None:
if self.pending_sync_command and self.pending_sync_command.name == 'AUTHENTICATE':
if self.literal_auth_data is None:
Abort('asked for literal auth data but have no literal auth data to send')
self.transport.write(self.literal_auth_data)
self.transport.write(CRLF)
self.literal_auth_data = None
return
super()._continuation(line)
@change_state
async def authenticate(self, mechanism: str, data: str):
self.literal_auth_data = base64.b64encode(data.encode('ascii'))
response = await self.execute(Command('AUTHENTICATE', self.new_tag(), mechanism, loop=self.loop))
if 'OK' == response.result:
self.state = AUTH
for line in response.lines:
if b'CAPABILITY' in line:
self.capabilities = self.capabilities.union(set(line.decode().replace('CAPABILITY', '').strip().split()))
return response
class IMAP4WithXOAUTH2(IMAP4):
def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
local_loop = loop if loop is not None else get_running_loop()
self.protocol = IMAP4ClientProtocolWithXOAUTH2(local_loop, conn_lost_cb)
local_loop.create_task(local_loop.create_connection(lambda: self.protocol, host, port, ssl=ssl_context))
async def authenticate(self, mechanism: str, data: str):
return await asyncio.wait_for(self.protocol.authenticate(mechanism, data), self.timeout)
class IMAP4WithXOAUTH2_SSL(IMAP4WithXOAUTH2):
def __init__(self, host = '127.0.0.1', port=993, loop=None, timeout=IMAP4.TIMEOUT_SECONDS, ssl_context=None):
super().__init__(host, port, loop, timeout, None, ssl_context)
def create_client(self, host, port, loop, conn_lost_cb=None, ssl_context=None):
if ssl_context is None:
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
super().create_client(host, port, loop, conn_lost_cb, ssl_context)
Used like this:
with open(oauthfile, 'r') as tokenfile:
oauthdata = 'user={}\1auth=Bearer {}\1\1'.format(user, f.read().strip())
await imap_client.authenticate('XOAUTH2', oauthdata)