genielibs
genielibs copied to clipboard
"Exception: The clean stage '' does not exist in the json file" while trying to use multiprocessing
The error
Hi,
I was trying to make the execution of a script based on pyats/genie faster by using multiprocessing as recommended in the documentation.
So at some point, in a method called "runChecks" I replaced the following lines :
for device in self.devices:
# already connected to each device
device.makeChecks(self.phase,self.name)
by this line :
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
with this definition of method:
def runChecksDevice(self, device):
device.makeChecks(self.phase,self.name)
So it's supposed to be exactly the same but with multiprocessing.
But I get the following error (in debug mode):
2024-04-22 14:54:41,983 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) 2024-04-22 14:54:42,023 - genie.conf.base.api - DEBUG - Retrieving stage 'getstate' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51) Traceback (most recent call last): File "main.py", line 74, in
operation.run() File "/package/path/my_operation.py", line 252, in run self.runChecks() File "/package/path/package/my_operation.py", line 332, in runChecks multiprocessing.Pool().map(self.runChecksDevice, self.devices) File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map return self._map_async(func, iterable, mapstar, chunksize).get() File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get raise self._value File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks put(task) File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr__ File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function raise Exception(f"The clean stage '{name}' does not exist in the json " Exception: The clean stage '' does not exist in the json file
Before the change in lines to add multiprocessing it was working perfectly and I don't really get the reason of the error since it's happening directly at the beginning of the multiprocessing execution. The method creating the exception (get_clean_function) is linked to genielibs so maybe you have more insight on this.
Context
To give a little more context :
- You can see a class variable
self.devices
. It is a list of custom devices and the point of these custom devices is that no matter the os I can call the same method to do my checks. For each device in this list,device.pyats_device
is the object we use in pyats to connect/ disconnect / run checks - In this case I was trying my script on bigip devices, so
device.makeChecks(self,phase,test_name)
performs the following :def makeChecks(self,phase,test_name): for uri in self.check_list: try: output = self.pyats_device.rest.get( api_url=uri , verbose=True ).text output = json.loads(output) output = json.dumps(output , indent=4) except icontrol.exceptions.iControlUnexpectedHTTPError: logger.error(uri +' does not exist') output = traceback.format_exc() try : with open(f'{self.operations_path}/{test_name}/{self.name}/{phase}/{(uri.replace("/", "_")).split("?" , )[0]}.json' , 'w') as output_file: output_file.write(output) except Exception as e: logger.error(f'learning {uri} failed!! Exception following lines:\n{e}')
What could lead to this exception ? Did I do something wrong in my attempt to use multiprocessing with pyats/genie ?
https://pubhub.devnetcloud.com/media/pyats/docs/async/multiprocessing.html
Kindly go through the above documentation for multiprocessing, try it
Thank you!
Yes thanks but I've already seen this page and also some posts on stackoverflow but I still have my issue. Where does my case differ from the given example ? The example, with f a method and range(20) an iterable:
with multiprocessing.Pool(2) as pool:
result = pool.map(f, range(20))
My case, with self.runChecksDevice a method, self.devices an iterable:
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
Just in case, I also tried the following before sending my issue:
with multiprocessing.Pool(2) as pool:
pool.map(self.runChecksDevice, self.devices)
with multiprocessing.Pool() as pool: #supposed to use the maximum number of process possible
pool.map(self.runChecksDevice, self.devices)
Okay, Kindly give me some time. I will check and let you know
Thank you
The error message indicates that the name
variable being passed to the runChecksDevice method is empty. This suggests that the self.name
attribute might not be set correctly or is not accessible within the multiprocessing context. Please investigate and resolve this issue.
Additionally, could you provide information on how you are running the job? Are you passing any clean YAML while running?
For the moment I've added some logs to check that the names are not empty. I checked first self.name (variable name in my class in my_operation.py) and then device.name (variable name in the class use₫ for devices) for each device (2 devices in my case):
2024-04-24 09:22:45,408 - package.my_operation - INFO - Name of operation: test_multiprocessing (my_operation.py:324)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-04-24 09:22:45,408 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-04-24 09:22:45,423 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-04-24 09:22:45,465 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
File "main.py", line 74, in <module>
operation.run()
....... (then it's the same traceback)
So i'm still investigating without really knowing why a "name" would be considered empty since these are the only variables "name" I defined.
About how I run the job, it all starts with main.py initializing other classes and calling methods on these classes. I do not use aetest so I do not use the "run(example.py)" method. I could show more of my code but it will be even more complex. The fact is that it was working before trying to implement the line with multiprocessing, so whatever the issue is, it only prevents multiprocessing to work.
And yes I'm generating/using a testbed.yaml with the devices I want to connect to inside. That's the only yaml I use.
devices:
a1aaa01-clients:
alias: a1aaa01-clients
connections:
rest:
class: rest.connector.Rest
ip: a1aaa01-clients
port: 443
protocol: https
custom:
abstraction:
order:
- os
os: bigip
a1aaa05:
alias: a1aaa05
connections:
rest:
class: rest.connector.Rest
ip: a1aaa05
port: 443
protocol: https
custom:
abstraction:
order:
- os
os: bigip
testbed:
credentials:
default:
password: '%ASK{}'
username: user_example
name: test_multiprocessing
Okay, please investigate and give me an update
Hi, can you give me an update on the above?
Hi, I haven't found out yet
Can you share the debug working log from earlier when it was functioning properly? Also, could you please confirm if you are using a clean YAML file or a clean stage? Earlier, you mentioned a testbed YAML file, but I'm not referring to that.
About the YAML, the testbed is the only yaml I'm using, I never tried a clean YAML file / clean stage or anything like that.
When it was functionning properly (with a for loop instead of multiprocessing) the logs about Retrieving stage '__getstate__'
were not there.
When I use a for loop :
...
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f99b12aadc0>, <package.device.DeviceF5 object at 0x7f99b12aae50>]
2024-05-02 07:59:37,272 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 07:59:37,272 - package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)
2024-05-02 07:59:37,272 - package.my_operation - INFO - device.name in for loop: a1aaa01-clients (my_operation.py:334)
2024-05-02 07:59:37,272 - rest.connector.libs.bigip.implementation - INFO - Sending GET to 'a1aaa01-clients': https://10.10.10.10:443/mgmt/tm/sys/version (implementation.py:309)
...
When I use multiprocessing as detailed previously :
...
2024-05-02 08:02:45,315 - rest.connector.libs.bigip.implementation - INFO - Connected successfully to 'a1aaa05' (implementation.py:217)
self.devices: %s [<package.device.DeviceF5 object at 0x7f643d9dfdf0>, <package.device.DeviceF5 object at 0x7f643d9dfe80>]
2024-05-02 08:02:45,315 - package.my_operation - INFO - Name of operation: test_f5 (my_operation.py:324)
2024-05-02 08:02:45,315 - package.my_operation - INFO - One device name is: a1aaa01-clients (my_operation.py:326)
2024-05-02 08:02:45,316 - package.my_operation - INFO - One device name is: a1aaa05 (my_operation.py:326)
2024-05-02 08:02:45,336 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
2024-05-02 08:02:45,377 - genie.conf.base.api - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None) (reduction.py:51)
Traceback (most recent call last):
File "main.py", line 74, in <module>
operation.run()
File "/package/path/my_operation.py", line 252, in run
self.runChecks()
File "/package/path/my_operation.py", line 330, in runChecks
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "src/genie/conf/base/api.py", line 581, in genie.conf.base.api.CleanAPI.__getattr__
File "/package/path/.venv/lib/python3.8/site-packages/genie/libs/clean/utils.py", line 268, in get_clean_function
raise Exception(f"The clean stage '{name}' does not exist in the json "
Exception: The clean stage '' does not exist in the json file
According to these logs, it seems like the method i'm trying to pass in multiprocessing (self.runChecksDevice) is not even running its first line that should state a line similar to the one in the for loop:
def runChecksDevice(self, device):
logger.info(f"Starting makeChecks for device name {device.name}")
device.makeChecks(self.phase,self.name)
So should log:
package.my_operation - INFO - Starting makeChecks for device a1aaa01-clients (my_operation.py:333)
Could you please share the code files.so that I can debug it?
Could you please share the code files.so that I can debug it?
please share the code files to debug the issue
please share the code files to debug the issue
I'll post an update on next monday/thursday, i'm currently not available I think i'll do a minimal example of my case because if I share all the current code it will increase the complexity
Kindly share the entire code file to debug the issue
I managed to do a minimal version which has the same exact output when trying to use multiprocessing. There are 4 python files. Here is the code (a lot of parts of it are not even linked to the issue but at least you have everything as expected):
my_main.py :
from package.my_operation import MyOperation
import logging
import sys
import os
from datetime import datetime
stream_handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(module)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
logging.getLogger().addHandler(stream_handler)
logger = logging.getLogger(__name__)
logging.getLogger().setLevel(logging.DEBUG)
operation = MyOperation()
operation.initiate()
input_choice=''
while input_choice not in ['1','2','3','4']:
input_choice = input('Which operation do you want to perform? \n1) Pre Checks \n2) Post Checks \n3) Diff \n4) Exit the CLI \nChoice (type number): ').lower()
datetime_start = datetime.now()
if input_choice == '1':
operation.phase = "preChecks"
operation.loadTestbed()
elif input_choice == '2':
operation.phase = "postChecks"
operation.loadTestbed()
elif input_choice == '3':
operation.phase = "Diff"
operation.run()
my_operation.py
from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
from package.utils.config_loader import config_loader
import pathlib
from package import genie_digger
import logging
import multiprocessing
logger = logging.getLogger(__name__)
class MyOperation:
def __init__(self):
self.name = ''
self.username = ''
self.devices:list[MyDevice] = []
self.testbed = ''
self.root_path = pathlib.Path(__file__).resolve().parents[1]
self.operations_path = f"{self.root_path}/output/operations"
self.testbeds_path = f"{self.root_path}/output/testbeds"
self.phase = None
def initiate(self):
with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
self.testbed = yaml.safe_load(testbed_file)
self.name = self.testbed['testbed']['name']
self.username = self.testbed['testbed']['credentials']['default']['username']
for device in self.testbed['devices'].keys():
if self.testbed['devices'][device]['os'] == 'bigip':
self.devices.append(MyDeviceF5(device,self.operations_path))
def run(self):
logger.info("Starting run('%s')",self.phase)
if not self.phase:
raise Exception("No phase defined")
self.buildTree()
if self.phase=="Diff":
self.runDiff()
else:
self.runChecks()
def buildTree(self):
logger.info("Starting buildTree('%s')",self.phase)
if not os.path.exists(f'{self.operations_path}/{self.name}'):
os.makedirs(f'{self.operations_path}/{self.name}')
for device in self.devices:
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')
def loadTestbed(self):
try:
self.testbed = load(self.testbed)
except:
print('host is not resolved')
def runChecks(self):
for device in self.devices:
logger.info("name of device: %s",device.name)
logger.info("self.testbed:\n%s",self.testbed)
device.raiseSessionLimit(self.testbed , device.name)
device.myConnect(self.testbed , device.name)
if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):
if device.os not in ['bigip','fortios']:
device.getRunningConfig()
device.configAnalyzer(device.mapping_features , device.os)
print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
for model in device.check_list:
print(f'- {model}')
not_checked_list = list(set(genie_digger.getFeatureList(device.os)) - set(device.check_list))
print(f'the following models have been ignored')
for model in not_checked_list:
print(f'- {model}')
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
device.configAnalyzer()
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
print("self.devices: %s", str(self.devices))
logger.info(f"Name of operation: {self.name}")
for device in self.devices:
logger.info(f"One device name is: {device.name}")
# with multiprocessing.Pool(2) as pool:
# pool.map(self.runChecksDevice, self.devices)
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
for device in self.devices:
logger.info(f"Starting makeChecks for device {device.name}")
logger.info(f"device.name in for loop: {device.name}")
# device.makeChecks(self.phase,self.name)
device.restoreSessionLimit()
device.pyats_device.disconnect()
logger.info(f"Finished makeChecks and disconnected from device {device.name}")
def runChecksDevice(self, device):
logger.info(f"Starting makeChecks for device name {device.name}")
device.makeChecks(self.phase,self.name)
logger.info(f"Finished makeChecks and disconnected from device {device.name}")
def runDiff(self):
logging.debug("Running findDiff")
for device in self.devices:
logging.debug("runDiff on device %s", str(device.name))
#Looping the list of uris
if (device.check_list == []):
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
logging.debug("device.checklist was empty, filled with check_list.json in device folder")
logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
for feature in device.check_list:
logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
if os.path.isfile(f"{pre_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
diff = f_pre.read()
elif os.path.isfile(f"{post_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_post:
diff = f_post.read()
else:
try:
with open(f'{pre_checks_path}.json' , 'r') as f_pre:
lines_pre = json.load(f_pre)
with open(f'{post_checks_path}.json' , 'r') as f_post:
lines_post = json.load(f_post)
except FileNotFoundError:
logger.error('file not found')
continue
logging.info("Opened the f_pre and f_post, now running Diff")
diff = Diff(lines_pre, lines_post)
logging.info("Running findDiff")
diff.findDiff()
if len(str(diff)) > 0 :
with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
f_diff.write(str(diff))
logging.info("Finished to write diff")
my_device.py
from abc import ABC , abstractmethod
from pyats.topology.loader import load
class MyDevice(ABC):
def __init__(self,name,operations_path):
self.name=name
self.operations_path = operations_path
@abstractmethod
def makeChecks(self,phase,test_name):
pass
def myConnect(self,testbed,hostname):
pass
def raiseSessionLimit(self,testbed,hostname):
pass
def restoreSessionLimit(self):
pass
my_device_F5.py
from package.my_device import MyDevice
import unicon
import json
from pyats.topology.loader import load
import icontrol
import logging
import traceback
logger = logging.getLogger(__name__)
class MyDeviceF5(MyDevice):
# Declare a list of uris which start by /mgmt/tm/sys
uri_sys=[
'/mgmt/tm/sys/version',
'/mgmt/tm/sys/hardware',
'/mgmt/tm/sys/clock',
'/mgmt/tm/sys/cpu',
'/mgmt/tm/sys/license',
'/mgmt/tm/sys/provision',
'/mgmt/tm/sys/performance/connections/stats',
'/mgmt/tm/sys/mcp-state/stats',
'/mgmt/tm/sys/service/stats'
]
# Declare a list of uris which start by /mgmt/tm/net
uri_net=[
'/mgmt/tm/net/interface',
'/mgmt/tm/net/trunk',
'/mgmt/tm/net/vlan',
'/mgmt/tm/net/self',
'/mgmt/tm/net/arp',
'/mgmt/tm/net/fdb',
'/mgmt/tm/net/route',
'/mgmt/tm/net/routing',
'/mgmt/tm/net/interface/stats?$select=counters.dropsAll,counters.errorsAll,tmName,mediaActive,status',
'/mgmt/tm/net/route-domain',
'/mgmt/tm/net/routing/bgp'
]
# Declare a list of uris which start by /mgmt/tm/gtm
uri_gtm=[
'/mgmt/tm/gtm/datacenter',
'/mgmt/tm/gtm/server/',
'/mgmt/tm/gtm/wideip/a',
'/mgmt/tm/gtm/wideip/aaaa',
'/mgmt/tm/gtm/pool/a',
'/mgmt/tm/gtm/pool/aaaa',
'/mgmt/tm/gtm/prober-pool',
'/mgmt/tm/gtm/monitor',
'/mgmt/tm/gtm/sync-status',
'/mgmt/tm/gtm/iquery?$select=serverType,serverName,connectState',
'/mgmt/tm/gtm/pool/a/stats?$select=status.availabilityState,status.enabledState,tmName',
'/mgmt/tm/gtm/server/stats?$select=tmName,status.enabledState,status.availabilityState',
'/mgmt/tm/gtm/datacenter/stats?$select=dcName,status.enabledState,status.availabilityState'
]
# Declare a list of uris which start by /mgmt/tm/ltm
uri_ltm=[
'/mgmt/tm/ltm/virtual-address',
'/mgmt/tm/ltm/virtual-address/stats?$select=tmName.description,status.availabilityState,status.enabledState,status.statusReason',
'/mgmt/tm/ltm/virtual',
'/mgmt/tm/ltm/virtual/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
'/mgmt/tm/ltm/snatpool',
'/mgmt/tm/ltm/snat-translation',
'/mgmt/tm/ltm/snat',
'/mgmt/tm/ltm/rule',
'/mgmt/tm/ltm/pool',
'/mgmt/tm/ltm/pool/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
'/mgmt/tm/ltm/policy',
'/mgmt/tm/ltm/node',
'/mgmt/tm/ltm/node/stats?$select=tmName,status.availabilityState,status.enabledState,status.statusReason',
'/mgmt/tm/ltm/monitor',
'/mgmt/tm/ltm/nat',
'/mgmt/tm/ltm/persistence'
]
def __init__(self,name,operations_path,protocol='https',port='443'):
super().__init__(name,operations_path)
self.protocol = protocol
self.port = port
self.os = 'bigip'
self.running_config=''
self.check_list=[]
self.pyats_device = None
def makeChecks(self,phase,test_name):
for uri in self.check_list:
file_extension = "json"
try:
output = self.pyats_device.rest.get( api_url=uri , verbose=True ).text
output = json.loads(output)
output = self.format_json(output)
output = json.dumps(output , indent=4)
except icontrol.exceptions.iControlUnexpectedHTTPError:
logger.error(uri +' does not exist')
output = traceback.format_exc()
file_extension = "txt"
try :
with open(f'{self.operations_path}/{test_name}/{self.name}/{phase}/{(uri.replace("/", "_")).split("?" , )[0]}.{file_extension}' , 'w') as output_file:
output_file.write(output)
except Exception as e:
logger.error(f'learning {uri} failed!! Exception following lines:\n{e}')
def myConnect(self,testbed,hostname):
logger.info("testbed in:\n%s",testbed)
device=testbed.devices[hostname]
try:
device.connect( via='rest' , alias='rest' )
device.rest.connected
self.pyats_device = device
except unicon.core.errors.ConnectionError as e:
logger.error("-- ERROR --")
logger.error(f" The error following line:\n{e}")
logger.error(f" Can't connect to {device.alias}")
device.disconnect()
return
except RuntimeError:
device.connect( connection_timeout=15, learn_hostname=True , log_stdout=True )
self.pyats_device = device
def configAnalyzer(self):
output_ltm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/ltm' , verbose=True ).text
output_gtm = self.pyats_device.rest.get( api_url='/mgmt/tm/sys/provision/gtm' , verbose=True ).text
if '"level":"nominal"' in output_ltm :
is_ltm = True
else:
is_ltm = False
if '"level":"nominal"' in output_gtm :
is_dns = True
else:
is_dns = False
if is_dns and not is_ltm:
self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_gtm
elif is_ltm and not is_dns:
self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm
elif is_dns and is_ltm:
self.check_list = MyDeviceF5.uri_sys + MyDeviceF5.uri_net + MyDeviceF5.uri_ltm + MyDeviceF5.uri_gtm
def format_json(self, json_dict):
if ("items" in json_dict):
for item in json_dict["items"]:
if ("fullPath" in item):
# item_copy = copy.deepcopy(item)
new_item = {"partition_" + item["fullPath"] : item}
index = json_dict["items"].index(item)
if index in json_dict["items"]:
logger.warning("Overwritting data in format_json on key %s in items", index)
json_dict["items"][index] = new_item
return json_dict
Thank you for sharing. Kindly give me some time to debug the issue.
Can you share the working code files as well so that we can debug the issue?
Can you share the working code files as well so that we can debug the issue?
Just so you know I am aware of your comments and requests when you post them. I have other things to manage in parallel, sorry for each delay.
I realized that the error is different while using 24.4 (I was in 24.3 while having the error posted earlier: "clean stage '' does not exist in the json file") In 24.4 I have the following one:
024-05-16 12:44:21,106 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
2024-05-16 12:44:21,329 - genie.conf.base.api - reduction - DEBUG - Retrieving stage '__getstate__' for a1aaa01-clients (OS:bigip, PLATFORM:None)
Traceback (most recent call last):
File "my_main.py", line 31, in <module>
operation.run()
File "/package/path/package/my_operation.py", line 48, in run
self.runChecks()
File "/package/path/package/my_operation.py", line 109, in runChecks
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
The only difference for a working code is in my_operation.py. I made a little update on this file to skip a dependancy which is not needed to test it on your side so here are the two versions of this script:
- Script my_operation.py using multiprocessing only for device.makeChecks method (not working):
from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
import pathlib
import logging
import multiprocessing
from datetime import datetime
logger = logging.getLogger(__name__)
class MyOperation:
def __init__(self):
self.name = ''
self.username = ''
self.devices:list[MyDevice] = []
self.testbed = ''
self.root_path = pathlib.Path(__file__).resolve().parents[1]
self.operations_path = f"{self.root_path}/output/operations"
self.testbeds_path = f"{self.root_path}/output/testbeds"
self.phase = None
def initiate(self):
with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
self.testbed = yaml.safe_load(testbed_file)
self.name = self.testbed['testbed']['name']
self.username = self.testbed['testbed']['credentials']['default']['username']
for device in self.testbed['devices'].keys():
if self.testbed['devices'][device]['os'] == 'bigip':
self.devices.append(MyDeviceF5(device,self.operations_path))
def run(self):
datetime_start = datetime.now()
logger.info("Starting run('%s')",self.phase)
if not self.phase:
raise Exception("No phase defined")
self.buildTree()
if self.phase=="Diff":
self.runDiff()
else:
self.runChecks()
datetime_end = datetime.now()
diff = datetime_end - datetime_start
logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))
def buildTree(self):
logger.info("Starting buildTree('%s')",self.phase)
if not os.path.exists(f'{self.operations_path}/{self.name}'):
os.makedirs(f'{self.operations_path}/{self.name}')
for device in self.devices:
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')
def loadTestbed(self):
try:
self.testbed = load(self.testbed)
except:
print('host is not resolved')
def runChecks(self):
for device in self.devices:
logger.info("name of device: %s",device.name)
logger.info("self.testbed:\n%s",self.testbed)
device.raiseSessionLimit(self.testbed , device.name)
device.myConnect(self.testbed , device.name)
if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):
if device.os not in ['bigip','fortios']:
device.getRunningConfig()
device.configAnalyzer(device.mapping_features , device.os)
print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
for model in device.check_list:
print(f'- {model}')
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
device.configAnalyzer()
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
print("self.devices: %s", str(self.devices))
logger.info(f"Name of operation: {self.name}")
for device in self.devices:
logger.info(f"One device name is: {device.name}")
# with multiprocessing.Pool(2) as pool:
# pool.map(self.runChecksDevice, self.devices)
multiprocessing.Pool().map(self.runChecksDevice, self.devices)
for device in self.devices:
logger.info(f"Starting makeChecks for device {device.name}")
logger.info(f"device.name in for loop: {device.name}")
# device.makeChecks(self.phase,self.name)
device.restoreSessionLimit()
device.pyats_device.disconnect()
logger.info(f"Finished makeChecks and disconnected from device {device.name}")
def runChecksDevice(self, device:MyDevice):
logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
device.makeChecks(self.phase,self.name)
device.restoreSessionLimit()
device.pyats_device.disconnect()
logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")
def runDiff(self):
logging.debug("Running findDiff")
for device in self.devices:
logging.debug("runDiff on device %s", str(device.name))
#Looping the list of uris
if (device.check_list == []):
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
logging.debug("device.checklist was empty, filled with check_list.json in device folder")
logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
for feature in device.check_list:
logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
if os.path.isfile(f"{pre_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
diff = f_pre.read()
elif os.path.isfile(f"{post_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_post:
diff = f_post.read()
else:
try:
with open(f'{pre_checks_path}.json' , 'r') as f_pre:
lines_pre = json.load(f_pre)
with open(f'{post_checks_path}.json' , 'r') as f_post:
lines_post = json.load(f_post)
except FileNotFoundError:
logger.error('file not found')
continue
logging.info("Opened the f_pre and f_post, now running Diff")
diff = Diff(lines_pre, lines_post)
logging.info("Running findDiff")
diff.findDiff()
if len(str(diff)) > 0 :
with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
f_diff.write(str(diff))
logging.info("Finished to write diff")
- Script my_operation.py using a for loop only (working):
from genie.utils.diff import Diff
import json
import os
import yaml
from pyats.topology.loader import load
from package.my_device import MyDevice
from package.my_device_f5 import MyDeviceF5
import pathlib
import logging
import multiprocessing
from datetime import datetime
logger = logging.getLogger(__name__)
class MyOperation:
def __init__(self):
self.name = ''
self.username = ''
self.devices:list[MyDevice] = []
self.testbed = ''
self.root_path = pathlib.Path(__file__).resolve().parents[1]
self.operations_path = f"{self.root_path}/output/operations"
self.testbeds_path = f"{self.root_path}/output/testbeds"
self.phase = None
def initiate(self):
with open( f'{self.testbeds_path}/testbed.yaml') as testbed_file:
self.testbed = yaml.safe_load(testbed_file)
self.name = self.testbed['testbed']['name']
self.username = self.testbed['testbed']['credentials']['default']['username']
for device in self.testbed['devices'].keys():
if self.testbed['devices'][device]['os'] == 'bigip':
self.devices.append(MyDeviceF5(device,self.operations_path))
def run(self):
datetime_start = datetime.now()
logger.info("Starting run('%s')",self.phase)
if not self.phase:
raise Exception("No phase defined")
self.buildTree()
if self.phase=="Diff":
self.runDiff()
else:
self.runChecks()
datetime_end = datetime.now()
diff = datetime_end - datetime_start
logger.info("Duration: %s (in hours)", (diff.total_seconds()/3600))
def buildTree(self):
logger.info("Starting buildTree('%s')",self.phase)
if not os.path.exists(f'{self.operations_path}/{self.name}'):
os.makedirs(f'{self.operations_path}/{self.name}')
for device in self.devices:
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}')
if not os.path.exists(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}'):
os.makedirs(f'{self.operations_path}/{self.name}/{device.name}/{self.phase}')
def loadTestbed(self):
try:
self.testbed = load(self.testbed)
except:
print('host is not resolved')
def runChecks(self):
for device in self.devices:
logger.info("name of device: %s",device.name)
logger.info("self.testbed:\n%s",self.testbed)
device.raiseSessionLimit(self.testbed , device.name)
device.myConnect(self.testbed , device.name)
if not os.path.isfile(f'{self.operations_path}/{self.name}/{device.name}/check_list.json'):
if device.os not in ['bigip','fortios']:
device.getRunningConfig()
device.configAnalyzer(device.mapping_features , device.os)
print(f'\nbased on {device.name} running-config, I deduced I gotta check the following fields:')
for model in device.check_list:
print(f'- {model}')
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
device.configAnalyzer()
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'w') as check_list_file:
check_list_file.write(str(json.dumps(device.check_list, indent=4)))
else:
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
print("self.devices: %s", str(self.devices))
logger.info(f"Name of operation: {self.name}")
for device in self.devices:
logger.info(f"One device name is: {device.name}")
# with multiprocessing.Pool(2) as pool:
# pool.map(self.runChecksDevice, self.devices)
# multiprocessing.Pool().map(self.runChecksDevice, self.devices)
for device in self.devices:
logger.info(f"Starting makeChecks for device {device.name}")
logger.info(f"device.name in for loop: {device.name}")
device.makeChecks(self.phase,self.name)
device.restoreSessionLimit()
device.pyats_device.disconnect()
logger.info(f"Finished makeChecks and disconnected from device {device.name}")
# def runChecksDevice(self, device:MyDevice):
# logger.info(f"Starting makeChecks for device name {device.name} in multiprocessing")
# device.makeChecks(self.phase,self.name)
# device.restoreSessionLimit()
# device.pyats_device.disconnect()
# logger.info(f"Finished makeChecks and disconnected from device {device.name} in multiprocessing")
def runDiff(self):
logging.debug("Running findDiff")
for device in self.devices:
logging.debug("runDiff on device %s", str(device.name))
#Looping the list of uris
if (device.check_list == []):
with open( f'{self.operations_path}/{self.name}/{device.name}/check_list.json' , 'r') as check_list_file:
device.check_list = json.load(check_list_file)
logging.debug("device.checklist was empty, filled with check_list.json in device folder")
logging.debug("runDiff on device %s on all following features : %s", str(device.name), str(device.check_list))
for feature in device.check_list:
logging.debug("runDiff on device %s on feature %s", str(device.name), str(feature))
pre_checks_path = f'{self.operations_path}/{self.name}/{device.name}/preChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
post_checks_path = f'{self.operations_path}/{self.name}/{device.name}/postChecks/{(feature.replace("/", "_")).split("?" , )[0]}'
if os.path.isfile(f"{pre_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in pre checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_pre:
diff = f_pre.read()
elif os.path.isfile(f"{post_checks_path}.txt"):
logging.debug("For device %s, feature %s led to an error in post checks phase", str(device.name), str(feature))
with open(f'{pre_checks_path}.txt' , 'r') as f_post:
diff = f_post.read()
else:
try:
with open(f'{pre_checks_path}.json' , 'r') as f_pre:
lines_pre = json.load(f_pre)
with open(f'{post_checks_path}.json' , 'r') as f_post:
lines_post = json.load(f_post)
except FileNotFoundError:
logger.error('file not found')
continue
logging.info("Opened the f_pre and f_post, now running Diff")
diff = Diff(lines_pre, lines_post)
logging.info("Running findDiff")
diff.findDiff()
if len(str(diff)) > 0 :
with open(f'{self.operations_path}/{self.name}/{device.name}/Diff/{(feature.replace("/", "_")).split("?" , )[0]}.diff' , 'w') as f_diff:
f_diff.write(str(diff))
logging.info("Finished to write diff")
You can just copy that in my_operation.py to try the (not) working version but you can see that the only difference in fact is at the end of runChecks method, I just commented different lines.
About the expected structure of files being used:
- my_main.py
-
package
- my_device.py
- my_device_f5.py
- my_operation.py
-
output
-
testbeds
- testbed.yaml
-
testbeds
I already sent you an example of testbed i'm using (just need to replace what has to be replaced such as hostnames and username).
Okay, will check and let you know
The error is not coming from the pyats. It is coming from your code. Kindly fix it. For your reference, kindly check this link: https://stackoverflow.com/questions/71945399/python-3-8-multiprocessing-typeerror-cannot-pickle-weakref-object
I've seen this page during my own reasearch, but I never specify in my code that I use multiprocessing in "spawn" mode contrary to the examples here.
To verify that by default it's not spawn mode too, I added the following log in my my_main.py
script:
logger.warning("start_method: "+ str(multiprocessing.get_start_method()))
The result:
2024-05-22 14:53:35,999 - __main__ - my_main - WARNING - start_method: fork
So I'm apparently in "fork" mode, which is not addressed in this stackoverflow issue.
I'm in python 3.8.10, pyats 24.4 during this log.
The issue might be with the objects in the self.devices list. They might contain attributes that cannot be pickled check the pickle-ability of your MyDevice objects If your MyDevice class contains complex objects, try to simplify it. For instance, if it has a connection attribute, you might want to remove it or replace it with a simpler representation before passing it to the multiprocessing pool. Modify the runChecks method to handle multiprocessing correctly:
I will try to see if I can get rid of complex objects. But do you know why the error is not the same in 24.3 ? ( "clean stage '' does not exist in the json file")
Hi, In version 24.3, we seen some bugs, but we have fixed them in version 24.4. That's the reason why you haven't seen the same issue again.
Have you fixed the issue and tried again? Can you give an update?
Hi, I didn't find a way for the moment on the time I spend on it.
Do you think it could be related to the fact that the method i'm calling in multiprocessing is a class method ? So I need to call it by doing something like "multiprocessing.Pool().map(self.runChecksDevice, self.devices)" (with self.) to be able to use class variables.
I haven't found examples of this in the documentation, even though I think it's a common case.
As a test I replaced the runChecks method of my script my_operation.py by the following:
def runChecks(self):
def methodTest1(x:int):
logger.info("x² is: %s", x**2)
multiprocessing.Pool().map(methodTest1, range(10))
And I have the following error logs:
Traceback (most recent call last):
File "my_main.py", line 42, in <module>
operation.run()
File "/package/path/package/my_operation.py", line 48, in run
self.runChecks()
File "/package/path/package/my_operation.py", line 112, in runChecks
multiprocessing.Pool().map(methodTest1, range(10))
File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/usr/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
put(task)
File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'MyOperation.runChecks.<locals>.methodTest1'
But if I add global methodTest1
it's working...
So apparently it needs to be top level to be adressed in multiprocessing, so maybe that could explain why it's not working in my original case.... It could even be not related to a "complex object"