#!/usr/bin/env python
# This file is part of nxsrecconfig - NeXus Sardana Recorder Settings
#
# Copyright (C) 2014-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Component Describer """
import re
import sys
import json
try:
import tango
except Exception:
import PyTango as tango
import xml.etree.ElementTree as et
from lxml.etree import XMLParser
# from lxml import etree
from .Utils import Utils, TangoUtils
if sys.version_info > (3,):
unicode = str
[docs]class DSItem(object):
""" Basic DataSource item
"""
__slots__ = 'name', 'dstype', 'record', 'parentobj'
#
def __init__(self, name=None, dstype=None, record=None, dsitem=None,
parentobj=None):
""" constructor
:param name: datasource name
:type name: :obj:`str`
:param dstype: datasource type
:type dstype: :obj:`str`
:param record: datasource record
:type record: :obj:`str`
:param dsitem: datasource item
:type dsitem: :class:`DSItem`
:param parentobj: datasource parent object
:type parentobj: :obj:`str`
"""
if dsitem:
#: (:obj:`str`) datasource name
self.name = dsitem.name
#: (:obj:`str`) datasource type
self.dstype = dsitem.dstype
#: (:obj:`str`) datasource record
self.record = dsitem.record
#: (:obj:`str`) datasource parentobj
self.parentobj = dsitem.parentobj
else:
self.name = Utils.tostr(name) if name is not None else None
self.dstype = Utils.tostr(dstype) if dstype is not None else None
self.record = Utils.tostr(record) if record is not None else None
self.parentobj = Utils.tostr(parentobj) \
if parentobj is not None else None
[docs]class ExDSItem(DSItem):
""" Extended DataSource item
"""
__slots__ = 'mode', 'nxtype', 'shape'
#
def __init__(self, dsitem=None, mode=None, nxtype=None, shape=None):
""" constructor
:param dsitem: datasource item
:type dsitem: :class:`DSItem`
:param mode: writing strategy mode
:type mode: :obj:`str`
:param nxtype: nexus type
:type nxstype: :obj:`str`
:param shape: datasource shape
:type shape: :obj:`list` <:obj:`int`>
"""
DSItem.__init__(self, dsitem=dsitem)
#: (:obj:`str`) writing strategy mode
self.mode = Utils.tostr(mode) if mode is not None else None
#: (:obj:`str`) nexus type
self.nxtype = Utils.tostr(nxtype) if nxtype is not None else None
#: (:obj:`list` <:obj:`int`>) datasource shape
self.shape = shape
[docs]class ExDSDict(dict):
""" Extended DataSource Dictionary
"""
def __init__(self, *args, **kw):
""" constructor
:param args: dict args
:type args: :obj:`list` <`any`>
:param kw: dict kw
:type kw: :obj:`dict` <`any` , `any`>
"""
super(ExDSDict, self).__init__(*args, **kw)
#: (:obj:`int`) counter
self.__counter = 1
#: (:obj:`str`) noname datasource prefix
self.__prefix = '__unnamed__'
[docs] def appendDSList(self, dslist, mode, nxtype=None, shape=None):
""" appends a list of ExDSItem
:param dslist: DSItem list
:type dslist: :obj:`list` <:class:`DSItem`>
:param mode: startegy mode
:type mode: :obj:`str`
:param nxtype: NeXus type
:type nxstype: :obj:`str`
:param shape: data shape
:type shape: :obj:`list` <:obj:`int`>
:returns: datasource name for first added datasource
or None if not appended
:rtype: :obj:`str`
"""
fname = None
for dsitem in dslist:
name = dsitem.name
if name:
if name not in self.keys():
self[name] = []
self[name].append(ExDSItem(dsitem, mode, nxtype, shape))
elif dsitem.dstype:
name = self.__prefix + Utils.tostr(self.__counter)
while name in self.keys():
name = self.__prefix + Utils.tostr(self.__counter)
self.__counter = self.__counter + 1
self[name] = []
self[name].append(ExDSItem(dsitem, mode, nxtype, shape))
if not fname:
fname = name
return fname
[docs]class Describer(object):
""" Lists datasources, strategy, dstype and record name
of given component """
def __init__(self, nexusconfig_device, tree=False, pyevalfromscript=False):
""" constructor
:param nexusconfig_device: configserver configuration server
:type nexusconfig_device: :class:`tango.DeviceProxy` \
or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator`
:param tree: flag for output tree dictionary
:type tree: :obj:`bool`
:param pyevalfromscript: if evalulate PYEVAL datasources from script
:type pyevalfromscript: :obj:`bool`
"""
#: (:class:`tango.DeviceProxy` \
#: or :class:`nxsconfigserver.XMLConfigurator.XMLConfigurator`) \
#: configuration server
self.__nexusconfig_device = nexusconfig_device
#: (:obj:`bool`) flag for output tree dictionary
self.__treeOutput = tree
#: (:obj:`bool`) flag for evalulating PYEVAL datasources from script
self.__pyevalfromscript = pyevalfromscript
#: (:obj:`list` <:obj:`str`>) available configuration server components
self.__availableComponents = TangoUtils.command(
self.__nexusconfig_device,
"availableComponents")
#: (:obj:`list` <:obj:`str`>) \
#: available configuration server datasources
self.__availableDataSources = TangoUtils.command(
self.__nexusconfig_device,
"availableDataSources")
[docs] def components(self, components=None, strategy='', dstype='', cfvars=None):
""" describes given components. If :obj:`tree` = True it returns
| [{ cpname : { dsname : [ \
| (strategy, dstype, record, nxstype, shape), ...] } } ]
else
| [{"dsname": dsname, "strategy": strategy, "dstype": dstype, \
| "record": record, "nxstype": nxstype, "shape": shape , \
| "cpname": cpname}, ...]
:param components: given components.
If None all available ones are taken
:type components: :obj:`list` <:obj:`str`>
:param strategy: list datasets only with given strategy.
If '' all available ones are taken
:type strategy: :obj:`str`
:param dstype: list datasets only with given datasource type.
If '' all available ones are taken
:type dstype: :obj:`str`
:param cfvars: configuration variables in JSON dictionary
:type cfvars: :obj:`str`
:returns: list of dictionary with description of components \
:rtype: [:obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, \
:obj:`list` <(:obj:`str`, :obj:`str`, :obj:`str`, \
:obj:`str`, :obj:`list` <:obj:`int`>)> > > ] or \
[{"dsname": :obj:`str`, "strategy": :obj:`str`, \
"dstype": :obj:`str`, "record": :obj:`str`, \
"nxstype": :obj:`str`, "shape": :obj:`list` <:obj:`int`> , \
"cpname": :obj:`str`}, ...]
"""
result = []
if components is not None:
cps = [cp for cp in components if cp in self.__availableComponents]
else:
cps = list(self.__availableComponents)
if self.__treeOutput:
result = [{}]
result[0] = self.__fillintree(cps, strategy, dstype)
else:
result = self.__fillinlist(cps, strategy, dstype, cfvars)
return result
def __fillinlist(self, cps, strategy, dstype, cfvars):
""" fills in the list of output elements
:param cps: component list
:type components: :obj:`list` <:obj:`str`>
:param strategy: required strategy or None
:type strategy: :obj:`str`
:param dstype: required datasource type or None
:type dstype: :obj:`str`
:param cfvars: dictionary with configuration variables
:type cfvars: :obj:`str`
:returns: list of output dictionary elements
:rtype: [{"dsname": :obj:`str`, "strategy": :obj:`str`, \
"dstype": :obj:`str`, "record": :obj:`str`, \
"nxstype": :obj:`str`, "shape": :obj:`list` <:obj:`int`> , \
"cpname": :obj:`str`}, ...]
"""
result = []
for cp in cps:
dss = self.__getInstDataSourceAttributes(cp, cfvars)
for ds in dss.keys():
for vds in dss[ds]:
if (not strategy or vds.mode == strategy) and \
(not dstype or vds.dstype == dstype):
elem = {}
elem["dsname"] = ds
elem["strategy"] = vds.mode
elem["dstype"] = vds.dstype
elem["parentobj"] = vds.parentobj
elem["record"] = vds.record
elem["nxtype"] = vds.nxtype
elem["shape"] = vds.shape
elem["cpname"] = cp
result.append(elem)
return result
def __fillintree(self, cps, strategy, dstype):
""" fills in the dictionary of output elements
:param cps: component list
:type components: :obj:`list` <:obj:`str`>
:param strategy: required strategy or None
:type strategy: :obj:`str`
:param dstype: required datasource type or None
:type dstype: :obj:`str`
:returns: dictionary of output dictionary elements
:rtype: [:obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, \
:obj:`list` <(:obj:`str`, :obj:`str`, :obj:`str`, \
:obj:`str`, :obj:`list` <:obj:`int`>)> > > ]
"""
result = {}
for cp in cps:
dss = self.__getDataSourceAttributes(cp)
tr = {}
for ds in dss.keys():
for vds in dss[ds]:
if (not strategy or vds.mode == strategy) and \
(not dstype or vds.dstype == dstype):
if ds not in tr:
tr[ds] = []
tr[ds].append((vds.mode, vds.dstype, vds.record,
vds.nxtype, vds.shape, vds.parentobj))
result[cp] = tr
return result
def __getDSFromNode(self, parent, dsl=None):
""" provides datasource item from XML node
:param node: xml node
:type node: :class:`lxml.etree.Element`
:param dsl: list with datasource items (DSItem)
:type dsl: :obj:`list` <:class:`DSItem`>
:returns: list with datasource items (DSItem)
:rtype: :obj:`list` <:class:`DSItem`>
"""
label = 'datasources'
name = None
dstype = None
record = None
dslist = dsl if dsl else []
dsxmls = None
nodes = parent.findall("datasource")
for node in nodes:
dstype = node.get("type")
name = node.get("name")
record = Utils.getRecord(node)
parentobj = (parent.tag
if hasattr(parent, "tag") else None)
dslist.append(
DSItem(name, dstype, record,
parentobj=parentobj))
if name and Utils.tostr(dstype) == 'PYEVAL':
if dsxmls and self.__pyevalfromscript:
if node not in parent:
parentobj = "datasource"
dslist.extend(
self.__findsubdatasources(dsxmls[0]), parentobj)
else:
self.__getDSFromNode(node, dslist)
if not name and not dslist:
dstxt = Utils.getText(parent)
dsitem = DSItem(
parentobj=(parent.tag
if hasattr(parent, "tag") else None))
index = dstxt.find("$%s." % label)
while index != -1:
try:
if sys.version_info > (3,):
subc = re.finditer(
r"[\w]+",
dstxt[(index + len(label) + 2):]
).__next__().group(0)
else:
subc = re.finditer(
r"[\w]+",
dstxt[(index + len(label) + 2):]).next().group(0)
except (StopIteration, IndexError):
subc = ''
name = subc.strip() if subc else ""
if Utils.tostr(name) in self.__availableDataSources:
dsxmls = TangoUtils.command(
self.__nexusconfig_device,
"dataSources", [Utils.tostr(name)])
else:
dsxmls = None
dsitem = DSItem(
name, "__ERROR__", "__ERROR__",
parentobj=(parent.tag
if hasattr(parent, "tag") else None))
if dsxmls:
dsitem = self.__describeDataSource(name, dsxmls[0])
dsitem.parentobj = (
parent.tag
if hasattr(parent, "tag") else None)
if dsitem.dstype:
dstype = dsitem.dstype
break
else:
dsitem = DSItem(
name, None, None,
parentobj=(parent.tag
if hasattr(parent, "tag") else None)
)
index = dstxt.find("$%s." % label, index + 1)
dslist.append(dsitem)
if name and Utils.tostr(dstype) == 'PYEVAL':
if dsxmls and self.__pyevalfromscript:
dslist.extend(
self.__findsubdatasources(
dsxmls[0], "datasource"))
return dslist
def __findsubdatasources(self, dsxml, parentobj="datasource"):
""" finds datasources in pyeval scripts
:param dsxml: pyeval xml
:type dsxml: :obj:`str`
:param parentobj: parentobj
:type parentobj: :obj:`str`
:returns: list with datasource items (DSItem)
:rtype: :obj:`list` <:class:`DSItem`>
"""
dslist = []
result = ""
label = 'datasources'
if sys.version_info > (3,):
root = et.fromstring(bytes(dsxml, "UTF-8"),
parser=XMLParser(collect_ids=False))
else:
root = et.fromstring(dsxml,
parser=XMLParser(collect_ids=False))
cnode = root.findall("datasource")
if cnode:
for child in cnode[0]:
if child.tag == 'result':
result = Utils.getText(child)
index = dsxml.find("$%s." % label)
while index != -1:
try:
if sys.version_info > (3,):
subc = re.finditer(
r"[\w]+",
dsxml[(index + len(label) + 2):]).__next__().group(0)
else:
subc = re.finditer(
r"[\w]+",
dsxml[(index + len(label) + 2):]).next().group(0)
except (StopIteration, IndexError):
subc = ''
name = subc.strip() if subc else ""
if name in result:
chdsxml = TangoUtils.command(
self.__nexusconfig_device,
"dataSources", [Utils.tostr(name)])
if chdsxml:
dsitem = self.__describeDataSource(name, chdsxml[0])
dsitem.parentobj = parentobj
if dsitem.dstype:
dslist.append(dsitem)
else:
dslist.append(
DSItem(name, None, None, parentobj=parentobj))
index = dsxml.find("$%s." % label, index + 1)
return dslist
@classmethod
def __getShape(cls, node):
""" provides shape from node
:param node: xml node
:type node: :class:`lxml.etree.Element`
:returns: shape of node
:rtype :obj:`list` <:obj:`int`>
"""
rank = int(node.get("rank"))
shape = [None] * rank
dims = node.findall("dim")
for dim in dims:
index = int(dim.get("index"))
value = dim.get("value")
if value is not None:
try:
value = int(value)
except ValueError:
value = Utils.tostr(value)
shape[index - 1] = value
else:
dss = dim.findall("datasource")
if dss:
value = dss[0].get("name")
if value is None:
value = '__unnamed__'
shape[index - 1] = "$datasources.%s" % value
else:
value = Utils.getText(dim)
try:
value = int(value)
except Exception:
value = value.strip()
if not value:
value = None
shape[index - 1] = value
return shape
def __getDataSourceAttributes(self, cp):
""" provides datasource ExDSDict of given component
:param cp : component name
:type cp : :obj:`str`
:returns: datasource ExDSDict
:rtype: :class:`ExDSDict`
"""
dcps = TangoUtils.command(self.__nexusconfig_device,
"dependentComponents", [cp])
xmlc = TangoUtils.command(self.__nexusconfig_device,
"components", dcps)
if not len(xmlc) > 0:
return ExDSDict()
return self.__getDSFromXML(xmlc)
def __getInstDataSourceAttributes(self, cp, cfvars=None):
""" provides datasource ExDSDict of given instantiated component
:param cp : component name
:type cp : :obj:`str`
:param cfvars : component variables
:type cpvars : :obj:`str`
:returns: datasource ExDSDict
:rtype: :class:`ExDSDict`
"""
if cfvars:
cv = json.loads(self.__nexusconfig_device.variables)
sv = json.loads(cfvars)
if sv and isinstance(sv, dict):
cv.update(sv)
self.__nexusconfig_device.variables = json.dumps(cv)
dcps = TangoUtils.command(self.__nexusconfig_device,
"dependentComponents", [cp])
xmlc = TangoUtils.command(self.__nexusconfig_device,
"instantiatedComponents", dcps)
if not len(xmlc) > 0:
return ExDSDict()
return self.__getDSFromXML(xmlc)
def __getDSFromXML(self, cpxmls):
""" provides datasource ExDSDict of given component xml
:param cpxml : list of component xmls
:type cpxml : :obj:`list` < obj:`str`>
:returns: datasource ExDSDict
:rtype: :class:`ExDSDict`
"""
dss = ExDSDict()
for cpxml in cpxmls:
if sys.version_info > (3,):
root = et.fromstring(bytes(cpxml, "UTF-8"),
parser=XMLParser(collect_ids=False))
else:
root = et.fromstring(cpxml,
parser=XMLParser(collect_ids=False))
parents = root.findall(".//field")
attrs = root.findall(".//attribute")
dims = root.findall(".//dim")
parents.extend(attrs if attrs else [])
parents.extend(dims if dims else [])
for parent in parents:
strategy = parent.findall("strategy")
for sg in strategy:
mode = sg.get("mode")
if mode:
name = None
nxtype = None
dset = parent
nxtype = dset.get("type")
shape = None
dimensions = parent.findall("dimensions")
if dimensions:
shape = self.__getShape(dimensions[0])
name = dss.appendDSList(self.__getDSFromNode(parent),
mode, nxtype, shape)
if name:
break
return dss
[docs] def dataSources(self, names=None, dstype=''):
""" describes given datasources
:param names: given datasources.
If None all available ones are taken
:type names: :obj:`list` <:obj:`str`>
:param dstype: list datasources only with given type.
If '' all available ones are taken
:type dstype: :obj:`str`
:returns: list of dictionary with description of datasources
:rtype: [:obj:`dict` <:obj:`str`, :class:`ExDSDict` > ] or \
[{"dsname": :obj:`str`, "dstype": :obj:`str`, \
"record": :obj:`str`}, ...]
"""
ads = list(self.__availableDataSources)
if names is not None:
dss = [name for name in names if name in ads]
else:
dss = ads
try:
if dss:
xmls = TangoUtils.command(self.__nexusconfig_device,
"dataSources", dss)
else:
xmls = []
except Exception:
xmls = None
dslist = []
dsres = {}
for i, name in enumerate(dss):
if name:
if xmls:
dsxml = xmls[i]
dsitem = self.__describeDataSource(name, dsxml)
if dstype and dsitem.dstype != dstype:
continue
dsres[name] = dsitem
if dsxml and self.__pyevalfromscript:
dsitems = self.__findsubdatasources(dsxml)
for itm in dsitems:
dsres[itm.name] = itm
if self.__treeOutput:
dslist = [{}]
dslist[0] = dsres
else:
dslist = []
if isinstance(dsres, dict):
for ds in dsres.values():
elem = {}
elem["dsname"] = ds.name
elem["dstype"] = ds.dstype
elem["record"] = ds.record
dslist.append(Utils.tostr(json.dumps(elem)))
return dslist
def __describeDataSource(self, name, dsxml=None):
""" describes the datasource
:param name: datasource name
:param type: :obj:`str`
:param dsxml: datasource xml
:param dsxml: :obj:`str`
:returns: datasource DSItem
:rtype: :class:`DSItem`
"""
dstype = None
record = None
try:
if not dsxml:
dsource = TangoUtils.command(
self.__nexusconfig_device,
"dataSources", [Utils.tostr(name)])
else:
dsource = [dsxml]
except tango.DevFailed:
dsource = []
if len(dsource) > 0:
if sys.version_info > (3,):
root = et.fromstring(bytes(dsource[0], "UTF-8"),
parser=XMLParser(collect_ids=False))
else:
root = et.fromstring(dsource[0],
parser=XMLParser(collect_ids=False))
dss = root.findall(".//datasource")
for ds in dss:
if ds.tag == 'datasource':
if "name" in ds.attrib.keys():
name = ds.get("name")
dstype = ds.get("type")
record = Utils.getRecord(ds)
return DSItem(name, dstype, record)