# This file is part of nexdatas - Tango Server for NeXus data writer
#
# Copyright (C) 2012-2018 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
# nexdatas is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# nexdatas is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with nexdatas. If not, see <http://www.gnu.org/licenses/>.
#
""" Command-line tool for ascess to the nexdatas configuration server """
import sys
import os
import argparse
import json
from .nxsparser import ParserTools, TableTools, TableDictTools, ESRFConverter
from .nxsargparser import (Runner, NXSArgParser, ErrorException)
from .nxsdevicetools import (checkServer, listServers, openServer)
#: (:obj:`bool`) True if tango available
PYTANGO = False
try:
try:
import tango
except Exception:
import PyTango as tango
PYTANGO = True
except Exception:
pass
if sys.version_info > (3,):
raw_input = input
[docs]class ConfigServer(object):
""" configuration server adapter
"""
def __init__(self, device, nonewline=False):
""" constructor
:param device: device name of the configuration server
:type device: :obj:`str`
:param nonewline: if the output should not be separated
by the new line character
:type nonewline: :obj:`bool`
"""
#: (:obj:`str`) spliting character
self.char = " " if nonewline else "\n"
#: (:class:`tango.DeviceProxy`) configuration server proxy
self._cnfServer = openServer(device)
self._cnfServer.Open()
[docs] def listCmd(self, ds, mandatory=False, private=False, profiles=False):
""" lists the DB item names
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param mandatory: flag set True for mandatory components
:type mandatory: :obj:`bool`
:param private: flag set True for components starting with '__'
:type private: :obj:`bool`
:param profiles: flag set True for profiles
:type profiles: :obj:`bool`
:returns: list op item names
:rtype: :obj:`list` <:obj:`str`>
"""
if ds:
if not mandatory:
return self._cnfServer.AvailableDataSources()
elif profiles:
return self._cnfServer.AvailableSelections()
else:
if mandatory:
return self._cnfServer.MandatoryComponents()
elif private:
return [cp for cp in self._cnfServer.AvailableComponents()
if cp.startswith("__")]
else:
return [cp for cp in self._cnfServer.AvailableComponents()
if not cp.startswith("__")]
return []
[docs] def sourcesCmd(self, components, mandatory=False):
""" lists datasources of the components
:param components: given components
:type components: :obj:`list` <:obj:`str`>
:returns: list of datasource names
:rtype: :obj:`list` <:obj:`str`>
"""
cmps = self._cnfServer.AvailableComponents()
result = []
for component in components:
if component not in cmps:
sys.stderr.write("Error: Component '%s' not stored in "
"the configuration server\n" % component)
sys.stderr.flush()
return []
if not mandatory:
for component in components:
result.extend(self._cnfServer.ComponentDataSources(component))
else:
result = self._cnfServer.ComponentsDataSources(components)
return result
[docs] def componentsCmd(self, components):
""" lists components of the components
:param components: given components
:type components: :obj:`list` <:obj:`str`>
:returns: list of component names
:rtype: :obj:`list` <:obj:`str`>
"""
cmps = self._cnfServer.AvailableComponents()
result = []
for component in components:
if component not in cmps:
sys.stderr.write("Error: Component '%s' not stored in "
"the configuration server\n" % component)
sys.stderr.flush()
return []
result = self._cnfServer.DependentComponents(components)
return result
[docs] def variablesCmd(self, components, mandatory=False):
""" lists variable of the components
:param components: given components
:type components: :obj:`list` <:obj:`str`>
:returns: list of datasource names
:rtype: :obj:`list` <:obj:`str`>
"""
cmps = self._cnfServer.AvailableComponents()
result = []
for component in components:
if component not in cmps:
sys.stderr.write("Error: Component '%s' not stored in "
"the configuration server\n" % component)
sys.stderr.flush()
return []
if not mandatory:
for component in components:
result.extend(self._cnfServer.ComponentVariables(component))
else:
result = self._cnfServer.ComponentsVariables(components)
return result
def __getDataSources(self, name):
""" provides datasources and its records for a given component
:param name: given component or datasource
:type name: :obj:`str`
:returns: tuple with names and records
:rtype: (:obj:`str` , :obj:`str`)
"""
records = []
names = []
interNames = []
xmlcp = self._cnfServer.Components([name])
for xmlc in xmlcp:
dslist = ParserTools.parseDataSources(xmlc)
for ds in dslist:
if ds["source_name"]:
interNames.append(ds["source_name"])
if ds["source"]:
records.append(ds["source"])
allNames = self._cnfServer.ComponentDataSources(name)
for nm in allNames:
if nm not in interNames:
names.append(nm)
return (names, records)
[docs] def recordCmd(self, ds, name):
""" lists datasources of the component
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param name: given component or datasource
:type name: :obj:`str`
:returns: list of record names
:rtype: :obj:`list` <:obj:`str`>
"""
if not name:
return []
name = name[0]
if not ds:
cmps = self._cnfServer.AvailableComponents()
if name not in cmps:
sys.stderr.write("Error: Component '%s' not stored in "
"the configuration server\n" % name)
sys.stderr.flush()
return []
names, records = self.__getDataSources(name)
else:
names = [name]
records = []
dsrcs = self._cnfServer.AvailableDataSources()
for nm in names:
if nm not in dsrcs:
sys.stderr.write("Error: Datasource '%s' not stored in "
"the configuration server\n" % nm)
sys.stderr.flush()
return []
xmls = self._cnfServer.DataSources(names)
for xml in xmls:
if xml:
try:
rec = ParserTools.parseRecord(xml)
if rec:
records.append(rec)
except Exception:
sys.stderr.write(
"Error: Datasource '%s' cannot be parsed\n" % xml)
sys.stderr.write(str(sys.exc_info()[0]) + ": "
+ str(sys.exc_info()[1]) + '\n')
sys.stderr.flush()
return []
return records
[docs] def showCmd(self, ds, args, mandatory=False, profiles=False,
directory=None):
""" shows the DB items
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param mandatory: flag set True for mandatory components
:type mandatory: :obj:`bool`
:param profiles: flag set True for profiles
:type profiles: :obj:`bool`
:param directory: output file directory
:type directory: :obj:`str`
:returns: list of XML items
:rtype: :obj:`list` <:obj:`str`>
"""
if ds:
dsrc = self._cnfServer.AvailableDataSources()
for ar in args:
if ar not in dsrc:
sys.stderr.write("Error: DataSource '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return []
elems = self._cnfServer.DataSources(args)
if not directory:
return elems
else:
for i, ar in enumerate(args):
name = os.path.join(directory, "%s.ds.xml" % ar)
with open(name, "w") as text_file:
text_file.write(elems[i])
elif profiles:
dsrc = self._cnfServer.AvailableSelections()
for ar in args:
if ar not in dsrc:
sys.stderr.write("Error: Profile '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return []
elems = self._cnfServer.Selections(args)
if not directory:
return elems
else:
for i, ar in enumerate(args):
name = os.path.join(directory, "%s.json" % ar)
with open(name, "w") as text_file:
text_file.write(elems[i])
else:
cmps = self._cnfServer.AvailableComponents()
for ar in args:
if ar not in cmps:
sys.stderr.write("Error: Component '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return []
if mandatory:
mand = list(self._cnfServer.MandatoryComponents())
mand.extend(args)
elems = self._cnfServer.Components(mand)
else:
elems = self._cnfServer.Components(args)
mand = args
if not directory:
return elems
else:
for i, ar in enumerate(mand):
name = os.path.join(directory, "%s.xml" % ar)
with open(name, "w") as text_file:
text_file.write(elems[i])
return []
[docs] def deleteCmd(self, ds, args, ask=True, profiles=False):
""" delete the DB items
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param ask: ask flag
:type ask: :obj:`bool`
:param profiles: flag set True for profiles
:type profiles: :obj:`bool`
:returns: list of XML items
:rtype: :obj:`list` <:obj:`str`>
"""
valid = {"yes": True, "y": True, "ye": True,
"no": False, "n": False}
default = "y"
label = "Component"
if ds:
dsrc = self._cnfServer.AvailableDataSources()
label = "DataSource"
elif profiles:
dsrc = self._cnfServer.AvailableSelections()
label = "Profile"
else:
dsrc = self._cnfServer.AvailableComponents()
for ar in args:
if ar not in dsrc:
sys.stderr.write(
"Error: %s '%s' not stored in "
"the configuration server\n" % (label, ar))
sys.stderr.flush()
return []
for ar in args:
choice = default
if ask:
choice = raw_input("Remove %s '%s'? [Y/n] \n" % (
label, ar)).lower()
while True:
if choice == '':
choice = default
break
elif choice in valid:
break
else:
sys.stdout.write("Please respond with 'yes' or 'no' "
"(or 'y' or 'n').\n")
sys.stdout.flush()
if valid[choice]:
if ds:
self._cnfServer.DeleteDataSource(ar)
elif profiles:
self._cnfServer.DeleteSelection(ar)
else:
self._cnfServer.DeleteComponent(ar)
return []
[docs] def uploadCmd(self, ds, args, force=False, profiles=False, directory='.',
mandatory=False, external=None):
""" upload the DB items from files
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param force: force flag
:type force: :obj:`bool`
:param profiles: flag set True for profiles
:type profiles: :obj:`bool`
:param directory: input file directory
:type directory: :obj:`str`
:param mandatory: mandatory flag
:type mandatory: :obj:`bool`
:param external: external import type
:type external: :obj:`str`
:returns: list of XML items
:rtype: :obj:`list` <:obj:`str`>
"""
label = "Component"
if ds:
dsrc = self._cnfServer.AvailableDataSources()
label = "DataSource"
elif profiles:
dsrc = self._cnfServer.AvailableSelections()
label = "Profile"
else:
dsrc = self._cnfServer.AvailableComponents()
if not force:
for ar in args:
if ar in dsrc:
sys.stderr.write(
"Error: %s '%s' is stored in "
"the configuration server\n" % (label, ar))
sys.stderr.flush()
return []
for ar in args:
if ds:
name = os.path.join(directory, "%s.ds.xml" % ar)
with open(name, 'r') as fl:
txt = fl.read()
self._cnfServer.XMLString = txt
self._cnfServer.StoreDataSource(ar)
elif profiles:
name = os.path.join(directory, "%s.json" % ar)
with open(name, 'r') as fl:
txt = fl.read()
self._cnfServer.Selection = txt
self._cnfServer.StoreSelection(ar)
else:
name = os.path.join(directory, "%s.xml" % ar)
with open(name, 'r') as fl:
txt = fl.read()
if external and external.lower() == "esrf":
txt = ESRFConverter().convert(txt)
self._cnfServer.XMLString = txt
self._cnfServer.StoreComponent(ar)
if mandatory:
self._cnfServer.SetMandatoryComponents([ar])
return []
[docs] def getCmd(self, args):
""" provides final configuration
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:returns: XML configuration string
:rtype: :obj:`str`
"""
cmps = self._cnfServer.AvailableComponents()
for ar in args:
if ar not in cmps:
sys.stderr.write(
"Error: Component '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return ""
self._cnfServer.CreateConfiguration(args)
return self._cnfServer.XMLString
def __describeDataSources(self, args, headers=None, filters=None):
""" provides description of datasources
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param headers: list of output parameters
:type headers: :obj:`list` <:obj:`str`>
:param filters: filters for first column names
:type filters: :obj:`list` <:obj:`str`>
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
xmls = ""
parameters = []
description = []
dss = self._cnfServer.AvailableDataSources()
if not dss:
sys.stderr.write(
"\n'%s' does not have any datasources\n\n"
% self._cnfServer.name())
sys.stderr.flush()
return ""
for ar in args:
if ar not in dss:
sys.stderr.write(
"Error: DataSource '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return ""
headers = headers or ["source_type", "source"]
if args:
dsxmls = self._cnfServer.DataSources(args)
for i, xmls in enumerate(dsxmls):
parameters = ParserTools.parseDataSources(xmls)
ttools = TableTools(parameters,
headers=headers,
filters=filters)
ttools.title = "DataSource: '%s'" % args[i]
description.extend(ttools.generateList())
else:
dsxmls = self._cnfServer.DataSources(dss)
xmls = ParserTools.mergeDefinitions(dsxmls).strip()
parameters.extend(ParserTools.parseDataSources(xmls))
if "source_name" not in headers:
headers = ["source_name"].extend(headers)
ttools = TableTools(parameters,
headers=headers,
filters=filters)
description.extend(ttools.generateList())
if not description:
sys.stderr.write(
"\nHint: add datasource names as command arguments "
"or -m for mandatory components \n\n")
sys.stderr.flush()
return ""
return description
def __describeProfiles(self, args, headers=None):
""" provides description of datasources
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param headers: list of output parameters
:type headers: :obj:`list` <:obj:`str`>
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
xmls = ""
parameters = []
description = []
dss = self._cnfServer.AvailableSelections()
if not dss:
sys.stderr.write(
"\n'%s' does not have any profiles\n\n"
% self._cnfServer.name())
sys.stderr.flush()
return ""
for ar in args:
if ar not in dss:
sys.stderr.write(
"Error: Profile '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return ""
args = args or dss
dsxmls = self._cnfServer.Selections(args)
for i, xmls in enumerate(dsxmls):
parameters = [json.loads(xmls)]
ttools = TableDictTools(parameters)
ttools.title = "Profile: '%s'" % args[i]
if headers:
ttools.headers = headers
description.extend(ttools.generateList())
if not description:
sys.stderr.write(
"\nHint: add profile names as command arguments "
"or -m for mandatory components \n\n")
sys.stderr.flush()
return ""
return description
def __describeComponents(self, args, headers=None, nonone=None,
private=False, attrs=True, filters=None):
""" provides description of components
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param headers: list of output parameters
:type headers: :obj:`list` <:obj:`str`>
:param nonone: list of parameters which have to exist to be shown
:type nonone: :obj:`list` <:obj:`str`>
:param private: flag set True for components starting with '__'
:type private: :obj:`bool`
:param attrs: flag set True for parsing attributes
:type attrs: :obj:`bool`
:param filters: filters for first column names
:type filters: :obj:`list` <:obj:`str`>
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
xmls = ""
parameters = []
description = []
cmps = self._cnfServer.AvailableComponents()
if not cmps:
sys.stderr.write(
"\n'%s' does not have any components\n\n"
% self._cnfServer.name())
sys.stderr.flush()
return ""
dargs = []
deps = {}
for ar in args:
dargs.append(ar)
if ar not in cmps:
sys.stderr.write(
"Error: Component '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return ""
else:
dars = self._cnfServer.dependentComponents([ar])
dars = list(set(dars) - set([ar]))
if dars:
dargs.extend(dars)
deps[ar] = dars
if not dargs:
if private:
dargs = [cp for cp in cmps if cp.startswith("__")]
else:
dargs = [cp for cp in cmps if not cp.startswith("__")]
dargs = dargs or cmps
if dargs:
try:
cpxmls = self._cnfServer.instantiatedComponents(dargs)
except Exception:
cpxmls = []
for ar in dargs:
try:
cpxmls.extend(
self._cnfServer.instantiatedComponents([ar]))
except Exception:
cpxmls.extend(self._cnfServer.Components([ar]))
sys.stderr.write(
"Error: Component '%s' cannot be instantiated\n"
% ar)
sys.stderr.flush()
for i, xmls in enumerate(cpxmls):
parameters = ParserTools.parseFields(xmls)
if attrs:
parameters.extend(ParserTools.parseAttributes(xmls))
parameters.extend(ParserTools.parseLinks(xmls))
ttools = TableTools(parameters, nonone,
headers,
filters)
if dargs[i] in deps:
ttools.title = "Component: '%s' %s" % (
dargs[i], deps[dargs[i]])
else:
ttools.title = "Component: '%s'" % dargs[i]
description.extend(ttools.generateList())
if not description:
sys.stderr.write(
"\nHint: add component names as command arguments "
"or -m for mandatory components \n\n")
sys.stderr.flush()
return ""
return description
def __describeConfiguration(self, args, headers=None, nonone=None,
attrs=True, filters=None):
""" provides description of final configuration
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param headers: list of output parameters
:type headers: :obj:`list` <:obj:`str`>
:param nonone: list of parameters which have to exist to be shown
:type nonone: :obj:`list` <:obj:`str`>
:param attrs: flag set True for parsing attributes
:type attrs: :obj:`bool`
:param filters: filters for first column names
:type filters: :obj:`list` <:obj:`str`>
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
xmls = ""
description = []
cmps = self._cnfServer.AvailableComponents()
if not cmps:
sys.stderr.write(
"\n'%s' does not have any components\n\n"
% self._cnfServer.name())
sys.stderr.flush()
return ""
for ar in args:
if ar not in cmps:
sys.stderr.write(
"Error: Component '%s' not stored in "
"the configuration server\n" % ar)
sys.stderr.flush()
return ""
self._cnfServer.CreateConfiguration(args)
xmls = str(self._cnfServer.XMLString).strip()
if xmls:
description.extend(ParserTools.parseFields(xmls))
if attrs:
description.extend(ParserTools.parseAttributes(xmls))
description.extend(ParserTools.parseLinks(xmls))
if not description:
sys.stderr.write(
"\nHint: add components as command arguments "
"or -m for mandatory components \n\n")
sys.stderr.flush()
return ""
ttools = TableTools(description, nonone,
headers=headers,
filters=filters)
if headers:
ttools.headers = headers
return ttools.generateList()
[docs] def describeCmd(self, ds, args, md, pr, headers=None, filters=None):
""" provides description of configuration elements
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param md: flag set True for mandatory components
:type md: :obj:`bool`
:param pr: flag set True for private components
:type pr: :obj:`bool`
:param pr: column headers
:type pr: :obj:`str`
:param filters: filters for first column names separated by comma
:type filters: :obj:`str`
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
if headers:
headers = str(headers).split(',')
else:
headers = None
if filters:
filters = str(filters).split(',')
else:
filters = None
if ds:
return self.__describeDataSources(args, headers, filters=filters)
elif not md:
return self.__describeComponents(
args, headers, private=pr, filters=filters)
else:
return self.__describeConfiguration(
args, headers, filters=filters)
[docs] def infoCmd(self, ds, args, md, pr, profiles):
""" Provides info for given elements
:param ds: flag set True for datasources
:type ds: :obj:`bool`
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param md: flag set True for mandatory components
:type md: :obj:`bool`
:param pr: flag set True for private components
:type pr: :obj:`bool`
:param profiles: flag set True for profiles
:type profiles: :obj:`bool`
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
cpheaders = [
"source_name",
"source_type",
"nexus_type",
"shape",
"strategy",
"source",
]
nonone = ["source_name"]
if ds:
return self.__describeDataSources(args)
elif profiles:
return self.__describeProfiles(args)
elif not md:
return self.__describeComponents(args, cpheaders, nonone, pr)
else:
return self.__describeConfiguration(args, cpheaders, nonone)
[docs] def geometryCmd(self, args, md, pr):
""" provides geometry info for given elements
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:param md: flag set True for mandatory components
:type md: :obj:`bool`
:param pr: flag set True for private components
:type pr: :obj:`bool`
:returns: list with description
:rtype: :obj:`list` <:obj:`str`>
"""
cpheaders = [
"nexus_path",
"source_name",
"units",
"trans_type",
"trans_vector",
"trans_offset",
"depends_on",
]
if not md:
return self.__describeComponents(args, cpheaders, private=pr,
attrs=False)
else:
return self.__describeConfiguration(args, cpheaders,
attrs=False)
[docs] def dataCmd(self, args):
""" provides varaible values
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:returns: JSON with variables
:rtype: :obj:`str`
"""
if args and len(args) > 0 and args[0]:
self._cnfServer.Variables = args[0]
return [str(self._cnfServer.Variables)]
[docs] def mergeCmd(self, args):
""" provides merged components
:param args: list of item names
:type args: :obj:`list` <:obj:`str`>
:returns: XML configuration string with merged components
:rtype: :obj:`str`
"""
cmps = self._cnfServer.AvailableComponents()
for ar in args:
if ar not in cmps:
sys.stderr.write(
"Error: Component '%s' not stored "
"in the configuration server\n" % ar)
sys.stderr.flush()
return ""
return self._cnfServer.Merge(args)
[docs]class List(Runner):
""" List runner"""
#: (:obj:`str`) command description
description = "list names of available components, datasources or profiles"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig list\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-r", "--profiles", action="store_true",
default=False, dest="profiles",
help="perform operation for profiles")
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
parser.add_argument("-p", "--private", action="store_true",
default=False, dest="private",
help="make use private components,"
" i.e. starting with '__'")
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, options.nonewlines)
string = cnfserver.char.join(cnfserver.listCmd(
options.datasources, options.mandatory, options.private,
options.profiles
))
return string
[docs]class Show(Runner):
""" Show runner"""
#: (:obj:`str`) command description
description = "show (or write to files) " \
"components, datasources or profiles with given names"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig show dcm\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-r", "--profiles", action="store_true",
default=False, dest="profiles",
help="perform operation for profiles")
parser.add_argument("-o", "--directory", dest="directory",
help=("output file directory"))
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components, datasources '
'or profiles')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, False)
string = cnfserver.char.join(cnfserver.showCmd(
options.datasources, options.args, False,
options.profiles, options.directory
))
return string
[docs]class Delete(Runner):
""" Show runner"""
#: (:obj:`str`) command description
description = "delete components, datasources or profiles " \
"with given names from ConfigServer"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig delete pilatus1a\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-r", "--profiles", action="store_true",
default=False, dest="profiles",
help="perform operation for profiles")
parser.add_argument("-f", "--force", action="store_true",
default=False, dest="force",
help="do not ask")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components, datasources '
'or profiles')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, False)
string = cnfserver.char.join(cnfserver.deleteCmd(
options.datasources, options.args, not options.force,
options.profiles
))
return string
[docs]class Upload(Runner):
""" Store runner"""
#: (:obj:`str`) command description
description = "upload components, datasources or profiles " \
"with given names from locale filesystem into ConfigServer"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig upload exp_c01 exp_c02\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="set the component as mandatory")
parser.add_argument("-r", "--profiles", action="store_true",
default=False, dest="profiles",
help="perform operation for profiles")
parser.add_argument("-f", "--force", action="store_true",
default=False, dest="force",
help="do not ask")
parser.add_argument("-i", "--directory", dest="directory",
default=".",
help=("input file directory, default: '.'"))
parser.add_argument("-e", "--external", dest="external",
default="",
help=("external format of xml, e.g. 'esrf'"))
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components, datasources '
'or profiles')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, False)
string = cnfserver.char.join(cnfserver.uploadCmd(
options.datasources, options.args, options.force,
options.profiles, options.directory, options.mandatory,
options.external
))
return string
[docs]class Get(Runner):
""" Get runner"""
#: (:obj:`str`) command description
description = "get full configuration of components"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig get dcm source slit1 slit2\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = str(cnfserver.getCmd(options.args))
return string
[docs]class Merge(Runner):
""" Merge runner"""
#: (:obj:`str`) command description
description = "get merged configuration of components or datasources"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig merge slit1 dcm \n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = str(cnfserver.getCmd(options.args))
return string
[docs]class Sources(Runner):
""" Sources runner"""
#: (:obj:`str`) command description
description = "get a list of component datasources"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig sources slit1\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, options.nonewlines)
string = cnfserver.char.join(cnfserver.sourcesCmd(
options.args, options.mandatory))
return string
[docs]class Components(Runner):
""" Components runner"""
#: (:obj:`str`) command description
description = "get a list of dependent components"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig components dcm\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, options.nonewlines)
string = cnfserver.char.join(cnfserver.componentsCmd(
options.args))
return string
[docs]class Variables(Runner):
""" Variables runner"""
#: (:obj:`str`) command description
description = "get a list of component variables"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig variables dcm\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, options.nonewlines)
string = cnfserver.char.join(cnfserver.variablesCmd(
options.args, options.mandatory))
return string
[docs]class Data(Runner):
""" Data runner"""
#: (:obj:`str`) command description
description = "get/set values of component variables"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig data \n" \
+ " nxsconfig data '{\"sample_name\":\"H2O\"}'\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument('args', metavar='name', type=str, nargs='?',
help='data dictionary in json string')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = cnfserver.char.join(cnfserver.dataCmd(
options.args))
return string
[docs]class Record(Runner):
""" Record runner"""
#: (:obj:`str`) command description
description = "get a list of datasource record names for " \
+ "components or datasources"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig record -d exp_mot01 \n" \
+ " nxsconfig record dcm \n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
parser.add_argument('args', metavar='name', type=str, nargs='?',
help='name of components or datasources')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server, options.nonewlines)
string = cnfserver.char.join(cnfserver.recordCmd(
options.datasources, options.args))
return string
[docs]class Servers(Runner):
""" Servers runner"""
#: (:obj:`str`) command description
description = "get a list of configuration servers from" \
+ " the current tango host"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig servers\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("tango host or configuration server"))
parser.add_argument("-n", "--no-newlines", action="store_true",
default=False, dest="nonewlines",
help="split result with space characters")
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
char = " " if options.nonewlines else "\n"
return char.join(listServers(options.server, 'NXSConfigServer'))
[docs]class Describe(Runner):
""" Describe runner"""
#: (:obj:`str`) command description
description = "show all parameters of given components" \
+ " or datasources"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig describe pilatus\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument(
"-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument(
"-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument(
"-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
self._parser.add_argument(
"-c", "--columns",
help="names of column to be shown (separated by commas "
"without spaces). The possible names are: "
"depends_on, dtype, full_path, nexus_path, nexus_type, shape,"
" source, source_name, source_type, strategy, trans_type, "
"trans_offset, trans_vector, units, value",
dest="headers", default="")
self._parser.add_argument(
"-f", "--filters",
help="filters on the first column (separated by commas "
"without spaces). Default: '*'. E.g. '*:NXsample/*'",
dest="filters", default="")
parser.add_argument(
"-p", "--private", action="store_true",
default=False, dest="private",
help="make use private components,"
" i.e. starting with '__'")
parser.add_argument(
'args', metavar='name', type=str, nargs='*',
help='names of components or datasources')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = cnfserver.char.join(cnfserver.describeCmd(
options.datasources, options.args, options.mandatory,
options.private, options.headers, options.filters))
return string
[docs]class Info(Runner):
""" List runner"""
#: (:obj:`str`) command description
description = "show general parameters of given components," \
+ " datasources or profile"
#: (:obj:`str`) command epilog
epilog = "" \
+ " examples:\n" \
+ " nxsconfig info slit1\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-d", "--datasources", action="store_true",
default=False, dest="datasources",
help="perform operation for datasources")
parser.add_argument("-r", "--profiles", action="store_true",
default=False, dest="profiles",
help="perform operation for profiles")
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
parser.add_argument("-p", "--private", action="store_true",
default=False, dest="private",
help="make use private components,"
" i.e. starting with '__'")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components, datasources '
'or profiles')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = cnfserver.char.join(cnfserver.infoCmd(
options.datasources, options.args, options.mandatory,
options.private, options.profiles))
return string
[docs]class Geometry(Runner):
""" List runner"""
#: (:obj:`str`) command description
description = "show transformation parameters of given components" \
+ " or datasources"
epilog = "" \
+ " examples:\n" \
+ " nxsconfig geometry dcm\n" \
+ "\n"
[docs] def create(self):
""" creates parser
"""
parser = self._parser
parser.add_argument("-s", "--server", dest="server",
help=("configuration server device name"))
parser.add_argument("-m", "--mandatory", action="store_true",
default=False, dest="mandatory",
help="make use mandatory components")
parser.add_argument("-p", "--private", action="store_true",
default=False, dest="private",
help="make use private components,"
" i.e. starting with '__'")
parser.add_argument('args', metavar='name', type=str, nargs='*',
help='names of components or datasources')
[docs] def run(self, options):
""" the main program function
:param options: parser options
:type options: :class:`argparse.Namespace`
:returns: output information
:rtype: :obj:`str`
"""
cnfserver = ConfigServer(options.server)
string = cnfserver.char.join(cnfserver.geometryCmd(
options.args, options.mandatory,
options.private))
return string
[docs]def main():
""" the main program function
"""
#: pipe arguments
pipe = []
if not sys.stdin.isatty():
#: system pipe
pipe = sys.stdin.readlines()
description = "Command-line tool for reading NeXus configuration " \
+ "from NXSConfigServer"
epilog = 'For more help:\n nxsconfig <sub-command> -h'
parser = NXSArgParser(
description=description, epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.cmdrunners = [('list', List),
('show', Show),
('get', Get),
('delete', Delete),
('upload', Upload),
('variables', Variables),
('sources', Sources),
('record', Record),
('merge', Merge),
('components', Components),
('data', Data),
('describe', Describe),
('info', Info),
('geometry', Geometry),
('servers', Servers)]
runners = parser.createSubParsers()
try:
options = parser.parse_args()
except ErrorException as e:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.flush()
parser.print_help()
print("")
sys.exit(255)
if hasattr(options, "datasources") and \
hasattr(options, "profiles") and \
options.datasources and options.profiles:
sys.stderr.write(
"Error: %s\n" % str(
"argument -d/--datasources and -r/--profiles "
"cannot be selected simultaneously"))
sys.stderr.flush()
parser.print_help()
print("")
sys.exit(255)
if options.subparser is None:
sys.stderr.write(
"Error: %s\n" % str("too few arguments"))
sys.stderr.flush()
parser.print_help()
print("")
sys.exit(255)
if options.subparser != 'servers':
if not options.server:
options.server = checkServer()
if not options.server:
print(list(parser.subparsers.keys()))
parser.subparsers[options.subparser].print_help()
print("")
sys.exit(255)
#: command-line and pipe arguments
parg = []
if hasattr(options, "args"):
if not isinstance(options.args, list):
options.args = [options.args] if options.args else []
parg = options.args or []
if pipe:
parg.extend([p.strip() for p in pipe])
if hasattr(options, "args"):
options.args[:] = parg
try:
result = runners[options.subparser].run(options)
# except tango.DevFailed as
except Exception as e:
if isinstance(e, EOFError) \
and str(e).startswith("EOF when reading a line"):
sys.stderr.write("Error: %s. Consider to use the "
"--force option \n" % str(e))
sys.stderr.flush()
sys.exit(255)
if PYTANGO and isinstance(e, tango.DevFailed):
# print(str((e.args[0]).desc))
if str((e.args[0]).desc).startswith(
"NonregisteredDBRecordError: The datasource "):
mydss = str((e.args[0]).desc)[43:].split()
if not mydss or not mydss[0]:
mydss = ["UKNOWN"]
sys.stderr.write(
"Error: Datasource %s not stored in Configuration Server\n"
% mydss[0])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
"nxsconfigserver.Errors.NonregisteredDBRecordError:"
" The datasource "):
mydss = str((e.args[0]).desc)[66:].split()
if not mydss or not mydss[0]:
mydss = ["UKNOWN"]
sys.stderr.write(
"Error: Datasource %s not stored in Configuration Server\n"
% mydss[0])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
"NonregisteredDBRecordError: Component "):
mydss = str((e.args[0]).desc)[38:].split()
if not mydss or not mydss[0]:
mydss = ["UKNOWN"]
sys.stderr.write(
"Error: Component %s not stored in Configuration Server\n"
% mydss[0])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
"nxsconfigserver.Errors.NonregisteredDBRecordError:"
" Component "):
mydss = str((e.args[0]).desc)[61:].split()
if not mydss or not mydss[0]:
mydss = ["UKNOWN"]
sys.stderr.write(
"Error: Component %s not stored in Configuration Server\n"
% mydss[0])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
'IncompatibleNodeError: '):
sys.stderr.write("Error:%s\n" % (e.args[0]).desc[22:])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
'nxsconfigserver.Errors.IncompatibleNodeError: '):
sys.stderr.write("Error:%s\n" % (e.args[0]).desc[45:])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
'ExpatError: '):
sys.stderr.write("Error from XML parser: %s\n"
% (e.args[0]).desc[12:])
sys.stderr.flush()
elif str((e.args[0]).desc).startswith(
'nxsconfigserver.Errors.ExpatError: '):
sys.stderr.write("Error from XML parser: %s\n"
% (e.args[0]).desc[35:])
sys.stderr.flush()
else:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.flush()
sys.exit(255)
else:
sys.stderr.write("Error: %s\n" % str(e))
sys.stderr.flush()
sys.exit(255)
# raise
if result and str(result).strip():
print(result)
if __name__ == "__main__":
main()