Source code for nxsrecconfig.NXSConfig

#    "$Name:  $";
#    "$Header:  $";
# =============================================================================
#
# file :        NXSRecSelector.py
#
# description : Python source for the NXSRecSelector and its commands.
#                The class is derived from Device. It represents the
#                CORBA servant object which will be accessed from the
#                network. All commands which can be executed on the
#                NXSRecSelector are implemented in this file.
#
# project :     TANGO Device Server
#
# $Author:  $
#
# $Revision:  $
#
# $Log:  $
#
# copyleft :    European Synchrotron Radiation Facility
#               BP 220, Grenoble 38043
#               FRANCE
#
# =============================================================================
#          This file is generated by POGO
#    (Program Obviously used to Generate tango Object)
#
#         (c) - Software Engineering Group - ESRF
# =============================================================================
#

""" Selector Server for NeXus Sardana Recorder """

# ==================================================================
# ==================================================================

try:
    import tango
except Exception:
    import PyTango as tango

from .Settings import Settings as STG
from .Utils import Utils


[docs]class NXSRecSelector(tango.LatestDeviceImpl): """ NXSRecSelector server interface :brief: Tango Server for Nexus Sardana Recorder Settings. Device States Description: DevState.ON - Server is ON, DevState.RUNNING - Performing a query """ def __init__(self, cl, name): """ Device constructor :param cl: class name :type cl: :obj:`str` :param name: device name :type name: :obj:`str` """ tango.LatestDeviceImpl.__init__(self, cl, name) self.debug_stream("In __init__()") #: (:class:`nxsrecconfig.Settings.Settings`) \ #: Recorder Settings self.__stg = None #: (:class:`tango.DeviceProxy`) self device proxy self.__dp = None #: (:obj:`list` <:obj:`str`>) \ #: memorize attribute to be updated on conf change self.__toupdate = ['ConfigDevice', 'Door'] NXSRecSelector.init_device(self)
[docs] def delete_device(self): """ Device destructor """ self.debug_stream("In delete_device()") if hasattr(self, 'stg') and self.__stg: del self.__stg self.__stg = None self.set_state(tango.DevState.OFF)
[docs] def init_device(self): """ Device initialization """ self.debug_stream("In init_device()") if hasattr(self, 'stg') and self.__stg: del self.__stg self.__stg = None self.get_device_properties(self.get_device_class()) numberofthreads = self.NumberOfThreads or None defaultpath = self.DefaultNeXusPath or None defaultudatapath = self.DefaultUserDataPath or None defaultzone = self.DefaultTimeZone or None defaultmntgrp = self.DefaultMntGrp or None defaultnexustype = self.DefaultNeXusType or None syncsnapshot = bool(self.SyncSnapshot) writepoolmotorpositions = bool(self.WritePoolMotorPositions) self.__stg = STG(self, numberofthreads, defaultpath, defaultzone, defaultmntgrp, syncsnapshot, writepoolmotorpositions, defaultnexustype, defaultudatapath) self.set_state(tango.DevState.ON) self.__stg.poolBlacklist = self.PoolBlacklist or [] self.__stg.timerFilters = self.TimerFilters or [ "*dgg*", "*/timer/*", "*/ctctrl0*"] self.__stg.masterTimerFirst = bool(self.MasterTimerFirst) self.__stg.mergeProfilesToMntGrps = bool(self.MergeProfilesToMntGrps) self.__stg.resetInvalidDoor = bool(self.ResetInvalidDoor) self.__stg.masterTimer = bool(self.MasterTimer) self.__stg.mutedChannelFilters = self.MutedChannelFilters \ or ["*tip551*"] self.__stg.mutedPreScanAttrFilters = self.MutedPreScanAttrFilters or [] self.__stg.adminDataNames = self.AdminDataNames or [] self.__stg.defaultPreselectedComponents = \ self.DefaultPreselectedComponents or [] self.__stg.defaultCanFailDataSources = \ self.DefaultCanFailDataSources or [] self.__stg.tangoSourceErrorStates = \ self.TangoSourceErrorStates or [] self.__stg.tangoSourceWarningStates = \ self.TangoSourceWarningStates or [] # print(self.TangoSourceErrorStates) # print(self.TangoSourceWarningStates) self.__stg.clientRecordKeys = \ self.ClientRecordKeys or []
[docs] def always_executed_hook(self): """ Always excuted hook method """ self.debug_stream("In always_excuted_hook()")
# ================================================================== # # NXSRecSelector read/write attribute methods # # ==================================================================
[docs] def read_attr_hardware(self, _): """ Read Attribute Hardware """ self.debug_stream("In read_attr_hardware()")
[docs] def read_Components(self, attr): """ Read Components attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_Components()") attr.set_value(self.__stg.components)
[docs] def read_DescriptionErrors(self, attr): """ Read DescriptionErrors attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_DescriptionErrors()") attr.set_value(self.__stg.descriptionErrors)
[docs] def read_Version(self, attr): """ Read Version attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_Version()") attr.set_value(self.__stg.version)
[docs] def read_MacroServer(self, attr): """ Read MacroServer attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_MacroServer()") attr.set_value(self.__stg.macroServer)
[docs] def read_Door(self, attr): """ Read Door attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_Door()") attr.set_value(self.__stg.door)
[docs] def write_Door(self, attr): """ Write Door attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_Door()") self.__stg.door = attr.get_write_value()
[docs] def read_StepDataSources(self, attr): """ Read StepDataSources attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_StepDataSources()") attr.set_value(self.__stg.stepdatasources or "")
[docs] def write_StepDataSources(self, attr): """ Write StepDataSources attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_StepDataSources()") if self.is_StepDataSources_write_allowed(): self.__stg.stepdatasources = attr.get_write_value() or "" else: self.warn_stream("To change the settings please close the server.") raise Exception( "To change the settings please close the server.")
[docs] def is_StepDataSources_write_allowed(self): """ StepDataSources attribute Write State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def read_CanFailDataSources(self, attr): """ Read CanFailDataSources attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_CanFailDataSources()") attr.set_value(self.__stg.canfaildatasources or "")
[docs] def write_CanFailDataSources(self, attr): """ Write CanFailDataSources attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_CanFailDataSources()") if self.is_CanFailDataSources_write_allowed(): self.__stg.canfaildatasources = attr.get_write_value() or "" else: self.warn_stream("To change the settings please close the server.") raise Exception( "To change the settings please close the server.")
[docs] def is_CanFailDataSources_write_allowed(self): """ CanFailDataSources attribute Write State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def read_LinkDataSources(self, attr): """ Read LinkDataSources attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_LinkDataSources()") attr.set_value(self.__stg.linkdatasources or "")
[docs] def write_LinkDataSources(self, attr): """ Write LinkDataSources attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_LinkDataSources()") if self.is_LinkDataSources_write_allowed(): self.__stg.linkdatasources = attr.get_write_value() or "" else: self.warn_stream("To change the settings please close the server.") raise Exception( "To change the settings please close the server.")
[docs] def is_LinkDataSources_write_allowed(self): """ LinkDataSources attribute Write State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def read_ConfigDevice(self, attr): """ Read ConfigDevice attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ConfigDevice()") attr.set_value(self.__stg.configDevice)
[docs] def write_ConfigDevice(self, attr): """ Write ConfigDevice attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ConfigDevice()") self.__stg.configDevice = attr.get_write_value()
[docs] def read_MntGrp(self, attr): """ Read MntGrp attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_MntGrp()") attr.set_value(self.__stg.mntGrp)
[docs] def write_MntGrp(self, attr): """ Write MntGrp attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_MntGrp()") self.__stg.mntGrp = attr.get_write_value()
[docs] def read_ScanDir(self, attr): """ Read ScanDir attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ScanDir()") attr.set_value(self.__stg.scanDir)
[docs] def write_ScanDir(self, attr): """ Write ScanDir attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ScanDir()") self.__stg.scanDir = attr.get_write_value()
[docs] def read_ScanFile(self, attr): """ Read ScanFile attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ScanFile()") attr.set_value(self.__stg.scanFile or "")
[docs] def write_ScanFile(self, attr): """ Write ScanFile attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ScanFile()") self.__stg.scanFile = attr.get_write_value() or ""
[docs] def read_ScanID(self, attr): """ Read ScanID attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ScanID()") attr.set_value(self.__stg.scanID)
[docs] def write_ScanID(self, attr): """ Write ScanID attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ScanID()") self.__stg.scanID = attr.get_write_value()
[docs] def read_WriterDevice(self, attr): """ Read WriterDevice attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_WriterDevice()") attr.set_value(self.__stg.writerDevice)
[docs] def write_WriterDevice(self, attr): """ Write WriterDevice attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_WriterDevice()") self.__stg.writerDevice = attr.get_write_value()
[docs] def read_DeviceGroups(self, attr): """ Read DeviceGroups attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_DeviceGroups()") attr.set_value(self.__stg.deviceGroups)
[docs] def write_DeviceGroups(self, attr): """ Write DeviceGroups attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_DeviceGroups()") self.__stg.deviceGroups = attr.get_write_value()
[docs] def read_UserData(self, attr): """ Read UserData attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_UserData()") attr.set_value(self.__stg.userData)
[docs] def write_UserData(self, attr): """ Write UserData attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_UserData()") self.__stg.userData = attr.get_write_value()
[docs] def read_DataSources(self, attr): """ Read DataSources attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_DataSources()") attr.set_value(self.__stg.dataSources)
[docs] def read_ProfileConfiguration(self, attr): """ Read ProfileConfiguration attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_DataSources()") attr.set_value(self.__stg.profileConfiguration)
[docs] def write_ProfileConfiguration(self, attr): """ Write ProfileConfiguration attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ProfileConfiguration()") self.__stg.profileConfiguration = attr.get_write_value() try: self.__dp = self.__dp or tango.DeviceProxy( Utils.tostr(self.get_name())) for var in self.__toupdate: if var in self.__stg.names(): if hasattr(self.__dp, var): self.__dp.write_attribute( Utils.tostr(var), self.__stg.value(var)) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def read_AppendEntry(self, attr): """ Read AppendEntry attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_AppendEntry()") attr.set_value(self.__stg.appendEntry)
[docs] def write_AppendEntry(self, attr): """ Write AppendEntry attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_AppendEntry()") self.__stg.appendEntry = attr.get_write_value()
[docs] def read_ConfigVariables(self, attr): """ Read ConfigVariables attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ConfigVariables()") attr.set_value(self.__stg.configVariables)
[docs] def write_ConfigVariables(self, attr): """ Write ConfigVariables attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ConfigVariables()") self.__stg.configVariables = attr.get_write_value()
[docs] def read_ProfileFile(self, attr): """ Read ProfileFile attribute :param attr: read attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In read_ProfileFile()") attr.set_value(self.__stg.profileFile)
[docs] def write_ProfileFile(self, attr): """ Write ProfileFile attribute :param attr: written attribute :type attr: :class:`tango.Attribute` """ self.debug_stream("In write_ProfileFile()") self.__stg.profileFile = attr.get_write_value()
# ================================================================== # # NXSRecSelector command methods # # ==================================================================
[docs] def LoadProfile(self): """ LoadProfile command :brief: Load server configuration """ self.debug_stream("In LoadProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.loadProfile() # updating memorized attributes self.__dp = self.__dp or tango.DeviceProxy( Utils.tostr(self.get_name())) for var in self.__toupdate: if var in self.__stg.names(): if hasattr(self.__dp, var): self.__dp.write_attribute( Utils.tostr(var), self.__stg.value(var)) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_LoadProfile_allowed(self): """ LoadProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def FetchProfile(self): """ FetchProfile command :brief: Fetch server configuration """ self.debug_stream("In FetchProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.fetchProfile() # updating memorized attributes self.__dp = self.__dp or tango.DeviceProxy( Utils.tostr(self.get_name())) for var in self.__toupdate: if var in self.__stg.names(): if hasattr(self.__dp, var): self.__dp.write_attribute( Utils.tostr(var), self.__stg.value(var)) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_FetchProfile_allowed(self): """ FetchProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SaveProfile(self): """ SaveProfile command :brief: Save server configuration """ self.debug_stream("In SaveProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.saveProfile() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_SaveProfile_allowed(self): """ SaveProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def StoreProfile(self): """ StoreProfile command command :brief: Store server configuration """ self.debug_stream("In StoreProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.storeProfile() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_StoreProfile_allowed(self): """ StoreProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def PreselectComponents(self): """ PreselectComponents command :brief: Check existing devices of pools """ self.debug_stream("In PreselectComponents()") try: self.set_state(tango.DevState.RUNNING) self.__stg.preselectComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_PreselectComponents_allowed(self): """ PreselectComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ResetPreselectedComponents(self): """ ResetPreselectedComponents command :brief: reset PreselectedComponents to DefaultPreselectedComponents """ self.debug_stream("In ResetPreselectedComponents()") try: self.set_state(tango.DevState.RUNNING) self.__stg.resetPreselectedComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_ResetPreselectedComponents_allowed(self): """ ResetPreselectedComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def DeleteAllProfiles(self): """ DeleteAllProfiles command :brief: Delete all profiles from Configuration Server """ self.debug_stream("In DeleteAllProfiles()") try: self.set_state(tango.DevState.RUNNING) self.__stg.deleteAllProfiles() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_DeleteAllProfiles_allowed(self): """ DeleteAllProfiles command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def UpdateConfigVariables(self): """ UpdateConfigVariables command :brief: Send ConfigVariables into ConfigServer and update serialno if appendEntry selected """ self.debug_stream("In UpdateConfigVariables()") try: self.set_state(tango.DevState.RUNNING) self.__stg.updateConfigVariables() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_UpdateConfigVariables_allowed(self): """ UpdateConfigVariables command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def IsMntGrpUpdated(self): """ IsMntGrpUpdated command :returns: True if mntgrp was changed :rtype: :obj:`bool` """ self.debug_stream("In IsMntGrpUpdated()") try: self.set_state(tango.DevState.RUNNING) conf = bool(self.__stg.isMntGrpUpdated()) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return conf
[docs] def is_IsMntGrpUpdated_allowed(self): """ IsMntGrpUpdated command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def MntGrpConfiguration(self): """ MntGrpConfiguration command :brief: returns mntgrp configuration :returns: Provide mntgrp configuration in json string :rtype: :obj:`str` """ self.debug_stream("In MntGrpConfiguration()") try: self.set_state(tango.DevState.RUNNING) conf = Utils.tostr(self.__stg.mntGrpConfiguration()) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return conf
[docs] def is_MntGrpConfiguration_allowed(self): """ MntGrpConfiguration command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def UpdateProfile(self): """ UpdateProfile command :brief: Update mntgrp configuration :returns: JSON string with mntgrp configuration info :rtype: :obj:`str` """ self.debug_stream("In UpdateProfile()") try: self.set_state(tango.DevState.RUNNING) conf = Utils.tostr(self.__stg.updateProfile()) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return conf
[docs] def is_UpdateProfile_allowed(self): """ UpdateProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def UpdateMntGrp(self): """ UpdateMntGrp command :brief: Update mntgrp configuration :returns: mntgrp configuration string :rtype: :obj:`str` """ self.debug_stream("In UpdateMntGrp()") try: self.set_state(tango.DevState.RUNNING) conf = Utils.tostr(self.__stg.updateMntGrp()) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return conf
[docs] def is_UpdateMntGrp_allowed(self): """ UpdateMntGrp command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SwitchProfile(self): """ SwitchProfile command :brief: Switch mntgrp configuration """ self.debug_stream("In SwitchProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.switchProfile() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_SwitchProfile_allowed(self): """ SwitchProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ImportMntGrp(self): """ ImportMntGrp command :brief: Import active mntgrp configuration """ self.debug_stream("In ImportMntGrp()") try: self.set_state(tango.DevState.RUNNING) self.__stg.importMntGrp() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_ImportMntGrp_allowed(self): """ ImportMntGrp command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ImportEnvProfile(self): """ ImportEnvProfile command :brief: Import all environment variables """ self.debug_stream("In ImportEnvProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.importEnvProfile() # updating memorized attributes self.__dp = self.__dp or tango.DeviceProxy( Utils.tostr(self.get_name())) for var in self.__toupdate: if var in self.__stg.names(): if hasattr(self.__dp, var): self.__dp.write_attribute( Utils.tostr(var), self.__stg.value(var)) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_ImportEnv_allowed(self): """ ImportMntGrp command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ExportEnvProfile(self): """ ExportEnvProfile command :brief: Export all environment variables """ self.debug_stream("In ExportEnvProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.exportEnvProfile() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_ExportEnv_allowed(self): """ ExportMntGrp command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AvailableTimers(self): """ AvailableTimers command :brief: Returns a list of available component names :returns: DevVarStringArray list of available component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AvailableTimers()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.availableTimers() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AvailableTimers_allowed(self): """ AvailableTimers command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def MutedChannels(self): """ MutedChannels command :brief: Return a list of muted channel names :returns: DevVarStringArray list of muted channel names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In MutedChannels()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.mutedChannels() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_MutedChannels_allowed(self): """ MutedChannels command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AvailableComponents(self): """ AvailableComponents command :brief: Return a list of available component names :returns: DevVarStringArray list of available component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AvailableComponents()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.availableComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AvailableComponents_allowed(self): """ AvailableComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ComponentDescription(self): """ ComponentDescription command :brief: Return a list of available component names :returns: DevString list of available component names :rtype: :obj:`str` """ self.debug_stream("In ComponentDescription()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.componentDescription() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ComponentDescription_allowed(self): """ ComponentDescription command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SetScanEnvVariables(self, argin): """ SetScanEnvVariables command :brief: Store ScanDir, ScanFile and NeXusSelectorDevice in environment variables :param argin: json dictionary with environment data :type argin: :obj:`str` :returns: DevLong scan ID :rtype: :obj:`int` """ self.debug_stream("In SetScanEnvVariables()") try: self.set_state(tango.DevState.RUNNING) argout = int(self.__stg.setScanEnvVariables(argin)) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_SetScanEnvVariables_allowed(self): """ SetScanEnvVariables command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def FullDeviceNames(self): """ FullDeviceNames command :brief: Return a JSON with full device names for all aliases :returns: DevString JSON dictionary with full device names :rtype: :obj:`str` """ self.debug_stream("In FullDeviceNames()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.fullDeviceNames() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_FullDeviceNames_allowed(self): """ FullDeviceNames command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ScanEnvVariables(self): """ ScanEnvVariables command :brief: Fetch ScanDir, ScanFile, ScanID and NeXusSelectorDevice in environment variables :returns: DevString json dictionary with environment data :rtype: :obj:`str` """ self.debug_stream("In ScanEnvVariables()") try: self.set_state(tango.DevState.RUNNING) argout = Utils.tostr(self.__stg.scanEnvVariables()) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ScanEnvVariables_allowed(self): """ ScanEnvVariables command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def VariableComponents(self): """ VariableComponents command :brief: Returns a list of available component names :returns: DevString list of available component names :rtype: :obj:`str` """ self.debug_stream("In VariableComponents()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.variableComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_VariableComponents_allowed(self): """ VariableComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AvailableProfiles(self): """ AvailableProfiles command :brief: Return a list of available selection names :returns: DevVarStringArray list of available selection names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AvailableProfiles()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.availableProfiles() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AvailableProfiles_allowed(self): """ AvailableProfiles command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AvailableMntGrps(self): """ AvailableMntGrps command :brief: Return a list of available mntgrp names :returns: DevVarStringArray list of available mntgrp names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AvailableMntGrps()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.availableMntGrps() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AvailableMntGrps_allowed(self): """ AvailableMntGrps command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AvailableDataSources(self): """ AvailableDataSources command :brief: Return a list of available DataSource names :returns: DevVarStringArray list of available DataSource names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AvailableDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.availableDataSources() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AvailableDataSources_allowed(self): """ AvailableDataSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def PoolElementNames(self, argin): """ PoolElementNames command :brief: Return a list of available pool channels :param argin: DevString name of pool list attribute :type argin: :obj:`str` :returns: DevVarStringArray list of available pool elements :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In PoolElementNames()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.poolElementNames(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_PoolElementNames_allowed(self): """ PoolElementNames command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ComponentDataSources(self): """ ComponentDataSources command :brief: Provide the component datasources :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In ComponentDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.componentDataSources() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ComponentDataSources_allowed(self): """ ComponentDataSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SelectedDataSources(self): """ SelectedDataSources command :brief: Provide the selected datasources :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In SelectedDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.selectedDataSources() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_SelectedDataSources_allowed(self): """ SelectedDataSources command State Machine """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def AdministratorDataNames(self): """ AdministratorDataNames command :brief: Provide Administrator Data Names :returns: DevVarStringArray data record names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In AdministratorDataNames()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.administratorDataNames() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_AdministratorDataNames_allowed(self): """ AdministratorDataNames command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def MandatoryComponents(self): """ MandatoryComponents command :brief: Set the mandatory components :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In MandatoryComponents()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.mandatoryComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_MandatoryComponents_allowed(self): """ MandatoryComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SelectedComponents(self): """ SelectedComponents command :brief: Provide the selected components :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In SelectedComponents()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.selectedComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_SelectedComponents_allowed(self): """ SelectedComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def PreselectedComponents(self): """ PreselectedComponents command :brief: Provide the preselected components :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In PreselectedComponents()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.preselectedComponents() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_PreselectedComponents_allowed(self): """ PreselectedComponents command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def PreselectedDataSources(self): """ PreselectedDataSources command :brief: Provide the preselected components :returns: DevVarStringArray component names :rtype: :obj:`list` <:obj:`str`> """ self.debug_stream("In PreselectedDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.preselectedDataSources() self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_PreselectedDataSources_allowed(self): """ PreselectedDataSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def CreateDynamicComponent(self, argin): """ CreateDynamicComponent command :brief: Create dynamic component :param argin: DevVarStringArray list of datasource parameters :type argin: :obj:`list` <:obj:`str`> :returns: DevVarString name of created dynamic component :rtype: :obj:`str` """ self.debug_stream("In CreateDynamicComponent()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.createDynamicComponent(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_CreateDynamicComponent_allowed(self): """ CreateDynamicComponent command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def RemoveDynamicComponent(self, argin): """ RemoveDynamicComponent command :brief: Delete the given dynamic component :param argin: DevString dynamic component name :type argin: :obj:`str` """ self.debug_stream("In RemoveDynamicComponent()") try: self.set_state(tango.DevState.RUNNING) self.__stg.removeDynamicComponent(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_RemoveDynamicComponent_allowed(self): """ RemoveDynamicComponent command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def DeleteProfile(self, argin): """ DeleteProfile command :brief: Delete the given mntgrp :param argin: DevString measurement group name :type argin: :obj:`str` """ self.debug_stream("In DeleteProfile()") try: self.set_state(tango.DevState.RUNNING) self.__stg.deleteProfile(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON)
[docs] def is_DeleteProfile_allowed(self): """ DeleteProfile command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ChannelProperties(self, argin): """ ChannelProperties command :brief: Provide JSON dictionary with channel properties {channel:property} :param argin: DevString property type :type argin: :obj:`str` :returns: DevString JSON dictionary with channel properties :rtype: :obj:`str` """ self.debug_stream("In ChannelProperties()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.channelProperties(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ChannelProperties_allowed(self): """ ChannelProperties command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def SetChannelProperties(self, argin): """ SetChannelProperties command SetChannelProperties :brief: Set Channel Properties of the given type :param argin: DevVarStringArray two element list with a property type and JSON value dictionary {channel:property} :type argin: [:obj:`str`, :obj:`str`] """ self.debug_stream("In SetChannelProperties()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.setChannelProperties(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_SetChannelProperties_allowed(self): """ SetChannelProperties command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ComponentClientSources(self, argin): """ ComponentClientSources command :brief: Describe client datasources from components :param argin: DevVarStringArray list of component names :type argin: :obj:`list` <:obj:`str`> :returns: DevVarString description of component datasources :rtype: :obj:`str` """ self.debug_stream("In ComponentClientSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.componentClientSources(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ComponentClientSources_allowed(self): """ ComponentClientSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def ComponentSources(self, argin): """ ComponentSources command :brief: Describe datasources from components :param argin: DevVarStringArray list of component names :type argin: :obj:`list` <:obj:`str`> :returns: DevVarString description of component datasources :rtype: :obj:`str` """ self.debug_stream("In ComponentSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.componentSources(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_ComponentSources_allowed(self): """ ComponentSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def DataSourceDescription(self, argin): """ DataSourceDescription command :brief: Provide datasource description :param argin: DevVarStringArray list of datasource names :type argin: :obj:`list` <:obj:`str`> :returns: DevVarStringArray description of datasources :rtypes: :obj:`list` <:obj:`str`> """ self.debug_stream("In DataSourceDescription()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.dataSourceDescription(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def AddStepDataSources(self, argin): """ AddStepDataSources command :brief: switch datasources to step mode :param argin: DevVarStringArray list of datasource names :type argin: :obj:`list` <:obj:`str`> :returns: DevVarStringArray list of datasources not found in components :rtypes: :obj:`list` <:obj:`str`> """ self.debug_stream("In AddStepDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.addStepDataSources(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_DataSourceDescription_allowed(self): """ DataSourceDescription command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def CreateWriterConfiguration(self, argin): """ CreateWriterConfiguration command :brief: Create configuration from the given components :param argin: DevVarStringArray list of component names :type argin: :obj:`list` <:obj:`str`> :returns: DevVarString XML configuration string :rtype: :obj:`str` """ self.debug_stream("In CreateWriterConfiguration()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.createWriterConfiguration(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_CreateWriterConfiguration_allowed(self): """ CreateWriterConfiguration command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ if self.get_state() in [tango.DevState.RUNNING]: return False return True
[docs] def CreateDataSources(self, argin): """ It creates new DataSources on the ConfigServer :param argin: JSON dictionary with {``dsname``: ``tangosource``, ...} :type argin: :obj:`str` """ self.debug_stream("In createDataSources()") try: self.set_state(tango.DevState.RUNNING) argout = self.__stg.createDataSources(argin) self.set_state(tango.DevState.ON) finally: if self.get_state() == tango.DevState.RUNNING: self.set_state(tango.DevState.ON) return argout
[docs] def is_CreateDataSources_allowed(self): """ CreateDataSources command State Machine :returns: True if the operation allowed :rtype: :obj:`bool` """ state_ok = not (self.get_state() in [tango.DevState.RUNNING]) return state_ok
# ================================================================== # # NXSRecSelectorClass class definition # # ==================================================================
[docs]class NXSRecSelectorClass(tango.DeviceClass): #: (:obj:`dict` <:obj:`str`, \ #: [ :obj:`str`, :class:`tango.CmdArgType`, \ #: [ :obj:`list` <:obj:`int`> ] ] > ) Class Properties class_property_list = { } #: (:obj:`dict` <:obj:`str`, \ #: [ :obj:`str`, :class:`tango.CmdArgType`, \ #: [ :obj:`list` <:obj:`int`> ] ] > ) Device Properties device_property_list = { 'NumberOfThreads': [tango.DevLong, "maximal number of threads", [20]], 'DefaultNeXusPath': [tango.DevString, "default NeXus path", ["/$var.entryname#'scan'$var.serialno:NXentry/" "NXinstrument/collection"]], 'DefaultUserDataPath': [tango.DevString, "default NeXus User Data path", ["/$var.entryname#'scan'$var.serialno:NXentry/" "user_data:NXparameters"]], 'DefaultNeXusType': [tango.DevString, "default dynamic component NeXus data type", ["NX_CHAR"]], 'DefaultTimeZone': [tango.DevString, "default Time Zone", ["Europe/Berlin"]], 'DefaultMntGrp': [tango.DevString, "default measurement group name", ["nxsmntgrp"]], 'PoolBlacklist': [tango.DevVarStringArray, "blacklist of pools", []], 'TimerFilters': [tango.DevVarStringArray, "list of timer device name filters", []], 'MutedChannelFilters': [tango.DevVarStringArray, "list of muted channel filters", []], 'MutedPreScanAttrFilters': [tango.DevVarStringArray, "list of muted attribute channel filters for PreScanSnapshot", []], 'AdminDataNames': [tango.DevVarStringArray, "list of administrator data names", []], 'DefaultPreselectedComponents': [tango.DevVarStringArray, "list of default preselected components", []], 'ClientRecordKeys': [tango.DevVarStringArray, "list of record keys for CLIENT datasources", []], 'SyncSnapshot': [tango.DevBoolean, "preselection merges the current ScanSnapshot", [False]], 'WritePoolMotorPositions': [tango.DevBoolean, "add dynamic components for all pool motor positions", [False]], 'MasterTimerFirst': [tango.DevBoolean, "the master timer channel of MG with the index: 0", [True]], 'ResetInvalidDoor': [tango.DevBoolean, "reset Door when it is invalid", [True]], 'MergeProfilesToMntGrps': [tango.DevBoolean, "merge profiles to available measurement groups", [False]], 'MasterTimer': [tango.DevBoolean, "set the master timer/monitor channel for older MG", [False]], 'DefaultCanFailDataSources': [tango.DevVarStringArray, "list of default datasources in the CanFail mode", []], 'TangoSourceErrorStates': [tango.DevVarStringArray, "list of tango error states for tango datasources", ["OFF", "INIT", "INSERT", "CLOSE", "UNKNOWN", "FAULT"]], 'TangoSourceWarningStates': [tango.DevVarStringArray, "list of tango warning states for tango datasources", ["ALARM", "DISABLE"]], } #: (:obj:`dict` <:obj:`str`, \ #: [[ :class:`tango.CmdArgType`, :obj:`str`]] >) #: Command definitions cmd_list = { 'SetScanEnvVariables': [[tango.DevString, "environment data"], [tango.DevLong, "scanID"]], 'ScanEnvVariables': [[tango.DevVoid, ""], [tango.DevString, "environment data"]], 'LoadProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'SaveProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'FetchProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'StoreProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'DeleteProfile': [[tango.DevString, "mntgrp name"], [tango.DevVoid, ""]], 'ImportMntGrp': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'ImportEnvProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'ExportEnvProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'UpdateMntGrp': [[tango.DevVoid, ""], [tango.DevString, "configuration"]], 'UpdateProfile': [[tango.DevVoid, ""], [tango.DevString, "mntgrp configuration string"]], 'SwitchProfile': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'MntGrpConfiguration': [[tango.DevVoid, ""], [tango.DevString, "mntgrp configuration string"]], 'IsMntGrpUpdated': [[tango.DevVoid, ""], [tango.DevBoolean, "true if mntgrp changed"]], 'PreselectComponents': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'ResetPreselectedComponents': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'DeleteAllProfiles': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'UpdateConfigVariables': [[tango.DevVoid, ""], [tango.DevVoid, ""]], 'AvailableComponents': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of available component names"]], 'AvailableProfiles': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of available selection names"]], 'AvailableMntGrps': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of available mntgrp names"]], 'AvailableDataSources': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of available DataSource names"]], 'AvailableTimers': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of available timers"]], 'MutedChannels': [[tango.DevVoid, ""], [tango.DevVarStringArray, "list of muted channels"]], 'ComponentDescription': [[tango.DevVoid, ""], [tango.DevString, "JSON component description"]], 'VariableComponents': [[tango.DevVoid, ""], [tango.DevString, "JSON Dictionary with all Components for " + " configuration variable"]], 'FullDeviceNames': [[tango.DevVoid, ""], [tango.DevString, "JSON Dictionary with full device names for " + " all aliases "]], 'ComponentDataSources': [[tango.DevVoid, ""], [tango.DevVarStringArray, "profile component datasources"]], 'SelectedDataSources': [[tango.DevVoid, ""], [tango.DevVarStringArray, "profile datasources"]], 'AdministratorDataNames': [[tango.DevVoid, ""], [tango.DevVarStringArray, "administrator data names"]], 'PoolElementNames': [[tango.DevString, "pool list attribute name"], [tango.DevVarStringArray, "list of available pool elements"]], 'MandatoryComponents': [[tango.DevVoid, ""], [tango.DevVarStringArray, "component names"]], 'SelectedComponents': [[tango.DevVoid, ""], [tango.DevVarStringArray, "component names"]], 'PreselectedComponents': [[tango.DevVoid, ""], [tango.DevVarStringArray, "component names"]], 'PreselectedDataSources': [[tango.DevVoid, ""], [tango.DevVarStringArray, "datasources names"]], 'CreateDynamicComponent': [[tango.DevVarStringArray, "list of JSON strings with datasource parameters"], [tango.DevString, "name of dynamic Component"]], 'DataSourceDescription': [[tango.DevVarStringArray, "list of required datasources"], [tango.DevVarStringArray, "list of JSON with description of CLIENT Datasources"]], 'AddStepDataSources': [[tango.DevVarStringArray, "list of required datasources"], [tango.DevVarStringArray, "list of datasources not found in components"]], 'ComponentClientSources': [[tango.DevVarStringArray, "list of component " "client datasources"], [tango.DevString, "JSON with description of component CLIENT Datasources"]], 'ComponentSources': [[tango.DevVarStringArray, "list of components"], [tango.DevString, "JSON with description of component Datasources"]], 'ChannelProperties': [[tango.DevString, "property type"], [tango.DevString, "JSON dictionary with channel properties {channel:property}"]], 'SetChannelProperties': [[tango.DevVarStringArray, "a two element list with a property type and " "JSON value dictionary {channel:property}"], [tango.DevVoid, ""]], 'CreateWriterConfiguration': [[tango.DevVarStringArray, "list of required components"], [tango.DevString, "XML Settinges"]], 'RemoveDynamicComponent': [[tango.DevString, "name of dynamic Component"], [tango.DevVoid, ""]], 'CreateDataSources': [[tango.DevString, "JSON dictionary with {``dsname``: ``tangosource``, ...}"], [tango.DevVoid, ""]], } #: (:obj:`dict` <:obj:`str`, \ # [[ :class:`tango.CmdArgType`, \ # :class:`tango.AttrDataFormat`, \ # :class:`tango.AttrWriteType` ], \ # :obj:`dict` <:obj:`str` , any> ] > ) Attribute definitions attr_list = { 'Components': [[tango.DevString, tango.SPECTRUM, tango.READ, 10000], { 'label': "Selected Components", 'description': "list of Selected Components", }], 'StepDataSources': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "list of datasources to be switch into step mode", 'description': "list of datasources to be switched" + " into step mode", 'Display level': tango.DispLevel.EXPERT, }], 'CanFailDataSources': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "list of datasources to be switch into canfail mode", 'description': "list of datasources to be switched" + " into canfail mode", 'Display level': tango.DispLevel.EXPERT, }], 'LinkDataSources': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "list of datasources to which a link will be added", 'description': "list of datasources to which " "a link will be added", 'Display level': tango.DispLevel.EXPERT, }], 'DescriptionErrors': [[tango.DevString, tango.SPECTRUM, tango.READ, 10000], { 'label': "Descriptive Component Errors", 'description': "list of Descriptive Component Errors", }], 'Version': [[tango.DevString, tango.SCALAR, tango.READ], { 'label': "Version", 'description': "server version", }], 'MacroServer': [[tango.DevString, tango.SCALAR, tango.READ], { 'label': "MacroServer", 'description': "Macro Server device name", }], 'MntGrp': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': " Measurement Group", 'description': " Measurement Group name", }], 'ScanDir': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Scan Directory", 'description': "Scan Directory", }], 'ScanID': [[tango.DevLong, tango.SCALAR, tango.READ_WRITE], { 'label': "Scan ID", 'description': "Scan ID", }], 'ScanFile': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Scan File(s)", 'description': "Scan File(s)", }], 'ConfigDevice': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Configuration Device", 'description': "Configuration device name", 'Memorized': "true", }], 'Door': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Door", 'description': "Door device name", 'Memorized': "true", }], 'WriterDevice': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Writer Device", 'description': "Writer device device name", }], 'UserData': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Client Data", 'description': "JSON dictionary with User Data", 'Display level': tango.DispLevel.EXPERT, }], 'DeviceGroups': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Device groups", 'description': "JSON dictionary with device groups", 'Memorized': "true", 'Display level': tango.DispLevel.EXPERT, }], 'DataSources': [[tango.DevString, tango.SPECTRUM, tango.READ, 10000], { 'label': "Selected Datasources", 'description': "list of Selected Datasources", }], 'ProfileConfiguration': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Profile Configuration", 'description': "JSON dict of server configuration", 'Display level': tango.DispLevel.EXPERT, }], 'AppendEntry': [[tango.DevBoolean, tango.SCALAR, tango.READ_WRITE], { 'label': "Append Entry", 'description': "flag for entry appending", }], 'ConfigVariables': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Configuration Variables", 'description': "JSON dictionary with configuration variables" + "for templated components", 'Display level': tango.DispLevel.EXPERT, }], 'ProfileFile': [[tango.DevString, tango.SCALAR, tango.READ_WRITE], { 'label': "Profile File with its Path", 'description': "config file with its full path", 'Memorized': "true", }], } def __init__(self, name): """ NXSRecSelectorclass Constructor """ tango.DeviceClass.__init__(self, name) self.set_type(name) print("In NXSRecSelectorClass constructor")
# ================================================================== # # NXSRecSelector class main method # # ================================================================== if __name__ == '__main__': pass