# This file is part of nexdatas - Tango Server for NeXus data writer
#
# Copyright (C) 2012-2018 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Creator of XML configuration files """
try:
import tango
except Exception:
import PyTango as tango
import sys
import lxml.etree
if sys.version_info > (3,):
unicode = str
def _tostr(text):
""" convert bytestr or unicode to python str
:param text: text to convert
:type text: :obj:`bytes` or :obj:`unicode` or :obj:`str`
:returns: converted text
:rtype: :obj:`str`
"""
if isinstance(text, str):
return text
else:
if sys.version_info > (3,) and \
(isinstance(text, bytes) or isinstance(text, unicode)):
return str(text, "utf8")
else:
return str(text)
[docs]class NTag(object):
""" tag wrapper
"""
#
def __init__(self, parent, tagName, nameAttr="", typeAttr=""):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param tagName: tag name
:type tagName: :obj:`str`
:param nameAttr: value of name attribute
:type nameAttr: :obj:`str`
:param typeAttr: value of type attribute
:type typeAttr: :obj:`str`
"""
#: (:class:`lxml.etree.Element`) tag element from etree
self.elem = lxml.etree.Element(tagName)
parent.elem.append(self.elem)
if nameAttr != "":
self.elem.attrib["name"] = nameAttr
if typeAttr != "":
self.elem.attrib["type"] = typeAttr
[docs] def addTagAttr(self, name, value):
""" adds tag attribute
:param name: attribute name
:type name: :obj:`str`
:param value: attribute value
:type value: :obj:`str`
"""
self.elem.attrib[name] = value
[docs] def setText(self, text):
""" sets tag content
:param text: tag content
:type text: :obj:`str`
"""
self.elem.text = text
[docs] def addText(self, text):
""" adds tag content
:param text: tag content
:type text: :obj:`str`
"""
self.elem.text = self.elem.text + text
[docs]class NAttr(NTag):
""" Attribute tag wrapper
"""
def __init__(self, parent, nameAttr, typeAttr=""):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param nameAttr: name attribute
:type nameAttr: :obj:`str`
:param typeAttr: type attribute
:type typeAttr: :obj:`str`
"""
NTag.__init__(self, parent, "attribute", nameAttr, typeAttr)
[docs] def setStrategy(self, mode="STEP", trigger=None, value=None, canfail=None):
""" sets the attribute strategy
:param mode: mode data writing, i.e. INIT, STEP, FINAL, POSTRUN
:type mode: :obj:`str`
:param trigger: for asynchronous writting,
e.g. with different subentries
:type trigger: :obj:`str`
:param value: label for postrun mode
:type value: :obj:`str`
:param canfail: can fail strategy flag
:type canfail: :obj:`bool`
"""
#: strategy of data writing, i.e. INIT, STEP, FINAL, POSTRUN
strategy = NTag(self, "strategy")
if strategy:
strategy.addTagAttr("mode", mode)
if trigger:
strategy.addTagAttr("trigger", trigger)
if canfail:
strategy.addTagAttr("canfail", "true")
if value:
strategy.setText(value)
[docs]class NGroup(NTag):
""" Group tag wrapper
"""
def __init__(self, parent, nameAttr, typeAttr=""):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param nameAttr: name attribute
:type nameAttr: :obj:`str`
:param typeAttr: type attribute
:type typeAttr: :obj:`str`
"""
NTag.__init__(self, parent, "group", nameAttr, typeAttr)
#: (:obj:`list` <:obj:`str`> ) list of doc tag contents
self._doc = []
#: (:obj:`dict` <:obj:`str` , :obj:`str`> )
#: container with attribute tag wrappers
self._gAttr = {}
[docs] def addDoc(self, doc):
""" adds doc tag content
:param doc: doc tag content
:type doc: :obj:`str`
"""
self._doc.append(NTag(self, "doc"))
self._doc[-1].addText(doc)
[docs] def addAttr(self, attrName, attrType, attrValue=""):
"""adds attribute: tag
:param attrName: name attribute
:type attrName: :obj:`str`
:param attrType: type attribute
:type attrType: :obj:`str`
:param attrValue: content of the attribute tag
:type attrValue: :obj:`str`
"""
print("%s %s %s" % (attrName, attrType, attrValue))
at = NAttr(self, attrName, attrType)
self._gAttr[attrName] = at
if attrValue != "":
at.setText(attrValue)
return self._gAttr[attrName]
[docs]class NLink(NTag):
""" Link tag wrapper
"""
def __init__(self, parent, nameAttr, gTarget):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param nameAttr: name attribute
:type nameAttr: :obj:`str`
:param gTarget: target attribute
:type gTarget: :obj:`str`
"""
NTag.__init__(self, parent, "link", nameAttr)
self.addTagAttr("target", gTarget)
#: list of doc tag contents
self._doc = []
[docs] def addDoc(self, doc):
""" adds doc tag content
:param doc: doc tag content
:type doc: :obj:`str`
"""
self._doc.append(NTag(self, "doc"))
self._doc[-1].addText(doc)
[docs]class NDimensions(NTag):
""" Dimensions tag wrapper
"""
def __init__(self, parent, rankAttr):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param rankAttr: rank attribute
:type rankAttr: :obj:`str`
"""
NTag.__init__(self, parent, "dimensions")
self.addTagAttr("rank", rankAttr)
#: (:obj:`dict` <:obj:`str`, :class:`NDim`>)
#: container with dim tag wrapper
self.dims = {}
[docs] def dim(self, indexAttr, valueAttr):
""" adds dim tag
:param indexAttr: index attribute
:type indexAttr: :obj:`str`
:param valueAttr: value attribute
:type valueAttr: :obj:`str`
"""
self.dims[indexAttr] = NDim(self, indexAttr, valueAttr)
[docs]class NDim(NTag):
""" Dim tag wrapper
"""
def __init__(self, parent, indexAttr, valueAttr):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param indexAttr: index attribute
:type indexAttr: :obj:`str`
:param valueAttr: value attribute
:type valueAttr: :obj:`str`
"""
NTag.__init__(self, parent, "dim")
self.addTagAttr("index", indexAttr)
self.addTagAttr("value", valueAttr)
[docs]class NField(NTag):
""" Field tag wrapper
"""
def __init__(self, parent, nameAttr, typeAttr=""):
"""constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param nameAttr: name attribute
:type nameAttr: :obj:`str`
:param typeAttr: type attribute
:type typeAttr: :obj:`str`
"""
NTag.__init__(self, parent, "field", nameAttr, typeAttr)
#: (:obj:`list` <:obj:`str`> ) list of doc tag contents
self._doc = []
#: (:obj:`dict` <:obj:`str` , :obj:`str`> )
#: container with attribute tag wrappers
self._attr = {}
[docs] def setStrategy(self, mode="STEP", trigger=None, value=None,
grows=None, compression=False, rate=None,
shuffle=None, canfail=None, compression_opts=None):
""" sets the field strategy
:param mode: mode data writing, i.e. INIT, STEP, FINAL, POSTRUN
:type mode: :obj:`str`
:param trigger: for asynchronous writting,
e.g. with different subentries
:type trigger: :obj:`str`
:param value: label for postrun mode
:type value: :obj:`str`
:param grows: growing dimension
:type grows: :obj:`str`
:param compression: flag if compression shuold be applied
:type compression: :obj:`str`
:param rate: compression rate
:type rate: :obj:`str`
:param shuffle: flag if compression shuffle
:type shuffle: :obj:`str`
:param canfail: can fail strategy flag
:type canfail: :obj:`bool`
"""
#: strategy of data writing, i.e. INIT, STEP, FINAL, POSTRUN
strategy = NTag(self, "strategy")
if strategy:
strategy.addTagAttr("mode", mode)
if grows:
strategy.addTagAttr("grows", grows)
if trigger:
strategy.addTagAttr("trigger", trigger)
if value:
strategy.setText(value)
if canfail:
strategy.addTagAttr("canfail", "true")
if compression:
if int(compression) == 1:
strategy.addTagAttr("compression", "true")
if rate is not None:
strategy.addTagAttr("rate", str(rate))
else:
strategy.addTagAttr(
"compression", int(compression))
if compression_opts:
strategy.addTagAttr(
"compression_opts",
",".join([str(opts) for opts in compression_opts]))
if shuffle is not None:
strategy.addTagAttr(
"shuffle",
"true" if shuffle else "false")
[docs] def setUnits(self, unitsAttr):
""" sets the field unit
:param unitsAttr: the field unit
:type unitsAttr: :obj:`str`
"""
self.addTagAttr("units", unitsAttr)
[docs] def addDoc(self, doc):
""" adds doc tag content
:param doc: doc tag content
:type doc: :obj:`str`
"""
self._doc.append(NTag(self, "doc"))
self._doc[-1].addText(doc)
[docs] def addAttr(self, attrName, attrType, attrValue=""):
""" adds attribute tag
:param attrName: name attribute
:type attrName: :obj:`str`
:param attrType: type attribute
:type attrType: :obj:`str`
:param attrValue: content of the attribute tag
:type attrValue: :obj:`str`
"""
self._attr[attrName] = NAttr(self, attrName, attrType)
if attrValue != '':
self._attr[attrName].setText(attrValue)
return self._attr[attrName]
[docs]class NDSource(NTag):
""" Source tag wrapper
"""
def __init__(self, parent):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
"""
NTag.__init__(self, parent, "datasource")
#: list of doc tag contents
self._doc = []
[docs] def initDBase(self, name, dbtype, query, dbname=None, rank=None,
mycnf=None, user=None,
passwd=None, dsn=None, mode=None, host=None, port=None):
""" sets parameters of DataBase
:param name: name of datasource
:type name: :obj:`str`
:param dbname: name of used DataBase
:type dbname: :obj:`str`
:param query: database query
:type query: :obj:`str`
:param dbtype: type of the database, i.e. MYSQL, PGSQL, ORACLE
:type dbtype: :obj:`str`
:param rank: rank of the query output, i.e. SCALAR, SPECTRUM, IMAGE
:type rank: :obj:`str`
:param mycnf: MYSQL config file
:type mycnf: :obj:`str`
:param user: database user name
:type user: :obj:`str`
:param passwd: database user password
:type passwd: :obj:`str`
:param dsn: DSN string to initialize ORACLE and PGSQL databases
:type dsn: :obj:`str`
:param mode: mode for ORACLE databases, i.e. SYSDBA or SYSOPER
:type mode: :obj:`str`
:param host: name of the host
:type host: :obj:`str`
:param port: port number
:type port: :obj:`str`
"""
self.addTagAttr("type", "DB")
self.addTagAttr("name", name)
da = NTag(self, "database")
da.addTagAttr("dbtype", dbtype)
if host:
da.addTagAttr("hostname", host)
if port:
da.addTagAttr("port", port)
if dbname:
da.addTagAttr("dbname", dbname)
if user:
da.addTagAttr("user", user)
if passwd:
da.addTagAttr("passwd", passwd)
if mycnf:
da.addTagAttr("mycnf", mycnf)
if mode:
da.addTagAttr("mode", mode)
if dsn:
da.addText(dsn)
da = NTag(self, "query")
if rank:
da.addTagAttr("format", rank)
da.addText(query)
[docs] def initTango(self, name, device, memberType, recordName, host=None,
port=None, encoding=None, group=None):
""" sets paramters for Tango device
:param name: name of datasource
:type name: :obj:`str`
:param device: device name
:type device: :obj:`str`
:param memberType: type of the data object, i.e. attribute,
property, command
:type memberType: :obj:`str`
:param recordName: name of the data object
:type recordName: :obj:`str`
:param host: host name
:type host: :obj:`str`
:param port: port
:type port: :obj:`str`
:param encoding: encoding of DevEncoded data
:type encoding: :obj:`str`
"""
self.addTagAttr("type", "TANGO")
self.addTagAttr("name", name)
dv = NTag(self, "device")
dv.addTagAttr("name", device)
if memberType:
dv.addTagAttr("member", memberType)
if host:
dv.addTagAttr("hostname", host)
if port:
dv.addTagAttr("port", port)
if encoding:
dv.addTagAttr("encoding", encoding)
if group:
dv.addTagAttr("group", group)
da = NTag(self, "record")
da.addTagAttr("name", recordName)
[docs] def initClient(self, name, recordName):
""" sets paramters for Client data
:param name: name of datasource
:type name: :obj:`str`
:param recordName: name of the data object
:type recordName: :obj:`str`
"""
self.addTagAttr("type", "CLIENT")
self.addTagAttr("name", name)
da = NTag(self, "record")
da.addTagAttr("name", recordName)
[docs] def addDoc(self, doc):
""" adds doc tag content
:param doc: doc tag content
:type doc: :obj:`str`
"""
self._doc.append(NTag(self, "doc"))
self._doc[-1].addText(doc)
[docs]class NDeviceGroup(NGroup):
""" Tango device tag creator
"""
#: (:obj:`list` < :obj:`str`>) Tango types
tTypes = ["DevVoid",
"DevBoolean",
"DevShort",
"DevLong",
"DevFloat",
"DevDouble",
"DevUShort",
"DevULong",
"DevString",
"DevVarCharArray",
"DevVarShortArray",
"DevVarLongArray",
"DevVarFloatArray",
"DevVarDoubleArray",
"DevVarUShortArray",
"DevVarULongArray",
"DevVarStringArray",
"DevVarLongStringArray",
"DevVarDoubleStringArray",
"DevState",
"ConstDevString",
"DevVarBooleanArray",
"DevUChar",
"DevLong64",
"DevULong64",
"DevVarLong64Array",
"DevVarULong64Array",
"DevInt",
"DevEncoded"]
#: (:obj:`list` <:obj:`str`>) NeXuS types corresponding to the Tango types
nTypes = ["NX_CHAR",
"NX_BOOLEAN",
"NX_INT32",
"NX_INT32",
"NX_FLOAT32",
"NX_FLOAT64",
"NX_UINT32",
"NX_UINT32",
"NX_CHAR",
"NX_CHAR",
"NX_INT32",
"NX_INT32",
"NX_FLOAT32",
"NX_FLOAT64",
"NX_UINT32",
"NX_UINT32",
"NX_CHAR",
"NX_CHAR",
"NX_CHAR",
"NX_CHAR",
"NX_CHAR",
"NX_BOOLEAN",
"NX_CHAR",
"NX_INT64",
"NX_UINT64",
"NX_INT64",
"NX_UINT64",
"NX_INT32",
"NX_CHAR"]
def __init__(self, parent, deviceName, nameAttr, typeAttr="",
commands=True, blackAttrs=None):
""" constructor
:param parent: parent tag element
:type parent: :class:`NTag`
:param deviceName: tango device name
:type deviceName: :obj:`str`
:param nameAttr: name attribute
:type nameAttr: :obj:`str`
:param typeAttr: type attribute
:type typeAttr: :obj:`str`
:param commands: if we call the commands
:type commands: :obj:`bool`
:param blackAttrs: list of excluded attributes
:type blackAttrs: :obj:`list` <:obj:`str`>
"""
NGroup.__init__(self, parent, nameAttr, typeAttr)
#: (:class:`tango.DeviceProxy`) device proxy
self._proxy = tango.DeviceProxy(deviceName)
#: (:obj:`dict` <:obj:`str`, :class:`NTag`>) fields of the device
self._fields = {}
#: (:obj:`list` <:obj:`str`>) blacklist for Attributes
self._blackAttrs = blackAttrs if blackAttrs else []
#: (:obj:`str`) the device name
self._deviceName = deviceName
self._fetchProperties()
self._fetchAttributes()
if commands:
self._fetchCommands()
def _fetchProperties(self):
""" fetches properties
:brief: It collects the device properties
"""
prop = self._proxy.get_property_list('*')
print("PROPERIES %s" % prop)
for pr in prop:
self.addAttr(pr, "NX_CHAR",
str(self._proxy.get_property(pr)[pr][0]))
if pr not in self._fields:
self._fields[pr] = NField(self, pr, "NX_CHAR")
self._fields[pr].setStrategy("STEP")
sr = NDSource(self._fields[pr])
sr.initTango(
self._deviceName, self._deviceName, "property",
pr, host="haso228k.desy.de", port="10000")
def _fetchAttributes(self):
""" fetches Attributes
:brief: collects the device attributes
"""
#: device attirbutes
attr = self._proxy.get_attribute_list()
for at in attr:
print(at)
cf = self._proxy.attribute_query(at)
print("QUERY")
print(cf)
print(cf.name)
print(cf.data_format)
print(cf.standard_unit)
print(cf.display_unit)
print(cf.unit)
print(self.tTypes[cf.data_type])
print(self.nTypes[cf.data_type])
print(cf.data_type)
if at not in self._fields and at not in self._blackAttrs:
self._fields[at] = NField(self, at, self.nTypes[cf.data_type])
encoding = None
if str(cf.data_format).split('.')[-1] == "SPECTRUM":
da = self._proxy.read_attribute(at)
d = NDimensions(self._fields[at], "1")
d.dim("1", str(da.dim_x))
if str(da.type) == 'DevEncoded':
encoding = 'VDEO'
if str(cf.data_format).split('.')[-1] == "IMAGE":
da = self._proxy.read_attribute(at)
d = NDimensions(self._fields[at], "2")
d.dim("1", str(da.dim_x))
d.dim("2", str(da.dim_y))
if str(da.type) == 'DevEncoded':
encoding = 'VDEO'
if cf.unit != 'No unit':
self._fields[at].setUnits(cf.unit)
self._fields[at].setUnits(cf.unit)
if cf.description != 'No description':
self._fields[at].addDoc(cf.description)
self.addAttr('URL', "NX_CHAR", "tango://" + self._deviceName)
self._fields[at].setStrategy("STEP")
sr = NDSource(self._fields[at])
sr.initTango(self._deviceName, self._deviceName, "attribute",
at, host="haso228k.desy.de", port="10000",
encoding=encoding)
def _fetchCommands(self):
""" fetches commands
:brief: It collects results of the device commands
"""
#: list of the device commands
cmd = self._proxy.command_list_query()
print("COMMANDS %s" % cmd)
for cd in cmd:
if str(cd.in_type).split(".")[-1] == "DevVoid" \
and str(cd.out_type).split(".")[-1] != "DevVoid" \
and str(cd.out_type).split(".")[-1] in self.tTypes \
and cd.cmd_name not in self._fields:
self._fields[cd.cmd_name] = \
NField(
self, cd.cmd_name,
self.nTypes[self.tTypes.index(
str(cd.out_type).split(".")[-1])])
self._fields[cd.cmd_name].setStrategy("STEP")
sr = NDSource(self._fields[cd.cmd_name])
sr.initTango(self._deviceName, self._deviceName,
"command", cd.cmd_name,
host="haso228k.desy.de", port="10000")
[docs]class XMLFile(object):
""" XML file object
"""
def __init__(self, fname):
""" constructor
:param fname: XML file name
:type fname: :obj:`str`
"""
#: (:obj:`str`) XML file name
self.fname = fname
#: (:class:`lxml.etree.Element`) XML root instance
self.elem = lxml.etree.Element("definition")
[docs] def prettyPrint(self, etNode=None):
"""prints pretty XML making use of etree
:param etNode: node
:type etNode: :class:`lxml.etree.Element`
"""
node = etNode if etNode is not None else self.elem
if sys.version_info > (3,):
xmls = _tostr(
lxml.etree.tostring(
node, encoding='unicode',
method='xml', pretty_print=True))
else:
xmls = _tostr(
lxml.etree.tostring(
node, encoding='utf8',
method='xml', pretty_print=True))
if not xmls.startswith("<?xml"):
xmls = "<?xml version='1.0' encoding='utf8'?>\n" + xmls
return xmls
[docs] def setDependencies(self, components, entry=None):
""" sets tag content
:param components: component dependencies
:type components: :obj:`list` <:obj:`str`>
:param entry: entry node
:type entry: :class:`lxml.etree.Element`
"""
text = "\n"
for cp in components:
text += "$components.%s\n" % cp
if entry is not None:
if entry.elem.tail is not None:
entry.elem.tail += text
else:
entry.elem.tail = text
else:
if self.elem.text:
self.elem.text += text
else:
self.elem.text = text
[docs] def dump(self):
""" dumps XML structure into the XML file
:brief: It opens XML file, calls prettyPrint and closes the XML file
"""
with open(self.fname, "w") as myfile:
myfile.write(self.prettyPrint(self.elem))
[docs]def main():
""" the main function
"""
#: handler to XML file
df = XMLFile("test.xml")
#: entry
en = NGroup(df, "entry1", "NXentry")
#: instrument
ins = NGroup(en, "instrument", "NXinstrument")
#: NXsource
src = NGroup(ins, "source", "NXsource")
#: field
f = NField(src, "distance", "NX_FLOAT")
f.setUnits("m")
f.setText("100.")
df.dump()
if __name__ == "__main__":
main()