#!/usr/bin/env python
# This file is part of nexdatas - Tango Server for NeXus data writer
#
# Copyright (C) 2012-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Definitions of PYEVAL datasource """
import threading
import copy
import sys
import xml.etree.ElementTree as et
from lxml.etree import XMLParser
from .Types import NTP
from .DataHolder import DataHolder
from .DataSources import DataSource
from .Errors import DataSourceSetupError
[docs]class Variables(object):
""" Variables for PyEval datasource
"""
[docs]class PyEvalSource(DataSource):
""" Python Eval data source
"""
def __init__(self, streams=None, name=None):
""" constructor
:brief: It cleans all member variables
:param streams: tango-like steamset class
:type streams: :class:`StreamSet` or :class:`tango.LatestDeviceImpl`
:param name: datasource name
:type name: :obj:`str`
"""
DataSource.__init__(self, streams=streams, name=name)
#: (:obj:`str`) name of data
self.__name = None
#: (:obj:`dict` <:obj:`str` , :obj:`dict` <:obj:`str`, any>>) \
#: the current static JSON object
self.__globalJSON = None
#: (:obj:`dict` <:obj:`str` , :obj:`dict` <:obj:`str`, any>>) \
#: the current dynamic JSON object
self.__localJSON = None
#: (:class:`nxswriter.DataSourcePool.DataSourcePool`) datasource pool
self.__pool = None
#: (:obj:`dict` <:obj:`str`, (:obj:`str`,:obj:`str`) > ) \
#: datasources dictionary with {dsname: (dstype, dsxml)}
self.__sources = {}
#: (:obj:`dict` \
#: <:obj:`str`, (:class:`nxswriter.DataSources.DataSource`) > ) \
#: datasource dictionary {name: DataSource}
self.__datasources = {}
#: (:obj:`str`) python script
self.__script = ""
#: (:obj:`bool`) True if common block used
self.__commonblock = False
#: (:obj:`bool`) True if counter used
self.__counter = False
#: (:class:`threading.Lock`) lock for common block
self.__lock = None
#: (:obj:`dict` <:obj:`str`, any> ) \
#: common block variables
self.__common = None
#: ({"rank": :obj:`str`, "value": any, "tangoDType": :obj:`str`, \
#: "shape": :obj:`list`<int>, "encoding": :obj:`str`, \
#: "decoders": :obj:`str`} ) \
#: data format
self.__result = {"rank": "SCALAR",
"value": None,
"tangoDType": "DevString",
"shape": [1, 0],
"encoding": None,
"decoders": None}
[docs] def setup(self, xml):
""" sets the parrameters up from xml
:param xml: datasource parameters
:type xml: :obj:`str`
"""
if sys.version_info > (3,):
xml = bytes(xml, "UTF-8")
root = et.fromstring(xml, parser=XMLParser(collect_ids=False))
mds = root.find("datasource")
inputs = []
if mds is not None:
inputs = root.findall(".//datasource")
for inp in inputs:
if "name" in inp.attrib and "type" in inp.attrib:
name = inp.get("name")
dstype = inp.get("type")
if len(name) > 0:
if len(name) > 3 and name[:2] == 'ds.':
name = name[3:]
self.__sources[name] = (dstype, self._toxml(inp))
else:
if self._streams:
self._streams.error(
"PyEvalSource::setup() - "
"PyEval input %s not defined" % name,
std=False)
raise DataSourceSetupError(
"PyEvalSource::setup() - "
"PyEval input %s not defined" % name)
else:
if self._streams:
self._streams.error(
"PyEvalSource::setup() - "
"PyEval input name wrongly defined",
std=False)
raise DataSourceSetupError(
"PyEvalSource::setup() - "
"PyEval input name wrongly defined")
res = root.find("result")
if res is not None:
self.__name = res.get("name") or 'result'
if len(self.__name) > 3 and self.__name[:2] == 'ds.':
self.__name = self.__name[3:]
self.__script = self._getText(res)
if len(self.__script) == 0:
if self._streams:
self._streams.error(
"PyEvalSource::setup() - "
"PyEval script %s not defined" % self.__name,
std=False)
raise DataSourceSetupError(
"PyEvalSource::setup() - "
"PyEval script %s not defined" % self.__name)
if "commonblock" in self.__script:
self.__commonblock = True
else:
self.__commonblock = False
def __str__(self):
""" self-description
:returns: self-describing string
:rtype: :obj:`str`
"""
return " PYEVAL %s" % (self.__script)
[docs] def setJSON(self, globalJSON, localJSON=None):
""" sets JSON string
:brief: It sets the currently used JSON string
:param globalJSON: static JSON string
:type globalJSON: \
: :obj:`dict` <:obj:`str` , :obj:`dict` <:obj:`str`, any>>
:param localJSON: dynamic JSON string
:type localJSON: \
: :obj:`dict` <:obj:`str` , :obj:`dict` <:obj:`str`, any>>
"""
self.__globalJSON = globalJSON
self.__localJSON = localJSON
for source in self.__datasources.values():
if hasattr(source, "setJSON"):
source.setJSON(self.__globalJSON,
self.__localJSON)
[docs] def getData(self):
""" provides access to the data
:returns: dictionary with collected data
:rtype: {'rank': :obj:`str`, 'value': any, 'tangoDType': :obj:`str`, \
: 'shape': :obj:`list` <int>, 'encoding': :obj:`str`, \
: 'decoders': :obj:`str`} )
"""
if not self.__name:
if self._streams:
self._streams.error(
"PyEvalSource::getData() - PyEval datasource not set up",
std=False)
raise DataSourceSetupError(
"PyEvalSource::getData() - PyEval datasource not set up")
if self.__commonblock:
self.__pool.common['PYEVAL']["common"]["__counter__"] = \
self.__pool.counter
ds = Variables()
for name, source in self.__datasources.items():
if name in self.__script:
dt = source.getData()
value = None
if dt:
dh = DataHolder(streams=self._streams, **dt)
if dh and hasattr(dh, "value"):
value = dh.value
setattr(ds, name, value)
setattr(ds, self.__name, None)
if not self.__commonblock:
exec(self.__script.strip(), {}, {"ds": ds})
rec = getattr(ds, self.__name)
else:
rec = None
with self.__lock:
exec(self.__script.strip(), {}, {
"ds": ds, "commonblock": self.__common})
rec = copy.deepcopy(getattr(ds, self.__name))
ntp = NTP()
rank, shape, dtype = ntp.arrayRankShape(rec)
if rank in NTP.rTf:
if shape is None:
shape = [1, 0]
return {"rank": NTP.rTf[rank],
"value": rec,
"tangoDType": NTP.pTt[dtype],
"shape": shape}
[docs] def setDecoders(self, decoders):
""" sets the used decoders
:param decoders: pool to be set
:type decoders: :class:`nxswriter.DecoderPool.DecoderPool`
"""
self.__result["decoders"] = decoders
for source in self.__datasources.values():
if hasattr(source, "setDecoders"):
source.setDecoders(decoders)
[docs] def setDataSources(self, pool):
""" sets the datasources
:param pool: datasource pool
:type pool: :class:`nxswriter.DataSourcePool.DataSourcePool`
"""
self.__pool = pool
pool.lock.acquire()
try:
if 'PYEVAL' not in self.__pool.common.keys():
self.__pool.common['PYEVAL'] = {}
if "lock" not in self.__pool.common['PYEVAL'].keys():
self.__pool.common['PYEVAL']["lock"] = threading.Lock()
self.__lock = self.__pool.common['PYEVAL']["lock"]
if "common" not in self.__pool.common['PYEVAL'].keys():
self.__pool.common['PYEVAL']["common"] = {}
if self.__pool.nxroot is not None:
self.__pool.common['PYEVAL']["common"]["__nxroot__"] = \
self.__pool.nxroot.h5object
self.__pool.common['PYEVAL']["common"]["__root__"] = \
self.__pool.nxroot
self.__pool.common['PYEVAL']["common"]["__counter__"] = \
self.__pool.counter
self.__common = self.__pool.common['PYEVAL']["common"]
finally:
pool.lock.release()
for name, inp in self.__sources.items():
if name in self.__script:
if pool and pool.hasDataSource(inp[0]):
self.__datasources[name] = pool.get(inp[0])()
self.__datasources[name].setup(inp[1])
if hasattr(self.__datasources[name], "setJSON") \
and self.__globalJSON:
self.__datasources[name].setJSON(self.__globalJSON)
if hasattr(self.__datasources[name], "setDataSources"):
self.__datasources[name].setDataSources(pool)
else:
if self._streams:
self._streams.error(
"PyEvalSource::setDataSources "
"- Unknown data source")
self.__datasources[name] = DataSource()