nipyapi icon indicating copy to clipboard operation
nipyapi copied to clipboard

[Feature] Disabling Process Groups

Open rsaggino opened this issue 5 years ago • 1 comments

Hello,

I have been wondering on how to properly disable a Process Group handling all those situations where an inner processor is in a "middle state" (stopping/starting instead of stopped/started).

The code below gets the job done in all the cases except the ones stated above. Not sure how to recognize those "stopping/starting" situations and consequently handle them (retry, raise etc.).

Any suggestion?

def disable_process_group(pg_id):
    """
    Disable a Process Group and all components.
    Note: Only stopped processors will be disabled.
    Args:
        pg_id (str): The UUID of the target Process Group
    Returns:
         (bool): True of successfully disabled, False if not.
    """
    assert isinstance(pg_id, six.string_types)
    assert isinstance(
        get_process_group(pg_id, 'id'),
        nipyapi.nifi.ProcessGroupEntity
    )
    target_state = 'DISABLED'
    body = nipyapi.nifi.ScheduleComponentsEntity(
        id=pg_id,
        state=target_state
    )

    with nipyapi.utils.rest_exceptions():
        result = nipyapi.nifi.FlowApi().schedule_components(
            id=pg_id,
            body=body
        )
    if result.state == target_state:
        return True
    return False

Thanks.

rsaggino avatar Mar 18 '20 13:03 rsaggino

I agree this could be a useful enhancement. You will need to handle Controllers, and also as you say check for hung threads and coerce them to stop if you want to be sure about it.

Here's some code I wrote for an Ansible module to ensure a process group started, it may help you a bit.

target_controllers = [
    x for x in nipyapi.canvas.list_all_controllers(self.target_object.id)
    if x.component.state != 'ENABLED'
]
for cont in target_controllers:
    if cont.component.validation_errors:
        really_invalid = [
            x for x in cont.component.validation_errors
            if 'is disabled' not in x
        ]
        if really_invalid:
            self.module.fail_json(msg='Controllers in Process group' + self.object_name + 'are Invalid')
while target_controllers:
    _ = [
        nipyapi.canvas.schedule_controller(x, True) for x in target_controllers
        if not x.component.validation_errors
    ]
    # Update worklist
    target_controllers = [
        x for x in nipyapi.canvas.list_all_controllers(self.target_object.id) if
        x.component.state != 'ENABLED'
    ]
# Finished handling Controllers
if nipyapi.canvas.list_invalid_processors(self.target_object.id):
    self.module.fail_json(msg='Processors in Process group ' + self.object_name + ' are Invalid')
# Finished handling Processors
_ = nipyapi.canvas.schedule_process_group(self.target_object.id, True)
self.get_target_object_if_exists()
self.out_changed = True

Chaffelson avatar Jun 21 '20 14:06 Chaffelson