dbx icon indicating copy to clipboard operation
dbx copied to clipboard

Pause test workflow when changes are deployed to production

Open panoptikum opened this issue 2 years ago • 3 comments
trafficstars

Hello,

I am currently thinking about how to implement a process that automatically switches the Job attribute "PAUSE_STATUS" to PAUSED, if I push changes either per tag or whatever method to production. Is this something dbx somehow supports? I could not find anything... Right now, we usually set up new pipelines in the test environment first than we ship things to production. If I do changes now, we activate that test workflow again but it does not get switched off again if changes move to production as well.

Any input on this matter is much appreciated.

Best Felix

panoptikum avatar Sep 07 '23 12:09 panoptikum

@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do.

def dbx_adhoc_run():
    args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
    print("")  # separate next printed line from args

    non_default_args = args.get_non_default_args()

    if non_default_args:
        args.print_non_default_args(non_default_args)

    if not non_default_args:
        echo("Skipping adhoc run because all args are default")
        return

    args.adhoc = True

    create_deployment(args)

    # this will deploy with a different name ("adhoc" added to name)
    dbx_deploy(args=args)

    w = WorkspaceClient()

    with pause_current_job(w, args):
        with one_time_job_run(w, args) as run:
            print("")
            echo(f"Created new run: {run.run_page_url}")

            try:
                _ = w.jobs.wait_get_run_job_terminated_or_skipped(
                    run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
                )

                run_status = w.jobs.get_run(run_id=run.run_id)

                task_output_dict: dict[str, RunOutput] = {}
                for task in run_status.tasks:
                    task_output = w.jobs.get_run_output(run_id=task.run_id)
                    task_output_dict[task.task_key] = task_output

                echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
                for task_key, task_output in task_output_dict.items():
                    echo(f"Task logs: {task_key}\n")
                    echo(task_output.logs)
            except KeyboardInterrupt:
                print("")  # skip the line that has the ^C
                echo("Keyboard interrupt, canceling run")
                w.jobs.cancel_run_and_wait(run_id=run.run_id)
                raise

            if run_status.state.result_state != RunResultState.SUCCESS:
                for task_key, task_output in task_output_dict.items():
                    if task_output.error:
                        echo(f"Error: {task_key}\n")
                        echo("Error: ", task_output.error)
                        echo("Error Trace:", task_output.error_trace)

                raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")


@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
    exc = None
    try:
        one_time_job = get_job(w, get_package_name(args))

        run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)

        run = w.jobs.get_run(run_id=run_wait.run_id)

        yield run
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Deleting one time job")
        w.jobs.delete(job_id=one_time_job.job_id)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
    exc = None
    try:
        # get the standard job name so we can pause the schedule and cancel any running jobs
        job_name = get_package_name(args.catalog)
        standard_job = get_job(w, job_name)
        job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
        for job_run in job_runs:
            echo(f"Canceling existing run: {job_run.run_id}")
            w.jobs.cancel_run_and_wait(run_id=job_run.run_id)

        orig_settings = standard_job.settings

        if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
            yield
            return

        echo("Pausing schedule")
        copied_settings = deepcopy(orig_settings)
        copied_settings.schedule.pause_status = PauseStatus.PAUSED

        w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)

        yield
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Unpausing schedule")
        w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


def get_job(w: WorkspaceClient, job_name: str):
    job_list = list(w.jobs.list(name=job_name))

    job_id = job_list[0].job_id
    existing_job = w.jobs.get(job_id=job_id)
    return existing_job

NodeJSmith avatar Sep 08 '23 20:09 NodeJSmith

@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do.

def dbx_adhoc_run():
    args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
    print("")  # separate next printed line from args

    non_default_args = args.get_non_default_args()

    if non_default_args:
        args.print_non_default_args(non_default_args)

    if not non_default_args:
        echo("Skipping adhoc run because all args are default")
        return

    args.adhoc = True

    create_deployment(args)

    # this will deploy with a different name ("adhoc" added to name)
    dbx_deploy(args=args)

    w = WorkspaceClient()

    with pause_current_job(w, args):
        with one_time_job_run(w, args) as run:
            print("")
            echo(f"Created new run: {run.run_page_url}")

            try:
                _ = w.jobs.wait_get_run_job_terminated_or_skipped(
                    run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
                )

                run_status = w.jobs.get_run(run_id=run.run_id)

                task_output_dict: dict[str, RunOutput] = {}
                for task in run_status.tasks:
                    task_output = w.jobs.get_run_output(run_id=task.run_id)
                    task_output_dict[task.task_key] = task_output

                echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
                for task_key, task_output in task_output_dict.items():
                    echo(f"Task logs: {task_key}\n")
                    echo(task_output.logs)
            except KeyboardInterrupt:
                print("")  # skip the line that has the ^C
                echo("Keyboard interrupt, canceling run")
                w.jobs.cancel_run_and_wait(run_id=run.run_id)
                raise

            if run_status.state.result_state != RunResultState.SUCCESS:
                for task_key, task_output in task_output_dict.items():
                    if task_output.error:
                        echo(f"Error: {task_key}\n")
                        echo("Error: ", task_output.error)
                        echo("Error Trace:", task_output.error_trace)

                raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")


@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
    exc = None
    try:
        one_time_job = get_job(w, get_package_name(args))

        run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)

        run = w.jobs.get_run(run_id=run_wait.run_id)

        yield run
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Deleting one time job")
        w.jobs.delete(job_id=one_time_job.job_id)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
    exc = None
    try:
        # get the standard job name so we can pause the schedule and cancel any running jobs
        job_name = get_package_name(args.catalog)
        standard_job = get_job(w, job_name)
        job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
        for job_run in job_runs:
            echo(f"Canceling existing run: {job_run.run_id}")
            w.jobs.cancel_run_and_wait(run_id=job_run.run_id)

        orig_settings = standard_job.settings

        if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
            yield
            return

        echo("Pausing schedule")
        copied_settings = deepcopy(orig_settings)
        copied_settings.schedule.pause_status = PauseStatus.PAUSED

        w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)

        yield
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Unpausing schedule")
        w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


def get_job(w: WorkspaceClient, job_name: str):
    job_list = list(w.jobs.list(name=job_name))

    job_id = job_list[0].job_id
    existing_job = w.jobs.get(job_id=job_id)
    return existing_job

NodeJSmith avatar Sep 09 '23 00:09 NodeJSmith

Hi @NodeJSmith ,

Thank you for sharing your code. I'll look into it and see how I can adapt it to my needs.

Best, Felix

panoptikum avatar Sep 22 '23 07:09 panoptikum