Source code for nxswriter.DecoderPool

#!/usr/bin/env python
#   This file is part of nexdatas - Tango Server for NeXus data writer
#
#    Copyright (C) 2012-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

""" Provides a pool  with data decoders """

import struct
import numpy
import sys

if sys.version_info > (3,):
    unicode = str


[docs]class UTF8decoder(object): """ UTF8 decoder """ def __init__(self): """ constructor :brief: It clears the local variables """ #: (:obj:`str`) decoder name self.name = "UTF8" #: (:obj:`str`) decoder format self.format = None #: (:obj:`str`) data type self.dtype = None #: (:class:`numpy.ndarray`) image data self.__value = None #: ([:obj:`str`, :obj:`str`]) header and image data self.__data = None
[docs] def load(self, data): """ loads encoded data :param data: encoded data :type data: [:obj:`str`, :obj:`bytes`] """ self.__data = data self.format = data[0] self.__value = None self.dtype = "string"
[docs] def shape(self): """ provides the data shape :returns: the data shape i.e. `[1, 0]` :rtype: :obj:`list` <:obj:`int` > """ return [1, 0]
[docs] def decode(self): """ provides the decoded data :returns: the decoded data if data was loaded :rtype: :obj:`bytes` """ if not self.__data: return if not self.__value: if isinstance(self.__data[1], unicode): self.__value = bytes(self.__data[1], 'utf8') else: self.__value = bytes(self.__data[1]) return self.__value
[docs]class UINT32decoder(object): """ INT decoder """ def __init__(self): """ constructor :brief: It clears the local variables """ #: (:obj:`str`) decoder name self.name = "UINT32" #: (:obj:`str`) decoder format self.format = None #: (:obj:`str`) data type self.dtype = None #: (:class:`numpy.ndarray`) image data self.__value = None #: ([:obj:`str`, :obj:`str`]) header and image data self.__data = None
[docs] def load(self, data): """ loads encoded data :param data: encoded data :type data: [:obj:`str`, :obj:`bytes`] """ if not hasattr(data, "__iter__"): raise ValueError("Wrong Data Format") if len(data[1]) % 4: raise ValueError("Wrong encoded UINT32 data length") self.__data = data self.format = data[0] self.__value = None self.dtype = "uint32"
[docs] def shape(self): """ provides the data shape :returns: the data shape if data was loaded :rtype: :obj:`list` <:obj:`int` > """ if self.__data: return [len(self.__data[1]) // 4, 0]
[docs] def decode(self): """ provides the decoded data :returns: the decoded data if data was loaded :rtype: :class:`numpy.ndarray` """ if not self.__data: return if len(self.__data[1]) % 4: raise ValueError("Wrong encoded UINT32 data length") if not self.__value: if isinstance(self.__data[1], str): data = bytearray() data.extend(list(map(ord, self.__data[1]))) else: data = self.__data[1] # res = struct.unpack('I' * (len(data) // 4), data) self.__value = numpy.array( struct.unpack('I' * (len(data) // 4), data), dtype=self.dtype).reshape(len(data) // 4) return self.__value
[docs]class DATAARRAYdecoder(object): """ DATA ARRAY LIMA decoder """ def __init__(self): """ constructor :brief: It clears the local variables """ #: (:obj:`str`) decoder name self.name = "DATA_ARRAY" #: (:obj:`str`) decoder format self.format = None #: (:obj:`str`) data type self.dtype = None #: (:class:`numpy.ndarray`) image data self.__value = None #: ([:obj:`str`, :obj:`str`]) header and image data self.__data = None #: (:obj:`str`) struct header format self.__headerFormat = '<IHHIIHHHHHHHHIIIIIIII' #: (:obj:`dict` <:obj:`str`, :obj:`any` > ) header data self.__header = {} #: (:obj:`dict` <:obj:`int`, :obj:`str` > ) format modes self.__formatID = { 0: 'B', 1: 'H', 2: 'I', 3: 'Q', 4: 'b', 5: 'h', 6: 'i', 7: 'q', 8: 'f', 9: 'd' } #: (:obj:`dict` <:obj:`int`, :obj:`str` > ) dtype modes self.__dtypeID = { 0: 'uint8', 1: 'uint16', 2: 'uint32', 3: 'uint64', 4: 'int8', 5: 'int16', 6: 'int32', 7: 'int64', 8: 'float32', 9: 'float64', }
[docs] def load(self, data): """ loads encoded data :param data: encoded data :type data: [:obj:`str`, :obj:`str`] """ self.__data = data self.format = data[0] self._loadHeader(data[1][:struct.calcsize(self.__headerFormat)]) self.__value = None
def _loadHeader(self, headerData): """ loads the image header :param headerData: buffer with header data :type headerData: :obj:`str` """ hdr = struct.unpack(self.__headerFormat, headerData) self.__header = {} self.__header['magic'] = hdr[0] self.__header['headerVersion'] = hdr[1] self.__header['headerSize'] = hdr[2] self.__header['category'] = hdr[3] self.__header['imageMode'] = hdr[4] self.__header['endianness'] = hdr[5] self.__header['dim'] = hdr[6] self.__header['shape'] = [ hdr[7], hdr[8], hdr[9], hdr[10], hdr[11], hdr[12]] self.__header['steps'] = [ hdr[13], hdr[14], hdr[15], hdr[16], hdr[17], hdr[18]] self.__header['padding'] = hdr[19:] self.dtype = self.__dtypeID[self.__header['imageMode']]
[docs] def frameNumber(self): """ no data """
[docs] def shape(self): """ provides the data shape :returns: the data shape if data was loaded :rtype: :obj:`list` <:obj:`int` > """ if self.__header: return list(reversed( [self.__header['shape'][i] for i in range(self.__header['dim'])]))
[docs] def steps(self): """ provides the data steps :returns: the data steps if data was loaded :rtype: :obj:`list` <:obj:`int` > """ if self.__header: return list(reversed( [self.__header['steps'][i] for i in range(self.__header['dim'])]))
[docs] def decode(self): """ provides the decoded data :returns: the decoded data if data was loaded :rtype: :class:`numpy.ndarray` """ if not self.__header or not self.__data: return if self.__value is None: image = self.__data[1][struct.calcsize(self.__headerFormat):] dformat = self.__formatID[self.__header['imageMode']] fSize = struct.calcsize(dformat) self.__value = numpy.array( struct.unpack(dformat * (len(image) // fSize), image), dtype=self.dtype).reshape(self.shape()) fendian = self.__header['endianness'] lendian = ord(struct.pack('=H', 1).decode()[-1]) if fendian != lendian: try: self.__value.byteswap(inplace=False) except TypeError: self.__value = self.__value.byteswap() return self.__value
[docs]class VDEOdecoder(object): """ VIDEO IMAGE LIMA decoder """ def __init__(self): """ constructor :brief: It clears the local variables """ #: (:obj:`str`) decoder name self.name = "LIMA_VIDEO_IMAGE" #: (:obj:`str`) decoder format self.format = None #: (:obj:`str`) data type self.dtype = None #: (:class:`numpy.ndarray`) image data self.__value = None #: ([:obj:`str`, :obj:`str`]) header and image data self.__data = None #: (:obj:`str`) struct header format self.__headerFormat = '!IHHqiiHHHH' #: (:obj:`dict` <:obj:`str`, :obj:`any` > ) header data self.__header = {} #: (:obj:`dict` <:obj:`int`, :obj:`str` > ) format modes self.__formatID = {0: 'B', 1: 'H', 2: 'I', 3: 'Q'} #: (:obj:`dict` <:obj:`int`, :obj:`str` > ) dtype modes self.__dtypeID = {0: 'uint8', 1: 'uint16', 2: 'uint32', 3: 'uint64'}
[docs] def load(self, data): """ loads encoded data :param data: encoded data :type data: [:obj:`str`, :obj:`str`] """ self.__data = data self.format = data[0] self._loadHeader(data[1][:struct.calcsize(self.__headerFormat)]) self.__value = None
def _loadHeader(self, headerData): """ loads the image header :param headerData: buffer with header data :type headerData: :obj:`str` """ hdr = struct.unpack(self.__headerFormat, headerData) self.__header = {} self.__header['magic'] = hdr[0] self.__header['headerVersion'] = hdr[1] self.__header['imageMode'] = hdr[2] self.__header['frameNumber'] = hdr[3] self.__header['width'] = hdr[4] self.__header['height'] = hdr[5] self.__header['endianness'] = hdr[6] self.__header['headerSize'] = hdr[7] self.__header['padding'] = hdr[7:] self.dtype = self.__dtypeID[self.__header['imageMode']]
[docs] def shape(self): """ provides the data shape :returns: the data shape if data was loaded :rtype: :obj:`list` <:obj:`int` > """ if self.__header: return [self.__header['height'], self.__header['width']]
[docs] def decode(self): """ provides the decoded data :returns: the decoded data if data was loaded :rtype: :class:`numpy.ndarray` """ if not self.__header or not self.__data: return if not self.__value: image = self.__data[1][struct.calcsize(self.__headerFormat):] dformat = self.__formatID[self.__header['imageMode']] fSize = struct.calcsize(dformat) self.__value = numpy.array( struct.unpack(dformat * (len(image) // fSize), image), dtype=self.dtype).reshape(self.__header['height'], self.__header['width']) fendian = self.__header['endianness'] lendian = ord(struct.pack('=H', 1).decode()[-1]) if fendian != lendian: try: self.__value.byteswap(inplace=False) except TypeError: self.__value = self.__value.byteswap() return self.__value
[docs]class DecoderPool(object): """ Decoder pool """ def __init__(self, configJSON=None): """ constructor :brief: It creates know decoders :param configJSON: string with decoders :type configJSON: \ : :obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, any>> """ self.__knowDecoders = { "LIMA_VIDEO_IMAGE": VDEOdecoder, "VIDEO_IMAGE": VDEOdecoder, "DATA_ARRAY": DATAARRAYdecoder, "UTF8": UTF8decoder, "UINT32": UINT32decoder} self.__pool = {} self.__createDecoders() self.appendUserDecoders(configJSON)
[docs] def appendUserDecoders(self, configJSON): """ loads user decoders :param configJSON: string with decoders :type configJSON: \ : :obj:`dict` <:obj:`str`, :obj:`dict` <:obj:`str`, any>> """ if configJSON and 'decoders' in configJSON.keys() \ and hasattr(configJSON['decoders'], 'keys'): for dk in configJSON['decoders'].keys(): pkl = configJSON['decoders'][dk].split(".") pkg = ".".join(pkl[:-1]) if pkg in sys.modules.keys(): pdec = sys.modules[pkg] dec = pdec else: dec = __import__(pkg, globals(), locals(), pkl[-1]) self.append(getattr(dec, pkl[-1]), dk)
def __createDecoders(self): """ creates know decoders :brief: It calls constructor of know decoders """ for dk in self.__knowDecoders.keys(): self.__pool[dk] = self.__knowDecoders[dk]()
[docs] def hasDecoder(self, decoder): """ checks it the decoder is registered :param decoder: the given decoder :type decoder: :obj:`str` :returns: True if it the decoder is registered :rtype: :obj:`bool` """ return True if decoder in self.__pool.keys() else False
[docs] def get(self, decoder): """ checks it the decoder is registered :param decoder: the given decoder :type decoder: :obj:`str` :returns: True if it the decoder is registered :rtype: :obj:`object` """ if decoder in self.__pool.keys(): return self.__pool[decoder]
[docs] def pop(self, name): """ adds additional decoder :param name: name of the adding decoder :type name: :obj:`str` """ self.__pool.pop(name, None)
[docs] def append(self, decoder, name): """ adds additional decoder :param name: name of the adding decoder :type name: :obj:`str` :param decoder: instance of the adding decoder :type decoder: :obj:`object` :returns: name of decoder :rtype: :obj:`str` """ instance = decoder() self.__pool[name] = instance if not hasattr(instance, "load") or not hasattr(instance, "name") \ or not hasattr(instance, "shape") \ or not hasattr(instance, "decode") \ or not hasattr(instance, "dtype") \ or not hasattr(instance, "format"): self.pop(name) return return name