Source code for nxsrecconfig.Utils

#!/usr/bin/env python
#   This file is part of nxsrecconfig - NeXus Sardana Recorder Settings
#
#    Copyright (C) 2014-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

"""  Tango Utilities """

import re
import time
import json
import pickle
import numpy
import fnmatch
import socket
import sys

try:
    import tango
except Exception:
    import PyTango as tango


if sys.version_info > (3,):
    unicode = str


#: (:obj:`bool`) tango bug #213 flag related to EncodedAttributes in python3
PYTG_BUG_213 = False
if sys.version_info > (3,):
    try:
        PYTGMAJOR, PYTGMINOR, PYTGPATCH = list(
            map(int, tango.__version__.split(".")[:3]))
        if PYTGMAJOR <= 9:
            if PYTGMAJOR == 9:
                if PYTGMINOR < 2:
                    PYTG_BUG_213 = True
                elif PYTGMINOR == 2 and PYTGPATCH <= 4:
                    PYTG_BUG_213 = True
            else:
                PYTG_BUG_213 = True
    except Exception:
        pass


[docs]class OldTangoError(Exception): """ Old Tango version Exception class """
[docs]class Utils(object): """ Miscellaneous Utilities """
[docs] @classmethod def tostr(cls, text): """ convert bytestr or unicode to python str :param text: text to convert :type text: :obj:`bytes` or :obj:`unicode` or :obj:`str` :returns: converted text :rtype: :obj:`str` """ if isinstance(text, str): return text else: if sys.version_info > (3,) and \ (isinstance(text, bytes) or isinstance(text, unicode)): return str(text, "utf8") else: return str(text)
[docs] @classmethod def pickleloads(cls, bytestr): """ loads pickle byte string :param bytestr: byte string to convert :type bytesstr: :obj:`bytes` :returns: loaded bytestring :rtype: :obj:`any` """ if sys.version_info > (3,): return pickle.loads(bytestr, encoding='latin1') else: return pickle.loads(bytestr)
[docs] @classmethod def compareDict(cls, dct, dct2): """ copares two dictionaries :param dct: first dictinary :type dct: :obj:`dict` :param dct2: second dictinary :type dct2: :obj:`dict` :returns: if dictionaries are the same :rtype: :obj:`bool` """ if not isinstance(dct, dict): return False if not isinstance(dct2, dict): return False if len(list(dct.keys())) != len(list(dct2.keys())): return False status = True for k, v in dct.items(): if k not in dct2.keys(): status = False break if isinstance(v, dict): status = Utils.compareDict(v, dct2[k]) if not status: break else: if v != dct2[k]: status = False break return status
[docs] @classmethod def getRecord(cls, node): """ provides datasource record from xml dom node :param node: xml DOM node :type node: :class:`lxml.etree.Element` :returns: datasource record :rtype: :obj:`str` """ res = '' host = None port = None dname = None rname = None member = None device = node.findall("device") if device is not None and len(device) > 0: host = device[0].get("hostname") port = device[0].get("port") dname = device[0].get("name") member = device[0].get("member") surfix = "" prefix = "" if member or member != 'attribute': if member == 'property': prefix = '@' elif member == 'command': surfix = '()' record = node.findall("record") if record is not None and len(record) > 0: rname = record[0].get("name") if rname: if dname: if host: if not port: port = '10000' res = '%s:%s/%s/%s%s%s' % ( host, port, dname, prefix, rname, surfix) else: res = '%s/%s%s%s' % (dname, prefix, rname, surfix) else: res = rname return res
[docs] @classmethod def getText(cls, node): """ collects text from text child nodes :param node: parent node :type node: :obj:`xml.etree.ElementTree.Element` """ if node is not None: tnodes = ([node.text] if node.text else []) \ + [child.tail for child in node if child.tail] return unicode("".join(tnodes)).strip() return ""
[docs] @classmethod def stringToDictJson(cls, string, toBool=False): """ converts string to json dictionary :param string: string with list of item or json dictionary :type string: :obj:`str` :param toBool: if true convert dictionary values to bool :type toBool: :obj:`bool` :returns: json dictionary :rtype: :obj:`str` """ try: if not string or string == "Not initialised": return "{}" acps = json.loads(string) if not isinstance(acps, dict): raise AssertionError() jstring = string except (ValueError, AssertionError): lst = re.sub("[:,;]", " ", string).split() if len(lst) % 2: lst.append("") dct = dict(zip(*[iter(lst)] * 2)) if toBool: for k in dct.keys(): dct[k] = False \ if dct[k].lower() == 'false' else True jstring = json.dumps(dct) return jstring
[docs] @classmethod def stringToListJson(cls, string): """ converts string to json list :param string: with list of item or json list :type string: :obj:`str` :returns: json list :rtype: :obj:`str` """ if not string or string == "Not initialised": return "[]" try: acps = json.loads(string) if not isinstance(acps, (list, tuple)): raise AssertionError() jstring = string except (ValueError, AssertionError): lst = re.sub("[:,;]", " ", string).split() jstring = json.dumps(lst) return jstring
[docs] @classmethod def toString(cls, obj): """ converts list/dict/object of unicode/string to string object :param obj: given unicode/string object :type obj: `any` :returns: string object :rtype: :obj:`str` """ if isinstance(obj, unicode) or isinstance(obj, bytes): return Utils.tostr(obj) elif isinstance(obj, list): return [cls.toString(el) for el in obj] elif isinstance(obj, dict): return dict([(cls.toString(key), cls.toString(value)) for key, value in obj.items()]) else: return obj
[docs]class TangoUtils(object): """ Tango Utilities """ #: (:obj:`dict` <:class:`tango.CmdArgType`, :obj:`str`>) #: map of Tango:Numpy types tTnp = {tango.DevLong64: "int64", tango.DevLong: "int32", tango.DevShort: "int16", tango.DevUChar: "uint8", tango.DevULong64: "uint64", tango.DevULong: "uint32", tango.DevUShort: "uint16", tango.DevDouble: "float64", tango.DevFloat: "float32", tango.DevString: "string", tango.DevBoolean: "bool", tango.DevEncoded: "encoded"}
[docs] @classmethod def openProxy(cls, device, counter=1000): """ opens device proxy of the given device :param device: device name :type device: :obj:`str` :returns: DeviceProxy of device :rtype: :class:`tango.DeviceProxy` """ found = False cnt = 0 cnfServer = tango.DeviceProxy(Utils.tostr(device)) while not found and cnt < counter: if cnt > 1: time.sleep(0.01) try: cnfServer.ping() found = True except tango.DevFailed: time.sleep(0.01) found = False if cnt == counter - 1: raise cnt += 1 cnfServer.set_source(tango.DevSource.DEV) return cnfServer
[docs] @classmethod def wait(cls, proxy, counter=100, state="RUNNING"): """waits for device proxy not running :param proxy: device proxy :type proxy: :class:`tango.DeviceProxy` :returns: if proxy device ready :rtype: :obj:`str` """ dstate = getattr(tango.DevState, state) if state else None found = False cnt = 0 while not found and cnt < counter: if cnt > 1: time.sleep(0.01) try: pstate = proxy.state() if dstate: if pstate != dstate: found = True else: found = True except tango.DevFailed: time.sleep(0.01) found = False if cnt == counter - 1: raise cnt += 1 return found
[docs] @classmethod def getProxies(cls, names): """ provides proxies of given device names :param names: given device names :type names: :obj:`list` <:obj:`str`> :returns: list of device DeviceProxies :rtype: :obj:`list` <:class:`tango.DeviceProxy`> """ dps = [] for name in names: dp = tango.DeviceProxy(Utils.tostr(name)) try: dp.ping() dps.append(dp) except tango.DevFailed: pass return dps
[docs] @classmethod def getDeviceName(cls, db, cname): """ finds device of give class :param db: tango database :type db: :class:`tango.DeviceProxy` :param cname: device class name :type cname: :obj:`str` :returns: device name if exists :rtype: :obj:`bool` """ servers = db.get_device_exported_for_class( cname).value_string device = '' for server in servers: try: dp = tango.DeviceProxy(Utils.tostr(server)) dp.ping() device = server break except tango.DevFailed: pass return device
[docs] @classmethod def getFullAttrName(cls, source, fqdn=False): """ provides tango device full name with host and port :param source: string with device name and its attribute :type source: :obj:`str` :param fqdn: if true adds fqdn host name :type fqdn: :obj:`bool` :returns: database host and port in url string :rtype: :obj:`str` """ if ':' in source: if not fqdn: return "tango://%s" % source else: lsource = source.split(":") lsource[0] = socket.getfqdn(lsource[0]) return "tango://%s" % ":".join(lsource) else: db = tango.Database() host, port = db.get_db_host(), db.get_db_port() if fqdn: host = socket.getfqdn(host) return "tango://%s:%s/%s" % (host, port, source)
[docs] @classmethod def getShapeTypeUnit(cls, source): """ retrives shape type units for attribure :param source: string with device name and its attribute :type source: :obj:`str` :returns: (shape, data_type, units) :rtype: (:obj:`list` <:obj:`int`>, :obj:`str`, :obj:`str`) """ vl = None shp = [] dt = 'float64' ut = 'No unit' ap = tango.AttributeProxy(source) da = None ac = None try: ac = ap.get_config() if ac.data_format != tango.AttrDataFormat.SCALAR: da = ap.read() vl = da.value except tango.DevFailed: pass # if ac and ac.data_format != tango.AttrDataFormat.SCALAR \ # and (da is None or not hasattr(da, 'dim_x')): # raise if vl is not None: shp = list(numpy.shape(vl)) elif da is not None: if ac.data_format != tango.AttrDataFormat.SCALAR: if da.dim_x and da.dim_x > 1: shp = [da.dim_y, da.dim_x] \ if da.dim_y \ else [da.dim_x] elif ac is not None: if ac.data_format != tango.AttrDataFormat.SCALAR: if ac.max_dim_x and ac.max_dim_x > 1: shp = [ac.max_dim_y, ac.max_dim_x] \ if ac.max_dim_y \ else [ac.max_dim_x] if ac is not None: dt = cls.tTnp[ac.data_type] ut = ac.unit return (shp, dt, ut)
[docs] @classmethod def command(cls, server, command, *var): """ executes command on server on python package :param server: tango server name or package name :type server: :class:`tango.DeviceProxy` \ or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator` :param command: command name :type command: :obj:`str` :param var: command variable list :type var: [ `any` ] :returns: command result :rtype: `any` """ if not hasattr(server, "command_inout"): return getattr(server, command)(*var) elif var is None: return server.command_inout(command) else: return server.command_inout(command, *var)
[docs]class MSUtils(object): """ MacroServer Utilities """
[docs] @classmethod def getEnv(cls, var, ms): """ provides environment variable value :param var: variable name :type var: :obj:`str` :param ms: macroserver :type ms: :obj:`str` :returns: environment variable value :rtype: `any` """ active = "" dp = TangoUtils.openProxy(ms) if PYTG_BUG_213: raise OldTangoError( "Reading Encoded Attributes not supported in tango < 9.2.5") rec = dp.Environment if rec[0] == 'pickle': dc = Utils.pickleloads(rec[1]) if 'new' in dc.keys() and \ var in dc['new'].keys(): active = dc['new'][var] return active
[docs] @classmethod def setEnv(cls, var, value, ms): """ sets environment variable value :param var: variable name :type var: :obj:`str` :param value: variable value :type value: `any` :param ms: macroserver :type ms: :obj:`str` """ dp = TangoUtils.openProxy(ms) dc = {'new': {}} dc['new'][var] = value MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod def setEnvs(cls, varvalues, ms): """ sets environment variable value :param varvalues: variable value dictionary :type varvalues: :obj:`dict` <:obj:`str` , `any`> :param ms: macroserver :type ms: :obj:`str` """ dp = TangoUtils.openProxy(ms) dc = {'new': {}} for var, value in varvalues.items(): dc['new'][var] = value MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod def usetEnv(cls, var, ms): """ unsets environment variable :param var: variable name :type var: :obj:`str` :param ms: macroserver :type ms: :obj:`str` """ dp = TangoUtils.openProxy(ms) dc = {'del': [var]} MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod def getMacroServer(cls, db, door, find=True): """ provides macro server of given door :param db: tango database :type db: :class:`tango.Database` :param door: given door :type door: :obj:`str` :param find: find new macroserver if door does not exist :type find: :obj:`bool` :returns: first MacroServer of the given door :rtype: :obj:`str` """ servers = db.get_device_exported_for_class( "MacroServer").value_string ms = "" mss = [] sdoor = door.split("/") hostname = None if len(sdoor) > 1 and ":" in sdoor[0]: door = "/".join(sdoor[1:]) hostname = sdoor[0] for server in servers: if hostname: mserver = "%s/%s" % (hostname, Utils.tostr(server)) else: mserver = Utils.tostr(server) dp = tango.DeviceProxy(Utils.tostr(mserver)) if hasattr(dp, "DoorList"): mss.append(mserver) if hasattr(dp, "DoorList") and dp.DoorList: lst = [str(dr).lower() for dr in dp.DoorList] if lst and door.lower() in lst: ms = mserver break if find and door != 'module' and not ms and mss: if hasattr(dp, "DoorList"): ms = mss[0] return ms
[docs] @classmethod def writeEnvAttr(cls, value, dp): """ sets environment variable value :param value: variable value dictionary :type value: :obj:`dict` <:obj:`str` , `any`> or `any` :param dp: macroserver :type dp: :obj:`str` """ if PYTG_BUG_213: raise OldTangoError( "Writing Encoded Attributes not supported in tango < 9.2.5") try: pk = pickle.dumps(value, protocol=2) dp.Environment = ['pickle', pk] except Exception: if sys.version_info < (3,): raise if isinstance(value, dict): newvalue = {} for key, vl in value.items(): if isinstance(vl, dict): nvl = {} for ky, it in vl.items(): nvl[bytes(ky, "utf8") if isinstance(ky, unicode) else ky] = it newvalue[bytes(key, "utf8") if isinstance(key, unicode) else key] = nvl else: newvalue[bytes(key, "utf8") if isinstance(key, unicode) else key] = vl else: newvalue = value pk = pickle.dumps(newvalue, protocol=2) dp.Environment = ['pickle', pk]
[docs]class PoolUtils(object): """ Pool Utilities """
[docs] @classmethod def getDeviceControllers(cls, pools, devices=None): """ provides device controller full names :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param devices: alias names :type devices: :obj:`list` <:obj:`str`> :returns: device controller full names :rtype: :obj:`dict` <:obj:`str`, :obj:`str`> """ lst = [] for pool in pools: if pool.ExpChannelList: lst += pool.ExpChannelList ctrls = {} for elm in lst: chan = json.loads(elm) if devices is None or chan['name'] in devices: ctrls[chan['name']] = chan['controller'] return ctrls
[docs] @classmethod def getChannelSources(cls, pools, devices): """ provides channel sources :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param devices: alias names :type devices: :obj:`list` <:obj:`str`> :returns: device sources :rtype: :obj:`dict` <:obj:`str`, :obj:`str`> """ lst = [] for pool in pools: if pool.ExpChannelList: lst += pool.ExpChannelList srs = {} for elm in lst: chan = json.loads(elm) if chan['name'] in devices: srs[chan['name']] = chan['source'] return srs
[docs] @classmethod def getElementNames(cls, pools, listattr, typefilter=None): """ provides experimental Channels :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param listattr: pool attribute with list :type listattr: :obj:`str` :param typefilter: pool attribute with list :type typefilter: :obj:`list` <:obj:`str`> :returns: names from given pool listattr :rtype: :obj:`list` <:obj:`str`> """ lst = [] elements = [] for pool in pools: if hasattr(pool, listattr): ellist = getattr(pool, listattr) if ellist: lst += ellist for elm in lst: if elm: chan = json.loads(elm) if chan and isinstance(chan, dict): if typefilter: if chan['type'] not in typefilter: continue elements.append(chan['name']) return elements
[docs] @classmethod def getFullDeviceNames(cls, pools, names=None): """ find device names from aliases :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param names: alias names if None returns name for all aliases :type names: :obj:`list` <:obj:`str`> :returns: full device name :rtype: :obj:`dict` <:obj:`str`, :obj:`str`> """ lst = [] for pool in pools: if pool.AcqChannelList: lst += pool.AcqChannelList argout = {} for elm in lst: chan = json.loads(elm) if names is None or chan['name'] in names: arr = chan['full_name'].split("/") argout[chan['name']] = "/".join(arr[0:-1]) return argout
[docs] @classmethod def getAliases(cls, pools, names=None): """ find aliases from fullnames :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param names: fullnames if None returns all aliases :type names: :obj:`list` <:obj:`str`> :returns: full device name :rtype: :obj:`dict` <:obj:`str`, :obj:`str`> """ lst = [] for pool in pools: if pool.AcqChannelList: lst += pool.AcqChannelList argout = {} for elm in lst: chan = json.loads(elm) arr = chan['full_name'].split("/") fname = "/".join(arr[0:-1]) if names is None or fname in names: argout[fname] = chan['name'] return argout
[docs] @classmethod def getMntGrpName(cls, pools, alias): """ find measurement group name from alias :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param alias: mntgrp alias :type alias: :obj:`str` :returns: full name of the measurement group alias :rtype: :obj:`str` """ lst = [] for pool in pools: if pool.MeasurementGroupList: lst += pool.MeasurementGroupList argout = "" for elm in lst: chan = json.loads(elm) if alias == chan['name']: argout = chan['full_name'] break return argout
[docs] @classmethod def getMotorPositionAttributes(cls, pools): """ find motor names :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :returns: full name of the measurement group alias :rtype: :obj:`str` :returns: (name , name , motor position attribute) :rtype: :obj:`list` <:obj:`str`,:obj:`str`, :obj:`str`> """ lst = [] for pool in pools: if pool.MotorList: lst += pool.MotorList argout = [] for elm in lst: chan = json.loads(elm) if "name" in elm and "full_name" in elm: name = chan['name'] fname = chan['full_name'] if name and fname: if fname.startswith("tango://"): fname = fname[8:] if not fname.lower().endswith("/position"): fname = fname + "/position" argout.append([name, name, fname]) return argout
[docs] @classmethod def getTimers(cls, pools, filters=None): """ provides tiemrs of given pools :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param filters: device name filter list :type filters: :obj:`list` <:obj:`str`> :returns: list of timer names :rtype: :obj:`list` <:obj:`str`> """ lst = [] res = [] for pool in pools: if pool.ExpChannelList: lst += pool.ExpChannelList if not filters or not hasattr(filters, '__iter__'): filters = ["*dgg*", "*/timer/*", "*/ctctrl0*"] for elm in lst: chan = json.loads(elm) inter = chan['interfaces'] source = chan['source'] if isinstance(inter, (list, tuple)): if 'CTExpChannel' in inter: found = False for df in filters: found = fnmatch.filter([source], df) if found: break if found: res.append(chan['name']) return res
[docs] @classmethod def filterNames(cls, pools, filters=None, lst=None): """ provides channels of given pools :param pools: list of pool devices :type pools: :obj:`list` <:class:`tango.DeviceProxy`> :param filters: device name filter list :type filters: :obj:`list` <:obj:`str`> :returns: list of channel names :rtype: :obj:`list` <:obj:`str`> """ res = [] if lst is None: lst = [] for pool in pools: if pool.AcqChannelList: lst += pool.AcqChannelList if filters is None or not hasattr(filters, '__iter__'): filters = ["*"] for elm in lst: chan = json.loads(elm) fullname = chan['full_name'] found = False for df in filters: found = fnmatch.filter([fullname], df) if found: break if found: res.append(chan['name']) return res
[docs] @classmethod def filterOutTango(cls, lst, filters=None): """ provides channels of given pools :param lst: list of strings to filter out :type lst: :obj:`list` <:obj:`str` or (:obj:`str`, :obj:`str`)> :param filters: device name filter list :type filters: :obj:`list` <:obj:`str`> :returns: list of channel names :rtype: :obj:`list` <:obj:`str`> """ res = [] lst = lst or [] if filters is None or not hasattr(filters, '__iter__'): return lst for item in lst: found = False for df in filters: if not isinstance(item, tuple): ilst = [item] else: ilst = item for name in ilst: found = fnmatch.fnmatch(name, df) if found: break if found: break if not found: res.append(item) return res
[docs] @classmethod def getSource(cls, name): """ provides datasource from pool device :param name: pool device name :type name: :obj:`str` :returns: source of pool device :rtype: :obj:`str` """ source = None try: dp = tango.DeviceProxy(Utils.tostr(name)) if hasattr(dp, 'DataSource'): ds = dp.DataSource sds = ds.split("://") ap = tango.AttributeProxy(sds[-1]) if ap is None: raise Exception("Empty proxy") source = sds[-1] except tango.DevFailed: pass return source