Source code for nxswriter.ThreadPool

#!/usr/bin/env python
#   This file is part of nexdatas - Tango Server for NeXus data writer
#
#    Copyright (C) 2012-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

""" Provides a pool with element threads """

import sys

from .ElementThread import ElementThread
from .Errors import ThreadError

if sys.version_info > (3,):
    import queue as Queue
else:
    import Queue


[docs]class ThreadPool(object): """ Pool with threads """ def __init__(self, numberOfThreads=None, streams=None, maxruntime=0): """ constructor :brief: It cleans the member variables :param numberOfThreads: number of threads :type numberOfThreads: :obj:`int` :param streams: tango-like steamset class :type streams: :class:`StreamSet` or :class:`tango.LatestDeviceImpl` :param maxruntime: maxruntime :type maxruntime: :obj:`int` """ #: (:obj:`int`) maximal number of threads self.numberOfThreads = numberOfThreads or -1 #: (:class:`Queue.Queue`) queue of the appended elements self.__elementQueue = Queue.Queue() #: (:obj:`list` <:class:`nxswriter.Element.Element`>) \ #: list of the appended elements self.__elementList = [] #: (:obj:`list` <:class:`nxswriter.ElementThread.ElementThread`>) \ #: list of the threads related to the appended elements self.__threadList = [] #: (:class:`StreamSet` or :class:`tango.LatestDeviceImpl`) stream set self._streams = streams #: (:obj:`float`) maximal runtime self.maxRuntime = maxruntime
[docs] def append(self, elem): """ appends the thread element :param elem: the thread element :type elem: :class:`nxswriter.Element.Element` """ self.__elementList.append(elem)
[docs] def setJSON(self, globalJSON, localJSON=None): """ sets the JSON string to threads :param globalJSON: the static JSON string :type globalJSON: \ : :obj:`dict` <:obj:`str` , :obj:`dict` <:obj:`str`, any>> :param localJSON: the dynamic JSON string :type localJSON: \ : :obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, any>> :returns: self object :rtype: :class:`ThreadPool` """ for el in self.__elementList: if hasattr(el.source, "setJSON") and callable(el.source.setJSON): el.source.setJSON(globalJSON, localJSON) return self
[docs] def run(self): """ thread runner :brief: It runs the threads from the pool """ self.__threadList = [] self.__elementQueue = Queue.Queue() for eth in self.__elementList: self.__elementQueue.put(eth) if self.numberOfThreads < 1: self.numberOfThreads = len(self.__elementList) for i in range(min(self.numberOfThreads, len(self.__elementList))): th = ElementThread(i, self.__elementQueue) self.__threadList.append(th) th.start()
[docs] def join(self, timeout=None): """ waits for all thread from the pool :param timeout: the maximal waiting time :type timeout: :obj:`int` """ for th in self.__threadList: if th.is_alive(): th.join(timeout)
[docs] def runAndWait(self): """ runner with waiting :brief: It runs and waits the threads from the pool """ self.run() self.join()
[docs] def checkErrors(self): """ checks errors from threads """ errors = [] for el in self.__elementList: if hasattr(el, "runtime") and \ el.runtime and self.maxRuntime > 0 \ and el.runtime > self.maxRuntime: path = "" if hasattr(el.h5Object, "path"): path = str(el.h5Object.path) elif hasattr(el.h5Object, "path"): path = str(el.h5Object.name) mess = "ThreadPool::checkErrors() - The maximal " \ "element record time (%s s) exceeded: %s s : %s " \ % (self.maxRuntime, el.runtime, path) if self._streams: self._streams.warn(mess) if el.error: if isinstance(el.error, tuple): serror = str(tuple([str(e) for e in el.error])) else: serror = str(el.error) mess = "ThreadPool::checkErrors() - %s" % serror if hasattr(el, "canfail") and el.canfail: if hasattr(el, "markFailed"): el.markFailed(serror) if self._streams: self._streams.warn(mess) else: errors.append(el.error) if self._streams: self._streams.error(mess, std=False) if errors: raise ThreadError("Problems in storing data: %s" % str(errors))
[docs] def close(self): """ closer :brief: It close the threads from the pool """ for el in self.__elementList: if hasattr(el.h5Object, "close"): el.h5Object.close() self.__threadList = [] self.__elementList = [] self.__elementQueue = None