分享

OPCUA-Python

 车厘子V 2019-03-13

pip install opcua

Server

from threading import Thread

import copy

import logging

from datetime import datetime

import time

from math import sin

import sys

sys.path.insert(0, "..")

try:

    from IPython import embed

except ImportError:

    import code

    def embed():

        myvars = globals()

        myvars.update(locals())

        shell = code.InteractiveConsole(myvars)

        shell.interact()

from opcua import ua, uamethod, Server

class SubHandler(object):

    """

    Subscription Handler. To receive events from server for a subscription

    """

    def datachange_notification(self, node, val, data):

        print("Python: New data change event", node, val)

    def event_notification(self, event):

        print("Python: New event", event)

# method to be exposed through server

def func(parent, variant):

    ret = False

    if variant.Value % 2 == 0:

        ret = True

    return [ua.Variant(ret, ua.VariantType.Boolean)]

# method to be exposed through server

# uses a decorator to automatically convert to and from variants

@uamethod

def multiply(parent, x, y):

    print("multiply method call with parameters: ", x, y)

    return x * y

class VarUpdater(Thread):

    def __init__(self, var):

        Thread.__init__(self)

        self._stopev = False

        self.var = var

    def stop(self):

        self._stopev = True

    def run(self):

        while not self._stopev:

            v = sin(time.time() / 10)

            self.var.set_value(v)

            time.sleep(0.1)

if __name__ == "__main__":

    # optional: setup logging

    logging.basicConfig(level=logging.WARN)

    #logger = logging.getLogger("opcua.address_space")

    # logger.setLevel(logging.DEBUG)

    #logger = logging.getLogger("opcua.internal_server")

    # logger.setLevel(logging.DEBUG)

    #logger = logging.getLogger("opcua.binary_server_asyncio")

    # logger.setLevel(logging.DEBUG)

    #logger = logging.getLogger("opcua.uaprocessor")

    # logger.setLevel(logging.DEBUG)

    # now setup our server

    server = Server()

    #server.disable_clock()

    #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")

    server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")

    server.set_server_name("FreeOpcUa Example Server")

    # set all possible endpoint policies for clients to connect through

    server.set_security_policy([

        ua.SecurityPolicyType.NoSecurity,

        # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,

        # ua.SecurityPolicyType.Basic256Sha256_Sign

    ])

    # setup our own namespace

    uri = "http://examples.freeopcua."

    idx = server.register_namespace(uri)

    # create a new node type we can instantiate in our address space

    dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")

    dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)

    dev.add_property(0, "device_id", "0340").set_modelling_rule(True)

    ctrl = dev.add_object(0, "controller")

    ctrl.set_modelling_rule(True)

    ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)

    # populating our address space

    # First a folder to organise our nodes

    myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")

    # instanciate one instance of our device

    mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)

    mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable 

    # create directly some objects and variables

    myobj = server.nodes.objects.add_object(idx, "MyObject")

    myvar = myobj.add_variable(idx, "MyVariable", 6.7)

    mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)

    myvar.set_writable()    # Set MyVariable to be writable by clients

    mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")

    mystringvar.set_writable()    # Set MyVariable to be writable by clients

    mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())

    mydtvar.set_writable()    # Set MyVariable to be writable by clients

    myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])

    myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))

    myprop = myobj.add_property(idx, "myproperty", "I am a property")

    mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])

    multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])

    # import some nodes from xml

    # server.import_xml("custom_nodes.xml")

    # creating a default event object

    # The event object automatically will have members for all events properties

    # you probably want to create a custom event type, see other examples

    myevgen = server.get_event_generator()

    myevgen.event.Severity = 300

    # starting!

    server.start()

    print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())

    vup = VarUpdater(mysin)  # just  a stupide class update a variable

    vup.start()

    try:

        # enable following if you want to subscribe to nodes on server side

        #handler = SubHandler()

        #sub = server.create_subscription(500, handler)

        #handle = sub.subscribe_data_change(myvar)

        # trigger event, all subscribed clients wil receive it

        var = myarrayvar.get_value()  # return a ref to value in db server side! not a copy!

        var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated

        var.append(9.3)

        myarrayvar.set_value(var)

        mydevice_var.set_value("Running")

        myevgen.trigger(message="This is BaseEvent")

        server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using set_value

        embed()

    finally:

        vup.stop()

        server.stop()


这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。

下图展示了server端的对象结构


客户端

from IPython import embed

from opcua import Client

class SubHandler(object):

    def event_notification(self, event):

        print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)

def main_c():

    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"

    c = Client(url)

    try:

        c.connect()

        root = c.get_root_node()

        embed()

    except Exception as e:

        print("Client Exception:", e)

    finally:

        c.disconnect()

if __name__ == "__main__":

    main_c()


客户端遍历流程:

获取root下的Objects节点的所有子节点,一般类型都为Object,取一代表为A

获取A的NodeClass(一般为Object)和TypeDefinition,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]下的类型对照(自定义类型也在内)。

继续以A为根,遍历子节点,如果时Object继续2,如果是Variable,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"],可以判断是变量(variable,63)还是属性(property,65)。

对于Method使用a_root.call_method(a, arg1)调用。对于Variable使用get_value/get_data_value()获取存储的值。如果是Object继续2.

对于Variable使用access_level获取是否有写权限,如果有set_value可以设置值。

一个比较完美的遍历客户端:

from opcua import Client, ua

def brower_child(root):

    """

    递归调用遍历,格式化不好做,有深度问题

    """

    name = root.get_node_class().name

    # print(name)

    if name == "Object":

        brower_obj(root)

        for c in root.get_children():

            print("  ", end='')

            brower_child(c)

    elif name == 'Variable':

        brower_var(root)

    else:

        brower_method(root)

class CurState():

    def __init__(self, parent=None, p=None, d=0):

        self.parent = parent  # unused

        self.p = p

        self.d = d

def brower_child2(root, max_d=-1, ignore=[]):

    """

    栈+循环遍历,非常好用

    """

    stack = [CurState(None, root, 0)]

    while len(stack):

        cur = stack.pop()

        name = cur.p.get_node_class().name

        print(''.join(['  ' for i in range(cur.d)]), end="")

        if cur.p.get_browse_name().Name in ignore:

            continue

        if name == "Object":

            brower_obj(cur.p)

            if max_d > 0 and cur.d >= max_d:

                continue

            for c in cur.p.get_children():

                stack.append(CurState(cur.p, c, cur.d+1))

        elif name == 'Variable':

            brower_var(cur.p)

        else:

            brower_method(cur.p)

def brower_obj(v):

    # print(v.get_browse_name())

    rw = 'R '

    bname = v.get_browse_name()

    print("*%2d:%-30s (%-2s, %-23s)" %

          (bname.NamespaceIndex, bname.Name, rw, "Object"))

def brower_var(v):

    # print(v.get_browse_name())

    rw = 'R '

    if ua.AccessLevel.CurrentWrite in v.get_access_level():

        rw = "RW"

    bname = v.get_browse_name()

    tv = v.get_data_value().Value

    v_show = tv.Value

    if len(str(v_show)) > 1024:

        v_show = str(v_show[:56]) + "..."

    print("-%2d:%-30s (%-2s, %-23s) =>" %

          (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)

def brower_method(v):

    # print(v.get_description())

    rw = 'C '

    bname = v.get_browse_name()

    # args = []

    # for a in v.get_properties():

    #     dt = a.get_data_type().NodeIdType.name

    #     args.append(dt)

    print("@%2d:%-30s (%-2s, %-23s)" %

          (bname.NamespaceIndex, bname.Name, rw, "Method"))

def main_c():

    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"

    c = Client(url)

    try:

        c.connect()

        root = c.get_root_node()

        print("\r\nBrower:")

        brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])

    except Exception as e:

        print("Client Exception:", e)

    finally:

        c.disconnect()

if __name__ == "__main__":

    main_c()

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多