wazuh-qa
wazuh-qa copied to clipboard
Jobflow engine - Findings and recommendations
Target version | Related issue | Related PR/dev branch |
---|---|---|
4.9 | #4989 | https://github.com/wazuh/wazuh-qa/tree/4996-dtt1-iteration-3-workflow-engine-unit-tests |
4.9 | #4996 | https://github.com/wazuh/wazuh-qa/tree/4989-dtt1-iteration-3-workflow-engine-module-release-workflow-module |
Description
While developing the unit tests for DTT1 iteration 3 - Workflow engine, I had the opportunity to review the workflow engine source code thoroughly. I list the findings in this issue for discussion and triage of several requests that the team will assess to improve the code quality, endurance, and maintainability.
Code Style Remarks and Best Practices:
- I propose to follow the style guides used by the Wazuh Framework team to develop Python code.
- I would like to discuss whether instance variable and parameter typing will be partially used.
Exception handling Best Practices
- Use exception chaining
raise xxx from e
- Don't raise the generic
Exception
class. It is preferred that the original exception be bubbled. - Define custom exceptions for Domain issues (for example, when cycles are found in a workflow configuration file).
- Include the Wazuh copyright headers in all files.
- We should ideally identify the known exceptions we want to handle. For example, when a DAG is instantiated, a
graphlib.is used.TopologicalSorter
is instantiated. The add method of this class generates a ValueError exception, and the prepare method raises a CycleError. Those errors should be handled so they can be logged as errors in the workflow log file. The user should receive the original exception (using chaining) and a more informative message whenever possible.
Import statements
- Change of order of import statements. Import the standard library packages first, then third-party libraries, and finally, local imports.
- Group and order alphabetically.
- Remove
import
statements with unused Classes, functions, etc.
Suggestions and Ideas
- I recommend using the
Click
library instead of the standardargparse
to handle command line arguments. It is more powerful and more straightforward to use. The parameters are implemented with decorators. I leave the link: https://click.palletsprojects.com/en/8.1.x/
Requested fixes - cosmetics and bugs
- The SchemaValidator.validateSchema method does not comply with the snake_case standard for function names; it should be validate_schema.
- Remove
pass
statements in abstract methods; they are marked bypylint
as unnecessary.
Design and Architecture
-
Check the parallel libraries used to execute tasks. Threads are being used in the
WorkflowProcessor
class. Multithread programming is not 100% "parallel" due to a limitation of Python, the Global Interpreter Lock (GIL). It will always run only one thread at a time, never in true parallel. GIL switches execution to another thread when a thread is blocked waiting for an Input/Output operation. We must consider whether this covers our requirements. If the only thing the workflow will do is to spawn processes like theProcessTask.execute
function (that calls thesubprocess.run
function to run a script with parameters and catches thestd output
andstderr
), the Threading libraries may be enough. -
WorkflowProcessor
constructor https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L268-L283- I recommend removing the
WorkflowProcessor.dry_run
instance variable and changing theWorkflowProcessor.run
function signature to accept adry_run
optional parameter. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L353-L356 I recommend changing the name of theWorkflowProcessor.threads
instance variable toWorkflowProcessor.max_threads_count
to clarify its purpose. - I recommend removing the
WorkflowProcessor.log_level
instance variable and moving the log configuration outside the class. This will reduce the responsibilities of this class. Indeed, if manyWorkflowProcessor
instances will be created, each new instance will redefine the log level. The log level is global to the program; it is not particular to each instance. - The
WorkflowProcessor
constructor creates a WorkflowFile instance, creating a dependency. Options:- Use a constructor pattern. Another class constructs the
WorkflowProcessor
instances. - Change the class constructor. The constructor receives a
task_collection
. - Add a dag instance variable and change the
WorkflowProcessor.run
function to use this instance variable. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/main.py#L31-L32
- Use a constructor pattern. Another class constructs the
- I recommend removing the
I propose creating a global configuration object from a yaml
config file. In the current design, the worflow_engine.main.py
entry point parses the command arguments and converts them to a dictionary passed directly to the WorkflowProcessor
constructor. If a new parameter is defined in the future, the class constructor must be changed. The global configuration can be accessed for any class that should be globally parameterized in the future.
- I recommend changing the name of the
logger.py
file tologging.py
. Theimport from workflow_engine.logger.logger import logger
is lengthy and repetitive. The logger can also be imported in the subpackage__nit__.py
file. This way, the import is reduced tofrom workflow_engine.logging import logger
. - I recommend moving the validation of the
ProcessTask.task_parameters
fromProcessTask.execute
to theProcessTask
constructor. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L36-L41 - The
ProcessTask.execute
has a bug that needs to be fixed. The statementif "KeyboardInterrupt" in error_msg:
generates an exception because the error_msg variable isNone
. Thestderr
parameter must be defined when theCalledProcessError
is raised. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L67-L73 - I recommend to validate the parameters of the
DAG.set_status
function. Thetask_name
must be in thetask_collection,
the status must be one of these values: failed, canceled, or successful. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L181-L184 - I recommend creating Enumerations for
task status
,cancel_policies
, etc. One example is in this constructor: https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L154-L167 - I recommend moving the validation of the
cancel_policy
parameter of theDAG.cancel_dependant_task
cancel from thefor
loop to the beginning of the function. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L209-L232 - The
WorflowProcessor.execute_task
function logs the elapsed time only for the normal flow. I recommend logging the elapsed time on exceptions, too. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L295-L307 - I suggest reviewing the
WorflowProcessor.execute_tasks_parallel
exception handler. The exception handler calls the same function recursively with the parameterreverse=True.
The call could lead to an infinite loop if the KeyboardInterrupt is raised again. I consider that this function should be called only if thereverse
parameter isFalse
https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L323-L332 - I suggest removing the
WorkflowProcessor.abort_execution
method because it is not referred to by any other file in theworflow_engine
module. - I suggest fixing the
element
parameter type of the functionWorfkowFile.__replace_placeholders
. It is defined asstr,
but it should be of typeAny.
https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L77-L87 I suggest reviewing the usage of theparent_key
parameter of the functionWorfkowFile.__replace_placeholders
. The parameter doesn't influence the function's output value. If it is maintained, I suggest defining it asparent_key: Optional[String] = None.
- I suggest improving the function
WorkflowFile.__static_workflow_validation.check_not_existing_tasks
. The function raises theValueError
when a task is not found in a task'sdepends-on
key in the task_collection. It would be better to raise the ValueError after checking all the tasks. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L140-L147 - The
SchemaValidator.preprocess_data
function validates that the 'path' entry exists for theprocess
tasks types but doesn't validate if thepath
entry is an empty string. I suggest adding this validation. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L55-L75 - The
SchemaValidator.validaSchema
function logs the ValidationError and Exception exceptions to the log but does not re-raise the exception to the caller. I suggest re-raising the exception. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L85-L90
We will discuss this in the daily meeting of 2024/03/11 or 2024/03/13 -> Blocked internal
Move this issue to https://github.com/wazuh/wazuh-qa/issues/4495 as epic
This issue will be include in DTT Tier 2