crewAI-examples icon indicating copy to clipboard operation
crewAI-examples copied to clipboard

CrewAI-LangGraph - possible to use imap server?

Open PiotrEsse opened this issue 1 year ago • 0 comments

Its possible to create a version of this tool with standard imap protocol? I was starting to work with this on 'nodes.pl' but maybe somebody would help/had already done this.

import imaplib
import email
import time
from datetime import datetime, timedelta

class Nodes():

  def __init__(self):
    self.mail = imaplib.IMAP4_SSL('mail.server.com', 993)
    self.mail.login('[email protected]', 'Password')
    self.mail.select('INBOX')

  def check_email(self, state):
    print("# Checking for new emails")

    # Search for emails in last day
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%d-%b-%Y")
    result, message_ids = self.mail.search(None, '(SINCE {date})'.format(date=yesterday))

    new_emails = []
    new_ids = []

    # Loop through new emails and fetch details
    for msg_id in message_ids[0].split():
      if msg_id not in state['checked_emails_ids']:
        result, data = self.mail.fetch(msg_id, '(RFC822)')
        email_msg = email.message_from_bytes(data[0][1])
        
        new_emails.append({
          'id': msg_id,
          'sender': email_msg['From'],
          'subject': email_msg['Subject']
        })
        new_ids.append(msg_id)

    # Update state
    state['checked_emails_ids'].extend(new_ids)
    state['emails'] = new_emails

    return state

  def wait_next_run(self, state):
    print("## Waiting for 3 minutes")  
    time.sleep(180)
    return state

  def new_emails(self, state):
    if len(state['emails']) == 0:
      print("## No new emails")
      return "end"
    else:
      print("## New emails")
      return "continue"


PiotrEsse avatar Jan 27 '24 12:01 PiotrEsse