#!/usr/bin/env python
# This file is part of nxsrecconfig - NeXus Sardana Recorder Settings
#
# Copyright (C) 2014-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Dynamic Component """
import json
import sys
import lxml.etree
import xml.etree.ElementTree as et
from lxml.etree import XMLParser
try:
import tango
except Exception:
import PyTango as tango
from .Utils import Utils, TangoUtils, PoolUtils
[docs]class DynamicComponent(object):
""" Creates dynamic component
of given component """
def __init__(self, nexusconfig_device,
defaultpath="/$var.entryname#'scan'$var.serialno:NXentry/"
"NXinstrument/collection",
defaulttype="NX_CHAR",
defaultuserpath="/$var.entryname#'scan'$var.serialno:NXentry/"
"user_data:NXparameters"):
""" constructor
:param nexusconfig_device: configserver configuration server
:type nexusconfig_device: :obj:`tango.DeviceProxy` \
or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator`
:param defaultpath: default dynamic component path
:type defaultpath: :obj:`str`
"""
#: (:class:`tango.DeviceProxy` \
#: or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator`) \
#: configuration server
self.__nexusconfig_device = nexusconfig_device
#: (:obj:`list` <:obj:`dict` <:obj:`str` , `any`> >) \
#: step datasources
self.__stepdsourcesDict = []
#: (:obj:`list` <:obj:`str`>) step datasources
self.__stepdsources = []
#: (:obj:`list` <:obj:`str`>) init datasources
self.__initdsources = []
#: (:obj:`str`) default dynamic component name
self.__defaultCP = "__dynamic_component__"
#: (:obj:`str`) dynamic component name
self.__dynamicCP = ""
#: (:obj:`dict` <:obj:`str` , :obj:`str`> ) \
#: alias label dictionary
self.__nexuslabels = {}
#: (:obj:`dict` <:obj:`str` , :obj:`str`> ) \
#: alias path dictionary
self.__nexuspaths = {}
#: (:obj:`dict` <:obj:`str` , :obj:`bool`> ) \
#: alias link dictionary
self.__nexuslinks = {}
#: (:obj:`dict` <:obj:`str` , :obj:`str`> ) \
#: alias nexus types dictionary
self.__nexustypes = {}
#: (:obj:`dict` <:obj:`str` , :obj:`list` <:obj:`int`> > ) \
#: alias nexus types dictionary
self.__nexusshapes = {}
#: (:class:`tango.Database` ) pytango database server
self.__db = tango.Database()
#: (:obj:`str`) default dynamic component path
self.__ldefaultpath = defaultpath
#: (:obj:`str`) standard dynamic component path
self.__defaultpath = defaultpath
#: (:obj:`str`) standard user data dynamic component path
self.__defaultuserpath = defaultuserpath
#: (:obj:`bool`) standard dynamic link flag
self.__links = True
#: (:obj:`bool`) standard dynamic link flag for INIT strategy
self.__ilinks = False
#: (:obj:`str`) default data type
self.__defaulttype = defaulttype
#: (:obj:`dict` <:obj:`str` , :obj:`str`> ) \
#: map of numpy types : NEXUS
self.__npTn = {"float32": "NX_FLOAT32", "float64": "NX_FLOAT64",
"float": "NX_FLOAT32", "double": "NX_FLOAT64",
"int": "NX_INT", "int64": "NX_INT64",
"int32": "NX_INT32", "int16": "NX_INT16",
"int8": "NX_INT8", "uint64": "NX_UINT64",
"uint32": "NX_UINT32", "uint16": "NX_UINT16",
"uint8": "NX_UINT8", "uint": "NX_UINT64",
"str": "NX_CHAR",
"string": "NX_CHAR", "bool": "NX_BOOLEAN"}
def __get_alias(self, name):
""" provides a device alias
:param name: device name
:type name: :obj:`str`
:returns: device alias
:rtype: :obj:`str`
"""
if name.startswith("tango://"):
name = name[8:]
# if name does not contain a "/" it's probably an alias
if name.find("/") == -1:
return name
# haso107klx:10000/expchan/hasysis3820ctrl/1
if name.find(':') >= 0:
lst = name.split("/")
name = "/".join(lst[1:])
return self.__db.get_alias(name)
[docs] def setStepDictDSources(self, dctlist):
""" sets user datasources with type and shape
:param dctlist: json list of parameter dictionaries
[{"name": <dsname>, "dtype": <num_type>, "shape":<list>}, ...]
:type dctlist: :obj:`str`
"""
self.__stepdsourcesDict = []
if isinstance(dctlist, list):
for dct in dctlist:
if "name" not in dct.keys():
continue
self.__stepdsourcesDict.append(dct)
if "dtype" not in dct.keys():
self.__stepdsourcesDict[-1]["dtype"] = "string"
if "shape" not in dct.keys():
self.__stepdsourcesDict[-1]["shape"] = []
[docs] def setStepDSources(self, dsources):
""" sets step datasources
:param dsources: list of step datasources
:type dsources: :obj:`list` <:obj:`str`>
"""
self.__stepdsources = list(dsources)
if not isinstance(self.__stepdsources, list):
self.__stepdsources = []
#
[docs] def setInitDSources(self, dsources):
""" sets init datasources
:param dsources: list of init datasources
:type dsources: :obj:`list` <:obj:`str`>
"""
self.__initdsources = list(dsources)
if not isinstance(self.__initdsources, list):
self.__initdsources = []
#
[docs] def setLabelParams(self, labels, paths, links, types, shapes):
""" sets label parameters for specific dynamic components
:param labels: label dictionaries
:type labels: :obj:`dict` <:obj:`str` , :obj:`str`>
:param paths: nexus path dictionaries
:type paths: :obj:`dict` <:obj:`str` , :obj:`str`>
:param links: link dictionaries
:type links: :obj:`dict` <:obj:`str` , :obj:`bool`>
:param types: nexus type dictionaries
:type types: :obj:`dict` <:obj:`str` , :obj:`str`>
:param shapes: data shape dictionaries
:type shapes: :obj:`dict` <:obj:`str` , :obj:`list` <:obj:`int`> >
"""
self.__nexuslabels = json.loads(labels)
if not isinstance(self.__nexuslabels, dict):
self.__nexuslabels = {}
self.__nexuspaths = json.loads(paths)
if not isinstance(self.__nexuspaths, dict):
self.__nexuspaths = {}
self.__nexuslinks = json.loads(links)
if not isinstance(self.__nexuslinks, dict):
self.__nexuslinks = {}
self.__nexustypes = json.loads(types)
if not isinstance(self.__nexustypes, dict):
self.__nexustypes = {}
self.__nexusshapes = json.loads(shapes)
if not isinstance(self.__nexusshapes, dict):
self.__nexusshapes = {}
[docs] def setDefaultLinkPath(self, dynamicLinks, dynamicPath,
dynamicInitLinks=None):
""" sets default nexus path and link flag for dynamic components
:brief: if dynamicPath is None or "" it is reset to default one
:param dynamicPath: nexus default path
:type dynamicPath: :obj:`str`
:param dynamicLinks: default link flag
:type dynamicLinks: :obj:`bool`
:param dynamicInitLinks: default link flag
:type dynamicInitLinks: :obj:`bool`
"""
self.__links = dynamicLinks
self.__ilinks = dynamicInitLinks
self.__defaultpath = dynamicPath
if not self.__defaultpath:
self.__defaultpath = self.__ldefaultpath
def __shapeFromTango(self, ds):
""" provices datasource shape and NeXus type from Tango device
:param ds: datasource name
:type ds: :obj:`str`
:returns: (shape, NeXus type) tuple
:returns: (:obj:`list` <:obj:`int`>, :obj:`str`) tuple
"""
nxtype = None
shape = None
dstype = ds.get("type")
if dstype == 'TANGO':
source = Utils.tostr(Utils.getRecord(ds))
shape, dt, _ = TangoUtils.getShapeTypeUnit(source)
nxtype = self.__npTn[dt] \
if dt in self.__npTn.keys() else nxtype
return shape, nxtype
def __createSardanaNodes(self, created, definition):
""" creates XML nodes for sardana devices
:param created: list of created devices
:type created: :obj:`list` <:obj:`str`>
:param definition: definition node
:type definition: :class:`lxml.etree.Element`
"""
special = ["strategy", "dtype", "name", "unit", "shape"]
for dd in self.__stepdsourcesDict:
defaultpath = self.__defaultpath
strategy = dd["strategy"] if "strategy" in dd else "STEP"
links = self.__links
if strategy == "INIT":
defaultpath = self.__defaultuserpath
links = self.__ilinks
alias = self.__get_alias(Utils.tostr(dd["name"]))
path, field = self.__getPathField(
self.__nexuspaths, self.__nexuslabels,
alias, defaultpath)
link = self.__getProp(
self.__nexuslinks, self.__nexuslabels,
alias, links)
(parent, nxdata) = self.__createGroupTree(
definition, path, link)
created.append(alias)
nxtype = self.__npTn[dd["dtype"]] \
if dd["dtype"] in self.__npTn.keys() else 'NX_CHAR'
xmlfield = self.__createField(
parent, field, nxtype, alias, dd["name"],
dd["shape"], strategy=strategy, dstype='CLIENT')
if "unit" in dd:
xmlfield.attrib["units"] = str(dd["unit"])
for ky, vl in dd.items():
if ky not in special:
xmlfield.attrib[ky] = str(vl)
if link:
self.__createLink(nxdata, path, field)
def __createNonSardanaNodes(self, created, avds, definition,
strategy="STEP"):
""" creates XML nodes for non sardana devices
:param created: list of created devices
:type created: :obj:`list` <:obj:`str`>
:param avds: available datasources
:type avds: :obj:`list` <:obj:`str`>
:param definition: definition node
:type definition: :class:`lxml.etree.Element`
"""
dsources = self.__initdsources \
if strategy == 'INIT' else self.__stepdsources
for ds in dsources:
if ds not in created:
path, field = self.__getPathField(
self.__nexuspaths, self.__nexuslabels,
ds, self.__defaultpath)
link = self.__getProp(
self.__nexuslinks, self.__nexuslabels, ds,
self.__ilinks if strategy == 'INIT'
else self.__links)
(parent, nxdata) = self.__createGroupTree(
definition, path, link)
created.append(ds)
shape, nxtype = None, self.__defaulttype
if ds in avds:
dsource = TangoUtils.command(
self.__nexusconfig_device, "dataSources",
[Utils.tostr(ds)])
if sys.version_info > (3,):
root = et.fromstring(
bytes(dsource[0], "UTF-8"),
parser=XMLParser(collect_ids=False))
else:
root = et.fromstring(
dsource[0],
parser=XMLParser(collect_ids=False))
dss = root.findall(".//datasource")
if dss and shape is None:
shape, nxtype = self.__shapeFromTango(dss[0])
if not nxtype:
nxtype = self.__defaulttype
nxtype = self.__getProp(
self.__nexustypes, self.__nexuslabels, ds, nxtype)
shape = self.__getProp(
self.__nexusshapes, self.__nexuslabels, ds, shape)
if ds in avds:
self.__createField(
parent, field, nxtype, ds,
dsnode=dss[0], shape=shape, strategy=strategy)
else:
self.__createField(parent, field, nxtype, ds,
ds, shape, strategy=strategy)
if link:
self.__createLink(nxdata, path, field)
[docs] def create(self):
""" creates dynamic component
:returns: dynanic component name
:rtype: :obj:`str`
"""
cps = TangoUtils.command(self.__nexusconfig_device,
"availableComponents")
name = self.__defaultCP
while name in cps:
name = name + "x"
self.__dynamicCP = name
definition = lxml.etree.Element("definition")
avds = TangoUtils.command(self.__nexusconfig_device,
"availableDataSources")
created = []
self.__createSardanaNodes(created, definition)
self.__createNonSardanaNodes(created, avds, definition, 'STEP')
self.__createNonSardanaNodes(created, avds, definition, 'INIT')
if sys.version_info > (3,):
xmls = Utils.tostr(
lxml.etree.tostring(
definition, encoding='unicode',
method='xml', pretty_print=True))
else:
xmls = Utils.tostr(
lxml.etree.tostring(
definition, encoding='utf8',
method='xml', pretty_print=True))
if xmls.startswith("<?xml"):
self.__nexusconfig_device.xmlstring = xmls
else:
self.__nexusconfig_device.xmlstring = \
"<?xml version='1.0' encoding='utf8'?>\n" + xmls
TangoUtils.command(self.__nexusconfig_device, "storeComponent",
Utils.tostr(self.__dynamicCP))
# print("Dynamic Component:\n%s" % root.toprettyxml(indent=" "))
return self.__dynamicCP
@classmethod
def __getProp(cls, nexusprop, nexuslabels, name, default):
""" gets the property value for the given datasource
:param nexusprop: nexus property dictionary
:type nexusprop: :obj:`dict` <:obj:`str` , :`any`>
:param nexuslabel: nexus label dictionary
:type nexuslabels: :obj:`dict` <:obj:`str` , :obj:`str`>
:param default: default value if property is not defined
:type default: `any`
:returns: propery value
:rtype: `any`
"""
prop = nexusprop.get(nexuslabels.get(name, ""), None)
if prop is None:
prop = nexusprop.get(name, default)
return prop
@classmethod
def __getPathField(cls, nexuspaths, nexuslabels, alias, defaultpath):
""" gets the Nexus path and for the given datasource
:param nexuspaths: nexus property path dictionary
:type nexuspaths: :obj:`dict` <:obj:`str` , :obj:`str`>
:param nexuslabels: nexus label dictionary
:type nexuslabels: :obj:`dict` <:obj:`str` , :obj:`str`>
:param alias : datasource alias
:type alias : :obj:`str`
:param defaultpath: default path if path is not defined
:type defaultpath: :obj:`str`
:returns: (path, fieldname)
:rtype: (:obj:`str` , :obj:`str`)
"""
path = nexuspaths.get(nexuslabels.get(alias, ""), "")
if not path:
path = nexuspaths.get(alias, "")
if path:
spath = path.split('/')
field = spath[-1]
path = '/'.join(spath[:-1]) if len(spath) > 1 else defaultpath
else:
path = defaultpath
field = alias
if len(field) > 12 and field[:8] == 'tango://':
field = field[8:]
return (path,
field.replace(" ", "_").replace("/", "_").replace(
":", "_").replace(".", "_").replace("\\", "_").replace(
";", "_").lower())
@classmethod
def __createLink(cls, entry, path, name):
""" creates XML node for nexus link
:param root: root node
:type root: :class:`lxml.etree.Element`
:param entry: entry node
:type entry: :class:`lxml.etree.Element`
:param path: nexus path
:type path: :obj:`str`
:param name: link name
:type name: :obj:`str`
"""
if name is not None and entry is not None:
link = lxml.etree.Element("link")
entry.append(link)
link.attrib["target"] = "%s/%s" % (path, name)
link.attrib["name"] = name
@classmethod
def __findDataSource(cls, name):
""" finds datasource details:
(attribute name, device name, host, port) tuple
:param name: datasource name
:type name: :obj:`str`
:returns: (attribute name, device name, host, port) tuple
:rtype: (:obj:`str`, :obj:`str`, :obj:`str`)
"""
attr = None
device = None
host = None
port = None
source = None
sname = name.split("://")
if name and sname[0] == 'tango' and sname[-1].count('/') > 2:
source = sname[-1]
else:
source = PoolUtils.getSource(name)
if source:
arr = source.split("/")
if len(arr) > 4 and ":" in arr[0]:
device = "/".join(arr[1:-1])
attr = arr[-1]
hat = arr[0].split(":")
if hat > 1:
host = hat[0]
port = hat[1]
elif len(arr) > 3:
device = "/".join(arr[:-1])
attr = arr[-1]
return (attr, device, host, port)
@classmethod
def __createField(cls, parent, fname, nxtype, sname,
record=None, shape=None, dsnode=None,
strategy='STEP', dstype=None):
""" creates XML node for NeXus field
:param parent: parent node
:type parent: :class:`lxml.etree.Element`
:param fname: field name
:type fname: :obj:`str`
:param nxtype: field NeXus type
:type nxtype: :obj:`str`
:param sname: data source name
:type sname: :obj:`str`
:param record: record attribute
:type record: :obj:`str`
:param shape: field shape
:type shape: :obj:`list`< :obj:`int`>:
:param dsnode: datasource node
:type dsnode: :obj:`str`
:param strategy: strategy mode
:type strategy: :obj:`str`
:param dstype: datasource type
:type dstyp: :obj:`str`
:returns: etree xml field element
:rtype dstyp: :class:`lxml.etree.Element`
"""
field = lxml.etree.Element("field")
parent.append(field)
field.attrib["type"] = nxtype
field.attrib["name"] = fname
strategynode = lxml.etree.Element("strategy")
field.append(strategynode)
strategynode.attrib["mode"] = strategy
if dsnode is not None:
dsource = dsnode
else:
if dstype == 'CLIENT' and record:
device = None
attr = None
else:
(attr, device, host, port) = cls.__findDataSource(sname)
if device and attr:
dsource = lxml.etree.Element("datasource")
dsource.attrib["name"] = sname
dsource.attrib["type"] = "TANGO"
dev = lxml.etree.Element("device")
dsource.append(dev)
dev.attrib["member"] = "attribute"
dev.attrib["name"] = device
if host and port:
dev.attrib["hostname"] = host
dev.attrib["port"] = port
rec = lxml.etree.Element("record")
dsource.append(rec)
rec.attrib["name"] = attr
else:
dsource = lxml.etree.Element("datasource")
dsource.attrib["name"] = sname
dsource.attrib["type"] = "CLIENT"
rec = lxml.etree.Element("record")
dsource.append(rec)
rec.attrib["name"] = record
field.append(dsource)
if shape:
dm = lxml.etree.Element("dimensions")
dm.attrib["rank"] = Utils.tostr(len(shape))
field.append(dm)
for i in range(len(shape)):
dim = lxml.etree.Element("dim")
dm.append(dim)
dim.attrib["index"] = Utils.tostr(i + 1)
dim.attrib["value"] = Utils.tostr(shape[i])
return field
[docs] def remove(self, name):
""" removes dynamic component
:param name: dynamic component name
:type name: :obj:`str`
"""
if self.__defaultCP not in name:
raise Exception(
"Dynamic component name should contain: %s" % self.__defaultCP)
cps = TangoUtils.command(self.__nexusconfig_device,
"availableComponents")
if name in cps:
TangoUtils.command(self.__nexusconfig_device,
"deleteComponent", Utils.tostr(name))
@classmethod
def __createGroupTree(cls, definition, path, links=False):
""" creates group tree
:param definition: definition node
:type definition: :class:`lxml.etree.Element`
:param path: NeXus path
:type path: :obj:`str`
:param links: if NXdata should be created
:type links: :obj:`bool`
:returns (last group node, nxdata group node) tuple
:rtype (:class:`lxml.etree.Element`, :class:`lxml.etree.Element`)
"""
spath = path.split('/')
entry = None
parent = definition
nxdata = None
for dr in spath:
if dr.strip():
node = lxml.etree.Element("group")
parent.append(node)
if entry is None:
entry = node
w = dr.split(':')
if len(w) == 1:
if len(w[0]) > 2 and w[0][:2] == 'NX':
w.insert(0, w[0][2:])
else:
w.append("NX" + w[0])
node.attrib["type"] = w[1]
node.attrib["name"] = w[0]
parent = node
if links and entry is not None:
nxdata = lxml.etree.Element("group")
entry.append(nxdata)
nxdata.attrib["type"] = "NXdata"
nxdata.attrib["name"] = "data"
return parent, nxdata