memphis icon indicating copy to clipboard operation
memphis copied to clipboard

Feature request: Connector to Prefect for triggering flows

Open yanivbh1 opened this issue 2 years ago • 5 comments

Asked by

Peppie (Discord)

yanivbh1 avatar Jul 04 '23 11:07 yanivbh1

Linking documentation to launch Prefect flows:

rizziemma avatar Jul 04 '23 13:07 rizziemma

Are we talking about following flow?

  • Consume events by TBD
  • Call REST API of Prefect
  • Part of Memphis adapter or
  • Dedicated process based on adapter's infrastructure

g41797 avatar Jul 09 '23 17:07 g41797

I've implemented a very basic connector : a Memphis consumer + handler that transforms the message into a Prefect flow run. I use the Prefect REST API to easily generalize into a HTTP connector,

from __future__ import annotations
import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError
import os
import httpx
import ast


#PREFECT
api_url = os.getenv('PREFECT_API_URL')
headers = {
  "Authorization": f"Bearer {os.getenv('PREFECT_API_KEY')}"
}



async def handle_message(msgs, error, context):
    try:
        for msg in msgs:
            print("message: ", msg.get_data())
            data = ast.literal_eval(msg.get_data().decode("UTF-8"))

            async with httpx.AsyncClient() as client:
                payload = { "parameters": data.get("parameters", {})}
                deployment_id = data["deployment_id"]
                url = f"{api_url}/deployments/{deployment_id}/create_flow_run"
                print(f"sending payload {payload} to url {url}")
                response = await client.post(
                    url,
                    headers=headers,
                    json=payload
                )
                response.raise_for_status()
            
            await msg.ack()
            if error:
                print(error)
    except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
        print(e)
        return
    except (Exception) as e:
        print(e)
        return



async def main():
    try:
        memphis = Memphis()
        await memphis.connect(host=os.getenv("MEMPHIS_HOST"), username=os.getenv("MEMPHIS_USERNAME"), password=os.getenv("MEMPHIS_PASSWORD"), account_id=os.getenv("MEMPHIS_ACCOUNT"))
        
        consumer = await memphis.consumer(station_name="prefect", consumer_name=os.getenv("MEMPHIS_CONSUMER"))
        consumer.set_context({"target": "prefect"})
        consumer.consume(handle_message)
        # Keep your main thread alive so the consumer will keep receiving data
        await asyncio.Event().wait()

    except (MemphisError, MemphisConnectError) as e:
        print(e)
        
    finally:
        await memphis.close()
        
if __name__ == "__main__":
    asyncio.run(main())

Currently adapting it as a Kubernetes deployment, should be sufficient for our use case right now. Let me know if we can make anything useful from this!

rizziemma avatar Aug 01 '23 14:08 rizziemma

Hey @rizziemma , It's a great boilerplate and should be shared with the community! A) @Avitaltrifsik will send you a cool swag B) Once Memphis functions will be released, will be great to integrate it within.

yanivbh1 avatar Aug 04 '23 13:08 yanivbh1

Hey @rizziemma I would be happy for you to share it on Discord with the whole community, I am sure it can benefit more users! and also would love to contact you there to have your details for the swag pack :) Here is an invite!

Avitaltrifsik avatar Aug 06 '23 08:08 Avitaltrifsik