wazuh-qa
wazuh-qa copied to clipboard
Jobflow engine - Findings and recommendations
| Target version | Related issue | Related PR/dev branch |
|---|---|---|
| 4.9 | #4989 | https://github.com/wazuh/wazuh-qa/tree/4996-dtt1-iteration-3-workflow-engine-unit-tests |
| 4.9 | #4996 | https://github.com/wazuh/wazuh-qa/tree/4989-dtt1-iteration-3-workflow-engine-module-release-workflow-module |
Description
While developing the unit tests for DTT1 iteration 3 - Workflow engine, I had the opportunity to review the workflow engine source code thoroughly. I list the findings in this issue for discussion and triage of several requests that the team will assess to improve the code quality, endurance, and maintainability.
Code Style Remarks and Best Practices:
- I propose to follow the style guides used by the Wazuh Framework team to develop Python code.
- I would like to discuss whether instance variable and parameter typing will be partially used.
Exception handling Best Practices
- Use exception chaining
raise xxx from e - Don't raise the generic
Exceptionclass. It is preferred that the original exception be bubbled. - Define custom exceptions for Domain issues (for example, when cycles are found in a workflow configuration file).
- Include the Wazuh copyright headers in all files.
- We should ideally identify the known exceptions we want to handle. For example, when a DAG is instantiated, a
graphlib.is used.TopologicalSorteris instantiated. The add method of this class generates a ValueError exception, and the prepare method raises a CycleError. Those errors should be handled so they can be logged as errors in the workflow log file. The user should receive the original exception (using chaining) and a more informative message whenever possible.
Import statements
- Change of order of import statements. Import the standard library packages first, then third-party libraries, and finally, local imports.
- Group and order alphabetically.
- Remove
importstatements with unused Classes, functions, etc.
Suggestions and Ideas
- I recommend using the
Clicklibrary instead of the standardargparseto handle command line arguments. It is more powerful and more straightforward to use. The parameters are implemented with decorators. I leave the link: https://click.palletsprojects.com/en/8.1.x/
Requested fixes - cosmetics and bugs
- The SchemaValidator.validateSchema method does not comply with the snake_case standard for function names; it should be validate_schema.
- Remove
passstatements in abstract methods; they are marked bypylintas unnecessary.
Design and Architecture
-
Check the parallel libraries used to execute tasks. Threads are being used in the
WorkflowProcessorclass. Multithread programming is not 100% "parallel" due to a limitation of Python, the Global Interpreter Lock (GIL). It will always run only one thread at a time, never in true parallel. GIL switches execution to another thread when a thread is blocked waiting for an Input/Output operation. We must consider whether this covers our requirements. If the only thing the workflow will do is to spawn processes like theProcessTask.executefunction (that calls thesubprocess.runfunction to run a script with parameters and catches thestd outputandstderr), the Threading libraries may be enough. -
WorkflowProcessorconstructor https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L268-L283- I recommend removing the
WorkflowProcessor.dry_runinstance variable and changing theWorkflowProcessor.runfunction signature to accept adry_runoptional parameter. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L353-L356 I recommend changing the name of theWorkflowProcessor.threadsinstance variable toWorkflowProcessor.max_threads_countto clarify its purpose. - I recommend removing the
WorkflowProcessor.log_levelinstance variable and moving the log configuration outside the class. This will reduce the responsibilities of this class. Indeed, if manyWorkflowProcessorinstances will be created, each new instance will redefine the log level. The log level is global to the program; it is not particular to each instance. - The
WorkflowProcessorconstructor creates a WorkflowFile instance, creating a dependency. Options:- Use a constructor pattern. Another class constructs the
WorkflowProcessorinstances. - Change the class constructor. The constructor receives a
task_collection. - Add a dag instance variable and change the
WorkflowProcessor.runfunction to use this instance variable. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/main.py#L31-L32
- Use a constructor pattern. Another class constructs the
- I recommend removing the
I propose creating a global configuration object from a yaml config file. In the current design, the worflow_engine.main.py entry point parses the command arguments and converts them to a dictionary passed directly to the WorkflowProcessor constructor. If a new parameter is defined in the future, the class constructor must be changed. The global configuration can be accessed for any class that should be globally parameterized in the future.
- I recommend changing the name of the
logger.pyfile tologging.py. Theimport from workflow_engine.logger.logger import loggeris lengthy and repetitive. The logger can also be imported in the subpackage__nit__.pyfile. This way, the import is reduced tofrom workflow_engine.logging import logger. - I recommend moving the validation of the
ProcessTask.task_parametersfromProcessTask.executeto theProcessTaskconstructor. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L36-L41 - The
ProcessTask.executehas a bug that needs to be fixed. The statementif "KeyboardInterrupt" in error_msg:generates an exception because the error_msg variable isNone. Thestderrparameter must be defined when theCalledProcessErroris raised. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/task.py#L67-L73 - I recommend to validate the parameters of the
DAG.set_statusfunction. Thetask_namemust be in thetask_collection,the status must be one of these values: failed, canceled, or successful. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L181-L184 - I recommend creating Enumerations for
task status,cancel_policies, etc. One example is in this constructor: https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L154-L167 - I recommend moving the validation of the
cancel_policyparameter of theDAG.cancel_dependant_taskcancel from theforloop to the beginning of the function. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L209-L232 - The
WorflowProcessor.execute_taskfunction logs the elapsed time only for the normal flow. I recommend logging the elapsed time on exceptions, too. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L295-L307 - I suggest reviewing the
WorflowProcessor.execute_tasks_parallelexception handler. The exception handler calls the same function recursively with the parameterreverse=True.The call could lead to an infinite loop if the KeyboardInterrupt is raised again. I consider that this function should be called only if thereverseparameter isFalsehttps://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L323-L332 - I suggest removing the
WorkflowProcessor.abort_executionmethod because it is not referred to by any other file in theworflow_enginemodule. - I suggest fixing the
elementparameter type of the functionWorfkowFile.__replace_placeholders. It is defined asstr,but it should be of typeAny.https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L77-L87 I suggest reviewing the usage of theparent_keyparameter of the functionWorfkowFile.__replace_placeholders. The parameter doesn't influence the function's output value. If it is maintained, I suggest defining it asparent_key: Optional[String] = None. - I suggest improving the function
WorkflowFile.__static_workflow_validation.check_not_existing_tasks. The function raises theValueErrorwhen a task is not found in a task'sdepends-onkey in the task_collection. It would be better to raise the ValueError after checking all the tasks. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/workflow_processor.py#L140-L147 - The
SchemaValidator.preprocess_datafunction validates that the 'path' entry exists for theprocesstasks types but doesn't validate if thepathentry is an empty string. I suggest adding this validation. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L55-L75 - The
SchemaValidator.validaSchemafunction logs the ValidationError and Exception exceptions to the log but does not re-raise the exception to the caller. I suggest re-raising the exception. https://github.com/wazuh/wazuh-qa/blob/39e5f11ba37a20d44f8bbe15d77f6547696f9b6f/deployability/modules/workflow_engine/schema_validator.py#L85-L90
We will discuss this in the daily meeting of 2024/03/11 or 2024/03/13 -> Blocked internal
Move this issue to https://github.com/wazuh/wazuh-qa/issues/4495 as epic
This issue will be include in DTT Tier 2