# "$Name: $";
# "$Header: $";
# =============================================================================
#
# file : NXSRecSelector.py
#
# description : Python source for the NXSRecSelector and its commands.
# The class is derived from Device. It represents the
# CORBA servant object which will be accessed from the
# network. All commands which can be executed on the
# NXSRecSelector are implemented in this file.
#
# project : TANGO Device Server
#
# $Author: $
#
# $Revision: $
#
# $Log: $
#
# copyleft : European Synchrotron Radiation Facility
# BP 220, Grenoble 38043
# FRANCE
#
# =============================================================================
# This file is generated by POGO
# (Program Obviously used to Generate tango Object)
#
# (c) - Software Engineering Group - ESRF
# =============================================================================
#
""" Selector Server for NeXus Sardana Recorder """
# ==================================================================
# ==================================================================
try:
import tango
except Exception:
import PyTango as tango
from .Settings import Settings as STG
from .Utils import Utils
[docs]class NXSRecSelector(tango.LatestDeviceImpl):
""" NXSRecSelector server interface
:brief: Tango Server for Nexus Sardana Recorder Settings.
Device States Description:
DevState.ON - Server is ON,
DevState.RUNNING - Performing a query
"""
def __init__(self, cl, name):
""" Device constructor
:param cl: class name
:type cl: :obj:`str`
:param name: device name
:type name: :obj:`str`
"""
tango.LatestDeviceImpl.__init__(self, cl, name)
self.debug_stream("In __init__()")
#: (:class:`nxsrecconfig.Settings.Settings`) \
#: Recorder Settings
self.__stg = None
#: (:class:`tango.DeviceProxy`) self device proxy
self.__dp = None
#: (:obj:`list` <:obj:`str`>) \
#: memorize attribute to be updated on conf change
self.__toupdate = ['ConfigDevice', 'Door']
NXSRecSelector.init_device(self)
[docs] def delete_device(self):
""" Device destructor
"""
self.debug_stream("In delete_device()")
if hasattr(self, 'stg') and self.__stg:
del self.__stg
self.__stg = None
self.set_state(tango.DevState.OFF)
[docs] def init_device(self):
""" Device initialization
"""
self.debug_stream("In init_device()")
if hasattr(self, 'stg') and self.__stg:
del self.__stg
self.__stg = None
self.get_device_properties(self.get_device_class())
numberofthreads = self.NumberOfThreads or None
defaultpath = self.DefaultNeXusPath or None
defaultudatapath = self.DefaultUserDataPath or None
defaultzone = self.DefaultTimeZone or None
defaultmntgrp = self.DefaultMntGrp or None
defaultnexustype = self.DefaultNeXusType or None
syncsnapshot = bool(self.SyncSnapshot)
writepoolmotorpositions = bool(self.WritePoolMotorPositions)
self.__stg = STG(self, numberofthreads, defaultpath,
defaultzone, defaultmntgrp, syncsnapshot,
writepoolmotorpositions, defaultnexustype,
defaultudatapath)
self.set_state(tango.DevState.ON)
self.__stg.poolBlacklist = self.PoolBlacklist or []
self.__stg.timerFilters = self.TimerFilters or [
"*dgg*", "*/timer/*", "*/ctctrl0*"]
self.__stg.masterTimerFirst = bool(self.MasterTimerFirst)
self.__stg.mergeProfilesToMntGrps = bool(self.MergeProfilesToMntGrps)
self.__stg.resetInvalidDoor = bool(self.ResetInvalidDoor)
self.__stg.masterTimer = bool(self.MasterTimer)
self.__stg.mutedChannelFilters = self.MutedChannelFilters \
or ["*tip551*"]
self.__stg.mutedPreScanAttrFilters = self.MutedPreScanAttrFilters or []
self.__stg.adminDataNames = self.AdminDataNames or []
self.__stg.defaultPreselectedComponents = \
self.DefaultPreselectedComponents or []
self.__stg.defaultCanFailDataSources = \
self.DefaultCanFailDataSources or []
self.__stg.tangoSourceErrorStates = \
self.TangoSourceErrorStates or []
self.__stg.tangoSourceWarningStates = \
self.TangoSourceWarningStates or []
# print(self.TangoSourceErrorStates)
# print(self.TangoSourceWarningStates)
self.__stg.clientRecordKeys = \
self.ClientRecordKeys or []
[docs] def always_executed_hook(self):
""" Always excuted hook method
"""
self.debug_stream("In always_excuted_hook()")
# ==================================================================
#
# NXSRecSelector read/write attribute methods
#
# ==================================================================
[docs] def read_attr_hardware(self, _):
""" Read Attribute Hardware
"""
self.debug_stream("In read_attr_hardware()")
[docs] def read_Components(self, attr):
""" Read Components attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_Components()")
attr.set_value(self.__stg.components)
[docs] def read_DescriptionErrors(self, attr):
""" Read DescriptionErrors attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_DescriptionErrors()")
attr.set_value(self.__stg.descriptionErrors)
[docs] def read_Version(self, attr):
""" Read Version attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_Version()")
attr.set_value(self.__stg.version)
[docs] def read_MacroServer(self, attr):
""" Read MacroServer attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_MacroServer()")
attr.set_value(self.__stg.macroServer)
[docs] def read_Door(self, attr):
""" Read Door attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_Door()")
attr.set_value(self.__stg.door)
[docs] def write_Door(self, attr):
""" Write Door attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_Door()")
self.__stg.door = attr.get_write_value()
[docs] def read_StepDataSources(self, attr):
""" Read StepDataSources attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_StepDataSources()")
attr.set_value(self.__stg.stepdatasources or "")
[docs] def write_StepDataSources(self, attr):
""" Write StepDataSources attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_StepDataSources()")
if self.is_StepDataSources_write_allowed():
self.__stg.stepdatasources = attr.get_write_value() or ""
else:
self.warn_stream("To change the settings please close the server.")
raise Exception(
"To change the settings please close the server.")
[docs] def is_StepDataSources_write_allowed(self):
""" StepDataSources attribute Write State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def read_CanFailDataSources(self, attr):
""" Read CanFailDataSources attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_CanFailDataSources()")
attr.set_value(self.__stg.canfaildatasources or "")
[docs] def write_CanFailDataSources(self, attr):
""" Write CanFailDataSources attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_CanFailDataSources()")
if self.is_CanFailDataSources_write_allowed():
self.__stg.canfaildatasources = attr.get_write_value() or ""
else:
self.warn_stream("To change the settings please close the server.")
raise Exception(
"To change the settings please close the server.")
[docs] def is_CanFailDataSources_write_allowed(self):
""" CanFailDataSources attribute Write State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def read_LinkDataSources(self, attr):
""" Read LinkDataSources attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_LinkDataSources()")
attr.set_value(self.__stg.linkdatasources or "")
[docs] def write_LinkDataSources(self, attr):
""" Write LinkDataSources attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_LinkDataSources()")
if self.is_LinkDataSources_write_allowed():
self.__stg.linkdatasources = attr.get_write_value() or ""
else:
self.warn_stream("To change the settings please close the server.")
raise Exception(
"To change the settings please close the server.")
[docs] def is_LinkDataSources_write_allowed(self):
""" LinkDataSources attribute Write State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def read_ConfigDevice(self, attr):
""" Read ConfigDevice attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ConfigDevice()")
attr.set_value(self.__stg.configDevice)
[docs] def write_ConfigDevice(self, attr):
""" Write ConfigDevice attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ConfigDevice()")
self.__stg.configDevice = attr.get_write_value()
[docs] def read_MntGrp(self, attr):
""" Read MntGrp attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_MntGrp()")
attr.set_value(self.__stg.mntGrp)
[docs] def write_MntGrp(self, attr):
""" Write MntGrp attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_MntGrp()")
self.__stg.mntGrp = attr.get_write_value()
[docs] def read_ScanDir(self, attr):
""" Read ScanDir attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ScanDir()")
attr.set_value(self.__stg.scanDir)
[docs] def write_ScanDir(self, attr):
""" Write ScanDir attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ScanDir()")
self.__stg.scanDir = attr.get_write_value()
[docs] def read_ScanFile(self, attr):
""" Read ScanFile attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ScanFile()")
attr.set_value(self.__stg.scanFile or "")
[docs] def write_ScanFile(self, attr):
""" Write ScanFile attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ScanFile()")
self.__stg.scanFile = attr.get_write_value() or ""
[docs] def read_ScanID(self, attr):
""" Read ScanID attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ScanID()")
attr.set_value(self.__stg.scanID)
[docs] def write_ScanID(self, attr):
""" Write ScanID attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ScanID()")
self.__stg.scanID = attr.get_write_value()
[docs] def read_WriterDevice(self, attr):
""" Read WriterDevice attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_WriterDevice()")
attr.set_value(self.__stg.writerDevice)
[docs] def write_WriterDevice(self, attr):
""" Write WriterDevice attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_WriterDevice()")
self.__stg.writerDevice = attr.get_write_value()
[docs] def read_DeviceGroups(self, attr):
""" Read DeviceGroups attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_DeviceGroups()")
attr.set_value(self.__stg.deviceGroups)
[docs] def write_DeviceGroups(self, attr):
""" Write DeviceGroups attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_DeviceGroups()")
self.__stg.deviceGroups = attr.get_write_value()
[docs] def read_UserData(self, attr):
""" Read UserData attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_UserData()")
attr.set_value(self.__stg.userData)
[docs] def write_UserData(self, attr):
""" Write UserData attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_UserData()")
self.__stg.userData = attr.get_write_value()
[docs] def read_DataSources(self, attr):
""" Read DataSources attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_DataSources()")
attr.set_value(self.__stg.dataSources)
[docs] def read_ProfileConfiguration(self, attr):
""" Read ProfileConfiguration attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_DataSources()")
attr.set_value(self.__stg.profileConfiguration)
[docs] def write_ProfileConfiguration(self, attr):
""" Write ProfileConfiguration attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ProfileConfiguration()")
self.__stg.profileConfiguration = attr.get_write_value()
try:
self.__dp = self.__dp or tango.DeviceProxy(
Utils.tostr(self.get_name()))
for var in self.__toupdate:
if var in self.__stg.names():
if hasattr(self.__dp, var):
self.__dp.write_attribute(
Utils.tostr(var), self.__stg.value(var))
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def read_AppendEntry(self, attr):
""" Read AppendEntry attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_AppendEntry()")
attr.set_value(self.__stg.appendEntry)
[docs] def write_AppendEntry(self, attr):
""" Write AppendEntry attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_AppendEntry()")
self.__stg.appendEntry = attr.get_write_value()
[docs] def read_ConfigVariables(self, attr):
""" Read ConfigVariables attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ConfigVariables()")
attr.set_value(self.__stg.configVariables)
[docs] def write_ConfigVariables(self, attr):
""" Write ConfigVariables attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ConfigVariables()")
self.__stg.configVariables = attr.get_write_value()
[docs] def read_ProfileFile(self, attr):
""" Read ProfileFile attribute
:param attr: read attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In read_ProfileFile()")
attr.set_value(self.__stg.profileFile)
[docs] def write_ProfileFile(self, attr):
""" Write ProfileFile attribute
:param attr: written attribute
:type attr: :class:`tango.Attribute`
"""
self.debug_stream("In write_ProfileFile()")
self.__stg.profileFile = attr.get_write_value()
# ==================================================================
#
# NXSRecSelector command methods
#
# ==================================================================
[docs] def LoadProfile(self):
""" LoadProfile command
:brief: Load server configuration
"""
self.debug_stream("In LoadProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.loadProfile()
# updating memorized attributes
self.__dp = self.__dp or tango.DeviceProxy(
Utils.tostr(self.get_name()))
for var in self.__toupdate:
if var in self.__stg.names():
if hasattr(self.__dp, var):
self.__dp.write_attribute(
Utils.tostr(var), self.__stg.value(var))
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_LoadProfile_allowed(self):
""" LoadProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def FetchProfile(self):
""" FetchProfile command
:brief: Fetch server configuration
"""
self.debug_stream("In FetchProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.fetchProfile()
# updating memorized attributes
self.__dp = self.__dp or tango.DeviceProxy(
Utils.tostr(self.get_name()))
for var in self.__toupdate:
if var in self.__stg.names():
if hasattr(self.__dp, var):
self.__dp.write_attribute(
Utils.tostr(var), self.__stg.value(var))
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_FetchProfile_allowed(self):
""" FetchProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SaveProfile(self):
""" SaveProfile command
:brief: Save server configuration
"""
self.debug_stream("In SaveProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.saveProfile()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_SaveProfile_allowed(self):
""" SaveProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def StoreProfile(self):
""" StoreProfile command command
:brief: Store server configuration
"""
self.debug_stream("In StoreProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.storeProfile()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_StoreProfile_allowed(self):
""" StoreProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def PreselectComponents(self):
""" PreselectComponents command
:brief: Check existing devices of pools
"""
self.debug_stream("In PreselectComponents()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.preselectComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_PreselectComponents_allowed(self):
""" PreselectComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ResetPreselectedComponents(self):
""" ResetPreselectedComponents command
:brief: reset PreselectedComponents
to DefaultPreselectedComponents
"""
self.debug_stream("In ResetPreselectedComponents()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.resetPreselectedComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_ResetPreselectedComponents_allowed(self):
""" ResetPreselectedComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def DeleteAllProfiles(self):
""" DeleteAllProfiles command
:brief: Delete all profiles from Configuration Server
"""
self.debug_stream("In DeleteAllProfiles()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.deleteAllProfiles()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_DeleteAllProfiles_allowed(self):
""" DeleteAllProfiles command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def UpdateConfigVariables(self):
""" UpdateConfigVariables command
:brief: Send ConfigVariables into ConfigServer
and update serialno if appendEntry selected
"""
self.debug_stream("In UpdateConfigVariables()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.updateConfigVariables()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_UpdateConfigVariables_allowed(self):
""" UpdateConfigVariables command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def IsMntGrpUpdated(self):
""" IsMntGrpUpdated command
:returns: True if mntgrp was changed
:rtype: :obj:`bool`
"""
self.debug_stream("In IsMntGrpUpdated()")
try:
self.set_state(tango.DevState.RUNNING)
conf = bool(self.__stg.isMntGrpUpdated())
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return conf
[docs] def is_IsMntGrpUpdated_allowed(self):
""" IsMntGrpUpdated command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def MntGrpConfiguration(self):
""" MntGrpConfiguration command
:brief: returns mntgrp configuration
:returns: Provide mntgrp configuration in json string
:rtype: :obj:`str`
"""
self.debug_stream("In MntGrpConfiguration()")
try:
self.set_state(tango.DevState.RUNNING)
conf = Utils.tostr(self.__stg.mntGrpConfiguration())
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return conf
[docs] def is_MntGrpConfiguration_allowed(self):
""" MntGrpConfiguration command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def UpdateProfile(self):
""" UpdateProfile command
:brief: Update mntgrp configuration
:returns: JSON string with mntgrp configuration info
:rtype: :obj:`str`
"""
self.debug_stream("In UpdateProfile()")
try:
self.set_state(tango.DevState.RUNNING)
conf = Utils.tostr(self.__stg.updateProfile())
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return conf
[docs] def is_UpdateProfile_allowed(self):
""" UpdateProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def UpdateMntGrp(self):
""" UpdateMntGrp command
:brief: Update mntgrp configuration
:returns: mntgrp configuration string
:rtype: :obj:`str`
"""
self.debug_stream("In UpdateMntGrp()")
try:
self.set_state(tango.DevState.RUNNING)
conf = Utils.tostr(self.__stg.updateMntGrp())
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return conf
[docs] def is_UpdateMntGrp_allowed(self):
""" UpdateMntGrp command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SwitchProfile(self):
""" SwitchProfile command
:brief: Switch mntgrp configuration
"""
self.debug_stream("In SwitchProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.switchProfile()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_SwitchProfile_allowed(self):
""" SwitchProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ImportMntGrp(self):
""" ImportMntGrp command
:brief: Import active mntgrp configuration
"""
self.debug_stream("In ImportMntGrp()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.importMntGrp()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_ImportMntGrp_allowed(self):
""" ImportMntGrp command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ImportEnvProfile(self):
""" ImportEnvProfile command
:brief: Import all environment variables
"""
self.debug_stream("In ImportEnvProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.importEnvProfile()
# updating memorized attributes
self.__dp = self.__dp or tango.DeviceProxy(
Utils.tostr(self.get_name()))
for var in self.__toupdate:
if var in self.__stg.names():
if hasattr(self.__dp, var):
self.__dp.write_attribute(
Utils.tostr(var), self.__stg.value(var))
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_ImportEnv_allowed(self):
""" ImportMntGrp command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ExportEnvProfile(self):
""" ExportEnvProfile command
:brief: Export all environment variables
"""
self.debug_stream("In ExportEnvProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.exportEnvProfile()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_ExportEnv_allowed(self):
""" ExportMntGrp command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AvailableTimers(self):
""" AvailableTimers command
:brief: Returns a list of available component names
:returns: DevVarStringArray list of available component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AvailableTimers()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.availableTimers()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AvailableTimers_allowed(self):
""" AvailableTimers command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def MutedChannels(self):
""" MutedChannels command
:brief: Return a list of muted channel names
:returns: DevVarStringArray list of muted channel names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In MutedChannels()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.mutedChannels()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_MutedChannels_allowed(self):
""" MutedChannels command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AvailableComponents(self):
""" AvailableComponents command
:brief: Return a list of available component names
:returns: DevVarStringArray list of available component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AvailableComponents()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.availableComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AvailableComponents_allowed(self):
""" AvailableComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ComponentDescription(self):
""" ComponentDescription command
:brief: Return a list of available component names
:returns: DevString list of available component names
:rtype: :obj:`str`
"""
self.debug_stream("In ComponentDescription()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.componentDescription()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ComponentDescription_allowed(self):
""" ComponentDescription command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SetScanEnvVariables(self, argin):
""" SetScanEnvVariables command
:brief: Store ScanDir, ScanFile and NeXusSelectorDevice
in environment variables
:param argin: json dictionary with environment data
:type argin: :obj:`str`
:returns: DevLong scan ID
:rtype: :obj:`int`
"""
self.debug_stream("In SetScanEnvVariables()")
try:
self.set_state(tango.DevState.RUNNING)
argout = int(self.__stg.setScanEnvVariables(argin))
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_SetScanEnvVariables_allowed(self):
""" SetScanEnvVariables command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def FullDeviceNames(self):
""" FullDeviceNames command
:brief: Return a JSON with full device names for all aliases
:returns: DevString JSON dictionary with full device names
:rtype: :obj:`str`
"""
self.debug_stream("In FullDeviceNames()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.fullDeviceNames()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_FullDeviceNames_allowed(self):
""" FullDeviceNames command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ScanEnvVariables(self):
""" ScanEnvVariables command
:brief: Fetch ScanDir, ScanFile, ScanID and NeXusSelectorDevice
in environment variables
:returns: DevString json dictionary with environment data
:rtype: :obj:`str`
"""
self.debug_stream("In ScanEnvVariables()")
try:
self.set_state(tango.DevState.RUNNING)
argout = Utils.tostr(self.__stg.scanEnvVariables())
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ScanEnvVariables_allowed(self):
""" ScanEnvVariables command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def VariableComponents(self):
""" VariableComponents command
:brief: Returns a list of available component names
:returns: DevString list of available component names
:rtype: :obj:`str`
"""
self.debug_stream("In VariableComponents()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.variableComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_VariableComponents_allowed(self):
""" VariableComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AvailableProfiles(self):
""" AvailableProfiles command
:brief: Return a list of available selection names
:returns: DevVarStringArray list of available selection names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AvailableProfiles()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.availableProfiles()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AvailableProfiles_allowed(self):
""" AvailableProfiles command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AvailableMntGrps(self):
""" AvailableMntGrps command
:brief: Return a list of available mntgrp names
:returns: DevVarStringArray list of available mntgrp names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AvailableMntGrps()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.availableMntGrps()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AvailableMntGrps_allowed(self):
""" AvailableMntGrps command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AvailableDataSources(self):
""" AvailableDataSources command
:brief: Return a list of available DataSource names
:returns: DevVarStringArray list of available DataSource names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AvailableDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.availableDataSources()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AvailableDataSources_allowed(self):
""" AvailableDataSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def PoolElementNames(self, argin):
""" PoolElementNames command
:brief: Return a list of available pool channels
:param argin: DevString name of pool list attribute
:type argin: :obj:`str`
:returns: DevVarStringArray list of available pool elements
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In PoolElementNames()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.poolElementNames(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_PoolElementNames_allowed(self):
""" PoolElementNames command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ComponentDataSources(self):
""" ComponentDataSources command
:brief: Provide the component datasources
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In ComponentDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.componentDataSources()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ComponentDataSources_allowed(self):
""" ComponentDataSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SelectedDataSources(self):
""" SelectedDataSources command
:brief: Provide the selected datasources
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In SelectedDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.selectedDataSources()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_SelectedDataSources_allowed(self):
""" SelectedDataSources command State Machine
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def AdministratorDataNames(self):
""" AdministratorDataNames command
:brief: Provide Administrator Data Names
:returns: DevVarStringArray data record names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AdministratorDataNames()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.administratorDataNames()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_AdministratorDataNames_allowed(self):
""" AdministratorDataNames command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def MandatoryComponents(self):
""" MandatoryComponents command
:brief: Set the mandatory components
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In MandatoryComponents()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.mandatoryComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_MandatoryComponents_allowed(self):
""" MandatoryComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SelectedComponents(self):
""" SelectedComponents command
:brief: Provide the selected components
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In SelectedComponents()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.selectedComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_SelectedComponents_allowed(self):
""" SelectedComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def PreselectedComponents(self):
""" PreselectedComponents command
:brief: Provide the preselected components
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In PreselectedComponents()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.preselectedComponents()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_PreselectedComponents_allowed(self):
""" PreselectedComponents command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def PreselectedDataSources(self):
""" PreselectedDataSources command
:brief: Provide the preselected components
:returns: DevVarStringArray component names
:rtype: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In PreselectedDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.preselectedDataSources()
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_PreselectedDataSources_allowed(self):
""" PreselectedDataSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def CreateDynamicComponent(self, argin):
""" CreateDynamicComponent command
:brief: Create dynamic component
:param argin: DevVarStringArray list of datasource parameters
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarString name of created dynamic component
:rtype: :obj:`str`
"""
self.debug_stream("In CreateDynamicComponent()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.createDynamicComponent(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_CreateDynamicComponent_allowed(self):
""" CreateDynamicComponent command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def RemoveDynamicComponent(self, argin):
""" RemoveDynamicComponent command
:brief: Delete the given dynamic component
:param argin: DevString dynamic component name
:type argin: :obj:`str`
"""
self.debug_stream("In RemoveDynamicComponent()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.removeDynamicComponent(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_RemoveDynamicComponent_allowed(self):
""" RemoveDynamicComponent command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def DeleteProfile(self, argin):
""" DeleteProfile command
:brief: Delete the given mntgrp
:param argin: DevString measurement group name
:type argin: :obj:`str`
"""
self.debug_stream("In DeleteProfile()")
try:
self.set_state(tango.DevState.RUNNING)
self.__stg.deleteProfile(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
[docs] def is_DeleteProfile_allowed(self):
""" DeleteProfile command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ChannelProperties(self, argin):
""" ChannelProperties command
:brief: Provide JSON dictionary with channel properties
{channel:property}
:param argin: DevString property type
:type argin: :obj:`str`
:returns: DevString JSON dictionary with channel properties
:rtype: :obj:`str`
"""
self.debug_stream("In ChannelProperties()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.channelProperties(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ChannelProperties_allowed(self):
""" ChannelProperties command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def SetChannelProperties(self, argin):
""" SetChannelProperties command SetChannelProperties
:brief: Set Channel Properties of the given type
:param argin: DevVarStringArray two element list with a property type
and JSON value dictionary {channel:property}
:type argin: [:obj:`str`, :obj:`str`]
"""
self.debug_stream("In SetChannelProperties()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.setChannelProperties(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_SetChannelProperties_allowed(self):
""" SetChannelProperties command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ComponentClientSources(self, argin):
""" ComponentClientSources command
:brief: Describe client datasources from components
:param argin: DevVarStringArray list of component names
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarString description of component datasources
:rtype: :obj:`str`
"""
self.debug_stream("In ComponentClientSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.componentClientSources(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ComponentClientSources_allowed(self):
""" ComponentClientSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def ComponentSources(self, argin):
""" ComponentSources command
:brief: Describe datasources from components
:param argin: DevVarStringArray list of component names
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarString description of component datasources
:rtype: :obj:`str`
"""
self.debug_stream("In ComponentSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.componentSources(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_ComponentSources_allowed(self):
""" ComponentSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def DataSourceDescription(self, argin):
""" DataSourceDescription command
:brief: Provide datasource description
:param argin: DevVarStringArray list of datasource names
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarStringArray description of datasources
:rtypes: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In DataSourceDescription()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.dataSourceDescription(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def AddStepDataSources(self, argin):
""" AddStepDataSources command
:brief: switch datasources to step mode
:param argin: DevVarStringArray list of datasource names
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarStringArray list of datasources not found in components
:rtypes: :obj:`list` <:obj:`str`>
"""
self.debug_stream("In AddStepDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.addStepDataSources(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_DataSourceDescription_allowed(self):
""" DataSourceDescription command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def CreateWriterConfiguration(self, argin):
""" CreateWriterConfiguration command
:brief: Create configuration from the given components
:param argin: DevVarStringArray list of component names
:type argin: :obj:`list` <:obj:`str`>
:returns: DevVarString XML configuration string
:rtype: :obj:`str`
"""
self.debug_stream("In CreateWriterConfiguration()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.createWriterConfiguration(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_CreateWriterConfiguration_allowed(self):
""" CreateWriterConfiguration command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
if self.get_state() in [tango.DevState.RUNNING]:
return False
return True
[docs] def CreateDataSources(self, argin):
""" It creates new DataSources on the ConfigServer
:param argin: JSON dictionary with {``dsname``: ``tangosource``, ...}
:type argin: :obj:`str`
"""
self.debug_stream("In createDataSources()")
try:
self.set_state(tango.DevState.RUNNING)
argout = self.__stg.createDataSources(argin)
self.set_state(tango.DevState.ON)
finally:
if self.get_state() == tango.DevState.RUNNING:
self.set_state(tango.DevState.ON)
return argout
[docs] def is_CreateDataSources_allowed(self):
""" CreateDataSources command State Machine
:returns: True if the operation allowed
:rtype: :obj:`bool`
"""
state_ok = not (self.get_state() in [tango.DevState.RUNNING])
return state_ok
# ==================================================================
#
# NXSRecSelectorClass class definition
#
# ==================================================================
[docs]class NXSRecSelectorClass(tango.DeviceClass):
#: (:obj:`dict` <:obj:`str`, \
#: [ :obj:`str`, :class:`tango.CmdArgType`, \
#: [ :obj:`list` <:obj:`int`> ] ] > ) Class Properties
class_property_list = {
}
#: (:obj:`dict` <:obj:`str`, \
#: [ :obj:`str`, :class:`tango.CmdArgType`, \
#: [ :obj:`list` <:obj:`int`> ] ] > ) Device Properties
device_property_list = {
'NumberOfThreads':
[tango.DevLong,
"maximal number of threads",
[20]],
'DefaultNeXusPath':
[tango.DevString,
"default NeXus path",
["/$var.entryname#'scan'$var.serialno:NXentry/"
"NXinstrument/collection"]],
'DefaultUserDataPath':
[tango.DevString,
"default NeXus User Data path",
["/$var.entryname#'scan'$var.serialno:NXentry/"
"user_data:NXparameters"]],
'DefaultNeXusType':
[tango.DevString,
"default dynamic component NeXus data type",
["NX_CHAR"]],
'DefaultTimeZone':
[tango.DevString,
"default Time Zone",
["Europe/Berlin"]],
'DefaultMntGrp':
[tango.DevString,
"default measurement group name",
["nxsmntgrp"]],
'PoolBlacklist':
[tango.DevVarStringArray,
"blacklist of pools",
[]],
'TimerFilters':
[tango.DevVarStringArray,
"list of timer device name filters",
[]],
'MutedChannelFilters':
[tango.DevVarStringArray,
"list of muted channel filters",
[]],
'MutedPreScanAttrFilters':
[tango.DevVarStringArray,
"list of muted attribute channel filters for PreScanSnapshot",
[]],
'AdminDataNames':
[tango.DevVarStringArray,
"list of administrator data names",
[]],
'DefaultPreselectedComponents':
[tango.DevVarStringArray,
"list of default preselected components",
[]],
'ClientRecordKeys':
[tango.DevVarStringArray,
"list of record keys for CLIENT datasources",
[]],
'SyncSnapshot':
[tango.DevBoolean,
"preselection merges the current ScanSnapshot",
[False]],
'WritePoolMotorPositions':
[tango.DevBoolean,
"add dynamic components for all pool motor positions",
[False]],
'MasterTimerFirst':
[tango.DevBoolean,
"the master timer channel of MG with the index: 0",
[True]],
'ResetInvalidDoor':
[tango.DevBoolean,
"reset Door when it is invalid",
[True]],
'MergeProfilesToMntGrps':
[tango.DevBoolean,
"merge profiles to available measurement groups",
[False]],
'MasterTimer':
[tango.DevBoolean,
"set the master timer/monitor channel for older MG",
[False]],
'DefaultCanFailDataSources':
[tango.DevVarStringArray,
"list of default datasources in the CanFail mode",
[]],
'TangoSourceErrorStates':
[tango.DevVarStringArray,
"list of tango error states for tango datasources",
["OFF", "INIT", "INSERT", "CLOSE", "UNKNOWN",
"FAULT"]],
'TangoSourceWarningStates':
[tango.DevVarStringArray,
"list of tango warning states for tango datasources",
["ALARM", "DISABLE"]],
}
#: (:obj:`dict` <:obj:`str`, \
#: [[ :class:`tango.CmdArgType`, :obj:`str`]] >)
#: Command definitions
cmd_list = {
'SetScanEnvVariables':
[[tango.DevString, "environment data"],
[tango.DevLong, "scanID"]],
'ScanEnvVariables':
[[tango.DevVoid, ""],
[tango.DevString, "environment data"]],
'LoadProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'SaveProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'FetchProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'StoreProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'DeleteProfile':
[[tango.DevString, "mntgrp name"],
[tango.DevVoid, ""]],
'ImportMntGrp':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'ImportEnvProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'ExportEnvProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'UpdateMntGrp':
[[tango.DevVoid, ""],
[tango.DevString, "configuration"]],
'UpdateProfile':
[[tango.DevVoid, ""],
[tango.DevString, "mntgrp configuration string"]],
'SwitchProfile':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'MntGrpConfiguration':
[[tango.DevVoid, ""],
[tango.DevString, "mntgrp configuration string"]],
'IsMntGrpUpdated':
[[tango.DevVoid, ""],
[tango.DevBoolean, "true if mntgrp changed"]],
'PreselectComponents':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'ResetPreselectedComponents':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'DeleteAllProfiles':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'UpdateConfigVariables':
[[tango.DevVoid, ""],
[tango.DevVoid, ""]],
'AvailableComponents':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of available component names"]],
'AvailableProfiles':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of available selection names"]],
'AvailableMntGrps':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of available mntgrp names"]],
'AvailableDataSources':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of available DataSource names"]],
'AvailableTimers':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of available timers"]],
'MutedChannels':
[[tango.DevVoid, ""],
[tango.DevVarStringArray,
"list of muted channels"]],
'ComponentDescription':
[[tango.DevVoid, ""],
[tango.DevString,
"JSON component description"]],
'VariableComponents':
[[tango.DevVoid, ""],
[tango.DevString,
"JSON Dictionary with all Components for "
+ " configuration variable"]],
'FullDeviceNames':
[[tango.DevVoid, ""],
[tango.DevString,
"JSON Dictionary with full device names for "
+ " all aliases "]],
'ComponentDataSources':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "profile component datasources"]],
'SelectedDataSources':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "profile datasources"]],
'AdministratorDataNames':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "administrator data names"]],
'PoolElementNames':
[[tango.DevString, "pool list attribute name"],
[tango.DevVarStringArray, "list of available pool elements"]],
'MandatoryComponents':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "component names"]],
'SelectedComponents':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "component names"]],
'PreselectedComponents':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "component names"]],
'PreselectedDataSources':
[[tango.DevVoid, ""],
[tango.DevVarStringArray, "datasources names"]],
'CreateDynamicComponent':
[[tango.DevVarStringArray,
"list of JSON strings with datasource parameters"],
[tango.DevString, "name of dynamic Component"]],
'DataSourceDescription':
[[tango.DevVarStringArray, "list of required datasources"],
[tango.DevVarStringArray,
"list of JSON with description of CLIENT Datasources"]],
'AddStepDataSources':
[[tango.DevVarStringArray, "list of required datasources"],
[tango.DevVarStringArray,
"list of datasources not found in components"]],
'ComponentClientSources':
[[tango.DevVarStringArray, "list of component "
"client datasources"],
[tango.DevString,
"JSON with description of component CLIENT Datasources"]],
'ComponentSources':
[[tango.DevVarStringArray, "list of components"],
[tango.DevString,
"JSON with description of component Datasources"]],
'ChannelProperties':
[[tango.DevString, "property type"],
[tango.DevString,
"JSON dictionary with channel properties {channel:property}"]],
'SetChannelProperties':
[[tango.DevVarStringArray,
"a two element list with a property type and "
"JSON value dictionary {channel:property}"],
[tango.DevVoid, ""]],
'CreateWriterConfiguration':
[[tango.DevVarStringArray, "list of required components"],
[tango.DevString,
"XML Settinges"]],
'RemoveDynamicComponent':
[[tango.DevString, "name of dynamic Component"],
[tango.DevVoid, ""]],
'CreateDataSources':
[[tango.DevString,
"JSON dictionary with {``dsname``: ``tangosource``, ...}"],
[tango.DevVoid, ""]],
}
#: (:obj:`dict` <:obj:`str`, \
# [[ :class:`tango.CmdArgType`, \
# :class:`tango.AttrDataFormat`, \
# :class:`tango.AttrWriteType` ], \
# :obj:`dict` <:obj:`str` , any> ] > ) Attribute definitions
attr_list = {
'Components':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 10000],
{
'label': "Selected Components",
'description': "list of Selected Components",
}],
'StepDataSources':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "list of datasources to be switch into step mode",
'description': "list of datasources to be switched" +
" into step mode",
'Display level': tango.DispLevel.EXPERT,
}],
'CanFailDataSources':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "list of datasources to be switch into canfail mode",
'description': "list of datasources to be switched" +
" into canfail mode",
'Display level': tango.DispLevel.EXPERT,
}],
'LinkDataSources':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "list of datasources to which a link will be added",
'description': "list of datasources to which "
"a link will be added",
'Display level': tango.DispLevel.EXPERT,
}],
'DescriptionErrors':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 10000],
{
'label': "Descriptive Component Errors",
'description': "list of Descriptive Component Errors",
}],
'Version':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'label': "Version",
'description': "server version",
}],
'MacroServer':
[[tango.DevString,
tango.SCALAR,
tango.READ],
{
'label': "MacroServer",
'description': "Macro Server device name",
}],
'MntGrp':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': " Measurement Group",
'description': " Measurement Group name",
}],
'ScanDir':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Scan Directory",
'description': "Scan Directory",
}],
'ScanID':
[[tango.DevLong,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Scan ID",
'description': "Scan ID",
}],
'ScanFile':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Scan File(s)",
'description': "Scan File(s)",
}],
'ConfigDevice':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Configuration Device",
'description': "Configuration device name",
'Memorized': "true",
}],
'Door':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Door",
'description': "Door device name",
'Memorized': "true",
}],
'WriterDevice':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Writer Device",
'description': "Writer device device name",
}],
'UserData':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Client Data",
'description': "JSON dictionary with User Data",
'Display level': tango.DispLevel.EXPERT,
}],
'DeviceGroups':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Device groups",
'description': "JSON dictionary with device groups",
'Memorized': "true",
'Display level': tango.DispLevel.EXPERT,
}],
'DataSources':
[[tango.DevString,
tango.SPECTRUM,
tango.READ, 10000],
{
'label': "Selected Datasources",
'description': "list of Selected Datasources",
}],
'ProfileConfiguration':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Profile Configuration",
'description': "JSON dict of server configuration",
'Display level': tango.DispLevel.EXPERT,
}],
'AppendEntry':
[[tango.DevBoolean,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Append Entry",
'description': "flag for entry appending",
}],
'ConfigVariables':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Configuration Variables",
'description': "JSON dictionary with configuration variables"
+ "for templated components",
'Display level': tango.DispLevel.EXPERT,
}],
'ProfileFile':
[[tango.DevString,
tango.SCALAR,
tango.READ_WRITE],
{
'label': "Profile File with its Path",
'description': "config file with its full path",
'Memorized': "true",
}],
}
def __init__(self, name):
""" NXSRecSelectorclass Constructor
"""
tango.DeviceClass.__init__(self, name)
self.set_type(name)
print("In NXSRecSelectorClass constructor")
# ==================================================================
#
# NXSRecSelector class main method
#
# ==================================================================
if __name__ == '__main__':
pass