plombery icon indicating copy to clipboard operation
plombery copied to clipboard

Trigger from another task

Open dvett01 opened this issue 1 year ago • 3 comments

Hi, I am wondering if it is possible to trigger another plombery task (job2) from another plombery task(job1)?

I have a task that is running some code(job1), and sometimes it needs to renew the auth token(job2). If it needs to renew, I tried to hit the api endpoint of job2, but it seems that jobs are blocking each other and plombery freezes.

Maybe someone can help me to get my head around it?

dvett01 avatar Jan 04 '24 09:01 dvett01

Can i see a snippet of your code ? The best solution i see here is to trigger the second task by making a post request to the pipeline endpoint

CurtisFL avatar Feb 17 '24 23:02 CurtisFL

by making a post request to the pipeline endpoint

Yes, currently plombery cannot dynamically edit/add/remove triggers by running jobs. the only way is to trigger by http api. but now the author did not post any http api documents.

I am also eager to this feature.

mtroym avatar Jul 18 '24 15:07 mtroym

by making a post request to the pipeline endpoint

Yes, currently plombery cannot dynamically edit/add/remove triggers by running jobs. the only way is to trigger by http api. but now the author did not post any http api documents.

I am also eager to this feature.

Its fastapi it has docs inside of it by default no?

Anyways, cant we do:

def schedule_task(pipeline_id, params={}):
    resp = requests.post(f'http://{host}:{port}/api/runs/', json={'pipeline_id': pipeline_id, 'params': params})
    id = resp.json()['id']
    return id


def check_task_status(run_id):
    resp = requests.get(f'http://{host}:{port}/api/runs/{run_id}').json()
    if resp['status'] == 'completed':
        return 1
    elif resp['status'] == 'failed':
        return -1
    return 0


def get_task_data(run_id):
    resp = requests.get(f'http://{host}:{port}/api/runs/{run_id}').json()
    task_id = resp['tasks_run'][0]['task_id']
    data = requests.get(f'http://{host}:{port}/api/runs/{run_id}/data/{task_id}').json()
    return json.loads(data)

I used schedule_task inside a task for another task and it worked.

D3m0n1cC09n1710n avatar Aug 03 '24 19:08 D3m0n1cC09n1710n