#!/usr/bin/env python
# This file is part of nxsrecconfig - NeXus Sardana Recorder Settings
#
# Copyright (C) 2014-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Tango Utilities """
import re
import time
import json
import pickle
import numpy
import fnmatch
import socket
import sys
try:
import tango
except Exception:
import PyTango as tango
if sys.version_info > (3,):
unicode = str
#: (:obj:`bool`) tango bug #213 flag related to EncodedAttributes in python3
PYTG_BUG_213 = False
if sys.version_info > (3,):
try:
PYTGMAJOR, PYTGMINOR, PYTGPATCH = list(
map(int, tango.__version__.split(".")[:3]))
if PYTGMAJOR <= 9:
if PYTGMAJOR == 9:
if PYTGMINOR < 2:
PYTG_BUG_213 = True
elif PYTGMINOR == 2 and PYTGPATCH <= 4:
PYTG_BUG_213 = True
else:
PYTG_BUG_213 = True
except Exception:
pass
[docs]class OldTangoError(Exception):
""" Old Tango version Exception class
"""
[docs]class Utils(object):
""" Miscellaneous Utilities """
[docs] @classmethod
def tostr(cls, text):
""" convert bytestr or unicode to python str
:param text: text to convert
:type text: :obj:`bytes` or :obj:`unicode` or :obj:`str`
:returns: converted text
:rtype: :obj:`str`
"""
if isinstance(text, str):
return text
else:
if sys.version_info > (3,) and \
(isinstance(text, bytes) or isinstance(text, unicode)):
return str(text, "utf8")
else:
return str(text)
[docs] @classmethod
def pickleloads(cls, bytestr):
""" loads pickle byte string
:param bytestr: byte string to convert
:type bytesstr: :obj:`bytes`
:returns: loaded bytestring
:rtype: :obj:`any`
"""
if sys.version_info > (3,):
return pickle.loads(bytestr, encoding='latin1')
else:
return pickle.loads(bytestr)
[docs] @classmethod
def compareDict(cls, dct, dct2):
""" copares two dictionaries
:param dct: first dictinary
:type dct: :obj:`dict`
:param dct2: second dictinary
:type dct2: :obj:`dict`
:returns: if dictionaries are the same
:rtype: :obj:`bool`
"""
if not isinstance(dct, dict):
return False
if not isinstance(dct2, dict):
return False
if len(list(dct.keys())) != len(list(dct2.keys())):
return False
status = True
for k, v in dct.items():
if k not in dct2.keys():
status = False
break
if isinstance(v, dict):
status = Utils.compareDict(v, dct2[k])
if not status:
break
else:
if v != dct2[k]:
status = False
break
return status
[docs] @classmethod
def getRecord(cls, node):
""" provides datasource record from xml dom node
:param node: xml DOM node
:type node: :class:`lxml.etree.Element`
:returns: datasource record
:rtype: :obj:`str`
"""
res = ''
host = None
port = None
dname = None
rname = None
member = None
device = node.findall("device")
if device is not None and len(device) > 0:
host = device[0].get("hostname")
port = device[0].get("port")
dname = device[0].get("name")
member = device[0].get("member")
surfix = ""
prefix = ""
if member or member != 'attribute':
if member == 'property':
prefix = '@'
elif member == 'command':
surfix = '()'
record = node.findall("record")
if record is not None and len(record) > 0:
rname = record[0].get("name")
if rname:
if dname:
if host:
if not port:
port = '10000'
res = '%s:%s/%s/%s%s%s' % (
host, port, dname, prefix, rname, surfix)
else:
res = '%s/%s%s%s' % (dname, prefix, rname, surfix)
else:
res = rname
return res
[docs] @classmethod
def getText(cls, node):
""" collects text from text child nodes
:param node: parent node
:type node: :obj:`xml.etree.ElementTree.Element`
"""
if node is not None:
tnodes = ([node.text] if node.text else []) \
+ [child.tail for child in node if child.tail]
return unicode("".join(tnodes)).strip()
return ""
[docs] @classmethod
def stringToDictJson(cls, string, toBool=False):
""" converts string to json dictionary
:param string: string with list of item or json dictionary
:type string: :obj:`str`
:param toBool: if true convert dictionary values to bool
:type toBool: :obj:`bool`
:returns: json dictionary
:rtype: :obj:`str`
"""
try:
if not string or string == "Not initialised":
return "{}"
acps = json.loads(string)
if not isinstance(acps, dict):
raise AssertionError()
jstring = string
except (ValueError, AssertionError):
lst = re.sub("[:,;]", " ", string).split()
if len(lst) % 2:
lst.append("")
dct = dict(zip(*[iter(lst)] * 2))
if toBool:
for k in dct.keys():
dct[k] = False \
if dct[k].lower() == 'false' else True
jstring = json.dumps(dct)
return jstring
[docs] @classmethod
def stringToListJson(cls, string):
""" converts string to json list
:param string: with list of item or json list
:type string: :obj:`str`
:returns: json list
:rtype: :obj:`str`
"""
if not string or string == "Not initialised":
return "[]"
try:
acps = json.loads(string)
if not isinstance(acps, (list, tuple)):
raise AssertionError()
jstring = string
except (ValueError, AssertionError):
lst = re.sub("[:,;]", " ", string).split()
jstring = json.dumps(lst)
return jstring
[docs] @classmethod
def toString(cls, obj):
""" converts list/dict/object of unicode/string to string object
:param obj: given unicode/string object
:type obj: `any`
:returns: string object
:rtype: :obj:`str`
"""
if isinstance(obj, unicode) or isinstance(obj, bytes):
return Utils.tostr(obj)
elif isinstance(obj, list):
return [cls.toString(el) for el in obj]
elif isinstance(obj, dict):
return dict([(cls.toString(key), cls.toString(value))
for key, value in obj.items()])
else:
return obj
[docs]class TangoUtils(object):
""" Tango Utilities """
#: (:obj:`dict` <:class:`tango.CmdArgType`, :obj:`str`>)
#: map of Tango:Numpy types
tTnp = {tango.DevLong64: "int64", tango.DevLong: "int32",
tango.DevShort: "int16", tango.DevUChar: "uint8",
tango.DevULong64: "uint64", tango.DevULong: "uint32",
tango.DevUShort: "uint16", tango.DevDouble: "float64",
tango.DevFloat: "float32", tango.DevString: "string",
tango.DevBoolean: "bool", tango.DevEncoded: "encoded"}
[docs] @classmethod
def openProxy(cls, device, counter=1000):
""" opens device proxy of the given device
:param device: device name
:type device: :obj:`str`
:returns: DeviceProxy of device
:rtype: :class:`tango.DeviceProxy`
"""
found = False
cnt = 0
cnfServer = tango.DeviceProxy(Utils.tostr(device))
while not found and cnt < counter:
if cnt > 1:
time.sleep(0.01)
try:
cnfServer.ping()
found = True
except tango.DevFailed:
time.sleep(0.01)
found = False
if cnt == counter - 1:
raise
cnt += 1
cnfServer.set_source(tango.DevSource.DEV)
return cnfServer
[docs] @classmethod
def wait(cls, proxy, counter=100, state="RUNNING"):
"""waits for device proxy not running
:param proxy: device proxy
:type proxy: :class:`tango.DeviceProxy`
:returns: if proxy device ready
:rtype: :obj:`str`
"""
dstate = getattr(tango.DevState, state) if state else None
found = False
cnt = 0
while not found and cnt < counter:
if cnt > 1:
time.sleep(0.01)
try:
pstate = proxy.state()
if dstate:
if pstate != dstate:
found = True
else:
found = True
except tango.DevFailed:
time.sleep(0.01)
found = False
if cnt == counter - 1:
raise
cnt += 1
return found
[docs] @classmethod
def getProxies(cls, names):
""" provides proxies of given device names
:param names: given device names
:type names: :obj:`list` <:obj:`str`>
:returns: list of device DeviceProxies
:rtype: :obj:`list` <:class:`tango.DeviceProxy`>
"""
dps = []
for name in names:
dp = tango.DeviceProxy(Utils.tostr(name))
try:
dp.ping()
dps.append(dp)
except tango.DevFailed:
pass
return dps
[docs] @classmethod
def getDeviceName(cls, db, cname):
""" finds device of give class
:param db: tango database
:type db: :class:`tango.DeviceProxy`
:param cname: device class name
:type cname: :obj:`str`
:returns: device name if exists
:rtype: :obj:`bool`
"""
servers = db.get_device_exported_for_class(
cname).value_string
device = ''
for server in servers:
try:
dp = tango.DeviceProxy(Utils.tostr(server))
dp.ping()
device = server
break
except tango.DevFailed:
pass
return device
[docs] @classmethod
def getFullAttrName(cls, source, fqdn=False):
""" provides tango device full name with host and port
:param source: string with device name and its attribute
:type source: :obj:`str`
:param fqdn: if true adds fqdn host name
:type fqdn: :obj:`bool`
:returns: database host and port in url string
:rtype: :obj:`str`
"""
if ':' in source:
if not fqdn:
return "tango://%s" % source
else:
lsource = source.split(":")
lsource[0] = socket.getfqdn(lsource[0])
return "tango://%s" % ":".join(lsource)
else:
db = tango.Database()
host, port = db.get_db_host(), db.get_db_port()
if fqdn:
host = socket.getfqdn(host)
return "tango://%s:%s/%s" % (host, port, source)
[docs] @classmethod
def getShapeTypeUnit(cls, source):
""" retrives shape type units for attribure
:param source: string with device name and its attribute
:type source: :obj:`str`
:returns: (shape, data_type, units)
:rtype: (:obj:`list` <:obj:`int`>, :obj:`str`, :obj:`str`)
"""
vl = None
shp = []
dt = 'float64'
ut = 'No unit'
ap = tango.AttributeProxy(source)
da = None
ac = None
try:
ac = ap.get_config()
if ac.data_format != tango.AttrDataFormat.SCALAR:
da = ap.read()
vl = da.value
except tango.DevFailed:
pass
# if ac and ac.data_format != tango.AttrDataFormat.SCALAR \
# and (da is None or not hasattr(da, 'dim_x')):
# raise
if vl is not None:
shp = list(numpy.shape(vl))
elif da is not None:
if ac.data_format != tango.AttrDataFormat.SCALAR:
if da.dim_x and da.dim_x > 1:
shp = [da.dim_y, da.dim_x] \
if da.dim_y \
else [da.dim_x]
elif ac is not None:
if ac.data_format != tango.AttrDataFormat.SCALAR:
if ac.max_dim_x and ac.max_dim_x > 1:
shp = [ac.max_dim_y, ac.max_dim_x] \
if ac.max_dim_y \
else [ac.max_dim_x]
if ac is not None:
dt = cls.tTnp[ac.data_type]
ut = ac.unit
return (shp, dt, ut)
[docs] @classmethod
def command(cls, server, command, *var):
""" executes command on server on python package
:param server: tango server name or package name
:type server: :class:`tango.DeviceProxy` \
or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator`
:param command: command name
:type command: :obj:`str`
:param var: command variable list
:type var: [ `any` ]
:returns: command result
:rtype: `any`
"""
if not hasattr(server, "command_inout"):
return getattr(server, command)(*var)
elif var is None:
return server.command_inout(command)
else:
return server.command_inout(command, *var)
[docs]class MSUtils(object):
""" MacroServer Utilities """
[docs] @classmethod
def getEnv(cls, var, ms):
""" provides environment variable value
:param var: variable name
:type var: :obj:`str`
:param ms: macroserver
:type ms: :obj:`str`
:returns: environment variable value
:rtype: `any`
"""
active = ""
dp = TangoUtils.openProxy(ms)
if PYTG_BUG_213:
raise OldTangoError(
"Reading Encoded Attributes not supported in tango < 9.2.5")
rec = dp.Environment
if rec[0] == 'pickle':
dc = Utils.pickleloads(rec[1])
if 'new' in dc.keys() and \
var in dc['new'].keys():
active = dc['new'][var]
return active
[docs] @classmethod
def setEnv(cls, var, value, ms):
""" sets environment variable value
:param var: variable name
:type var: :obj:`str`
:param value: variable value
:type value: `any`
:param ms: macroserver
:type ms: :obj:`str`
"""
dp = TangoUtils.openProxy(ms)
dc = {'new': {}}
dc['new'][var] = value
MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod
def setEnvs(cls, varvalues, ms):
""" sets environment variable value
:param varvalues: variable value dictionary
:type varvalues: :obj:`dict` <:obj:`str` , `any`>
:param ms: macroserver
:type ms: :obj:`str`
"""
dp = TangoUtils.openProxy(ms)
dc = {'new': {}}
for var, value in varvalues.items():
dc['new'][var] = value
MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod
def usetEnv(cls, var, ms):
""" unsets environment variable
:param var: variable name
:type var: :obj:`str`
:param ms: macroserver
:type ms: :obj:`str`
"""
dp = TangoUtils.openProxy(ms)
dc = {'del': [var]}
MSUtils.writeEnvAttr(dc, dp)
[docs] @classmethod
def getMacroServer(cls, db, door, find=True):
""" provides macro server of given door
:param db: tango database
:type db: :class:`tango.Database`
:param door: given door
:type door: :obj:`str`
:param find: find new macroserver if door does not exist
:type find: :obj:`bool`
:returns: first MacroServer of the given door
:rtype: :obj:`str`
"""
servers = db.get_device_exported_for_class(
"MacroServer").value_string
ms = ""
mss = []
sdoor = door.split("/")
hostname = None
if len(sdoor) > 1 and ":" in sdoor[0]:
door = "/".join(sdoor[1:])
hostname = sdoor[0]
for server in servers:
if hostname:
mserver = "%s/%s" % (hostname, Utils.tostr(server))
else:
mserver = Utils.tostr(server)
dp = tango.DeviceProxy(Utils.tostr(mserver))
if hasattr(dp, "DoorList"):
mss.append(mserver)
if hasattr(dp, "DoorList") and dp.DoorList:
lst = [str(dr).lower() for dr in dp.DoorList]
if lst and door.lower() in lst:
ms = mserver
break
if find and door != 'module' and not ms and mss:
if hasattr(dp, "DoorList"):
ms = mss[0]
return ms
[docs] @classmethod
def writeEnvAttr(cls, value, dp):
""" sets environment variable value
:param value: variable value dictionary
:type value: :obj:`dict` <:obj:`str` , `any`> or `any`
:param dp: macroserver
:type dp: :obj:`str`
"""
if PYTG_BUG_213:
raise OldTangoError(
"Writing Encoded Attributes not supported in tango < 9.2.5")
try:
pk = pickle.dumps(value, protocol=2)
dp.Environment = ['pickle', pk]
except Exception:
if sys.version_info < (3,):
raise
if isinstance(value, dict):
newvalue = {}
for key, vl in value.items():
if isinstance(vl, dict):
nvl = {}
for ky, it in vl.items():
nvl[bytes(ky, "utf8")
if isinstance(ky, unicode) else ky] = it
newvalue[bytes(key, "utf8")
if isinstance(key, unicode) else key] = nvl
else:
newvalue[bytes(key, "utf8")
if isinstance(key, unicode) else key] = vl
else:
newvalue = value
pk = pickle.dumps(newvalue, protocol=2)
dp.Environment = ['pickle', pk]
[docs]class PoolUtils(object):
""" Pool Utilities """
[docs] @classmethod
def getDeviceControllers(cls, pools, devices=None):
""" provides device controller full names
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param devices: alias names
:type devices: :obj:`list` <:obj:`str`>
:returns: device controller full names
:rtype: :obj:`dict` <:obj:`str`, :obj:`str`>
"""
lst = []
for pool in pools:
if pool.ExpChannelList:
lst += pool.ExpChannelList
ctrls = {}
for elm in lst:
chan = json.loads(elm)
if devices is None or chan['name'] in devices:
ctrls[chan['name']] = chan['controller']
return ctrls
[docs] @classmethod
def getChannelSources(cls, pools, devices):
""" provides channel sources
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param devices: alias names
:type devices: :obj:`list` <:obj:`str`>
:returns: device sources
:rtype: :obj:`dict` <:obj:`str`, :obj:`str`>
"""
lst = []
for pool in pools:
if pool.ExpChannelList:
lst += pool.ExpChannelList
srs = {}
for elm in lst:
chan = json.loads(elm)
if chan['name'] in devices:
srs[chan['name']] = chan['source']
return srs
[docs] @classmethod
def getElementNames(cls, pools, listattr, typefilter=None):
""" provides experimental Channels
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param listattr: pool attribute with list
:type listattr: :obj:`str`
:param typefilter: pool attribute with list
:type typefilter: :obj:`list` <:obj:`str`>
:returns: names from given pool listattr
:rtype: :obj:`list` <:obj:`str`>
"""
lst = []
elements = []
for pool in pools:
if hasattr(pool, listattr):
ellist = getattr(pool, listattr)
if ellist:
lst += ellist
for elm in lst:
if elm:
chan = json.loads(elm)
if chan and isinstance(chan, dict):
if typefilter:
if chan['type'] not in typefilter:
continue
elements.append(chan['name'])
return elements
[docs] @classmethod
def getFullDeviceNames(cls, pools, names=None):
""" find device names from aliases
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param names: alias names if None returns name for all aliases
:type names: :obj:`list` <:obj:`str`>
:returns: full device name
:rtype: :obj:`dict` <:obj:`str`, :obj:`str`>
"""
lst = []
for pool in pools:
if pool.AcqChannelList:
lst += pool.AcqChannelList
argout = {}
for elm in lst:
chan = json.loads(elm)
if names is None or chan['name'] in names:
arr = chan['full_name'].split("/")
argout[chan['name']] = "/".join(arr[0:-1])
return argout
[docs] @classmethod
def getAliases(cls, pools, names=None):
""" find aliases from fullnames
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param names: fullnames if None returns all aliases
:type names: :obj:`list` <:obj:`str`>
:returns: full device name
:rtype: :obj:`dict` <:obj:`str`, :obj:`str`>
"""
lst = []
for pool in pools:
if pool.AcqChannelList:
lst += pool.AcqChannelList
argout = {}
for elm in lst:
chan = json.loads(elm)
arr = chan['full_name'].split("/")
fname = "/".join(arr[0:-1])
if names is None or fname in names:
argout[fname] = chan['name']
return argout
[docs] @classmethod
def getMntGrpName(cls, pools, alias):
""" find measurement group name from alias
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param alias: mntgrp alias
:type alias: :obj:`str`
:returns: full name of the measurement group alias
:rtype: :obj:`str`
"""
lst = []
for pool in pools:
if pool.MeasurementGroupList:
lst += pool.MeasurementGroupList
argout = ""
for elm in lst:
chan = json.loads(elm)
if alias == chan['name']:
argout = chan['full_name']
break
return argout
[docs] @classmethod
def getMotorPositionAttributes(cls, pools):
""" find motor names
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:returns: full name of the measurement group alias
:rtype: :obj:`str`
:returns: (name , name , motor position attribute)
:rtype: :obj:`list` <:obj:`str`,:obj:`str`, :obj:`str`>
"""
lst = []
for pool in pools:
if pool.MotorList:
lst += pool.MotorList
argout = []
for elm in lst:
chan = json.loads(elm)
if "name" in elm and "full_name" in elm:
name = chan['name']
fname = chan['full_name']
if name and fname:
if fname.startswith("tango://"):
fname = fname[8:]
if not fname.lower().endswith("/position"):
fname = fname + "/position"
argout.append([name, name, fname])
return argout
[docs] @classmethod
def getTimers(cls, pools, filters=None):
""" provides tiemrs of given pools
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param filters: device name filter list
:type filters: :obj:`list` <:obj:`str`>
:returns: list of timer names
:rtype: :obj:`list` <:obj:`str`>
"""
lst = []
res = []
for pool in pools:
if pool.ExpChannelList:
lst += pool.ExpChannelList
if not filters or not hasattr(filters, '__iter__'):
filters = ["*dgg*", "*/timer/*", "*/ctctrl0*"]
for elm in lst:
chan = json.loads(elm)
inter = chan['interfaces']
source = chan['source']
if isinstance(inter, (list, tuple)):
if 'CTExpChannel' in inter:
found = False
for df in filters:
found = fnmatch.filter([source], df)
if found:
break
if found:
res.append(chan['name'])
return res
[docs] @classmethod
def filterNames(cls, pools, filters=None, lst=None):
""" provides channels of given pools
:param pools: list of pool devices
:type pools: :obj:`list` <:class:`tango.DeviceProxy`>
:param filters: device name filter list
:type filters: :obj:`list` <:obj:`str`>
:returns: list of channel names
:rtype: :obj:`list` <:obj:`str`>
"""
res = []
if lst is None:
lst = []
for pool in pools:
if pool.AcqChannelList:
lst += pool.AcqChannelList
if filters is None or not hasattr(filters, '__iter__'):
filters = ["*"]
for elm in lst:
chan = json.loads(elm)
fullname = chan['full_name']
found = False
for df in filters:
found = fnmatch.filter([fullname], df)
if found:
break
if found:
res.append(chan['name'])
return res
[docs] @classmethod
def filterOutTango(cls, lst, filters=None):
""" provides channels of given pools
:param lst: list of strings to filter out
:type lst: :obj:`list` <:obj:`str` or (:obj:`str`, :obj:`str`)>
:param filters: device name filter list
:type filters: :obj:`list` <:obj:`str`>
:returns: list of channel names
:rtype: :obj:`list` <:obj:`str`>
"""
res = []
lst = lst or []
if filters is None or not hasattr(filters, '__iter__'):
return lst
for item in lst:
found = False
for df in filters:
if not isinstance(item, tuple):
ilst = [item]
else:
ilst = item
for name in ilst:
found = fnmatch.fnmatch(name, df)
if found:
break
if found:
break
if not found:
res.append(item)
return res
[docs] @classmethod
def getSource(cls, name):
""" provides datasource from pool device
:param name: pool device name
:type name: :obj:`str`
:returns: source of pool device
:rtype: :obj:`str`
"""
source = None
try:
dp = tango.DeviceProxy(Utils.tostr(name))
if hasattr(dp, 'DataSource'):
ds = dp.DataSource
sds = ds.split("://")
ap = tango.AttributeProxy(sds[-1])
if ap is None:
raise Exception("Empty proxy")
source = sds[-1]
except tango.DevFailed:
pass
return source