bacpypes
bacpypes copied to clipboard
[Question] Best practice to read/write points of multiple devices at the same IP address
Hello, I am using bacpypes
(version 0.18.6, Python 3.10.12) to expose readable/writable points of device(s) at a specific IP address, such that the points can be accessed by a building automation system (I am using Niagara N4). I could implement it when there is a single device, but could not make it work when I have multiple devices. I would like to understand the best practice to read/write points of multiple devices at the same IP address. To provide the context, let me provide (1) desired configuration (2) what I did for a single device, and (3) what I did for multiple devices.
Desired configuration
I have a building automation system and multiple simulation models in a single machine or different machines. bacpypes
is used to expose 10~100+ readable/writable points from each simulation model as BACnet points preferably bound to a specific IP address, such that the building automation system can read/write points inside the simulation models (via ethernet cable, if the building automation system and simulation models live in the different machines).
What I did for a single device
A custom LocalDeviceObject
is created and attached to a custom BIPSimpleApplication
, and then the application is bound to an IP address (0.0.0.0
) for test. This implementation worked well, and I could read/write points as expected. Following is a part of my script implementing that.
@bacpypes_debugging
class CustomBACnetApplication(BIPSimpleApplication,
ReadWritePropertyMultipleServices,
DeviceCommunicationControlServices,):
pass
class CustomBACnetDevice(LocalDeviceObject):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._date_time: datetime = None
def ReadProperty(self, propid, arrayIndex=None):
if propid == "localTime" and self._date_time != None:
time = Time(str(self._date_time.time()))
return time.value
if propid == "localDate" and self._date_time != None:
date = Date(str(self._date_time.date()))
return date.value
return super().ReadProperty(propid, arrayIndex)
...
class BACnetBridge():
def __init__(self, host, site_id: SiteID) -> None:
self.device = CustomBACnetDevice(
objectName="Proxy",
objectIdentifier=int(599),
maxApduLengthAccepted=int(1024),
segmentationSupported="segmentedBoth",
vendorIdentifier=555,
vendorName=CharacterString("Vendor"),
modelName=CharacterString("BACnet Bridge"),
systemStatus=DeviceStatus(1),
description=CharacterString("BACpypes (Python) based tool for exposing points"),
firmwareRevision="0.0.0",
applicationSoftwareVersion="0.0.0",
protocolVersion=1,
protocolRevision=0)
self.application = CustomBACnetApplication(self.device, "0.0.0.0")
self.points = {}
...
for point in self.points.values():
self.application.add_object(point)
What I did for multiple devices
Based on my search (https://github.com/JoelBender/bacpypes/issues/159), to scale-up the above use case, I think it is necessary to create a virtual router bound to the same IP address, where multiple BACnet devices are attached to the router. I modified my script based on the sample IP2VLANRouter.py (https://github.com/JoelBender/bacpypes/blob/master/samples/IP2VLANRouter.py) as follows (mostly borrowed from the sample, but I included it in case I missed something):
@bacpypes_debugging
class CustomBACnetApplication(Application,
ReadWritePropertyMultipleServices,
DeviceCommunicationControlServices,
WhoIsIAmServices,):
def __init__(self, vlan_device, vlan_address, aseID=None):
...
# normal initialization
Application.__init__(self, vlan_device, aseID=aseID)
# include a application decoder
self.asap = ApplicationServiceAccessPoint()
# pass the device object to the state machine access point so it can know if it should support segmentation
self.smap = StateMachineAccessPoint(vlan_device)
# the segmentation state machines need access to the same device information cache as the application
self.smap.deviceInfoCache = self.deviceInfoCache
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# bind the top layers
bind(self, self.asap, self.smap, self.nsap)
# create a vlan node at the assigned address
self.vlan_node = Node(vlan_address)
# bind the stack to the node, no network number, no addresss
self.nsap.bind(self.vlan_node)
def request(self, apdu):
if _debug: CustomBACnetApplication._debug("[%s]request %r", self.vlan_node.address, apdu)
Application.request(self, apdu)
def indication(self, apdu):
if _debug: CustomBACnetApplication._debug("[%s]indication %r", self.vlan_node.address, apdu)
Application.indication(self, apdu)
def response(self, apdu):
if _debug: CustomBACnetApplication._debug("[%s]response %r", self.vlan_node.address, apdu)
Application.response(self, apdu)
def confirmation(self, apdu):
if _debug: CustomBACnetApplication._debug("[%s]confirmation %r", self.vlan_node.address, apdu)
Application.confirmation(self, apdu)
class CustomBACnetDevice(LocalDeviceObject):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._date_time: datetime = None
def ReadProperty(self, propid, arrayIndex=None):
if propid == "localTime" and self._date_time != None:
time = Time(str(self._date_time.time()))
return time.value
if propid == "localDate" and self._date_time != None:
date = Date(str(self._date_time.date()))
return date.value
return super().ReadProperty(propid, arrayIndex)
@bacpypes_debugging
@register_object_type(vendor_id=555)
class LocalAnalogValueObject(AnalogValueCmdObject):
def __init__(self, sim_value, **kwargs):
super().__init__(**kwargs)
self._sim_value = sim_value
def ReadProperty(self, propid, arrayIndex=None):
if propid == "presentValue":
return self._sim_value
return super().ReadProperty(propid, arrayIndex)
@bacpypes_debugging
class VLANRouter:
def __init__(self, local_address, local_network):
if _debug: VLANRouter._debug("__init__ %r %r", local_address, local_network)
# a network service access point will be needed
self.nsap = NetworkServiceAccessPoint()
# give the NSAP a generic network layer service element
self.nse = NetworkServiceElement()
bind(self.nse, self.nsap)
# create a BIPSimple, bound to the Annex J server on the UDP multiplexer
#self.bip = BIPSimple(local_address)
# create a BBMD, bound to the Annex J server on the UDP multiplexer
self.bip = BIPBBMD(local_address)
self.annexj = AnnexJCodec()
self.mux = UDPMultiplexer(local_address)
# bind the bottom layers
bind(self.bip, self.annexj, self.mux.annexJ)
# bind the BIP stack to the local network
self.nsap.bind(self.bip, local_network, local_address)
class BACnetBridge:
def __init__(self, host, alias_base: str, num_sites: int, verbose=True) -> None:
...
## Setup virtual router for BACnet devices
# create the VLAN router, bind it to the local network
addr1 = '0.0.0.0'
net1, net2 = 5, 6
self.router = VLANRouter(Address(addr1), net1)
# create a VLAN
self.vlan = Network(broadcast_address=LocalBroadcast())
# create a node for the router, address 1 on the VLAN
router_addr = Address(1)
router_node = Node(router_addr)
self.vlan.add_node(router_node)
# bind the router stack to the vlan network through this node
self.router.nsap.bind(router_node, net2, router_addr)
# send network topology
deferred(self.router.nse.i_am_router_to_network)
## Setup each BACnet device
for alias_each in self.alias:
...
self.device[alias_each] = CustomBACnetDevice(
objectName=self.bacnet_device_name[alias_each],
objectIdentifier=('device', self.device_instance[alias_each]),
maxApduLengthAccepted=int(1024),
segmentationSupported="noSegmentation",
vendorIdentifier=555,
vendorName=CharacterString("Vendor"),
modelName=CharacterString("BACnet Bridge"),
systemStatus=DeviceStatus(1),
description=CharacterString("BACpypes (Python) based tool for exposing points"),
firmwareRevision="0.0.0",
applicationSoftwareVersion="0.0.0",
protocolVersion=1,
protocolRevision=0)
self.vlan_address[alias_each] = Address(10 + self.index_site[alias_each])
self.application[alias_each] = CustomBACnetApplication(vlan_device=self.device[alias_each],
vlan_address=self.vlan_address[alias_each])
self.vlan.add_node(self.application[alias_each].vlan_node)
self.points[alias_each] = {}
self.points_prev[alias_each] = {}
self.points_new[alias_each] = {}
for point in self.points[alias_each].values():
self.application[alias_each].add_object(point)
...
The problems I encountered
First, I got a warning saying - path error (1)
per each device I created (for example, I got 5 warnings when I create 5 devices). I am not sure if this warning is the root cause of the following issue.
Second, I could discover the devices from my building automation system, but could not add the devices to the building automation system (Niagara N4). Also, I cannot see the list of points, cannot read/write the points. I could not read any information of the devices, except for the network number (6), MAC address (10 and 11) and object identifiers (shown as device names). I could read other information like vendor, model, firmware/app version, device name along with the other information
I tried changing the IP address from 0.0.0.0
to 0.0.0.0/24
or 0.0.0.0:47808
, but did not work. I am new to BACnet and building automation system, so I think I might miss some simple things when I implemented it. I appreciate any comments and/or suggestions, and feel free to let me know if there is a better way/example I can refer to implement what I need.
So my issue is similar to the issue here https://github.com/JoelBender/bacpypes/issues/460, as I could discover the devices but their MAC addresses are shown as 1, 2, 3, or 4 instead of an actual IP address. So, I could achieve the goal based on the second solution proposed there: creating a single BACnet device/application and adding all the points from multiple building simulations to the application (in other words, the same implementation as I did for a single device in the initial post).
I think it will work well for 10+ or 100+ building simulations in my case (each building simulation has 100~200 points), but would like to know what would be the difference between this approach and the approach using a virtual router. For example, what is the advantage of using a virtual router to manage multiple devices rather than adding all the points to a single BACnet device/application, assuming we have 100-200 points per device (like virtual router can handle a large number of devices more smoothly)?
If using a virtual router has a significant advantage in terms of scalability, I might include it as a long-term TODO in case I need to use a large number of building simulations at the same time.