nipyapi
nipyapi copied to clipboard
[Feature] Disabling Process Groups
Hello,
I have been wondering on how to properly disable a Process Group handling all those situations where an inner processor is in a "middle state" (stopping/starting instead of stopped/started).
The code below gets the job done in all the cases except the ones stated above. Not sure how to recognize those "stopping/starting" situations and consequently handle them (retry, raise etc.).
Any suggestion?
def disable_process_group(pg_id):
"""
Disable a Process Group and all components.
Note: Only stopped processors will be disabled.
Args:
pg_id (str): The UUID of the target Process Group
Returns:
(bool): True of successfully disabled, False if not.
"""
assert isinstance(pg_id, six.string_types)
assert isinstance(
get_process_group(pg_id, 'id'),
nipyapi.nifi.ProcessGroupEntity
)
target_state = 'DISABLED'
body = nipyapi.nifi.ScheduleComponentsEntity(
id=pg_id,
state=target_state
)
with nipyapi.utils.rest_exceptions():
result = nipyapi.nifi.FlowApi().schedule_components(
id=pg_id,
body=body
)
if result.state == target_state:
return True
return False
Thanks.
I agree this could be a useful enhancement. You will need to handle Controllers, and also as you say check for hung threads and coerce them to stop if you want to be sure about it.
Here's some code I wrote for an Ansible module to ensure a process group started, it may help you a bit.
target_controllers = [
x for x in nipyapi.canvas.list_all_controllers(self.target_object.id)
if x.component.state != 'ENABLED'
]
for cont in target_controllers:
if cont.component.validation_errors:
really_invalid = [
x for x in cont.component.validation_errors
if 'is disabled' not in x
]
if really_invalid:
self.module.fail_json(msg='Controllers in Process group' + self.object_name + 'are Invalid')
while target_controllers:
_ = [
nipyapi.canvas.schedule_controller(x, True) for x in target_controllers
if not x.component.validation_errors
]
# Update worklist
target_controllers = [
x for x in nipyapi.canvas.list_all_controllers(self.target_object.id) if
x.component.state != 'ENABLED'
]
# Finished handling Controllers
if nipyapi.canvas.list_invalid_processors(self.target_object.id):
self.module.fail_json(msg='Processors in Process group ' + self.object_name + ' are Invalid')
# Finished handling Processors
_ = nipyapi.canvas.schedule_process_group(self.target_object.id, True)
self.get_target_object_if_exists()
self.out_changed = True