genielibs icon indicating copy to clipboard operation
genielibs copied to clipboard

"Exception: The clean stage '' does not exist in the json file" while trying to use multiprocessing

Open Sulray opened this issue 10 months ago • 25 comments

The error

Hi,

I was trying to make the execution of a script based on pyats/genie faster by using multiprocessing as recommended in the documentation.

So at some point, in a method called "runChecks" I replaced the following lines :

        for device in self.devices:
            # already connected to each device
            device.makeChecks(self.phase,self.name)

by this line : multiprocessing.Pool().map(self.runChecksDevice, self.devices) with this definition of method:

def runChecksDevice(self, device):
        device.makeChecks(self.phase,self.name)

So it's supposed to be exactly the same but with multiprocessing.

But I get the following error (in debug mode):

2024-04-22 14:54:41,983 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) 2024-04-22 14:54:42,023 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) Traceback (most recent call last): File "main.py", line 74, in operation.run() File "/package/path/my_operation.py", line 252, in run self.runChecks() File "/package/path/package/my_operation.py", line 332, in runChecks multiprocessing.Pool().map(self.runChecksDevice, self.devices) File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get raise self._value File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr__ File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function raise Exception(f"The clean stage '{name}' does not exist in the json " Exception: The clean stage '' does not exist in the json file

Before the change in lines to add multiprocessing it was working perfectly and I don't really get the reason of the error since it's happening directly at the beginning of the multiprocessing execution. The method creating the exception (get_clean_function) is linked to genielibs so maybe you have more insight on this.

Context

To give a little more context :

  • You can see a class variable self.devices. It is a list of custom devices and the point of these custom devices is that no matter the os I can call the same method to do my checks. For each device in this list, device.pyats_device is the object we use in pyats to connect/ disconnect / run checks
  • In this case I was trying my script on bigip devices, so device.makeChecks(self,phase,test_name) performs the following :
    def makeChecks(self,phase,test_name):
            for uri in self.check_list: 
                try:
                    output = self.pyats_device.rest.get( api_url=uri , verbose=True ).text
                    output = json.loads(output)
                    output = json.dumps(output , indent=4)
                except icontrol.exceptions.iControlUnexpectedHTTPError:
                    logger.error(uri +' does not exist')
                    output = traceback.format_exc()
                try :
                    with open(f'{self.operations_path}/{test_name}/{self.name}/{phase}/{(uri.replace("/", "_")).split("?" , )[0]}.json' , 'w') as output_file:
                        output_file.write(output)
                except Exception as e:
                    logger.error(f'learning {uri} failed!! Exception following lines:\n{e}')
    

What could lead to this exception ? Did I do something wrong in my attempt to use multiprocessing with pyats/genie ?

Sulray avatar Apr 23 '24 09:04 Sulray

https://pubhub.devnetcloud.com/media/pyats/docs/async/multiprocessing.html

Kindly go through the above documentation for multiprocessing, try it

Thank you!

Harishv01 avatar Apr 23 '24 14:04 Harishv01

Yes thanks but I've already seen this page and also some posts on stackoverflow but I still have my issue. Where does my case differ from the given example ? The example, with f a method and range(20) an iterable:

 with multiprocessing.Pool(2) as pool:
            result = pool.map(f, range(20))

My case, with self.runChecksDevice a method, self.devices an iterable: multiprocessing.Pool().map(self.runChecksDevice, self.devices)

Just in case, I also tried the following before sending my issue:

with multiprocessing.Pool(2) as pool:
            pool.map(self.runChecksDevice, self.devices) 
with multiprocessing.Pool() as pool: #supposed to use the maximum number of process possible
            pool.map(self.runChecksDevice, self.devices) 

Sulray avatar Apr 23 '24 14:04 Sulray

Okay, Kindly give me some time. I will check and let you know

Thank you

Harishv01 avatar Apr 24 '24 08:04 Harishv01

The error message indicates that the name variable being passed to the runChecksDevice method is empty. This suggests that the self.name attribute might not be set correctly or is not accessible within the multiprocessing context. Please investigate and resolve this issue.

Additionally, could you provide information on how you are running the job? Are you passing any clean YAML while running?

Harishv01 avatar Apr 24 '24 08:04 Harishv01

For the moment I've added some logs to check that the names are not empty. I checked first self.name (variable name in my class in my_operation.py) and then device.name (variable name in the class use₫ for devices) for each device (2 devices in my case):

2024-04-24 09:22:45,408 - package.my_operation - INFO - Name of operation: test_multiprocessing (my_operation.py:324)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is:  a1aaa01-clients (my_operation.py:326)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is:  a1aaa05 (my_operation.py:326)
2024-04-24 09:22:45,423 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for  a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-04-24 09:22:45,465 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for  a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
  File "main.py", line 74, in <module>
    operation.run()
....... (then it's the same traceback)

So i'm still investigating without really knowing why a "name" would be considered empty since these are the only variables "name" I defined.

About how I run the job, it all starts with main.py initializing other classes and calling methods on these classes. I do not use aetest so I do not use the "run(example.py)" method. I could show more of my code but it will be even more complex. The fact is that it was working before trying to implement the line with multiprocessing, so whatever the issue is, it only prevents multiprocessing to work.

And yes I'm generating/using a testbed.yaml with the devices I want to connect to inside. That's the only yaml I use.

devices:
  a1aaa01-clients:
    alias: a1aaa01-clients
    connections:
      rest:
        class: rest.connector.Rest
        ip: a1aaa01-clients
        port: 443
        protocol: https
    custom:
      abstraction:
        order:
        - os
    os: bigip
  a1aaa05:
    alias: a1aaa05
    connections:
      rest:
        class: rest.connector.Rest
        ip: a1aaa05
        port: 443
        protocol: https
    custom:
      abstraction:
        order:
        - os
    os: bigip
testbed:
  credentials:
    default:
      password: '%ASK{}'
      username: user_example
  name: test_multiprocessing

Sulray avatar Apr 24 '24 09:04 Sulray

Okay, please investigate and give me an update

Harishv01 avatar Apr 24 '24 11:04 Harishv01

Hi, can you give me an update on the above?

Harishv01 avatar Apr 25 '24 15:04 Harishv01

Hi, I haven't found out yet

Sulray avatar Apr 26 '24 07:04 Sulray

Can you share the debug working log from earlier when it was functioning properly? Also, could you please confirm if you are using a clean YAML file or a clean stage? Earlier, you mentioned a testbed YAML file, but I'm not referring to that.

Harishv01 avatar Apr 30 '24 12:04 Harishv01

About the YAML, the testbed is the only yaml I'm using, I never tried a clean YAML file / clean stage or anything like that.

When it was functionning properly (with a for loop instead of multiprocessing) the logs about Retrieving stage '__getstate__' were not there.

When I use a for loop :

...
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f99b12aadc0>, <package.device.DeviceF5 object at 0x7f99b12aae50>]
2024-05-02 07:59:37,272 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)
2024-05-02 07:59:37,272 - package.my_operation - INFO - device.name in for loop: a1aaa01-clients (my_operation.py:334)
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Sending GET to 'a1aaa01-clients': https://10.10.10.10:443/mgmt/tm/sys/version (implementation.py:309)
...

When I use multiprocessing as detailed previously :

...
2024-05-02 08:02:45,315 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f643d9dfdf0>, <package.device.DeviceF5 object at 0x7f643d9dfe80>]
2024-05-02 08:02:45,315 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 08:02:45,315 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 08:02:45,316 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 08:02:45,336 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-05-02 08:02:45,377 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
  File "main.py", line 74, in <module>
    operation.run()
  File "/package/path/my_operation.py", line 252, in run
    self.runChecks()
  File "/package/path/my_operation.py", line 330, in runChecks
    multiprocessing.Pool().map(self.runChecksDevice, self.devices) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr__
  File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function
    raise Exception(f"The clean stage '{name}' does not exist in the json "
Exception: The clean stage '' does not exist in the json file

According to these logs, it seems like the method i'm trying to pass in multiprocessing (self.runChecksDevice) is not even running its first line that should state a line similar to the one in the for loop:

def runChecksDevice(self, device):
        logger.info(f"Starting makeChecks for device name {device.name}")
        device.makeChecks(self.phase,self.name)

So should log:

package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)

Sulray avatar May 02 '24 09:05 Sulray

Could you please share the code files.so that I can debug it?

Harishv01 avatar May 02 '24 13:05 Harishv01

Could you please share the code files.so that I can debug it?

Harishv01 avatar May 06 '24 14:05 Harishv01

please share the code files to debug the issue

Harishv01 avatar May 07 '24 15:05 Harishv01

please share the code files to debug the issue

Harishv01 avatar May 10 '24 09:05 Harishv01

I'll post an update on next monday/thursday, i'm currently not available I think i'll do a minimal example of my case because if I share all the current code it will increase the complexity

Sulray avatar May 10 '24 10:05 Sulray

Kindly share the entire code file to debug the issue

Harishv01 avatar May 13 '24 08:05 Harishv01

I managed to do a minimal version which has the same exact output when trying to use multiprocessing. There are 4 python files. Here is the code (a lot of parts of it are not even linked to the issue but at least you have everything as expected):

my_main.py :

from package.my_operation import MyOperation
import logging
import sys
import os
from datetime import datetime

stream_handler = logging.StreamHandler(stream=sys.stdout)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(module)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)

logging.getLogger().addHandler(stream_handler)
logger = logging.getLogger(__name__)
logging.getLogger().setLevel(logging.DEBUG)

operation = MyOperation()
operation.initiate()

input_choice=''
while input_choice not in ['1','2','3','4']:
    input_choice = input('Which operation do you want to perform? \n1) Pre Checks \n2) Post Checks \n3) Diff \n4) Exit the CLI \nChoice (type number): ').lower()
datetime_start = datetime.now()
if input_choice == '1':
    operation.phase = "preChecks"
    operation.loadTestbed()
elif input_choice == '2':
    operation.phase = "postChecks"
    operation.loadTestbed()
elif input_choice == '3':
    operation.phase = "Diff"        

operation.run()

my_operation.py

from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
from package.utils.config_loader import config_loader
import pathlib
from package import genie_digger
import logging
import multiprocessing

logger = logging.getLogger(__name__)

class MyOperation:
    
    def __init__(self):
        self.name = ''
        self.username = ''
        self.devices:list[MyDevice] = []
        self.testbed = ''
        self.root_path = pathlib.Path(__file__).resolve().parents[1]
        self.operations_path = f"{self.root_path}/output/operations"
        self.testbeds_path = f"{self.root_path}/output/testbeds"
        self.phase = None

    def initiate(self):
        with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
            self.testbed = yaml.safe_load(testbed_file)
        
        self.name = self.testbed['testbed']['name']
        self.username = self.testbed['testbed']['credentials']['default']['username']

        for device in self.testbed['devices'].keys():                        
            if self.testbed['devices'][device]['os'] == 'bigip':                    
                self.devices.append(MyDeviceF5(device,self.operations_path))


    def run(self):
        logger.info("Starting run('%s')",self.phase)
        if not self.phase:
            raise Exception("No phase defined")
        self.buildTree()
        if self.phase=="Diff":
            self.runDiff()
        else: 
            self.runChecks()


    def buildTree(self):
        logger.info("Starting buildTree('%s')",self.phase)
        if not os.path.exists(f'{self.operations_path}/{self.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}')
        for device in self.devices:
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')


    def loadTestbed(self):
        try:
            self.testbed = load(self.testbed)
        except:
            print('host is not resolved')


    def runChecks(self):
        
        for device in self.devices:
                logger.info("name of device: %s",device.name)
                logger.info("self.testbed:\n%s",self.testbed)

                device.raiseSessionLimit(self.testbed , device.name)
                device.myConnect(self.testbed , device.name)

                if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                    if device.os not in ['bigip','fortios']:
                        device.getRunningConfig()
                        device.configAnalyzer(device.mapping_features , device.os)

                        print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                        for model in device.check_list:
                            print(f'- {model}')
                        
                        not_checked_list = list(set(genie_digger.getFeatureList(device.os)) - set(device.check_list))

                        print(f'the following models have been ignored')
                        for model in not_checked_list:
                            print(f'- {model}')
                            
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                    
                    else:
                        device.configAnalyzer()
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                else:
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
        print("self.devices: %s", str(self.devices))

        logger.info(f"Name of operation: {self.name}")
        for device in self.devices:
            logger.info(f"One device name is: {device.name}")

        # with multiprocessing.Pool(2) as pool:
        #     pool.map(self.runChecksDevice, self.devices) 
            
        multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

        for device in self.devices:
            logger.info(f"Starting makeChecks for device {device.name}")
            logger.info(f"device.name in for loop: {device.name}")
            # device.makeChecks(self.phase,self.name)
            device.restoreSessionLimit()
            device.pyats_device.disconnect()
            logger.info(f"Finished makeChecks and disconnected from device {device.name}")


    def runChecksDevice(self, device):
        logger.info(f"Starting makeChecks for device name {device.name}")
        device.makeChecks(self.phase,self.name)
        logger.info(f"Finished makeChecks and disconnected from device {device.name}")


    def runDiff(self):
        logging.debug("Running findDiff")
        for device in self.devices:
            logging.debug("runDiff on device %s", str(device.name))
            #Looping the list of uris
            if (device.check_list == []):
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
                logging.debug("device.checklist was empty, filled with check_list.json in device folder")
            logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
            for feature in device.check_list:
                logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
                pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                if os.path.isfile(f"{pre_checks_path}.txt"): 
                    logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                        diff = f_pre.read()
                elif os.path.isfile(f"{post_checks_path}.txt"):
                    logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                        diff = f_post.read()
                else:
                    try:
                        with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                            lines_pre = json.load(f_pre)
                        with open(f'{post_checks_path}.json' , 'r') as f_post:
                            lines_post = json.load(f_post)
                    except FileNotFoundError:
                        logger.error('file not found')
                        continue
                    logging.info("Opened the f_pre and f_post, now running Diff")
                    diff = Diff(lines_pre, lines_post)                    
                    logging.info("Running findDiff")
                    diff.findDiff()
                if len(str(diff)) > 0 :
                    with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                        f_diff.write(str(diff))
                    logging.info("Finished to write diff")

my_device.py

from abc import ABC , abstractmethod
from pyats.topology.loader import load

class MyDevice(ABC):
    def __init__(self,name,operations_path):
        self.name=name
        self.operations_path = operations_path


    @abstractmethod
    def makeChecks(self,phase,test_name):
        pass
    
    def myConnect(self,testbed,hostname):
        pass
    
    def raiseSessionLimit(self,testbed,hostname):
        pass

    def restoreSessionLimit(self):
        pass

my_device_F5.py

from package.my_device import MyDevice
import unicon
import json
from pyats.topology.loader import load
import icontrol
import logging 
import traceback

logger = logging.getLogger(__name__)


class MyDeviceF5(MyDevice):

    # Declare a list of uris which start by /mgmt/tm/sys
    uri_sys=[  
            '/mgmt/tm/sys/version',
            '/mgmt/tm/sys/hardware',
            '/mgmt/tm/sys/clock',
            '/mgmt/tm/sys/cpu',
            '/mgmt/tm/sys/license',
            '/mgmt/tm/sys/provision',
            '/mgmt/tm/sys/performance/connections/stats',
            '/mgmt/tm/sys/mcp-state/stats',
            '/mgmt/tm/sys/service/stats'
            ]

    # Declare a list of uris which start by /mgmt/tm/net
    uri_net=[  
            '/mgmt/tm/net/interface',
            '/mgmt/tm/net/trunk',
            '/mgmt/tm/net/vlan',
            '/mgmt/tm/net/self',
            '/mgmt/tm/net/arp',
            '/mgmt/tm/net/fdb',
            '/mgmt/tm/net/route',
            '/mgmt/tm/net/routing',
            '/mgmt/tm/net/interface/stats?$select=counters.dropsAll,counters.errorsAll,tmName,mediaActive,status',
            '/mgmt/tm/net/route-domain',
            '/mgmt/tm/net/routing/bgp'
            ]
    

    # Declare a list of uris which start by /mgmt/tm/gtm
    uri_gtm=[
            '/mgmt/tm/gtm/datacenter',
            '/mgmt/tm/gtm/server/',
            '/mgmt/tm/gtm/wideip/a',
            '/mgmt/tm/gtm/wideip/aaaa',
            '/mgmt/tm/gtm/pool/a',
            '/mgmt/tm/gtm/pool/aaaa',
            '/mgmt/tm/gtm/prober-pool',
            '/mgmt/tm/gtm/monitor',
            '/mgmt/tm/gtm/sync-status',
            '/mgmt/tm/gtm/iquery?$select=serverType,serverName,connectState',
            '/mgmt/tm/gtm/pool/a/stats?$select=status.availabilityState,status.enabledState,tmName',
            '/mgmt/tm/gtm/server/stats?$select=tmName,status.enabledState,status.availabilityState',
            '/mgmt/tm/gtm/datacenter/stats?$select=dcName,status.enabledState,status.availabilityState'
            ]

    # Declare a list of uris which start by /mgmt/tm/ltm
    uri_ltm=[
            '/mgmt/tm/ltm/virtual-address',
            '/mgmt/tm/ltm/virtual-address/stats?$select=tmName.description,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/virtual',
            '/mgmt/tm/ltm/virtual/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/snatpool',
            '/mgmt/tm/ltm/snat-translation',
            '/mgmt/tm/ltm/snat',
            '/mgmt/tm/ltm/rule',
            '/mgmt/tm/ltm/pool',
            '/mgmt/tm/ltm/pool/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/policy',
            '/mgmt/tm/ltm/node',
            '/mgmt/tm/ltm/node/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
            '/mgmt/tm/ltm/monitor',
            '/mgmt/tm/ltm/nat',
            '/mgmt/tm/ltm/persistence'
    ]


    def __init__(self,name,operations_path,protocol='https',port='443'):
        super().__init__(name,operations_path)
        self.protocol = protocol
        self.port = port
        self.os = 'bigip'
        self.running_config=''
        self.check_list=[]
        self.pyats_device = None
    

    def makeChecks(self,phase,test_name):
        for uri in self.check_list: 
            file_extension = "json"
            try:
                output = self.pyats_device.rest.get( api_url=uri , verbose=True ).text
                output = json.loads(output)
                output = self.format_json(output)
                output = json.dumps(output , indent=4)
            except icontrol.exceptions.iControlUnexpectedHTTPError:
                logger.error(uri +' does not exist')
                output = traceback.format_exc()
                file_extension = "txt"

            try :
                with open(f'{self.operations_path}/{test_name}/{self.name}/{phase}/{(uri.replace("/", "_")).split("?" , )[0]}.{file_extension}' , 'w') as output_file:
                    output_file.write(output)
            except Exception as e:
                logger.error(f'learning {uri} failed!! Exception following lines:\n{e}')


    def myConnect(self,testbed,hostname):
        logger.info("testbed in:\n%s",testbed)

        device=testbed.devices[hostname]
        try:
            
            device.connect( via='rest' , alias='rest'  )
            device.rest.connected
            self.pyats_device = device
        except unicon.core.errors.ConnectionError as e:
            logger.error("-- ERROR --")
            logger.error(f"  The error following line:\n{e}")
            logger.error(f"  Can't connect to {device.alias}")
            device.disconnect()
            return
        except RuntimeError:
            device.connect( connection_timeout=15,  learn_hostname=True , log_stdout=True )
            self.pyats_device = device
    

    def configAnalyzer(self):
        output_ltm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/ltm' , verbose=True ).text
        output_gtm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/gtm' , verbose=True ).text

        if '"level":"nominal"' in output_ltm :
            is_ltm = True
        else:
            is_ltm = False
        
        if '"level":"nominal"' in output_gtm :
            is_dns = True
        else:
            is_dns = False
        
        if is_dns and not is_ltm:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_gtm
        elif is_ltm and not is_dns:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm
        elif is_dns and is_ltm:
            self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm + MyDeviceF5.uri_gtm


    def format_json(self, json_dict):
        if ("items" in json_dict):
            for item in json_dict["items"]:
                    if ("fullPath" in item):
                        # item_copy = copy.deepcopy(item)
                        new_item = {"partition_" + item["fullPath"] : item}
                        index = json_dict["items"].index(item)
                        if index in json_dict["items"]:
                            logger.warning("Overwritting data in format_json on key %s in items", index)
                        json_dict["items"][index] = new_item
        return json_dict

Sulray avatar May 13 '24 14:05 Sulray

Thank you for sharing. Kindly give me some time to debug the issue.

Harishv01 avatar May 15 '24 08:05 Harishv01

Can you share the working code files as well so that we can debug the issue?

Harishv01 avatar May 15 '24 13:05 Harishv01

Can you share the working code files as well so that we can debug the issue?

Harishv01 avatar May 16 '24 13:05 Harishv01

Just so you know I am aware of your comments and requests when you post them. I have other things to manage in parallel, sorry for each delay.

I realized that the error is different while using 24.4 (I was in 24.3 while having the error posted earlier: "clean stage '' does not exist in the json file") In 24.4 I have the following one:

024-05-16 12:44:21,106 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
2024-05-16 12:44:21,329 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
Traceback (most recent call last):
  File "my_main.py", line 31, in <module>
    operation.run()
  File "/package/path/package/my_operation.py", line 48, in run
    self.runChecks()
  File "/package/path/package/my_operation.py", line 109, in runChecks
    multiprocessing.Pool().map(self.runChecksDevice, self.devices) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

The only difference for a working code is in my_operation.py. I made a little update on this file to skip a dependancy which is not needed to test it on your side so here are the two versions of this script:

  • Script my_operation.py using multiprocessing only for device.makeChecks method (not working):
from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
import pathlib
import logging
import multiprocessing
from datetime import datetime

logger = logging.getLogger(__name__)

class MyOperation:
    
    def __init__(self):
        self.name = ''
        self.username = ''
        self.devices:list[MyDevice] = []
        self.testbed = ''
        self.root_path = pathlib.Path(__file__).resolve().parents[1]
        self.operations_path = f"{self.root_path}/output/operations"
        self.testbeds_path = f"{self.root_path}/output/testbeds"
        self.phase = None

    def initiate(self):
        with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
            self.testbed = yaml.safe_load(testbed_file)
        
        self.name = self.testbed['testbed']['name']
        self.username = self.testbed['testbed']['credentials']['default']['username']

        for device in self.testbed['devices'].keys():                        
            if self.testbed['devices'][device]['os'] == 'bigip':                    
                self.devices.append(MyDeviceF5(device,self.operations_path))


    def run(self):
        datetime_start = datetime.now()
        logger.info("Starting run('%s')",self.phase)
        if not self.phase:
            raise Exception("No phase defined")
        self.buildTree()
        if self.phase=="Diff":
            self.runDiff()
        else: 
            self.runChecks()
        datetime_end = datetime.now()
        diff = datetime_end - datetime_start
        logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))


    def buildTree(self):
        logger.info("Starting buildTree('%s')",self.phase)
        if not os.path.exists(f'{self.operations_path}/{self.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}')
        for device in self.devices:
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')


    def loadTestbed(self):
        try:
            self.testbed = load(self.testbed)
        except:
            print('host is not resolved')


    def runChecks(self):
        for device in self.devices:
                logger.info("name of device: %s",device.name)
                logger.info("self.testbed:\n%s",self.testbed)

                device.raiseSessionLimit(self.testbed , device.name)
                device.myConnect(self.testbed , device.name)

                if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                    if device.os not in ['bigip','fortios']:
                        device.getRunningConfig()
                        device.configAnalyzer(device.mapping_features , device.os)

                        print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                        for model in device.check_list:
                            print(f'- {model}')
                            
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                    
                    else:
                        device.configAnalyzer()
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                else:
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
        print("self.devices: %s", str(self.devices))

        logger.info(f"Name of operation: {self.name}")
        for device in self.devices:
            logger.info(f"One device name is: {device.name}")

        # with multiprocessing.Pool(2) as pool:
        #     pool.map(self.runChecksDevice, self.devices) 
            
        multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

        for device in self.devices:
            logger.info(f"Starting makeChecks for device {device.name}")
            logger.info(f"device.name in for loop: {device.name}")
            # device.makeChecks(self.phase,self.name)
            device.restoreSessionLimit()
            device.pyats_device.disconnect()
            logger.info(f"Finished makeChecks and disconnected from device {device.name}")


    def runChecksDevice(self, device:MyDevice):
        logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
        device.makeChecks(self.phase,self.name)
        device.restoreSessionLimit()
        device.pyats_device.disconnect()
        logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")


    def runDiff(self):
        logging.debug("Running findDiff")
        for device in self.devices:
            logging.debug("runDiff on device %s", str(device.name))
            #Looping the list of uris
            if (device.check_list == []):
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
                logging.debug("device.checklist was empty, filled with check_list.json in device folder")
            logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
            for feature in device.check_list:
                logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
                pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                if os.path.isfile(f"{pre_checks_path}.txt"): 
                    logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                        diff = f_pre.read()
                elif os.path.isfile(f"{post_checks_path}.txt"):
                    logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                        diff = f_post.read()
                else:
                    try:
                        with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                            lines_pre = json.load(f_pre)
                        with open(f'{post_checks_path}.json' , 'r') as f_post:
                            lines_post = json.load(f_post)
                    except FileNotFoundError:
                        logger.error('file not found')
                        continue
                    logging.info("Opened the f_pre and f_post, now running Diff")
                    diff = Diff(lines_pre, lines_post)                    
                    logging.info("Running findDiff")
                    diff.findDiff()
                if len(str(diff)) > 0 :
                    with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                        f_diff.write(str(diff))
                    logging.info("Finished to write diff")
  • Script my_operation.py using a for loop only (working):
from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
import pathlib
import logging
import multiprocessing
from datetime import datetime

logger = logging.getLogger(__name__)

class MyOperation:
    
    def __init__(self):
        self.name = ''
        self.username = ''
        self.devices:list[MyDevice] = []
        self.testbed = ''
        self.root_path = pathlib.Path(__file__).resolve().parents[1]
        self.operations_path = f"{self.root_path}/output/operations"
        self.testbeds_path = f"{self.root_path}/output/testbeds"
        self.phase = None

    def initiate(self):
        with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
            self.testbed = yaml.safe_load(testbed_file)
        
        self.name = self.testbed['testbed']['name']
        self.username = self.testbed['testbed']['credentials']['default']['username']

        for device in self.testbed['devices'].keys():                        
            if self.testbed['devices'][device]['os'] == 'bigip':                    
                self.devices.append(MyDeviceF5(device,self.operations_path))


    def run(self):
        datetime_start = datetime.now()
        logger.info("Starting run('%s')",self.phase)
        if not self.phase:
            raise Exception("No phase defined")
        self.buildTree()
        if self.phase=="Diff":
            self.runDiff()
        else: 
            self.runChecks()
        datetime_end = datetime.now()
        diff = datetime_end - datetime_start
        logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))


    def buildTree(self):
        logger.info("Starting buildTree('%s')",self.phase)
        if not os.path.exists(f'{self.operations_path}/{self.name}'):
            os.makedirs(f'{self.operations_path}/{self.name}')
        for device in self.devices:
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
            if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
                os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')


    def loadTestbed(self):
        try:
            self.testbed = load(self.testbed)
        except:
            print('host is not resolved')


    def runChecks(self):
        for device in self.devices:
                logger.info("name of device: %s",device.name)
                logger.info("self.testbed:\n%s",self.testbed)

                device.raiseSessionLimit(self.testbed , device.name)
                device.myConnect(self.testbed , device.name)

                if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):

                    if device.os not in ['bigip','fortios']:
                        device.getRunningConfig()
                        device.configAnalyzer(device.mapping_features , device.os)

                        print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
                        for model in device.check_list:
                            print(f'- {model}')
                            
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                    
                    else:
                        device.configAnalyzer()
                        with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
                            check_list_file.write(str(json.dumps(device.check_list, indent=4)))
                else:
                    with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
        print("self.devices: %s", str(self.devices))

        logger.info(f"Name of operation: {self.name}")
        for device in self.devices:
            logger.info(f"One device name is: {device.name}")

        # with multiprocessing.Pool(2) as pool:
        #     pool.map(self.runChecksDevice, self.devices) 
            
        # multiprocessing.Pool().map(self.runChecksDevice, self.devices) 

        for device in self.devices:
            logger.info(f"Starting makeChecks for device {device.name}")
            logger.info(f"device.name in for loop: {device.name}")
            device.makeChecks(self.phase,self.name)
            device.restoreSessionLimit()
            device.pyats_device.disconnect()
            logger.info(f"Finished makeChecks and disconnected from device {device.name}")


    # def runChecksDevice(self, device:MyDevice):
    #     logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
    #     device.makeChecks(self.phase,self.name)
    #     device.restoreSessionLimit()
    #     device.pyats_device.disconnect()
    #     logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")


    def runDiff(self):
        logging.debug("Running findDiff")
        for device in self.devices:
            logging.debug("runDiff on device %s", str(device.name))
            #Looping the list of uris
            if (device.check_list == []):
                with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
                        device.check_list = json.load(check_list_file)
                logging.debug("device.checklist was empty, filled with check_list.json in device folder")
            logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
            for feature in device.check_list:
                logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
                pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
                if os.path.isfile(f"{pre_checks_path}.txt"): 
                    logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
                        diff = f_pre.read()
                elif os.path.isfile(f"{post_checks_path}.txt"):
                    logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
                    with open(f'{pre_checks_path}.txt' , 'r') as f_post:
                        diff = f_post.read()
                else:
                    try:
                        with open(f'{pre_checks_path}.json' , 'r') as f_pre:
                            lines_pre = json.load(f_pre)
                        with open(f'{post_checks_path}.json' , 'r') as f_post:
                            lines_post = json.load(f_post)
                    except FileNotFoundError:
                        logger.error('file not found')
                        continue
                    logging.info("Opened the f_pre and f_post, now running Diff")
                    diff = Diff(lines_pre, lines_post)                    
                    logging.info("Running findDiff")
                    diff.findDiff()
                if len(str(diff)) > 0 :
                    with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
                        f_diff.write(str(diff))
                    logging.info("Finished to write diff")

You can just copy that in my_operation.py to try the (not) working version but you can see that the only difference in fact is at the end of runChecks method, I just commented different lines.

About the expected structure of files being used:

  • my_main.py
  • package
    • my_device.py
    • my_device_f5.py
    • my_operation.py
  • output
    • testbeds
      • testbed.yaml

I already sent you an example of testbed i'm using (just need to replace what has to be replaced such as hostnames and username).

Sulray avatar May 16 '24 13:05 Sulray

Okay, will check and let you know

Harishv01 avatar May 17 '24 08:05 Harishv01

The error is not coming from the pyats. It is coming from your code. Kindly fix it. For your reference, kindly check this link: https://stackoverflow.com/questions/71945399/python-3-8-multiprocessing-typeerror-cannot-pickle-weakref-object

Harishv01 avatar May 22 '24 08:05 Harishv01

I've seen this page during my own reasearch, but I never specify in my code that I use multiprocessing in "spawn" mode contrary to the examples here.

To verify that by default it's not spawn mode too, I added the following log in my my_main.py script: logger.warning("start_method: "+ str(multiprocessing.get_start_method())) The result: 2024-05-22 14:53:35,999 - __main__ - my_main - WARNING - start_method: fork

So I'm apparently in "fork" mode, which is not addressed in this stackoverflow issue.

I'm in python 3.8.10, pyats 24.4 during this log.

Sulray avatar May 22 '24 14:05 Sulray

The issue might be with the objects in the self.devices list. They might contain attributes that cannot be pickled check the pickle-ability of your MyDevice objects If your MyDevice class contains complex objects, try to simplify it. For instance, if it has a connection attribute, you might want to remove it or replace it with a simpler representation before passing it to the multiprocessing pool. Modify the runChecks method to handle multiprocessing correctly:

Harishv01 avatar May 23 '24 04:05 Harishv01

I will try to see if I can get rid of complex objects. But do you know why the error is not the same in 24.3 ? ( "clean stage '' does not exist in the json file")

Sulray avatar May 27 '24 07:05 Sulray

Hi, In version 24.3, we seen some bugs, but we have fixed them in version 24.4. That's the reason why you haven't seen the same issue again.

Harishv01 avatar May 27 '24 09:05 Harishv01

Have you fixed the issue and tried again? Can you give an update?

Harishv01 avatar May 29 '24 08:05 Harishv01

Hi, I didn't find a way for the moment on the time I spend on it.

Do you think it could be related to the fact that the method i'm calling in multiprocessing is a class method ? So I need to call it by doing something like "multiprocessing.Pool().map(self.runChecksDevice, self.devices)" (with self.) to be able to use class variables.

I haven't found examples of this in the documentation, even though I think it's a common case.

Sulray avatar May 29 '24 09:05 Sulray

As a test I replaced the runChecks method of my script my_operation.py by the following:

def runChecks(self):
        def methodTest1(x:int):
            logger.info("x² is: %s", x**2)

        multiprocessing.Pool().map(methodTest1, range(10)) 

And I have the following error logs:

Traceback (most recent call last):
  File "my_main.py", line 42, in <module>
    operation.run()
  File "/package/path/package/my_operation.py", line 48, in run
    self.runChecks()
  File "/package/path/package/my_operation.py", line 112, in runChecks
    multiprocessing.Pool().map(methodTest1, range(10)) 
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'MyOperation.runChecks.<locals>.methodTest1'

But if I add global methodTest1 it's working...

So apparently it needs to be top level to be adressed in multiprocessing, so maybe that could explain why it's not working in my original case.... It could even be not related to a "complex object"

Sulray avatar May 29 '24 11:05 Sulray