python-opcua icon indicating copy to clipboard operation
python-opcua copied to clipboard

Write data to InfluxDB at Data Change Notification

Open dwei98 opened this issue 6 years ago • 15 comments

Hi,

I am new to both InfluxDB and OPC UA. I need to archive the data and save in InfluxDB. But it seems that I cannot call dbclient.write_points(point). The source is as follows. Please advise how to solve this issue. Thx a lot in advance.

import sys sys.path.insert(0, "..") import time import datetime import logging

from opcua import Client from opcua import ua

from influxdb import InfluxDBClient

DB_USER = 'admin' DB_PASSWORD = 'admin' DBNAME = 'Prosys-rt-var-sub' DB_HOST = 'localhost' DB_PORT = '8086'

class SubHandler(object):

"""
Client to subscription. It will receive events from server
"""

def datachange_notification(self, node, val, data):
    print("Python: New data change event", node, val, data)

    timestamp2 = data.monitored_item.Value.SourceTimestamp

    tag_name = node.nodeid

    print (timestamp2, tag_name, val)
    now = datetime.datetime.utcnow()

    timestamp1 = int(timestamp2.strftime("%s")) * 1000
               
    print(timestamp1)
               
    point = {
            "measurement": tag_name,
            "time": timestamp2,
            "fields": {
                "Value": val,
                "Source Timestamp": timestamp1
            }
    }

    dbclient.write_points(point)

if name == "main": #from IPython import embed logging.basicConfig(level=logging.DEBUG) opcclient = Client("opc.tcp://localhost:53530/OPCUA/SimulationServer/")

try:
    node_var = []
    tag_name = []

    opcclient.connect()
    
    node_var.append(opcclient.get_node(ua.NodeId("Counter1", 5)))
    tag_name.append("Counter1")

    node_var.append(opcclient.get_node(ua.NodeId("Expression1", 5)))
    tag_name.append("Expression1")

    node_var.append(opcclient.get_node(ua.NodeId("Random1", 5)))
    tag_name.append("Random1")

    node_var.append(opcclient.get_node(ua.NodeId("Sawtooth1", 5)))
    tag_name.append("Sawtooth1")

    node_var.append(opcclient.get_node(ua.NodeId("Sinusoid1", 5)))
    tag_name.append("Sinusoid1")

    node_var.append(opcclient.get_node(ua.NodeId("Square1", 5)))
    tag_name.append("Square1")

    node_var.append(opcclient.get_node(ua.NodeId("Triangle1", 5)))
    tag_name.append("Triangle1")

    dbclient = InfluxDBClient(DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DBNAME)

    print("Create database: " + DBNAME)
    dbclient.create_database(DBNAME)
    dbclient.switch_database(DBNAME)

    handler = SubHandler()
    sub = opcclient.create_subscription(2000, handler)

Subscribe to OPC UA server - receive notification every 1 second

    for i in range(0, len(node_var)):
        handle = sub.subscribe_data_change(node_var[i])
    time.sleep(15)

    #embed()
    sub.unsubscribe(handle)
    sub.delete()

finally:
    opcclient.disconnect()

dwei98 avatar Feb 16 '19 21:02 dwei98

The datachange_notification is called from the network thread. Don't call functions there that block or it will stop your OPC UA connection.

You should only store the subscription data in the call back, then at some other time call your DB operation on the main thread.

Also your code only runs for 15 seconds? You don't really need subscriptions if that is how you want your code to run. You could just loop over your nodes and call node.get_value() (this is much slower than a subscription however). After you have the values you can write them to DB.

zerox1212 avatar Feb 17 '19 05:02 zerox1212

But that code looks fine, as far as I could see. Saving to DB inside callback is ok. This is what we do in the event history stuff. What is the stack trace?

oroulet avatar Feb 17 '19 06:02 oroulet

Zerox1212:

I need to run the code for a shift, like 8~9 hours. This code is only a test. I need to use subscription. Any suggestion?

Many thanks in advance

Dong

dwei98 avatar Feb 17 '19 14:02 dwei98

Hi Oroulet,

Really? Then I really would like to know how you did it. The stack trace is as follows -

ERROR:opcua.common.subscription:Exception calling data change handler Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/opcua/common/subscription.py", line 135, in _call_datachange self._handler.datachange_notification(data.node, item.Value.Value.Value, event_data) File "/home/osboxes/eclipse-workspace/Python_Projects/vSoC-Predvelopment-Test/client_to_prosys_sub.py", line 62, in datachange_notification dbclient.write_points(point) File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 456, in write_points tags=tags, protocol=protocol) File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 506, in _write_points protocol=protocol File "/usr/lib/python2.7/dist-packages/influxdb/client.py", line 292, in write data = make_lines(data, precision).encode('utf-8') File "/usr/lib/python2.7/dist-packages/influxdb/line_protocol.py", line 126, in make_lines point.get('measurement', data.get('measurement')) AttributeError: 'str' object has no attribute 'get'

Many thanks

Dong

dwei98 avatar Feb 17 '19 14:02 dwei98

The crash here says you have a variable'data' that is a string and that you seem to think is a dict. This is not related to opcua

oroulet avatar Feb 17 '19 15:02 oroulet

Oroulet,

I did not have this issue when I run the same code when I moved this piece to main(). So I do not believe this is the issue.

dwei98 avatar Feb 17 '19 18:02 dwei98

Well programming is not magical. The error above says that data is a string. And python crashes because of that

oroulet avatar Feb 17 '19 18:02 oroulet

I agree, that stack trace not a UA issue. I would of expected you to get a client timeout issue if you did a blocking operation in the callback.

@oroulet history is fine because it's a local SQL file. If his DB call is across a network or his DB is just slow it will crash. I have seen it happen. Only very fast operations can be done in datachange_notification.

zerox1212 avatar Feb 18 '19 05:02 zerox1212

Hi Oroulet,

You are correct. It works now after I moved dbclient.write_points(point) out of datachange_notification.

Thanks a lot

dwei98 avatar Feb 18 '19 16:02 dwei98

Hey dwei98, I don't know if this issue is stil alive but just a comment for other users. If you put the influxdb string in a list like below, your code runs fine. Also, if you have values coming from different nodes, you need to make sure they all have the same type .

Best regards, Tim

point = [ {
        "measurement": tag_name,
        "time": timestamp2,
        "fields": {
            "Value": val,
            "Source Timestamp": timestamp1
        }
} ]

TimHutse avatar Jul 28 '21 11:07 TimHutse

Hi dwei98, I have the same problem, but I can't understand how to run the reading of the value nodes continuously. Did you resolve your problem and how?

LoSpino87 avatar Sep 16 '21 09:09 LoSpino87

there is already a open source project for a opc ua influx logger https://github.com/coussej/node-opcua-logger

AndreasHeine avatar Sep 16 '21 10:09 AndreasHeine

''' https://reference.opcfoundation.org/v104/Core/docs/Part4/5.13.1/ In simple words: A OPC UA Subscription works like a "Mailbox" which gets emptied in a defined interval by the Client. The OPC UA Client sends a publishrequest, the server takes all notifications since the last publishrequest. and send them in the publishresponse to the client. If there is no Notification in the "Mailbox" the Client will get a "keep-alive" back. The OPC UA Client can add "MonitoredItems" to the Subscription which will generate Notifications. '''

AndreasHeine avatar Sep 16 '21 10:09 AndreasHeine

Great!! Thnak you @AndreasHeine Do you know if there is a python project such as this?

LoSpino87 avatar Sep 16 '21 10:09 LoSpino87

if so i havn't seen it yet

AndreasHeine avatar Sep 16 '21 10:09 AndreasHeine