crewAI-examples
crewAI-examples copied to clipboard
CrewAI-LangGraph - possible to use imap server?
Its possible to create a version of this tool with standard imap protocol? I was starting to work with this on 'nodes.pl' but maybe somebody would help/had already done this.
import imaplib
import email
import time
from datetime import datetime, timedelta
class Nodes():
def __init__(self):
self.mail = imaplib.IMAP4_SSL('mail.server.com', 993)
self.mail.login('[email protected]', 'Password')
self.mail.select('INBOX')
def check_email(self, state):
print("# Checking for new emails")
# Search for emails in last day
yesterday = (datetime.now() - timedelta(days=1)).strftime("%d-%b-%Y")
result, message_ids = self.mail.search(None, '(SINCE {date})'.format(date=yesterday))
new_emails = []
new_ids = []
# Loop through new emails and fetch details
for msg_id in message_ids[0].split():
if msg_id not in state['checked_emails_ids']:
result, data = self.mail.fetch(msg_id, '(RFC822)')
email_msg = email.message_from_bytes(data[0][1])
new_emails.append({
'id': msg_id,
'sender': email_msg['From'],
'subject': email_msg['Subject']
})
new_ids.append(msg_id)
# Update state
state['checked_emails_ids'].extend(new_ids)
state['emails'] = new_emails
return state
def wait_next_run(self, state):
print("## Waiting for 3 minutes")
time.sleep(180)
return state
def new_emails(self, state):
if len(state['emails']) == 0:
print("## No new emails")
return "end"
else:
print("## New emails")
return "continue"