Source code for nxswriter.TangoDataWriter

#!/usr/bin/env python
#   This file is part of nexdatas - Tango Server for NeXus data writer
#
#    Copyright (C) 2012-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

""" Tango Data Writer implementation """

import os
import shutil

from xml import sax
import json
import sys
import gc
import weakref
import time

try:
    from cStringIO import StringIO
except ImportError:
    from io import StringIO

from .NexusXMLHandler import NexusXMLHandler
from .FetchNameHandler import FetchNameHandler
from .StreamSet import StreamSet
from nxstools import filewriter as FileWriter

from .H5Elements import EFile
from .EGroup import EGroup
from .DecoderPool import DecoderPool
from .DataSourcePool import DataSourcePool
from .Metadata import Metadata, NXSMETA


WRITERS = {}
try:
    from nxstools import h5pywriter as H5PYWriter
    WRITERS["h5py"] = H5PYWriter
except Exception:
    pass

try:
    from nxstools import h5cppwriter as H5CppWriter
    WRITERS["h5cpp"] = H5CppWriter
except Exception:
    pass


try:
    from nxstools import h5rediswriter as H5RedisWriter
    WRITERS["h5redis"] = H5RedisWriter
except Exception:
    pass
DEFAULTWRITERS = ["h5cpp", "h5py", "h5redis"]


#: (:obj:`bool`) tango bug #213 flag related to EncodedAttributes in python3
PYTG_BUG_213 = False
if sys.version_info > (3,):
    unicode = str
    try:
        try:
            import tango
        except Exception:
            import PyTango as tango
        PYTGMAJOR, PYTGMINOR, PYTGPATCH = list(
            map(int, tango.__version__.split(".")[:3]))
        if PYTGMAJOR <= 9:
            if PYTGMAJOR == 9:
                if PYTGMINOR < 2:
                    PYTG_BUG_213 = True
                elif PYTGMINOR == 2 and PYTGPATCH <= 4:
                    PYTG_BUG_213 = True
            else:
                PYTG_BUG_213 = True
    except Exception:
        pass


[docs]class TangoDataWriter(object): """ NeXuS data writer """ def __init__(self, server=None): """ constructor :brief: It initialize the data writer for the H5 output file :param server: Tango server :type server: :class:`tango.LatestDeviceImpl` """ #: (:obj:`str`) output file name and optional nexus parent path self.__parent = "" #: (:obj:`str`) output file name self.__fileName = "" #: (:obj:`str`) output file name prefix self.__fileprefix = "" #: (:obj:`str`) output file name extension self.__fileext = "" #: (:obj:`dict` <:obj:`str` , :obj:`any`>) open file parameters self.__pars = {} #: (:obj:`str`) XML string with file settings self.__xmlsettings = "" #: (:obj:`str`) global JSON string with data records self.__json = "{}" #: (:obj:`str`) nexus parent path of (name, type) self.__parents = [] #: (:obj:`int`) maximal number of threads self.numberOfThreads = 100 # self.numberOfThreads = 1 #: (:obj:`float`) maximal element record time in sec self.maxElementRuntime = 0.0 #: (:obj:`float`) maximal record time in sec self.maxRecordRuntime = 0.0 #: (:class:`ThreadPool.ThreadPool`) thread pool with INIT elements self.__initPool = None #: (:class:`ThreadPool.ThreadPool`) thread pool with STEP elements self.__stepPool = None #: (:class:`ThreadPool.ThreadPool`) thread pool with FINAL elements self.__finalPool = None #: (:obj:`dict`< :obj:`str`, \ #: :class:`nxswriter.ThreadPool.ThreadPool` >) \ #: collection of thread pool with triggered STEP elements self.__triggerPools = {} #: (:obj:`list` <:obj:`dict` <:obj:`str`, :obj:`str` > >) \ #: list of entry group attributes self.__entryAttrs = [] #: (:class:`nxswriter.FileWriter.FTGroup`) H5 file handle self.__nxRoot = None #: (:obj: `list` <:class:`nxswriter.FileWriter.FTGroup` >) # group path of H5 file handles self.__nxPath = [] #: (:class:`nxswriter.FileWriter.FTFile`) H5 file handle self.__nxFile = None #: (:class:`nxswriter.H5Elements.EFile`) element file objects self.__eFile = None #: (:obj:`bool`) default value of canfail flag self.__defaultCanFail = True #: (:obj:`str`) writer type self.writer = None for wr in DEFAULTWRITERS: if wr in WRITERS.keys(): self.writer = wr break #: (:obj:`str`) metadata output self.metadataOutput = '' #: (:obj:`int`) steps per file self.stepsperfile = 0 #: (:obj:`int`) current file id self.__currentfileid = 0 #: (:class:`nxswriter.DecoderPool.DecoderPool`) pool with decoders self.__decoders = DecoderPool() #: (:class:`nxswriter.DataSourcePool.DataSourcePool`) \ #: pool with datasources self.__datasources = DataSourcePool() #: (:class:`nxswriter.FetchNameHandler.FetchNameHandler`) \ #: group name parser self.__fetcher = None #: (:obj:`str`) adding logs self.addingLogs = True #: (:obj:`int`) counter for open entries self.__entryCounter = 0 #: (:class:`nxswriter.FileWriter.FTGroup`) group with Nexus log Info self.__logGroup = None #: (:obj:`list` < :obj:`str`>) file names self.__filenames = [] #: (:obj:`dict` < :obj:`str`, :obj:`str`>) file time stamps self.__filetimes = {} #: (:class:`StreamSet` or :class:`tango.LatestDeviceImpl`) stream set self._streams = StreamSet(weakref.ref(server) if server else None) #: (:obj:`bool`) skip acquisition flag self.skipacquisition = False if PYTG_BUG_213: self._streams.error( "TangoDataWriter::TangoDataWriter() - " "Reading Encoded Attributes for python3 and tango < 9.2.5" " is not supported ") def __setWriter(self, writer): """ set method for writer module name :param jsonstring: value of writer module name :type jsonstring: :obj:`str` """ if '?' in writer: writer, params = writer.split('?') if not writer: for wr in DEFAULTWRITERS: if wr in WRITERS.keys(): writer = wr break wrmodule = WRITERS[writer.lower()] FileWriter.setwriter(wrmodule) return wrmodule def __getCanFail(self): """ get method for the global can fail flag :returns: global can fail flag :rtype: :obj:`bool` """ return self.__datasources.canfail def __setCanFail(self, canfail): """ set method for the global can fail flag :param canfail: the global can fail flag :type canfail: :obj:`bool` """ self.__datasources.canfail = canfail #: the global can fail flag canfail = property(__getCanFail, __setCanFail, doc='(:obj:`bool`) the global can fail flag') def __getDefaultCanFail(self): """ get method for the global can fail flag :returns: global can fail flag :rtype: :obj:`bool` """ return self.__defaultCanFail def __setDefaultCanFail(self, canfail): """ set method for the global can fail flag :param canfail: the global can fail flag :type canfail: :obj:`bool` """ self.__defaultCanFail = canfail self.__datasources.canfail = canfail #: the global can fail flag defaultCanFail = property( __getDefaultCanFail, __setDefaultCanFail, doc='(:obj:`bool`) default value of the global can fail flag') def __getJSON(self): """ get method for jsonrecord attribute :returns: value of jsonrecord :rtype: :obj:`str` """ return self.__json def __setJSON(self, jsonstring): """ set method for jsonrecord attribute :param jsonstring: value of jsonrecord :type jsonstring: :obj:`str` """ self.__decoders.appendUserDecoders(json.loads(jsonstring)) self.__datasources.appendUserDataSources(json.loads(jsonstring)) self.__json = jsonstring def __delJSON(self): """ del method for jsonrecord attribute """ del self.__json #: the json data string jsonrecord = property(__getJSON, __setJSON, __delJSON, doc='(:obj:`str`) the json data string') def __getCurrentFileID(self): """ get method for jsonrecord attribute :returns: value of jsonrecord :rtype: :obj:`str` """ return self.__currentfileid #: the json data string currentfileid = property(__getCurrentFileID, doc='(:obj:`str`) the current file id') def __getXML(self): """ get method for xmlsettings attribute :returns: value of jsonrecord :rtype: :obj:`str` """ return self.__xmlsettings def __setXML(self, xmlset): """ set method for xmlsettings attribute :param xmlset: xml settings :type xmlset: :obj:`str` """ self.__fetcher = FetchNameHandler(streams=self._streams) if sys.version_info > (3,): sax.parseString(xmlset, self.__fetcher) else: if isinstance(xmlset, unicode): sax.parseString(xmlset.encode('utf-8'), self.__fetcher) else: sax.parseString(xmlset, self.__fetcher) self.__xmlsettings = xmlset def __delXML(self): """ del method for xmlsettings attribute """ del self.__xmlsettings #: the xmlsettings xmlsettings = property(__getXML, __setXML, __delXML, doc='(:obj:`str`) the xml settings') def __getFileName(self): """ get method for parent attribute :returns: value of parent nexus path :rtype: :obj:`str` """ return self.__parent def __setFileName(self, parent): """ set method for parent attribute :param parent: parent nexus path :type parent: :obj:`str` """ parent = parent or "" lparent = str(parent).split(":/") if len(lparent) >= 3: fileName = lparent[1] nxpath = ":/".join(lparent[2:]) elif len(lparent) == 2: fileName = lparent[0] nxpath = lparent[1] elif len(lparent) == 1: fileName = lparent[0] nxpath = "" spath = nxpath.split("/") parents = [] for dr in spath: if dr.strip(): w = dr.split(':') if len(w) == 1: if len(w[0]) > 2 and w[0][:2] == 'NX': w.insert(0, w[0][2:]) else: w.append("NX" + w[0]) parents.append((w[0], w[1])) self.__fileName = fileName self.__parents = parents self.__parent = parent def __delFileName(self): """ del method for parent attribute """ del self.__parent #: the parent nexus path fileName = property(__getFileName, __setFileName, __delFileName, doc='(:obj:`str`) file name and optional nexus path')
[docs] def getFile(self): """ the H5 file handle :returns: the H5 file handle :rtype: :class:`nxswriter.FileWriter.FTFile` """ return self.__nxFile
[docs] def openFile(self): """ the H5 file opening :brief: It opens the H5 file """ try: self.closeFile() except Exception: self._streams.warn( "TangoDataWriter::openFile() - File cannot be closed") wrmodule = self.__setWriter(self.writer) self.__nxFile = None self.__eFile = None self.__initPool = None self.__stepPool = None self.__finalPool = None self.__triggerPools = {} self.__currentfileid = 0 self.__pars = self.__getParams(self.writer) pars = dict(self.__pars) pars["writer"] = wrmodule if os.path.isfile(self.__fileName): self.__nxFile = FileWriter.open_file( self.__fileName, False, **pars) else: self.__nxFile = FileWriter.create_file( self.__fileName, **pars) self.__fileprefix, self.__fileext = os.path.splitext( str(self.__fileName)) self.__nxRoot = self.__nxFile.root() self.__nxRoot.stepsperfile = self.stepsperfile self.__nxRoot.currentfileid = self.__currentfileid #: element file objects self.__eFile = EFile([], None, self.__nxRoot) last = self.__eFile for gname, gtype in self.__parents: last = EGroup( {"name": gname, "type": gtype}, last, reloadmode=True) self.__nxPath.append(last) if self.addingLogs: name = "nexus_logs" if not self.__nxRoot.exists(name): ngroup = self.__nxRoot.create_group(name) else: ngroup = self.__nxRoot.open(name) name = "configuration" error = True counter = 1 cname = name while error: cname = name if counter == 1 else \ ("%s_%s" % (name, counter)) if not ngroup.exists(cname): error = False else: counter += 1 self.__logGroup = ngroup.create_group(cname) vfield = self.__logGroup.create_field( "python_version", "string") vfield.write(str(sys.version)) vfield.close()
def __getParams(self, url): """ fetch parameters from url :param url: url string, i.e. something?key1=value1&key2=value2&... :type url: :obj:`str` :returns: dictionary of parameters :rtype: :obj:`dict` < :obj:`str`, :obj:`str`> """ pars = {} if '?' in url: _, params = url.split('?') if params: parlist = params.split('&') for par in parlist: if "=" in par: ky, vl = par.split('=') if ky: if ky == "swmr": vl = True if vl.lower() == "true" else False pars[ky] = vl if "swmr" in pars and pars["swmr"] and "libver" not in pars: pars["libver"] = "latest" return pars
[docs] def openEntry(self): """ opens the data entry corresponding to a new XML settings :brief: It parse the XML settings, creates thread pools and runs the INIT pool. """ if self.xmlsettings: # flag for INIT mode self.__datasources.counter = -1 self.__datasources.nxroot = self.__nxRoot errorHandler = sax.ErrorHandler() parser = sax.make_parser() handler = NexusXMLHandler( self.__nxPath[-1] if self.__nxPath else self.__eFile, self.__datasources, self.__decoders, self.__fetcher.groupTypes, parser, json.loads(self.jsonrecord), self._streams, self.skipacquisition ) parser.setContentHandler(handler) parser.setErrorHandler(errorHandler) inpsrc = sax.InputSource() if sys.version_info > (3,): inpsrc.setByteStream(StringIO(self.xmlsettings)) else: if isinstance(self.xmlsettings, unicode): inpsrc.setByteStream( StringIO(self.xmlsettings.encode('utf-8'))) else: inpsrc.setByteStream(StringIO(self.xmlsettings)) parser.parse(inpsrc) self.__initPool = handler.initPool self.__stepPool = handler.stepPool self.__finalPool = handler.finalPool self.__triggerPools = handler.triggerPools self.__entryAttrs = handler.entryAttrs self.__initPool.numberOfThreads = self.numberOfThreads self.__stepPool.numberOfThreads = self.numberOfThreads self.__finalPool.numberOfThreads = self.numberOfThreads self.__initPool.maxRuntime = self.maxElementRuntime self.__stepPool.maxRuntime = self.maxElementRuntime self.__finalPool.maxRuntime = self.maxElementRuntime for pool in self.__triggerPools.keys(): self.__triggerPools[pool].numberOfThreads = \ self.numberOfThreads self.__triggerPools[pool].maxRuntime = \ self.maxElementRuntime self.__initPool.setJSON(json.loads(self.jsonrecord)) if not self.skipacquisition: self.__initPool.runAndWait() self.__initPool.checkErrors() self.skipacquisition = False if self.addingLogs: self.__entryCounter += 1 if not self.__logGroup.is_valid: self.__logGroup.reopen() lfield = self.__logGroup.create_field( "nexus__entry__%s_xml" % str(self.__entryCounter), "string") lfield.write(self.xmlsettings) lfield.close() if self.__nxFile and hasattr(self.__nxFile, "flush"): self.__nxFile.flush() if self.stepsperfile > 0: self.__filenames = [] self.__filetimes = {} self.__nextfile() elif "swmr" in self.__pars.keys() and self.__pars["swmr"]: self.__nxFile.reopen(readonly=False, **self.__pars) if self.__nxFile and hasattr(self.__nxFile, "prepare"): # print("START") self.__nxFile.prepare()
def __nextfile(self): self.__nxFile.close() self.__currentfileid += 1 self.__nxRoot.currentfileid = self.__currentfileid self.__filenames.append("%s_%05d%s" % ( self.__fileprefix, self.__currentfileid, self.__fileext) ) shutil.copy2(self.__fileName, self.__filenames[-1]) self.__filetimes[self.__filenames[-1]] = self.__nxFile.currenttime() self.__nxFile.name = self.__filenames[-1] self.__nxFile.reopen(readonly=False, **self.__pars) def __previousfile(self): self.__nxFile.close() self.__currentfileid -= 1 self.__nxRoot.currentfileid = self.__currentfileid self.__filenames.pop() self.__nxFile.name = self.__filenames[-1] self.__nxFile.reopen(readonly=False, **self.__pars) def __removefile(self): if self.__filenames: filename = self.__filenames.pop() self.__nxFile.close() self.__currentfileid -= 1 self.__nxRoot.currentfileid = self.__currentfileid os.remove(filename) self.__nxFile.name = self.__filenames[-1] self.__nxFile.reopen(readonly=False, **self.__pars)
[docs] def record(self, jsonstring=None): """ runs threads form the STEP pool :brief: It runs threads from the STEP pool :param jsonstring: local JSON string with data records :type jsonstring: :obj:`str` """ st = time.time() # flag for STEP mode if self.__datasources.counter > 0: self.__datasources.counter += 1 else: self.__datasources.counter = 1 localJSON = None if jsonstring: localJSON = json.loads(jsonstring) if self.__stepPool: self._streams.info( "TangoDataWriter::record() - Default trigger", False ) self.__stepPool.setJSON(json.loads(self.jsonrecord), localJSON) if not self.skipacquisition: self.__stepPool.runAndWait() self.__stepPool.checkErrors() triggers = None if localJSON and 'triggers' in localJSON.keys(): triggers = localJSON['triggers'] if hasattr(triggers, "__iter__"): for pool in triggers: if pool in self.__triggerPools.keys(): self._streams.info( "TangoDataWriter:record() - Trigger: %s" % pool, False ) self.__triggerPools[pool].setJSON( json.loads(self.jsonrecord), localJSON) if not self.skipacquisition: self.__triggerPools[pool].runAndWait() self.__triggerPools[pool].checkErrors() if self.__nxFile and hasattr(self.__nxFile, "flush"): self.__nxFile.flush() if self.__nxFile and hasattr(self.__nxFile, "start") and \ self.__datasources.counter == 1: # print("START") self.__nxFile.start() if self.stepsperfile > 0: if (self.__datasources.counter) % self.stepsperfile == 0: self.__nextfile() self.skipacquisition = False dt = time.time() - st if dt and self.maxRecordRuntime and dt > self.maxRecordRuntime: mess = "TangoDataWriter.record() - " \ "The maximal record time for #%s exceeded: %s s (%s s) " \ % (self.__datasources.counter, dt, self.maxRecordRuntime) if self._streams: self._streams.warn(mess)
def __updateNXRoot(self): fname = self.__filenames[-1] self.__nxRoot.attributes.create( "file_name", "string", overwrite=True)[...] = fname if fname in self.__filetimes and len(self.__filenames) > 1: self.__nxRoot.attributes.create( "file_time", "string", overwrite=True)[...] = str(self.__filetimes[fname])
[docs] def closeEntry(self): """ closes the data entry :brief: It runs threads from the FINAL pool and removes the thread pools """ # flag for FINAL mode if self.stepsperfile > 0: os.remove(self.__fileName) if (self.__datasources.counter) % self.stepsperfile == 0: self.__removefile() self.__datasources.counter = -2 self.__datasources.canfail = self.defaultCanFail if self.addingLogs and self.__logGroup: self.__logGroup.close() # self.__logGroup = None if self.__finalPool: self.__finalPool.setJSON(json.loads(self.jsonrecord)) if not self.skipacquisition: self.__finalPool.runAndWait() if self.stepsperfile > 0: self.__updateNXRoot() while len(self.__filenames) > 1: self.__previousfile() if not self.skipacquisition: self.__finalPool.runAndWait() self.__updateNXRoot() if not self.skipacquisition: self.__finalPool.checkErrors() self.skipacquisition = False if self.__initPool: self.__initPool.close() self.__initPool = None if self.__stepPool: self.__stepPool.close() self.__stepPool = None if self.__finalPool: self.__finalPool.close() self.__finalPool = None if self.__triggerPools: for pool in self.__triggerPools.keys(): self.__triggerPools[pool].close() self.__triggerPools = {} if self.addingLogs and self.__logGroup: self.__logGroup.close() if self.__nxFile and hasattr(self.__nxFile, "flush"): self.__nxFile.flush() if self.__nxFile and hasattr(self.__nxFile, "finish"): self.__nxFile.finish() if NXSMETA and self.__nxRoot: args = {} if self.metadataOutput and \ 'file' in self.metadataOutput.split(","): for ad in self.__entryAttrs: nm = ad.get("name", None) tp = ad.get("type", None) if nm and tp in ["NXentry"]: args["output"] = "%s.%s.json" % ( self.__fileName, ad["name"]) args["args"] = [self.__fileName] args["entrynames"] = nm mdata = Metadata(self.__nxRoot).get(**args) if mdata: if args["output"]: with open(args["output"], "w") as fl: fl.write(mdata) gc.collect()
[docs] def closeFile(self): """ the H5 file closing :brief: It closes the H5 file """ self.__currentfileid = 0 if self.__nxRoot: self.__nxRoot.currentfileid = self.__currentfileid if self.__initPool: self.__initPool.close() self.__initPool = None if self.__stepPool: self.__stepPool.close() self.__stepPool = None if self.__finalPool: self.__finalPool.close() self.__finalPool = None if self.__triggerPools: for pool in self.__triggerPools.keys(): self.__triggerPools[pool].close() self.__triggerPools = {} if self.__nxRoot: self.__nxRoot.close() if self.__nxFile: self.__nxFile.close() if self.addingLogs and self.__logGroup: self.__logGroup.close() self.__nxPath = [] self.__nxRoot = None self.__nxFile = None self.__eFile = None self.__logGroup = None gc.collect()
if __name__ == "__main__": # Create a TDW object #: (:class:`TangoDataWriter`) instance of TangoDataWriter tdw = TangoDataWriter() tdw.fileName = 'test.h5' tdw.numberOfThreads = 1 #: (:obj:`str`) xml file name xmlf = "../XMLExamples/MNI.xml" # xmlf = "../XMLExamples/test.xml" print("usage: TangoDataWriter.py <XMLfile1> " "<XMLfile2> ... <XMLfileN> <H5file>") #: (:obj:`str`) No arguments argc = len(sys.argv) if argc > 2: tdw.fileName = sys.argv[argc - 1] if argc > 1: print("opening the H5 file") tdw.openFile() for i in range(1, argc - 1): xmlf = sys.argv[i] #: (:obj:`str`) xml string with open(xmlf, 'r') as fl: xml = fl.read() tdw.xmlsettings = xml print("opening the data entry ") tdw.openEntry() print("recording the H5 file") tdw.record('{"data": {"emittance_x": 0.8} , ' ' "triggers":["trigger1", "trigger2"] }') print("sleeping for 1s") time.sleep(1) print("recording the H5 file") tdw.record('{"data": {"emittance_x": 1.2}, ' ' "triggers":["trigger2"] }') print("sleeping for 1s") time.sleep(1) print("recording the H5 file") tdw.record('{"data": {"emittance_x": 1.1} , ' ' "triggers":["trigger1"] }') print("sleeping for 1s") time.sleep(1) print("recording the H5 file") tdw.record('{"data": {"emittance_x": 0.7} }') print("closing the data entry ") tdw.closeEntry() print("closing the H5 file") tdw.closeFile()