python-opcua
python-opcua copied to clipboard
Write data to InfluxDB at Data Change Notification
Hi,
I am new to both InfluxDB and OPC UA. I need to archive the data and save in InfluxDB. But it seems that I cannot call dbclient.write_points(point). The source is as follows. Please advise how to solve this issue. Thx a lot in advance.
import sys sys.path.insert(0, "..") import time import datetime import logging
from opcua import Client from opcua import ua
from influxdb import InfluxDBClient
DB_USER = 'admin' DB_PASSWORD = 'admin' DBNAME = 'Prosys-rt-var-sub' DB_HOST = 'localhost' DB_PORT = '8086'
class SubHandler(object):
"""
Client to subscription. It will receive events from server
"""
def datachange_notification(self, node, val, data):
print("Python: New data change event", node, val, data)
timestamp2 = data.monitored_item.Value.SourceTimestamp
tag_name = node.nodeid
print (timestamp2, tag_name, val)
now = datetime.datetime.utcnow()
timestamp1 = int(timestamp2.strftime("%s")) * 1000
print(timestamp1)
point = {
"measurement": tag_name,
"time": timestamp2,
"fields": {
"Value": val,
"Source Timestamp": timestamp1
}
}
dbclient.write_points(point)
if name == "main": #from IPython import embed logging.basicConfig(level=logging.DEBUG) opcclient = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")
try:
node_var = []
tag_name = []
opcclient.connect()
node_var.append(opcclient.get_node(ua.NodeId("Counter1", 5)))
tag_name.append("Counter1")
node_var.append(opcclient.get_node(ua.NodeId("Expression1", 5)))
tag_name.append("Expression1")
node_var.append(opcclient.get_node(ua.NodeId("Random1", 5)))
tag_name.append("Random1")
node_var.append(opcclient.get_node(ua.NodeId("Sawtooth1", 5)))
tag_name.append("Sawtooth1")
node_var.append(opcclient.get_node(ua.NodeId("Sinusoid1", 5)))
tag_name.append("Sinusoid1")
node_var.append(opcclient.get_node(ua.NodeId("Square1", 5)))
tag_name.append("Square1")
node_var.append(opcclient.get_node(ua.NodeId("Triangle1", 5)))
tag_name.append("Triangle1")
dbclient = InfluxDBClient(DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DBNAME)
print("Create database: " + DBNAME)
dbclient.create_database(DBNAME)
dbclient.switch_database(DBNAME)
handler = SubHandler()
sub = opcclient.create_subscription(2000, handler)
Subscribe to OPC UA server - receive notification every 1 second
for i in range(0, len(node_var)):
handle = sub.subscribe_data_change(node_var[i])
time.sleep(15)
#embed()
sub.unsubscribe(handle)
sub.delete()
finally:
opcclient.disconnect()
The datachange_notification
is called from the network thread. Don't call functions there that block or it will stop your OPC UA connection.
You should only store the subscription data in the call back, then at some other time call your DB operation on the main thread.
Also your code only runs for 15 seconds? You don't really need subscriptions if that is how you want your code to run. You could just loop over your nodes and call node.get_value()
(this is much slower than a subscription however). After you have the values you can write them to DB.
But that code looks fine, as far as I could see. Saving to DB inside callback is ok. This is what we do in the event history stuff. What is the stack trace?
Zerox1212:
I need to run the code for a shift, like 8~9 hours. This code is only a test. I need to use subscription. Any suggestion?
Many thanks in advance
Dong
Hi Oroulet,
Really? Then I really would like to know how you did it. The stack trace is as follows -
ERROR:opcua.common.subscription:Exception calling data change handler Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/opcua/common/subscription.py", line 135, in _call_datachange self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data) File "/home/osboxes/eclipse-workspace/Python_Projects/vSoC-Predvelopment-Test/client_to_prosys_sub.py", line 62, in datachange_notification dbclient.write_points(point) File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 456, in write_points tags=tags, protocol=protocol) File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 506, in _write_points protocol=protocol File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 292, in write data = make_lines(data, precision).encode('utf-8') File "/usr/lib/python2.7/dist-packages/influxdb/line_protocol.py", line 126, in make_lines point.get('measurement', data.get('measurement')) AttributeError: 'str' object has no attribute 'get'
Many thanks
Dong
The crash here says you have a variable'data' that is a string and that you seem to think is a dict. This is not related to opcua
Oroulet,
I did not have this issue when I run the same code when I moved this piece to main(). So I do not believe this is the issue.
Well programming is not magical. The error above says that data is a string. And python crashes because of that
I agree, that stack trace not a UA issue. I would of expected you to get a client timeout issue if you did a blocking operation in the callback.
@oroulet history is fine because it's a local SQL file. If his DB call is across a network or his DB is just slow it will crash. I have seen it happen. Only very fast operations can be done in datachange_notification
.
Hi Oroulet,
You are correct. It works now after I moved dbclient.write_points(point) out of datachange_notification.
Thanks a lot
Hey dwei98, I don't know if this issue is stil alive but just a comment for other users. If you put the influxdb string in a list like below, your code runs fine. Also, if you have values coming from different nodes, you need to make sure they all have the same type .
Best regards, Tim
point = [ {
"measurement": tag_name,
"time": timestamp2,
"fields": {
"Value": val,
"Source Timestamp": timestamp1
}
} ]
Hi dwei98, I have the same problem, but I can't understand how to run the reading of the value nodes continuously. Did you resolve your problem and how?
there is already a open source project for a opc ua influx logger https://github.com/coussej/node-opcua-logger
''' https://reference.opcfoundation.org/v104/Core/docs/Part4/5.13.1/ In simple words: A OPC UA Subscription works like a "Mailbox" which gets emptied in a defined interval by the Client. The OPC UA Client sends a publishrequest, the server takes all notifications since the last publishrequest. and send them in the publishresponse to the client. If there is no Notification in the "Mailbox" the Client will get a "keep-alive" back. The OPC UA Client can add "MonitoredItems" to the Subscription which will generate Notifications. '''
Great!! Thnak you @AndreasHeine Do you know if there is a python project such as this?
if so i havn't seen it yet