Source code for nxswriter.DBaseSource

#!/usr/bin/env python
#   This file is part of nexdatas - Tango Server for NeXus data writer
#
#    Copyright (C) 2012-2017 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

""" Definitions of DB datasource """

import xml.etree.ElementTree as et
from lxml.etree import XMLParser
import sys

from .Types import NTP

from .DataSources import DataSource
from .Errors import (PackageError, DataSourceSetupError)


#: (:obj:`list<str>`) list of available databases
DB_AVAILABLE = []

try:
    try:
        import MySQLdb
    except Exception:
        import pymysql
        pymysql.install_as_MySQLdb()
    DB_AVAILABLE.append("MYSQL")
except ImportError:
    pass
    # sys.stdout.write("MYSQL not available: %s\n" % e)
    # sys.stdout.flush()

try:
    import psycopg2
    DB_AVAILABLE.append("PGSQL")
except ImportError:
    pass
    # sys.stdout.write("PGSQL not available: %s\n" % e)
    # sys.stdout.flush()

try:
    import cx_Oracle
    DB_AVAILABLE.append("ORACLE")
except ImportError:
    pass
    # sys.stdout.write("ORACLE not available: %s\n" % e)
    # sys.stdout.flush()


[docs]class DBaseSource(DataSource): """ DataBase data source """ def __init__(self, streams=None, name=None): """ constructor :brief: It sets all member variables to None :param streams: tango-like steamset class :type streams: :class:`StreamSet` or :class:`tango.LatestDeviceImpl` :param name: datasource name :type name: :obj:`str` """ DataSource.__init__(self, streams=streams, name=name) #: (:obj:`str`) name of the host with the data source self.hostname = None #: (:obj:`str`) port related to the host self.port = None #: (:obj:`str`) database query self.query = None #: (:obj:`str`) DSN string self.dsn = None #: (:obj:`str`) database type, i.e. MYSQL, PGSQL, ORACLE self.dbtype = None #: (:obj:`str`) oracle database mode self.mode = None #: (:obj:`str`) database name self.dbname = None #: (:obj:`str`) database user self.user = None #: (:obj:`str`) database password self.passwd = None #: (:obj:`str`) mysql database configuration file self.mycnf = '/etc/my.cnf' #: (:obj:`str`) record format, i.e. `SCALAR`, `SPECTRUM`, `IMAGE` self.format = None #: (:obj:`dict` <:obj:`str`, :obj:`instancemethod`>) map self.__dbConnect = {"MYSQL": self.__connectMYSQL, "PGSQL": self.__connectPGSQL, "ORACLE": self.__connectORACLE} def __str__(self): """ self-description :returns: self-describing string :rtype: :obj:`str` """ return " %s DB %s with %s " % ( self.dbtype, self.dbname if self.dbname else "", self.query)
[docs] def setup(self, xml): """ sets the parrameters up from xml :param xml: datasource parameters :type xml: :obj:`str` :raises: :exc:`nxswriter.Errors.DataSourceSetupError` if :obj:`format` or :obj:`query` is not defined """ if sys.version_info > (3,): xml = bytes(xml, "UTF-8") root = et.fromstring(xml, parser=XMLParser(collect_ids=False)) query = root.find("query") if query is not None: self.format = query.get("format") self.query = self._getText(query) if not self.format or not self.query: if self._streams: self._streams.error( "DBaseSource::setup() - " "Database query or its format not defined: %s" % xml, std=False) raise DataSourceSetupError( "Database query or its format not defined: %s" % xml) db = root.find("database") if db is not None: self.dbname = db.get("dbname") self.dbtype = db.get("dbtype") self.user = db.get("user") self.passwd = db.get("passwd") self.mode = db.get("mode") mycnf = db.get("mycnf") if mycnf: self.mycnf = mycnf self.hostname = db.get("hostname") self.port = db.get("port") self.dsn = self._getText(db)
def __connectMYSQL(self): """ connects to MYSQL database :returns: open database object :rtype: :class:`MySQLdb.connections.Connection` """ args = {} if self.mycnf: args["read_default_file"] = self.mycnf if self.dbname: args["db"] = self.dbname if self.user: args["user"] = self.user if self.passwd: args["passwd"] = self.passwd if self.hostname: args["host"] = self.hostname if sys.version_info < (3,): for k in list(args.keys()): args[k] = args[k].encode() if self.port: args["port"] = int(self.port) return MySQLdb.connect(**args) def __connectPGSQL(self): """ connects to PGSQL database :returns: open database object :rtype: :class:`psycopg2._psycopg.connection` """ args = {} if self.dbname: args["database"] = self.dbname if self.user: args["user"] = self.user if self.passwd: args["password"] = self.passwd if self.hostname: args["host"] = self.hostname if sys.version_info < (3,): for k in list(args.keys()): args[k] = args[k].encode() if self.port: args["port"] = int(self.port) return psycopg2.connect(**args) def __connectORACLE(self): """ connects to ORACLE database :returns: open database object :rtype: :class:`cx_Oracle.Connection` """ args = {} if self.user: args["user"] = self.user if self.passwd: args["password"] = self.passwd if self.dsn: args["dsn"] = self.dsn if self.mode: args["mode"] = self.mode if sys.version_info < (3,): for k in list(args.keys()): args[k] = args[k].encode() return cx_Oracle.connect(**args)
[docs] def getData(self): """ provides access to the data :returns: dictionary with collected data :rtype: :obj:`dict` <:obj:`str`, any> """ db = None if self.dbtype in self.__dbConnect.keys() \ and self.dbtype in DB_AVAILABLE: db = self.__dbConnect[self.dbtype]() else: if self._streams: self._streams.error( "DBaseSource::getData() - " "Support for %s database not available" % self.dbtype, std=False) raise PackageError( "Support for %s database not available" % self.dbtype) if db: cursor = db.cursor() cursor.execute(self.query) if not self.format or self.format == 'SCALAR': # data = copy.deepcopy(cursor.fetchone()) data = cursor.fetchone() dh = {"rank": "SCALAR", "value": data[0], "tangoDType": (NTP.pTt[type(data[0]).__name__]), "shape": [1, 0]} elif self.format == 'SPECTRUM': data = cursor.fetchall() # data = copy.deepcopy(cursor.fetchall()) if len(data[0]) == 1: ldata = list(el[0] for el in data) else: ldata = list(el for el in data[0]) dh = {"rank": "SPECTRUM", "value": ldata, "tangoDType": (NTP.pTt[type(ldata[0]).__name__]), "shape": [len(ldata), 0]} else: data = cursor.fetchall() # data = copy.deepcopy(cursor.fetchall()) ldata = list(list(el) for el in data) dh = {"rank": "IMAGE", "value": ldata, "tangoDType": NTP.pTt[type(ldata[0][0]).__name__], "shape": [len(ldata), len(ldata[0])]} cursor.close() db.close() return dh