Source code for nxstools.nxsfileinfo

#   This file is part of nexdatas - Tango Server for NeXus data writer
#
#    Copyright (C) 2012-2018 DESY, Jan Kotanski <jkotan@mail.desy.de>
#
#    nexdatas is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    nexdatas is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with nexdatas.  If not, see <http://www.gnu.org/licenses/>.
#

""" Command-line tool for showing meta data from Nexus Files"""

import sys
import argparse
import json
import uuid
import os
import stat
import re
import time
import pytz
import datetime
import pwd
import grp
import fnmatch
import yaml
import base64
import math
import shutil
import numpy as np
from io import BytesIO

from .nxsparser import TableTools
from .nxsfileparser import (NXSFileParser, FIOFileParser,
                            numpyEncoder, numpyEncoderNull, isoDate)
from .nxsargparser import (Runner, NXSArgParser, ErrorException)
from . import filewriter
from .ontology import id_techniques, nexus_panet


if sys.version_info > (3,):
    basestring = str


WRITERS = {}
try:
    from . import h5pywriter
    WRITERS["h5py"] = h5pywriter
except Exception:
    pass

try:
    from . import h5cppwriter
    WRITERS["h5cpp"] = h5cppwriter
except Exception:
    pass

try:
    import matplotlib
    MATPLOTLIB = True
except Exception:
    MATPLOTLIB = False

# try:
#     import PIL
#     import PIL.Image
#     #: (:obj:`bool`) PIL imported
#     PILLOW = True
# except ImportError:
#     #: (:obj:`bool`) PIL imported
#     PILLOW = False


[docs]def getlist(text): """ converts a text string to a list of lists with respect to newline and space characters :param text: parser options :type text: :obj:`str` :returns: a list of list :rtype: :obj:`list` < :obj:`list`<:obj:`str`> > """ lst = [] if text: lines = text.strip().split("\n") lst = [line.strip().split(" ") for line in lines if (line.strip() and not line.strip().startswith("#"))] return lst
[docs]def splittext(text, lmax=68): """ split text to lines :param text: parser options :type text: :obj:`str` :param lmax: maximal line length :type lmax: :obj:`int` :returns: split text :rtype: :obj:`str` """ lnew = [] lw = [" " + ee for ee in text.split(" ") if ee] nw = [] for ew in lw: ww = [ee + "," for ee in ew.split(",") if ee] ww[-1] = ww[-1][:-1] nw.extend(ww) for ll in nw: if ll: if not lnew and ll[1:]: lnew.append(ll[1:]) elif len(lnew[-1]) + len(ll) < lmax: lnew[-1] = lnew[-1] + ll else: lnew.append(ll) return "\n".join(lnew)
[docs]class General(Runner): """ General runner""" #: (:obj:`str`) command description description = "show general information for the nexus file" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo general /user/data/myfile.nxs\n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "--h5py", action="store_true", default=False, dest="h5py", help="use h5py module as a nexus reader") self._parser.add_argument( "--h5cpp", action="store_true", default=False, dest="h5cpp", help="use h5cpp module as a nexus reader")
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( 'args', metavar='nexus_file', type=str, nargs=1, help='new nexus file name')
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ if options.h5cpp: writer = "h5cpp" elif options.h5py: writer = "h5py" elif "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if (options.h5py and options.h5cpp) or \ writer not in WRITERS.keys(): sys.stderr.write("nxsfileinfo: Writer '%s' cannot be opened\n" % writer) sys.stderr.flush() self._parser.print_help() sys.exit(255) wrmodule = WRITERS[writer.lower()] try: fl = filewriter.open_file( options.args[0], readonly=True, writer=wrmodule) except Exception: sys.stderr.write("nxsfileinfo: File '%s' cannot be opened\n" % options.args[0]) sys.stderr.flush() self._parser.print_help() sys.exit(255) root = fl.root() self.show(root) fl.close()
[docs] @classmethod def parseentry(cls, entry, description): """ parse entry of nexus file :param entry: nexus entry node :type entry: :class:`filewriter.FTGroup` :param description: dict description list :type description: :obj:`list` <:obj:`dict` <:obj:`str`, `any` > > :return: (key, value) name pair of table headers :rtype: [:obj:`str`, :obj:`str`] """ key = "A" value = "B" at = None try: at = entry.attributes["NX_class"] except Exception: pass if at and filewriter.first(at.read()) == 'NXentry': # description.append(None) # value = filewriter.first(value) key = "Scan entry:" value = entry.name # description.append({key: "Scan entry:", value: entry.name}) # description.append(None) try: vl = filewriter.first(entry.open("title").read()) description.append( {key: "Title:", value: vl}) except Exception: sys.stderr.write("nxsfileinfo: title cannot be found\n") sys.stderr.flush() try: vl = filewriter.first( entry.open("experiment_identifier").read()) description.append( {key: "Experiment identifier:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: experiment identifier cannot be found\n") sys.stderr.flush() for ins in entry: if isinstance(ins, filewriter.FTGroup): iat = ins.attributes["NX_class"] if iat and filewriter.first(iat.read()) == 'NXinstrument': try: vl = filewriter.first(ins.open("name").read()) description.append({ key: "Instrument name:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: instrument name cannot " "be found\n") sys.stderr.flush() try: vl = filewriter.first( ins.open("name").attributes[ "short_name"].read()) description.append({ key: "Instrument short name:", value: vl }) except Exception: sys.stderr.write( "nxsfileinfo: instrument short name cannot" " be found\n") sys.stderr.flush() for sr in ins: if isinstance(sr, filewriter.FTGroup): sat = sr.attributes["NX_class"] if sat and filewriter.first(sat.read()) \ == 'NXsource': try: vl = filewriter.first( sr.open("name").read()) description.append({ key: "Source name:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: source name" " cannot be found\n") sys.stderr.flush() try: vl = filewriter.first( sr.open("name").attributes[ "short_name"].read()) description.append({ key: "Source short name:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: source short name" " cannot be found\n") sys.stderr.flush() elif iat and filewriter.first(iat.read()) == 'NXsample': try: vl = filewriter.first(ins.open("name").read()) description.append({ key: "Sample name:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: sample name cannot be found\n") sys.stderr.flush() try: vl = filewriter.first( ins.open("chemical_formula").read()) description.append({ key: "Sample formula:", value: vl}) except Exception: sys.stderr.write( "nxsfileinfo: sample formula cannot" " be found\n") sys.stderr.flush() try: vl = filewriter.first(entry.open("start_time").read()) description.append({key: "Start time:", value: vl}) except Exception: sys.stderr.write("nxsfileinfo: start time cannot be found\n") sys.stderr.flush() try: vl = filewriter.first(entry.open("end_time").read()) description.append({key: "End time:", value: vl}) except Exception: sys.stderr.write("nxsfileinfo: end time cannot be found\n") sys.stderr.flush() if "program_name" in entry.names(): pn = entry.open("program_name") pname = filewriter.first(pn.read()) attr = pn.attributes names = [att.name for att in attr] if "scan_command" in names: scommand = filewriter.first(attr["scan_command"].read()) pname = "%s (%s)" % (pname, scommand) description.append({key: "Program:", value: pname}) return [key, value]
[docs] def show(self, root): """ show general informations :param root: nexus file root :type root: class:`filewriter.FTGroup` """ description = [] attr = root.attributes names = [at.name for at in attr] fname = filewriter.first( (attr["file_name"].read() if "file_name" in names else " ") or " ") title = "File name: '%s'" % fname print("") for en in root: description = [] headers = self.parseentry(en, description) ttools = TableTools(description) ttools.title = title ttools.headers = headers rstdescription = ttools.generateList() title = "" print("\n".join(rstdescription).strip()) print("")
[docs]class BeamtimeLoader(object): facilityalias = { "PETRA III": "petra3", "PETRA IV": "petra4", } btmdmap = {} newbtmdmap = { "principalInvestigator": ["applicant.email"], # "pid": "beamtimeId", # ?? is not unique for dataset "owner": ["leader.lastname", "applicant.lastname"], "contactEmail": ["pi.email", "applicant.email"], "sourceFolder": ["corePath"], "endTime": ["eventEnd"], # ?? should be endTime for dataset "ownerEmail": ["leader.email", "applicant.email"], "description": ["title"], # ?? should be from dataset # "createdAt": ["generated"], # ?? should be automatic # "updatedAt": ["generated"], # ?? should be automatic # "proposalId": "proposalId", "proposalId": ["beamtimeId"], } oldbtmdmap = { "createdAt": ["generated"], # ?? should be automatic "updatedAt": ["generated"], # ?? should be automatic } strcre = { "creationLocation": "/DESY/{facility}/{beamlineAlias}", "instrumentId": "/{facility}/{beamline}", "type": "raw", "keywords": ["scan"], "isPublished": False, "ownerGroup": "{beamtimeId}-dmgt", "accessGroups": ["{beamtimeId}-dmgt", "{beamtimeId}-clbt", "{beamtimeId}-part", "{beamline}dmgt", "{beamline}staff"] } cre = { "creationTime": [], # ?? startTime for dataset !!! "ownerGroup": [], # ??? !!! "sampleId": [], # ??? "publisheddataId": [], "accessGroups": [], # ??? "createdBy": [], # ??? "updatedBy": [], # ??? "createdAt": [], # ??? "updatedAt": [], # ??? "isPublished": ["false"], "dataFormat": [], "scientificMetadata": {}, "orcidOfOwner": "ORCID of owner https://orcid.org " "if available", "sourceFolderHost": [], "size": [], "packedSize": [], "numberOfFiles": [], "numberOfFilesArchived": [], "validationStatus": [], "keywords": [], "datasetName": [], "classification": [], "license": [], "version": [], "techniques": [], "instrumentId": [], "history": [], "datasetlifecycle": [], } dr = { "eventStart": [], "beamlineAlias": [], "leader": [], "onlineAnalysis": [], "pi.*": [], "applicant.*": [], "proposalType": [], "users": [], } copymap = { "endTime": "scientificMetadata.end_time.value", "description": "scientificMetadata.title.value", "scientificMetadata.ScanCommand": "scientificMetadata.program_name.scan_command", } copylist = [ ["creationTime", "endTime"], ] def __init__(self, options): """ loader constructor :param options: parser options :type options: :class:`argparse.Namespace` """ self.btmdmap = dict(self.newbtmdmap) if not hasattr(options, "scicatversion") or \ int(options.scicatversion) < 4: self.btmdmap.update(self.oldbtmdmap) self.__pid = options.pid self.__pap = options.pap self.__relpath = options.relpath self.__ownergroup = options.ownergroup self.__accessgroups = None if options.accessgroups is not None: self.__accessgroups = options.accessgroups.split(",") self.__keywords = [] if hasattr(options, "keywords") \ and options.keywords is not None: self.__keywords = [ kw for kw in options.keywords.split(",") if kw] dct = {} if options.beamtimemeta: with open(options.beamtimemeta, "r") as fl: # jstr = fl.read() # # print(jstr) dct = json.load(fl) self.__btmeta = dct dct = {} if options.scientificmeta: with open(options.scientificmeta, "r") as fl: jstr = fl.read() # print(jstr) try: dct = json.loads(jstr) except Exception: if jstr: nan = float('nan') # noqa: F841 dct = eval(jstr.strip()) if 'scientificMetadata' in dct.keys(): self.__scmeta = dct['scientificMetadata'] else: self.__scmeta = dct self.__metadata = {}
[docs] def run(self): """ runner for DESY beamtime file parser :param options: parser options :type options: :class:`argparse.Namespace` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if self.__pap: self.btmdmap["proposalId"] = ["proposalId"] if self.__btmeta: for sc, dss in self.btmdmap.items(): found = False for ds in dss: sds = ds.split(".") md = self.__btmeta for sd in sds: if sd in md: md = md[sd] else: break else: self.__metadata[sc] = md found = True if md: break if not found: print("%s cannot be found" % ds) for sc, vl in self.strcre.items(): if isinstance(vl, list): self.__metadata[sc] = [ (vv.format(**self.__btmeta) if hasattr(vv, "format") else vv) for vv in vl ] else: if hasattr(vl, "format"): self.__metadata[sc] = vl.format(**self.__btmeta) else: self.__metadata[sc] = vl if self.__relpath and "sourceFolder" in self.__metadata: self.__metadata["sourceFolder"] = \ os.path.join(self.__metadata["sourceFolder"], self.__relpath) if self.__scmeta or self.__btmeta: self.__metadata["scientificMetadata"] = {} if self.__scmeta: self.__metadata["scientificMetadata"].update(self.__scmeta) if self.__btmeta and \ "beamtimeId" not in self.__metadata["scientificMetadata"]: self.__metadata["scientificMetadata"]["beamtimeId"] = \ self.__btmeta["beamtimeId"] if self.__btmeta and \ "DOOR_proposalId" not in self.__metadata["scientificMetadata"]: self.__metadata["scientificMetadata"]["DOOR_proposalId"] = \ self.__btmeta["proposalId"] if self.__pid: self.__metadata["pid"] = self.__pid if self.__ownergroup: self.__metadata["ownerGroup"] = self.__ownergroup if self.__accessgroups is not None: self.__metadata["accessGroups"] = self.__accessgroups if self.__keywords: if "keywords" not in self.__metadata: self.__metadata["keywords"] = [] self.__metadata["keywords"].extend(self.__keywords) # print(self.__metadata) return self.__metadata
[docs] def merge(self, metadata): """ update metadata with dictionary :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if not self.__metadata: return metadata elif not metadata: return metadata return dict(self._mergedict(metadata, self.__metadata))
[docs] def merge_copy_maps(self, cmap): """ merge copy maps :param cmap: overwrite dictionary :type cmap: :obj:`dict` <:obj:`str`, :obj:`str`> :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if cmap is None: cpmap = dict(self.copymap) else: cpmap = dict(self.copymap) cpmap.update(cmap) return cpmap
[docs] def merge_copy_lists(self, clist): """ merge copy lists :param clist: overwrite copy list :type cmap: :obj:`list` < [:obj:`str`, :obj:`str`] > :returns: metadata dictionary :rtype: :obj:`list` < [:obj:`str`, :obj:`str`] > """ if clist is None: cplist = list(self.copylist) else: cplist = list(self.copylist) cplist.extend(clist) return cplist
[docs] def append_copymap_field(self, metadata, cmap, clist, cmapfield=None): """ overwrite metadata with dictionary :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param cmap: overwrite dictionary :type cmap: :obj:`dict` <:obj:`str`, :obj:`str`> :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :param cmapfield: copy map nexus field :type cmapfield: :obj:`str` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if cmapfield and metadata and cmap: vls = cmapfield.split(".") md = metadata for vl in vls: if vl in md: md = md[vl] else: break else: if md: try: dct = yaml.safe_load(str(md).strip()) if dct and isinstance(dct, dict): cmap.update(dct) elif dct: if isinstance(dct, basestring): dct = getlist(str(md).strip()) if isinstance(dct, list): for line in dct: if isinstance(line, list): clist.append(line[:2]) except Exception as e: sys.stderr.write( "nxsfileinfo: copymap update: '%s'\n" % str(e))
[docs] def overwrite(self, metadata, cmap=None, clist=None, cmapfield=None): """ overwrite metadata with dictionary :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param cmap: copy map to overwrite dictionary :type cmap: :obj:`dict` <:obj:`str`, :obj:`str`> :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :param cmapfield: copy map nexus field :type cmapfield: :obj:`str` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ cpmap = self.merge_copy_maps(cmap) cplist = self.merge_copy_lists(clist) self.append_copymap_field(metadata, cpmap, cplist, cmapfield) if metadata: for ts, vs in cpmap.items(): if ts and vs and isinstance(ts, basestring) \ and isinstance(vs, basestring) \ and not ts.startswith(vs + "."): vls = vs.split(".") md = metadata for vl in vls: if vl in md: md = md[vl] else: break else: tgs = ts.split(".") td = metadata parent = None for tg in tgs: parent = td if tg in td: td = td[tg] else: td[tg] = {} td = td[tg] parent[tg] = md for line in cplist: if line and len(line) > 1 and line[0] and line[1] and \ isinstance(line[0], basestring) and \ isinstance(line[1], basestring) and \ not line[0].startswith(line[1] + "."): ts = line[0] vs = line[1] vls = vs.split(".") md = metadata for vl in vls: if vl in md: md = md[vl] else: break else: tgs = ts.split(".") td = metadata parent = None for tg in tgs: parent = td if tg in td: td = td[tg] else: td[tg] = {} td = td[tg] parent[tg] = md return metadata
[docs] def remove_metadata(self, metadata, cmap=None, clist=None, cmapfield=None): """ remove metadata with dictionary with empty input or output in the copy map :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param cmap: overwrite dictionary :type cmap: :obj:`dict` <:obj:`str`, :obj:`str`> :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :param cmapfield: copy map nexus field :type cmapfield: :obj:`str` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ cpmap = self.merge_copy_maps(cmap) cplist = list(clist or []) self.append_copymap_field(metadata, cpmap, cplist, cmapfield) if metadata: for ts, vs in cpmap.items(): vv = None if not ts: vv = vs if not vs: vv = ts if vv and isinstance(vv, basestring): vls = vv.split(".") md = metadata parent = None for vl in vls: parent = md if vl in md: md = md[vl] else: break else: parent.pop(vl) for line in cplist: vv = None if line: if len(line) == 1 and line[0]: vv = line[0] elif len(line) > 1: if line[0] and not line[1]: vv = line[0] if line[1] and not line[0]: vv = line[1] if vv and isinstance(vv, basestring): vls = vv.split(".") md = metadata parent = None for vl in vls: parent = md if vl in md: md = md[vl] else: break else: parent.pop(vl) return metadata
@classmethod def _mergedict(self, dct1, dct2): for key in set(dct1) | set(dct2): if key in dct1 and key in dct2: if isinstance(dct1[key], dict) and isinstance(dct2[key], dict): yield (key, dict(self._mergedict(dct1[key], dct2[key]))) else: yield (key, dct2[key]) elif key in dct1: yield (key, dct1[key]) else: yield (key, dct2[key])
[docs] def update_pid(self, metadata, filename=None, puuid=False, pfname=False, beamtimeid=None): """ update pid metadata with dictionary :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param filename: nexus filename :type filename: :obj:`str` :param puuid: pid with uuid :type puuid: :obj:`bool` :param pfname: pid with file name :type pfname: :obj:`bool` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ metadata = metadata or {} if "pid" not in metadata: beamtimeid = beamtimeid or "" if not beamtimeid and "scientificMetadata" in metadata \ and "beamtimeId" in metadata["scientificMetadata"]: beamtimeid = metadata["scientificMetadata"]["beamtimeId"] scanid = "" fsscanid = "" fiscanid = None fname = "" if filename: fdir, fname = os.path.split(filename) fname, fext = os.path.splitext(fname) lfl = fname.split("_") fsscanid = lfl[-1] res = re.search(r'\d+$', fsscanid) fiscanid = int(res.group()) if res else None esscanid = "" eiscanid = None lfl = None if "scientificMetadata" in metadata and \ "name" in metadata["scientificMetadata"] and \ metadata["scientificMetadata"]["name"]: lfl = metadata["scientificMetadata"]["name"].split("_") esscanid = lfl[-1] res = re.search(r'\d+$', esscanid) eiscanid = int(res.group()) if res else None # if eiscanid: # print("WWWW:", eiscanid) if not pfname and fname and fiscanid is not None: scanid = fname elif not pfname and fname and eiscanid is not None: scanid = "%s_%s" % (fname, eiscanid) elif not pfname and fname: scanid = fname elif fiscanid is not None: scanid = str(fiscanid) elif eiscanid is not None: scanid = str(eiscanid) elif fsscanid and esscanid and esscanid == fsscanid: scanid = esscanid elif fsscanid and esscanid and esscanid != fsscanid: scanid = "%s_%s" % (fsscanid, esscanid) elif fsscanid: scanid = fsscanid else: scanid = esscanid if beamtimeid and scanid: if puuid: metadata["pid"] = "%s/%s/%s" % \ (beamtimeid, scanid, str(uuid.uuid4())) else: metadata["pid"] = "%s/%s" % \ (beamtimeid, scanid) if "datasetName" not in metadata and "pid" in metadata: spid = metadata["pid"].split("/") if len(spid) > 1: metadata["datasetName"] = spid[1] else: metadata["datasetName"] = metadata["pid"] return metadata
[docs] def update_sampleid(self, metadata, sampleid=None, sidfromname=False): """ update sampleid :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param sampleid: sample id :type sampleid: :obj:`str` :param sidfromname: sample id from its name :type sidfromname: :obj:`bool` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if sampleid: metadata["sampleId"] = sampleid elif sidfromname: if "scientificMetadata" in metadata and \ "sample" in metadata["scientificMetadata"] and \ "name" in metadata["scientificMetadata"]["sample"]: sname = metadata["scientificMetadata"]["sample"]["name"] if isinstance(sname, dict): if "value" in sname.keys() and sname["value"]: metadata["sampleId"] = sname["value"] elif sname: metadata["sampleId"] = sname else: try: if "scientificMetadata" in metadata and \ "sample" in metadata["scientificMetadata"] and \ "description" in metadata["scientificMetadata"]["sample"]: gdes = \ metadata["scientificMetadata"]["sample"]["description"] if "value" in gdes: sampleid = None try: des = yaml.safe_load(gdes["value"]) if "sample_id" in des: sampleid = des["sample_id"] elif "sampleId" in des: sampleid = des["sampleId"] else: sampleid = gdes["value"] except Exception: sampleid = gdes["value"] if sampleid: metadata["sampleId"] = sampleid except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) return metadata
[docs] def update_instrumentid(self, metadata): """ update instrument id :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if "instrumentId" in metadata: ins = metadata["instrumentId"] for fac, alias in self.facilityalias.items(): ins = ins.replace(fac, alias) ins = ins.lower() metadata["instrumentId"] = ins return metadata
[docs] def update_techniques(self, metadata, techniques=None): """ update techniques :param metadata: metadata dictionary to merge in :type metadata: :obj:`dict` <:obj:`str`, `any`> :param techniques: a list of techniques splitted by comma :type techniques: :obj:`str` :returns: metadata dictionary :rtype: :obj:`dict` <:obj:`str`, `any`> """ if techniques: metadata["techniques"] = \ self.generate_techniques(techniques.split(",")) if metadata and "techniques" not in metadata: try: if "scientificMetadata" in metadata and \ "definition" in metadata["scientificMetadata"]: gdefin = metadata["scientificMetadata"]["definition"] if "value" in gdefin: defin = gdefin["value"].split(",") defin = [ (df[2:] if (df and str(df).startswith("NX")) else df) for df in defin] if defin: metadata["techniques"] = \ self.generate_techniques(defin) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) try: if "scientificMetadata" in metadata and \ "experiment_description" in \ metadata["scientificMetadata"]: gexpdes = \ metadata["scientificMetadata"][ "experiment_description"] if "value" in gexpdes: try: pids = None tes = None expdes = yaml.safe_load(gexpdes["value"]) if "techniques" in expdes: tes = expdes["techniques"] elif "technique" in expdes: tes = [expdes["technique"]] if "techniques_pids" in expdes: pids = expdes["techniques_pids"] elif "technique_pid" in expdes: pids = [expdes["technique_pid"]] if tes is not None and pids is not None: metadata["techniques"] = \ self.generate_techniques(tes, pids) elif tes is not None: metadata["techniques"] = \ self.generate_techniques(tes) elif pids is not None: metadata["techniques"] = \ self.generate_techniques(pids) else: metadata["techniques"] = \ self.generate_techniques( [gexpdes["value"]]) except Exception: metadata["techniques"] = \ self.generate_techniques( [gexpdes["value"]]) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) if metadata and "techniques" not in metadata: metadata["techniques"] = [] return metadata
[docs] def generate_techniques(self, techniques, techniques_pids=None): """ generate technique dictionary :param techniques: a list of techniques splitted by comma :type techniques: :obj:`list` <:obj:`str`> :param techniques_pids: a list of technique pids splitted by comma :type techniques_pids: :obj:`list` <:obj:`str`> :returns: technique dictionary :rtype: :obj:`dict` <:obj:`str`, `objstr`> """ result = [] # print(techniques) # print(techniques_pids) for it, te in enumerate(techniques): pid = None name = te if techniques_pids and len(techniques_pids) > it and \ techniques_pids[it] is not None: pid = techniques_pids[it] elif te.startswith("http:/") and te in id_techniques.keys(): pid = te name = id_techniques[pid] elif te in nexus_panet.keys(): pid = nexus_panet[te] name = id_techniques[pid] elif te.startswith("PaNET"): nm = "http://purl.org/pan-science/PaNET/%s" % te if nm in id_techniques.keys(): pid = nm name = id_techniques[pid] if pid: result.append({"pid": pid, "name": name}) elif name: result.append({"pid": name, "name": name}) # print(result) return result
[docs]class Metadata(Runner): """ Metadata runner""" #: (:obj:`str`) command description description = "show metadata information for the nexus file" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo metadata /user/data/myfile.nxs\n" \ + " nxsfileinfo metadata /user/data/myfile.fio\n" \ + " nxsfileinfo metadata /user/data/myfile.nxs -p 'Group'\n" \ + " nxsfileinfo metadata /user/data/myfile.nxs -s\n" \ + " nxsfileinfo metadata /user/data/myfile.nxs " \ + "-a units,NX_class\n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-a", "--attributes", help="names of field or group attributes to be show " " (separated by commas without spaces). " "The default takes all attributes", dest="attrs", default=None) self._parser.add_argument( "-n", "--hidden-attributes", help="names of field or group attributes to be hidden " " (separated by commas without spaces). " "The default: 'nexdatas_source,nexdatas_strategy,units'", dest="nattrs", default="nexdatas_source,nexdatas_strategy,units") self._parser.add_argument( "-v", "--values", help="field names of more dimensional datasets" " which value should be shown" " (separated by commas without spaces)", dest="values", default="") self._parser.add_argument( "-w", "--owner-group", default="", dest="ownergroup", help="owner group name. Default is {beamtimeid}-dmgt") self._parser.add_argument( "-c", "--access-groups", default=None, dest="accessgroups", help="access group names separated by commas. " "Default is {beamtimeId}-dmgt,{beamtimeid}-clbt,{beamtimeId}-part," "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-z", "--keywords", default=None, dest="keywords", help="dataset keywords separated by commas.") self._parser.add_argument( "-g", "--group-postfix", help="postfix to be added to NeXus group name. " "The default: ''", dest="group_postfix", default="") self._parser.add_argument( "-t", "--entry-classes", help="names of entry NX_class to be shown" " (separated by commas without spaces)." " If name is '' all groups are shown. " "The default: 'NXentry'", dest="entryclasses", default="NXentry") self._parser.add_argument( "-e", "--entry-names", help="names of entry groups to be shown" " (separated by commas without spaces)." " If name is '' all groups are shown. " "The default: ''", dest="entrynames", default="") self._parser.add_argument( "-q", "--techniques", help="names of techniques" " (separated by commas without spaces)." "The default: ''", dest="techniques", default="") self._parser.add_argument( "-j", "--sample-id", help="sampleId", dest="sampleid", default="") self._parser.add_argument( "--sample-id-from-name", action="store_true", default=False, dest="sampleidfromname", help="get sampleId from the sample name") self._parser.add_argument( "-y", "--instrument-id", help="instrumentId", dest="instrumentid", default="") self._parser.add_argument( "--raw-instrument-id", action="store_true", default=False, dest="rawinstrumentid", help="leave raw instrument id") self._parser.add_argument( "-m", "--raw-metadata", action="store_true", default=False, dest="rawscientific", help="do not store NXentry as scientificMetadata") self._parser.add_argument( "--add-empty-units", action="store_true", default=False, dest="emptyunits", help="add empty units for fields without units") self._parser.add_argument( "--oned", action="store_true", default=False, dest="oned", help="add 1d values to scientificMetadata") self._parser.add_argument( "--max-oned-size", default=1024, dest="maxonedsize", help="add max and min (or first and last)" " values of 1d records " "to scientificMetadata " "if its size excides --max-oned-size value") self._parser.add_argument( "-p", "--pid", dest="pid", help=("dataset pid")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) self._parser.add_argument( "-u", "--pid-with-uuid", action="store_true", default=False, dest="puuid", help=("generate pid with uuid")) self._parser.add_argument( "-d", "--pid-without-filename", action="store_true", default=False, dest="pfname", help=("generate pid without file name")) self._parser.add_argument( "-f", "--file-format", dest="fileformat", help=("input file format, e.g. 'nxs'. " "Default is defined by the file extension")) self._parser.add_argument( "--proposal-as-proposal", action="store_true", default=False, dest="pap", help=("Store the DESY proposal as the SciCat proposal")) self._parser.add_argument( "-k", "--scicat-version", default=4, dest="scicatversion", help="major scicat version metadata") self._parser.add_argument( "--h5py", action="store_true", default=False, dest="h5py", help="use h5py module as a nexus reader") self._parser.add_argument( "--h5cpp", action="store_true", default=False, dest="h5cpp", help="use h5cpp module as a nexus reader") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662")) self._parser.add_argument( "--copy-map", dest="copymap", help=("json or yaml map of {output: input} " "or [[output, input],] or a text file list " " to re-arrange metadata")) self._parser.add_argument( "--copy-map-field", dest="copymapfield", help=( "field json or yaml with map {output: input} " "or [[output, input],] or a text file list " "to re-arrange metadata." " The default: " "'scientificMetadata.nxsfileinfo_parameters.copymap.value'"), default='scientificMetadata.nxsfileinfo_parameters.copymap.value') self._parser.add_argument( "--copy-map-error", action="store_true", default=False, dest="copymaperror", help=("Raise an error when the copy map file does not exist"))
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( "-b", "--beamtime-meta", dest="beamtimemeta", help=("beamtime metadata file")) self._parser.add_argument( "-s", "--scientific-meta", dest="scientificmeta", help=("scientific metadata file")) self._parser.add_argument( "-l", "--copy-map-file", dest="copymapfile", help=("json or yaml file containing the copy map, " "see also --copy-map")) self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat metadata file")) self._parser.add_argument( "-r", "--relative-path", dest="relpath", help=("relative path to the scan files")) self._parser.add_argument( 'args', metavar='nexus_file', type=str, nargs="*", help='new nexus file name')
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ if options.h5cpp: writer = "h5cpp" elif options.h5py: writer = "h5py" elif "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if (options.h5cpp and options.h5py) or writer not in WRITERS.keys(): sys.stderr.write("nxsfileinfo: Writer '%s' cannot be opened\n" % writer) sys.stderr.flush() self._parser.print_help() sys.exit(255) root = None nxfl = None if not hasattr(options, "fileformat"): options.fileformat = "" if hasattr(options, "maxonedsized"): try: options.maxonedsize = int(options.maxonedsize) except Exception: sys.stderr.write( "nxsfileinfo: Max 1d size '%s' cannot be " "converted to int\n" % options.maxonedsize) sys.stderr.flush() self._parser.print_help() sys.exit(255) if options.args: wrmodule = WRITERS[writer.lower()] if not options.fileformat: rt, ext = os.path.splitext(options.args[0]) if ext and len(ext) > 1 and ext.startswith("."): options.fileformat = ext[1:] try: if options.fileformat in ['nxs', 'h5', 'nx', 'ndf']: nxfl = filewriter.open_file( options.args[0], readonly=True, writer=wrmodule) root = nxfl.root() elif options.fileformat in ['fio']: with open(options.args[0]) as fl: root = fl.read() except Exception: sys.stderr.write("nxsfileinfo: File '%s' cannot be opened\n" % options.args[0]) sys.stderr.flush() self._parser.print_help() sys.exit(255) self.show(root, options) if nxfl is not None: nxfl.close()
@classmethod def _cure(cls, result): if 'creationTime' not in result: result['creationTime'] = isoDate(filewriter.FTFile.currenttime()) else: result['creationTime'] = isoDate(result['creationTime']) if 'endTime' in result: result['endTime'] = isoDate(result['endTime']) if 'type' not in result: result['type'] = 'raw' if 'creationLocation' not in result: result['creationLocation'] = "/DESY/PETRA III" if 'ownerGroup' not in result: result['ownerGroup'] = "ingestor" return result
[docs] @classmethod def metadata(cls, root, options): """ get metadata from nexus and beamtime file :param root: nexus file root :type root: :class:`filewriter.FTGroup` :param options: parser options :type options: :class:`argparse.Namespace` :returns: nexus file root metadata :rtype: :obj:`str` """ values = [] attrs = None entryclasses = [] entrynames = [] usercopymap = {} usercopylist = [] if options.values: values = options.values.split(',') if options.attrs: attrs = options.attrs.split(',') elif options.attrs is not None: attrs = [] if options.nattrs not in [None, '', "''", '""']: nattrs = options.nattrs.split(',') else: nattrs = [] if options.entryclasses not in [None, '', "''", '""']: entryclasses = options.entryclasses.split(',') if options.entrynames not in [None, '', "''", '""']: entrynames = options.entrynames.split(',') copymapfield = None if hasattr(options, "copymapfield") and options.copymapfield: copymapfield = options.copymapfield if hasattr(options, "copymap") and options.copymap: dct = yaml.safe_load(options.copymap.strip()) if dct and isinstance(dct, dict): usercopymap.update(dct) elif dct: if isinstance(dct, basestring): dct = getlist(options.copymap.strip()) if isinstance(dct, list): for line in dct: if isinstance(line, list): usercopylist.append(line[:2]) if hasattr(options, "copymapfile") and options.copymapfile: if os.path.isfile(options.copymapfile): with open(options.copymapfile, "r") as fl: jstr = fl.read() # print(jstr) try: dct = yaml.safe_load(jstr.strip()) except Exception: if jstr: nan = float('nan') # noqa: F841 try: dct = eval(jstr.strip()) except Exception: dct = " " # mdflatten(dstr, [], dct) else: dct = "" if dct and isinstance(dct, dict): usercopymap.update(dct) elif dct: if isinstance(dct, basestring): dct = getlist(jstr.strip()) if isinstance(dct, list): for line in dct: if isinstance(line, list): usercopylist.append(line[:2]) elif hasattr(options, "copymaperror") and options.copymaperror: raise Exception("Copy-map file '%s' does not exist" % options.copymapfile) result = None nxsparser = None if not hasattr(options, "fileformat"): options.fileformat = "" if options.args and not options.fileformat: rt, ext = os.path.splitext(options.args[0]) if ext and len(ext) > 1 and ext.startswith("."): options.fileformat = ext[1:] if root is not None: if options.fileformat in ['nxs', 'h5', 'nx', 'ndf']: nxsparser = NXSFileParser(root) nxsparser.valuestostore = values nxsparser.group_postfix = options.group_postfix nxsparser.entryclasses = entryclasses nxsparser.entrynames = entrynames nxsparser.scientific = not options.rawscientific if hasattr(options, "emptyunits"): nxsparser.emptyunits = options.emptyunits nxsparser.attrs = attrs nxsparser.hiddenattrs = nattrs if hasattr(options, "oned"): nxsparser.oned = options.oned if hasattr(options, "maxonedsize"): nxsparser.maxonedsize = int(options.maxonedsize) nxsparser.parseMeta() elif options.fileformat in ['fio']: nxsparser = FIOFileParser(root) nxsparser.group_postfix = options.group_postfix # nxsparser.attrs = attrs # nxsparser.hiddenattrs = nattrs if hasattr(options, "oned"): nxsparser.oned = options.oned if hasattr(options, "maxonedsize"): nxsparser.maxonedsize = int(options.maxonedsize) nxsparser.parseMeta() if nxsparser is None: bl = BeamtimeLoader(options) result = bl.run() result = bl.overwrite( result, usercopymap or None, usercopylist or None, copymapfield) result = bl.update_pid( result, None, options.puuid, options.pfname, options.beamtimeid) if not options.rawscientific: techniques = None if hasattr(options, "techniques"): techniques = options.techniques result = bl.update_techniques(result, techniques) sampleid = None if hasattr(options, "sampleid"): sampleid = options.sampleid sid_from_name = None if hasattr(options, "sampleidfromname"): sid_from_name = options.sampleidfromname result = bl.update_sampleid(result, sampleid, sid_from_name) rawinstrumentid = None if hasattr(options, "rawinstrumentid"): rawinstrumentid = options.rawinstrumentid if hasattr(options, "instrumentid"): instrumentid = options.instrumentid if instrumentid: result["instrumentId"] = instrumentid elif "instrumentId" in result and \ result["instrumentId"] and not rawinstrumentid: result = bl.update_instrumentid(result) result = bl.remove_metadata( result, usercopymap or None, usercopylist or None, copymapfield) if not options.rawscientific: result = cls._cure(result) elif nxsparser and nxsparser.description: if len(nxsparser.description) == 1: desc = nxsparser.description[0] if not options.beamtimemeta: try: if "scientificMetadata" in desc \ and "experiment_identifier" in \ desc["scientificMetadata"] \ and "beamtime_filename" in \ desc["scientificMetadata"][ "experiment_identifier"]: options.beamtimemeta = \ desc["scientificMetadata"][ "experiment_identifier"][ "beamtime_filename"] except Exception: pass bl = BeamtimeLoader(options) bl.run() result = bl.merge(desc) result = bl.overwrite( result, usercopymap or None, usercopylist or None, copymapfield) result = bl.update_pid( result, options.args[0], options.puuid, options.pfname, options.beamtimeid) if not options.rawscientific: techniques = None if hasattr(options, "techniques"): techniques = options.techniques result = bl.update_techniques(result, techniques) sampleid = None if hasattr(options, "sampleid"): sampleid = options.sampleid sid_from_name = None if hasattr(options, "sampleidfromname"): sid_from_name = options.sampleidfromname result = bl.update_sampleid( result, sampleid, sid_from_name) rawinstrumentid = None if hasattr(options, "rawinstrumentid"): rawinstrumentid = options.rawinstrumentid if hasattr(options, "instrumentid"): instrumentid = options.instrumentid if instrumentid: result["instrumentId"] = instrumentid elif "instrumentId" in result and \ result["instrumentId"] and not rawinstrumentid: result = bl.update_instrumentid(result) result = bl.remove_metadata( result, usercopymap or None, usercopylist or None, copymapfield) if not options.rawscientific: result = cls._cure(result) else: result = [] for desc in nxsparser.description: if not options.beamtimemeta: try: if "scientificMetadata" in desc \ and "experimental_identifier" in \ desc["scientificMetadata"] \ and "beamtime_filename" in \ desc["scientificMetadata"][ "experimental_identifier"]: options.beamtimemeta = \ desc["scientificMetadata"][ "experimental_identifier"][ "beamtime_filename"] except Exception: pass bl = BeamtimeLoader(options) bl.run() rst = bl.merge(desc) rst = bl.overwrite( rst, usercopymap or None, usercopylist or None, copymapfield) rst = bl.update_pid( rst, options.args[0], options.puuid, options.pfname, options.beamtimeid) if not options.rawscientific: techniques = None if hasattr(options, "techniques"): techniques = options.techniques rst = bl.update_techniques(rst, techniques) sampleid = None if hasattr(options, "sampleid"): sampleid = options.sampleid sid_from_name = None if hasattr(options, "sampleidfromname"): sid_from_name = options.sampleidfromname rst = bl.update_sampleid( rst, sampleid, sid_from_name) if hasattr(options, "rawinstrumentid"): rawinstrumentid = options.rawinstrumentid if hasattr(options, "instrumentid"): instrumentid = options.instrumentid if instrumentid: rst["instrumentId"] = instrumentid elif "instrumentId" in rst and \ rst["instrumentId"] and not rawinstrumentid: rst = bl.update_instrumentid(rst) rst = bl.remove_metadata( rst, usercopymap or None, usercopylist or None, copymapfield) if not options.rawscientific: rst = cls._cure(rst) result.append(rst) if result is not None: return json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoderNull)
[docs] def show(self, root, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` :param root: nexus file root :type root: :class:`filewriter.FTGroup` """ try: metadata = self.metadata(root, options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class GroupMetadata(Runner): """ Group Metadata runner""" #: (:obj:`str`) command description description = "group scan metadata information" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo groupmetadata" \ + " -o /user/data/myscan.scan.json " \ + " -t /user/data/myscan.attachment.json " \ + " -l /user/data/myscan.origdatablock.json " \ + " -c /home/user/group_config.txt " \ + " -m /user/data/myscan_00023.scan.json " \ + " -d /user/data/myscan_00023.origdatablock.json " \ + " -a /user/data/myscan_00023.attachment.json \n" \ + " \n" \ + " nxsfileinfo groupmetadata myscan_m001 " \ " -m /user/data/myscan_00021.scan.json\n" \ + " -c /home/user/group_config.txt " \ + " \n" \ + " \n" \ + " nxsfileinfo groupmetadata myscan_m001 " \ + " -c /home/user/group_config.txt " \ + " -m /user/data/myscan_00023.scan.json " \ + " -d /user/data/myscan_00023.origdatablock.json " \ + " -a /user/data/myscan_00023.attachment.json \n" \ + " \n" \ + " nxsfileinfo groupmetadata " \ + " -m /user/data/myscan_00023.scan.json " \ + " -d /user/data/myscan_00023.origdatablock.json " \ + " -c /home/user/group_config.txt " \ + " \n" \ + "\n" listtype = ["List", "L", "l", "list"] dicttype = ["Dict", "D", "d", "dict"] rangetype = ["Range", "R", "r", "rangle"] minmaxtype = ["MinMax", "M", "m", "minmax"] mintype = ["Min", "min"] maxtype = ["Max", "max"] uniquelisttype = ["UniqueList", "U", "u", "uniquelist"] endpointstype = ["Endpoints", "endpoints", "E", "e"] firstlasttype = ["FirstLast", "firstlast"] lasttype = ["Last", "last", "l", "L"] firsttype = ["First", "first", "f", "F"] # avaragetype = ["Average", "A", "a", "average"]
[docs] def create(self): """ creates parser """ # self._parser.add_argument( # "-w", "--owner-group", # default="", dest="ownergroup", # help="owner group name. Default is {beamtimeid}-dmgt") # self._parser.add_argument( # "-c", "--access-groups", # default=None, dest="accessgroups", # help="access group names separated by commas. " # "Default is {beamtimeId}-dmgt, # {beamtimeid}-clbt,{beamtimeId}-part," # "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-p", "--pid", dest="pid", help=("dataset pid")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) # self._parser.add_argument( # "-u", "--pid-with-uuid", action="store_true", # default=False, dest="puuid", # help=("generate pid with uuid")) # self._parser.add_argument( # "-d", "--pid-without-filename", action="store_true", # default=False, dest="pfname", # help=("generate pid without file name")) self._parser.add_argument( "-s", "--skip-group-datablock", action="store_true", default=False, dest="skipgroupdatablock", help=("skip group datablock")) self._parser.add_argument( "-w", "--allow-duplication", action="store_true", default=False, dest="nounique", help=("allow to merge metadata with duplicated pid")) self._parser.add_argument( "-q", "--raw", action="store_true", default=False, dest="raw", help="raw dataset type") self._parser.add_argument( "-f", "--write-files", action="store_true", default=False, dest="writefiles", help=("write output to files")) self._parser.add_argument( "-k", "--scicat-version", default=4, dest="scicatversion", help="major scicat version metadata") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662")) self._parser.add_argument( "-g --group-map", dest="groupmap", help=("json or yaml map of {output: input} " "or [[output, input],] or a text file list " " to re-arrange metadata")) self._parser.add_argument( 'group', metavar='group', type=str, nargs="*", help='group name') self._parser.add_argument( "-e", "--group-map-error", action="store_true", default=False, dest="groupmaperror", help=("Raise an error when the group map file does not exist"))
[docs] def postauto(self): """ parser creator after autocomplete run """ # self._parser.add_argument( # "-b", "--beamtime-meta", dest="beamtimemeta", # help=("beamtime metadata file")) # self._parser.add_argument( # "-s", "--scientific-meta", dest="scientificmeta", # help=("scientific metadata file")) self._parser.add_argument( "-r", "--group-map-file", dest="groupmapfile", help=("json or yaml file containing the copy map, " "see also --group-map")) self._parser.add_argument( "-m", "--metadata", dest="metadatafile", help=("json metadata file")) self._parser.add_argument( "-d", "--origdatablock", dest="origdatablockfile", help=("json origmetadata file")) self._parser.add_argument( "-a", "--attachment", dest="attachmentfile", help=("json attachment file")) self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat group metadata file")) self._parser.add_argument( "-l", "--datablock-output", dest="dboutput", help=("output scicat group datablocks list file")) self._parser.add_argument( "-t", "--attachment-output", dest="atoutput", help=("output scicat group attachments list file"))
# self._parser.add_argument( # "-r", "--relative-path", dest="relpath", # help=("relative path to the scan files"))
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ self.show(options)
@classmethod def _cure(cls, result): if 'creationTime' not in result: result['creationTime'] = isoDate(filewriter.FTFile.currenttime()) else: result['creationTime'] = isoDate(result['creationTime']) if 'endTime' in result: result['endTime'] = isoDate(result['endTime']) if 'type' not in result: result['type'] = 'raw' if 'creationLocation' not in result: result['creationLocation'] = "/DESY/PETRA III" if 'ownerGroup' not in result: result['ownerGroup'] = "ingestor" return result @classmethod def _group_metadata(cls, grfile, scfile, clist, options): """ group scan metadata :param grfile: grouped metadata file :type grfile: :obj:`str` :param scfile: scan metadata file :type scfile: :obj:`str` :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :param options: parser options :type options: :class:`argparse.Namespace` :returns: grouped metadata :rtype: :obj:`dct` <:obj:`str`, `any`> """ # print("GROUP", grfile, scfile) try: with open(scfile, "r") as fl: sfl = fl.read() ds = json.loads(sfl) if not isinstance(ds, dict): ds = {} except Exception as e: print("WARNING: %s" % str(e)) ds = {} try: with open(grfile, "r") as fl: sfl = fl.read() metadata = json.loads(sfl) if not isinstance(ds, dict): metadata = {} except Exception as e: print("WARNING: %s" % str(e)) metadata = {} metadata = cls._update_metadata(metadata, ds, clist, options.nounique) return metadata @classmethod def _update_metadata(cls, gr, ds, cplist, nounique=False): """ update and group scan metadata :param gr: group metadata :type gr: :obj:`dict` :param ds: scan metadata :type ds: :obj:`dict` :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :returns: grouped metadata :rtype: :obj:`dct` <:obj:`str`, `any`> """ first = True for line in cplist: if line and len(line) > 1 and line[0] and line[1] and \ isinstance(line[0], basestring) and \ isinstance(line[1], basestring) and \ not line[0].startswith(line[1] + "."): ts = line[0] vs = line[1] vls = vs.split(".") md = ds for vl in vls: if vl in md: md = md[vl] else: break else: tgs = ts.split(".") td = gr parent = None for tg in tgs: parent = td if tg in td: td = td[tg] else: td[tg] = {} td = td[tg] if not nounique and first and \ tg == 'inputDatasets' and \ isinstance(td, list) and md in td: return gr tgtype = line[2] if len(line) > 2 else None cls._merge_meta(parent, tg, md, tgtype) first = False return gr @classmethod def _merge_string(cls, parent, key, md, tgtype=None): """ update and group scan metadata :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param tgtype: target type :type tgtype: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if tgtype in cls.listtype: if not isinstance(tg, list): if tg: parent[key] = [tg] else: parent[key] = [] if tgtype in cls.dicttype: if not isinstance(tg, dict): parent[key] = {} sz = str(len(parent[key])) parent[key][sz] = md elif tgtype in cls.lasttype: parent[key] = md elif tgtype in cls.firsttype: if tg in [None, [], {}]: parent[key] = md elif tgtype in cls.firstlasttype: if not isinstance(tg, dict): parent[key] = {} parent[key]["last"] = md if "first" not in parent[key].keys(): parent[key]["first"] = md elif tgtype in cls.endpointstype: if not isinstance(tg, list) or len(parent[key]) != 2: parent[key] = [md, md] else: parent[key][1] = md elif key in parent and isinstance(parent[key], list): parent[key].append(md) elif not tg: parent[key] = md elif tg != md: parent[key] = [tg, md] @classmethod def _list_depth(cls, lst): """calculate list depth :param lst: target type :type lst: :obj:`list` :returns: list depth :rtype: :obj:`int` """ if not isinstance(lst, list): return 0 elif not len(lst): return 1 else: return cls._list_depth(lst[0]) + 1 @classmethod def _merge_range_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] if "value" not in tg: tg["value"] = [] if "unit" not in tg: tg["unit"] = unit try: mmin = min(md) except Exception: mmin = md try: mmax = max(md) except Exception: mmax = md if not isinstance(tg["value"], list) or len(tg["value"]) != 2: tg["value"] = [mmin, mmax] try: if tg["value"][0] > mmin: tg["value"][0] = mmin if tg["value"][1] < mmax: tg["value"][1] = mmax except Exception: tg["value"] = [mmin, mmax] return @classmethod def _merge_minmax_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] try: mmin = min(md) except Exception: mmin = md try: mmax = max(md) except Exception: mmax = md if not isinstance(tg, dict): parent[key] = {"min": {"value": mmin, "unit": unit}, "max": {"value": mmax, "unit": unit}} if "min" not in parent[key]: parent[key]["min"] = {"value": mmin, "unit": unit} if "max" not in parent[key]: parent[key]["max"] = {"value": mmax, "unit": unit} try: if parent[key]["min"]["value"] > mmin: parent[key]["min"]["value"] = mmin if parent[key]["max"]["value"] < mmax: parent[key]["max"]["value"] = mmax except Exception: parent[key] = {"min": {"value": mmin, "unit": unit}, "max": {"value": mmax, "unit": unit}} return @classmethod def _merge_max_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] try: mmax = max(md) except Exception: mmax = md if not isinstance(tg, dict): parent[key] = {"value": mmax, "unit": unit} if "value" not in parent[key]: parent[key]["value"] = mmax parent[key]["unit"] = unit try: if parent[key]["value"] < mmax: parent[key]["value"] = mmax except Exception: parent[key] = {"value": mmax, "unit": unit} return @classmethod def _merge_min_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] try: mmin = min(md) except Exception: mmin = md if not isinstance(tg, dict): parent[key] = {"value": mmin, "unit": unit} if "value" not in parent[key]: parent[key]["value"] = mmin parent[key]["unit"] = unit try: if parent[key]["value"] > mmin: parent[key]["value"] = mmin except Exception: parent[key] = {"value": mmin, "unit": unit} return @classmethod def _merge_list_list(cls, parent, key, md, unit, tgtype): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not unit: if not isinstance(tg, list): if tg is not None and tg != {} and tg != []: parent[key] = [tg] else: parent[key] = [] elif tgtype in cls.uniquelisttype and \ len(tg) > 0 and \ cls._list_depth(tg) <= cls._list_depth(md) \ and md != parent[key]: parent[key] = [tg] if tgtype not in cls.uniquelisttype or \ (md not in parent[key] and md != parent[key]): if tgtype in cls.uniquelisttype and not parent[key]: parent[key] = md else: parent[key].append(md) else: if not isinstance(tg, dict): parent[key] = {} tg = parent[key] if "value" not in tg: tg["value"] = 0 if "unit" not in tg: tg["unit"] = unit if not isinstance(tg["value"], list): tg["value"] = [] elif tgtype in cls.uniquelisttype and \ len(tg["value"]) > 0 and \ cls._list_depth(tg["value"]) <= cls._list_depth(md) \ and md != tg["value"]: parent[key]["value"] = [tg["value"]] tg = parent[key] if tgtype not in cls.uniquelisttype or \ (md not in tg["value"] and md != tg["value"]): if tgtype in cls.uniquelisttype \ and not parent[key]["value"]: tg["value"] = md else: tg["value"].append(md) @classmethod def _merge_dict_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not unit: if not isinstance(tg, dict): if tg is not None and tg != {} and tg != []: parent[key] = {"0": tg} else: parent[key] = {} sz = str(len(parent[key])) parent[key][sz] = md else: if not isinstance(tg, dict): parent[key] = {} sz = str(len(parent[key])) parent[key][sz] = {"value": md, "unit": unit} @classmethod def _merge_average_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if isinstance(md[0], basestring): if isinstance(tg, list): parent[key].extend( [mi for mi in md if mi not in tg]) elif not tg: parent[key] = md elif (isinstance(md[0], float) or isinstance(md[0], int)) \ and len(md) < 4: if isinstance(tg, list): if md not in parent[key]: parent[key].append(md) elif not tg: parent[key] = [md] else: for mi in md: if not isinstance(mi, float) and \ not isinstance(mi, int): break else: value = md tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] if "value" not in tg: tg["value"] = 0 if "unit" not in tg: tg["unit"] = unit if "min" not in tg: tg["min"] = min(value) if "max" not in tg: tg["max"] = max(value) if "std" not in tg: tg["std"] = 0.0 if "counts" not in tg: tg["counts"] = 0 ov = tg["value"] ocnts = tg["counts"] ostd = tg["std"] if ostd is not None: os2 = ostd * ostd nn = len(md) ncnts = ocnts + nn tg["counts"] = ncnts if tg["unit"] == unit and tg["value"] is not None: tg["value"] = \ float((ov * ocnts) + sum(value)) / ncnts minv = min(value) if tg["min"] > minv: tg["min"] = minv maxv = max(value) if tg["max"] < maxv: tg["max"] = maxv if ncnts > 1 and tg["std"] is not None: if (ocnts == 1 or nn == 1): tg["std"] = float( np.std([ov] + value, ddof=1)) elif ostd == 0.0: tg["std"] = float( np.std(value, ddof=1)) elif ocnts > 1 and nn > 1: nvar = float( np.var(value, ddof=1)) tg["std"] = \ math.sqrt( ((ocnts - 1) * os2 + (nn - 1) * nvar) / (ncnts - 2)) if isinstance(tg["std"], float) and \ (math.isinf(tg["std"]) or math.isnan(tg["std"])): tg["std"] = None if isinstance(tg["value"], float) and \ (math.isinf(tg["value"]) or math.isnan(tg["value"])): tg["value"] = None @classmethod def _merge_first_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] if not isinstance(tg, dict): parent[key] = {"value": md, "unit": unit} if "value" not in parent[key]: parent[key]["value"] = md parent[key]["unit"] = unit return @classmethod def _merge_last_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} tg = parent[key] parent[key] = {"value": md, "unit": unit} return @classmethod def _merge_firstlast_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} if not isinstance(parent[key], dict): parent[key] = {"first": {"value": md, "unit": unit}, "last": {"value": md, "unit": unit}} if "first" not in parent[key]: parent[key]["first"] = {"value": md, "unit": unit} parent[key]["last"] = {"value": md, "unit": unit} return @classmethod def _merge_endpoints_list(cls, parent, key, md, unit): """ merge list :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} if not isinstance(parent[key], dict): parent[key] = {"value": [md, md], "unit": unit} if "value" not in parent[key] or len(parent[key]["value"]) != 2: parent[key] = {"value": [md, md], "unit": unit} parent[key]["value"][1] = md return @classmethod def _merge_list(cls, parent, key, md, unit, tgtype=None): """ update and group scan metadata :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` :param tgtype: target type :type tgtype: :obj:`str` """ if tgtype in cls.rangetype and md and \ (isinstance(md[0], float) or isinstance(md[0], int)): return cls._merge_range_list(parent, key, md, unit) if tgtype in cls.minmaxtype and md and \ (isinstance(md[0], float) or isinstance(md[0], int)): return cls._merge_minmax_list(parent, key, md, unit) if tgtype in cls.maxtype and md and \ (isinstance(md[0], float) or isinstance(md[0], int)): return cls._merge_max_list(parent, key, md, unit) if tgtype in cls.mintype and md and \ (isinstance(md[0], float) or isinstance(md[0], int)): return cls._merge_min_list(parent, key, md, unit) if tgtype in cls.firsttype: return cls._merge_first_list(parent, key, md, unit) if tgtype in cls.lasttype: return cls._merge_last_list(parent, key, md, unit) if tgtype in cls.firstlasttype: return cls._merge_firstlast_list(parent, key, md, unit) if tgtype in cls.endpointstype: return cls._merge_endpoints_list(parent, key, md, unit) if (tgtype in cls.listtype or tgtype in cls.uniquelisttype): return cls._merge_list_list(parent, key, md, unit, tgtype) elif tgtype in cls.dicttype: return cls._merge_dict_list(parent, key, md, unit) elif md: return cls._merge_average_list(parent, key, md, unit) @classmethod def _merge_range_number(cls, parent, key, md, unit): """ merge metadata number to range type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = parent[key] if "value" not in tg: tg["value"] = 0 if "unit" not in tg: tg["unit"] = unit if not isinstance(tg["value"], list) or len(tg["value"]) != 2: tg["value"] = [md, md] try: if tg["value"][0] > md: tg["value"][0] = md if tg["value"][1] < md: tg["value"][1] = md except Exception: tg["value"] = [md, md] return @classmethod def _merge_min_number(cls, parent, key, md, unit): """ merge metadata number to min type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"value": md, "unit": unit} if "value" not in parent[key]: parent[key] = {"value": md, "unit": unit} try: if parent[key]["value"] > md: parent[key]["value"] = md except Exception: parent[key] = {"value": md, "unit": unit} return @classmethod def _merge_max_number(cls, parent, key, md, unit): """ merge metadata number to min type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"value": md, "unit": unit} if "value" not in parent[key]: parent[key] = {"value": md, "unit": unit} try: if parent[key]["value"] < md: parent[key]["value"] = md except Exception: parent[key] = {"value": md, "unit": unit} return @classmethod def _merge_minmax_number(cls, parent, key, md, unit): """ merge metadata number to minmax type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"min": {"value": md, "unit": unit}, "max": {"value": md, "unit": unit}} if "min" not in parent[key]: parent[key]["min"] = {"value": md, "unit": unit} if "max" not in parent[key]: parent[key]["max"] = {"value": md, "unit": unit} try: if parent[key]["min"]["value"] > md: parent[key]["min"]["value"] = md if parent[key]["max"]["value"] < md: parent[key]["max"]["value"] = md except Exception: parent[key] = {"min": {"value": md, "unit": unit}, "max": {"value": md, "unit": unit}} return @classmethod def _merge_list_number(cls, parent, key, md, unit): """ merge metadata number to minmax type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ tg = parent[key] if "value" not in tg: tg["value"] = 0 if "unit" not in tg: tg["unit"] = unit if not isinstance(tg["value"], list): tg["value"] = [] tg["value"].append(md) return @classmethod def _merge_dict_number(cls, parent, key, md, unit): """ merge metadata number to dict type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {} sz = str(len(parent[key])) parent[key][sz] = {"value": md, "unit": unit} return @classmethod def _merge_first_number(cls, parent, key, md, unit): """ merge metadata number to first type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"value": md, "unit": unit} if "value" not in parent[key]: parent[key] = {"value": md, "unit": unit} return @classmethod def _merge_last_number(cls, parent, key, md, unit): """ merge metadata number to last type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ parent[key] = {"value": md, "unit": unit} return @classmethod def _merge_firstlast_number(cls, parent, key, md, unit): """ merge metadata number to firstlast type i :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"first": {"value": md, "unit": unit}, "last": {"value": md, "unit": unit}} if "first" not in parent[key]: parent[key]["first"] = {"value": md, "unit": unit} parent[key]["last"] = {"value": md, "unit": unit} return @classmethod def _merge_endpoints_number(cls, parent, key, md, unit): """ merge metadata number to endpoints type i :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ if not isinstance(parent[key], dict): parent[key] = {"value": [md, md], "unit": unit} if "value" not in parent[key] or len(parent[key]["value"]) != 2: parent[key] = {"value": [md, md], "unit": unit} parent[key]["value"][1] = md return @classmethod def _merge_average_number(cls, parent, key, md, unit): """ merge metadata number to dict type :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` """ value = md tg = parent[key] if not isinstance(tg, dict): parent[key] = {} if "value" not in tg: tg["value"] = 0 if "unit" not in tg: tg["unit"] = unit if "min" not in tg: tg["min"] = value if "max" not in tg: tg["max"] = value if "std" not in tg: tg["std"] = 0.0 if "counts" not in tg: tg["counts"] = 0 ov = tg["value"] ocnts = tg["counts"] ostd = tg["std"] if ostd is not None: os2 = ostd * ostd ncnts = ocnts + 1 if tg["unit"] == unit and tg["value"] is not None: tg["value"] = float((ov * ocnts) + value)/(ncnts) if tg["min"] > value: tg["min"] = value if tg["max"] < value: tg["max"] = value tg["counts"] += 1 if ncnts == 2 and tg["value"] is not None: ns2 = float( (value - tg["value"]) * (value - tg["value"]) + (ov - tg["value"]) * (ov - tg["value"])) tg["std"] = math.sqrt(ns2) elif ncnts > 2 and tg["value"] is not None and ostd is not None: # ns2 = float( # (ncnts - 2) * os2 # +x (value - tg["value"]) * (value - ov) # ) \ (ncnts - 1) ns2 = float( (ncnts - 2) * os2 + (value - tg["value"]) * (value - ov) ) / (ncnts - 1) tg["std"] = math.sqrt(ns2) if isinstance(tg["std"], float) and \ (math.isinf(tg["std"]) or math.isnan(tg["std"])): tg["std"] = None if isinstance(tg["value"], float) and \ (math.isinf(tg["value"]) or math.isnan(tg["value"])): tg["value"] = None @classmethod def _merge_number(cls, parent, key, md, unit, tgtype=None): """ update and group scan metadata :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param unit: physical unit :type unit: :obj:`str` :param tgtype: target type :type tgtype: :obj:`str` """ tg = None if key in parent.keys(): tg = parent[key] if not isinstance(tg, dict): parent[key] = {} if tgtype in cls.rangetype: return cls._merge_range_number(parent, key, md, unit) if tgtype in cls.mintype: return cls._merge_min_number(parent, key, md, unit) if tgtype in cls.maxtype: return cls._merge_max_number(parent, key, md, unit) if tgtype in cls.minmaxtype: return cls._merge_minmax_number(parent, key, md, unit) if tgtype in cls.listtype: return cls._merge_list_number(parent, key, md, unit) if tgtype in cls.dicttype: return cls._merge_dict_number(parent, key, md, unit) if tgtype in cls.firsttype: return cls._merge_first_number(parent, key, md, unit) if tgtype in cls.lasttype: return cls._merge_last_number(parent, key, md, unit) if tgtype in cls.firstlasttype: return cls._merge_firstlast_number(parent, key, md, unit) if tgtype in cls.endpointstype: return cls._merge_endpoints_number(parent, key, md, unit) return cls._merge_average_number(parent, key, md, unit) @classmethod def _merge_meta(cls, parent, key, md, tgtype=None): """ update and group scan metadata :param parent: node metadata :type parent: :obj:`dict` :param key: metadata key :type key: :obj:`str` :param md: new metadata :type md: :obj:`str` or :obj:`dict` :param tgtype: target type :type tgtype: :obj:`str` """ if isinstance(md, basestring): cls._merge_string(parent, key, md, tgtype) unit = "" if isinstance(md, dict): if "unit" in md: unit = md["unit"] if "value" in md and not isinstance(md["value"], dict): md = md["value"] else: if isinstance(md, dict): for ky in md.keys(): parent[key] if not isinstance(parent[key], dict): if parent[key] is not None: vl = parent[key] parent[key] = {key: vl} else: parent[key] = {} tg = parent[key] cls._merge_meta(tg, ky, md[ky], tgtype) md = None if isinstance(md, list): cls._merge_list(parent, key, md, unit, tgtype) elif isinstance(md, float) or isinstance(md, int): cls._merge_number(parent, key, md, unit, tgtype) @classmethod def _create_metadata(cls, scfile, clist, options): """ group scan metadata :param scfile: scan metadata file :type scfile: :obj:`str` :param clist: copy list to overwrite metadata :type clist: :obj:`list` < [:obj:`str`, :obj:`str`] > :param options: parser options :type options: :class:`argparse.Namespace` :returns: [grouped metadata, grouped origdatablocks, grouped attachments] :rtype: [:obj:`str`,:obj:`str`, :obj:`str`] """ cpmap = { "accessGroups": "accessGroups", "creationTime": "creationTime", "contactEmail": "contactEmail", "sourceFolderHost": "sourceFolderHost", "description": "description", "isPublished": "isPublished", "owner": "owner", "ownerEmail": "ownerEmail", "ownerGroup": "ownerGroup", "investigator": "principalInvestigator", "sourceFolder": "sourceFolder", "techniques": "techniques", "scientificMetadata.instrumentId": "instrumentId", "scientificMetadata.creationLocation": "creationLocation", "scientificMetadata.proposalId": "proposalId", "scientificMetadata.DOOR_proposalId": "scientificMetadata.DOOR_proposalId", "scientificMetadata.beamtimeId": "scientificMetadata.beamtimeId", } metadata = { "type": "derived", "inputDatasets": [], "keywords": ["measurement"], "usedSoftware": "https://github.com/nexdatas/nxstools", "jobParameters": {"command": "nxsfileinfo groupmetadata"}, } rawmetadata = { "type": "raw", "scientificMetadata": { "inputDatasets": [], "usedSoftware": "https://github.com/nexdatas/nxstools", "jobParameters": {"command": "nxsfileinfo groupmetadata"} }, "keywords": ["measurement"], } if options.raw: metadata = rawmetadata try: with open(scfile, "r") as fl: sfl = fl.read() ds = json.loads(sfl) if not isinstance(ds, dict): ds = {} except Exception as e: print("WARNING: %s" % str(e)) ds = {} beamtimeid = "00000000" group = "" if "pid" in ds and ds["pid"]: spid = ds["pid"].split("/") else: spid = [] if options.group and options.group[0]: group = options.group[0] if options.beamtimeid: beamtimeid = options.beamtimeid else: if len(spid) > 1: beamtimeid = spid[-2] elif len(spid): beamtimeid = spid[0] if options.pid: metadata["pid"] = str(options.pid) else: if len(spid) > 1: spid[-1] = beamtimeid spid[-2] = group metadata["pid"] = str("/".join(spid)) else: metadata["pid"] = str( "%s/%s" % (group, beamtimeid)) if group: metadata["keywords"].append(group) if "datasetName" not in metadata: metadata["datasetName"] = group for ts, vs in cpmap.items(): if options.raw: ts = vs if ts and vs and isinstance(ts, basestring) \ and isinstance(vs, basestring) \ and not ts.startswith(vs + "."): vls = vs.split(".") md = ds for vl in vls: if vl in md: md = md[vl] else: break else: tgs = ts.split(".") td = metadata parent = None for tg in tgs: parent = td if tg in td: td = td[tg] else: td[tg] = {} td = td[tg] parent[tg] = md metadata = cls._update_metadata(metadata, ds, clist) return metadata
[docs] @classmethod def groupmetadata(cls, options): """ group scan metadata :param options: parser options :type options: :class:`argparse.Namespace` :returns: [grouped metadata, grouped origdatablocks, grouped attachments] :rtype: [:obj:`str`,:obj:`str`, :obj:`str`] """ result = None dresult = [] aresult = [] imfile = options.metadatafile omfile = None metadir = os.getcwd() if imfile and os.path.isfile(imfile): if options.output: omfile = options.output elif options.group and options.group[0]: if imfile: metadir, _ = os.path.split(os.path.abspath(imfile)) omfile = os.path.join( metadir, "%s.scan.json" % options.group[0]) if options.writefiles: options.output = omfile idfile = options.origdatablockfile odfile = None if options.dboutput: odfile = options.dboutput elif options.group and options.group[0]: if imfile and os.path.isfile(imfile): metadir, _ = os.path.split(os.path.abspath(imfile)) elif idfile: metadir, _ = os.path.split(os.path.abspath(idfile)) elif omfile: metadir, _ = os.path.split(os.path.abspath(omfile)) if metadir: odfile = os.path.join( metadir, "%s.origdatablock.json" % options.group[0]) if options.writefiles: options.dboutput = odfile iafile = options.attachmentfile oafile = None if options.atoutput: oafile = options.atoutput elif options.group and options.group[0]: if imfile and os.path.isfile(imfile): metadir, _ = os.path.split(os.path.abspath(imfile)) elif iafile: metadir, _ = os.path.split(os.path.abspath(iafile)) elif omfile: metadir, _ = os.path.split(os.path.abspath(omfile)) if metadir: oafile = os.path.join( metadir, "%s.attachment.json" % options.group[0]) if options.writefiles: options.atoutput = oafile usergroupmap = {} usergrouplist = [] if options.raw: grouplist = [["scientificMetadata.inputDatasets", "pid"]] else: grouplist = [["inputDatasets", "pid"]] if hasattr(options, "groupmap") and options.groupmap: dct = yaml.safe_load(options.groupmap.strip()) if dct and isinstance(dct, dict): usergroupmap.update(dct) elif dct: if isinstance(dct, basestring): dct = getlist(options.groupmap.strip()) if isinstance(dct, list): for line in dct: if isinstance(line, list): if len(line) > 2: usergrouplist.append(line[:3]) else: usergrouplist.append(line[:2]) if hasattr(options, "groupmapfile") and options.groupmapfile: if os.path.isfile(options.groupmapfile): with open(options.groupmapfile, "r") as fl: jstr = fl.read() # print(jstr) try: dct = yaml.safe_load(jstr.strip()) except Exception: if jstr: nan = float('nan') # noqa: F841 try: dct = eval(jstr.strip()) except Exception: dct = " " # mdflatten(dstr, [], dct) if dct and isinstance(dct, dict): usergroupmap.update(dct) elif dct: if isinstance(dct, basestring): dct = getlist(jstr.strip()) if isinstance(dct, list): for line in dct: if isinstance(line, list): if len(line) > 2: usergrouplist.append(line[:3]) else: usergrouplist.append(line[:2]) elif hasattr(options, "groupmaperror") and options.groupmaperror: raise Exception("Group-map file '%s' does not exist" % options.groupmapfile) grouplist.extend(usergrouplist) for ky, vl in usergroupmap.items(): if vl: grouplist.append([ky, vl]) if omfile: if os.path.isfile(omfile): result = cls._group_metadata( omfile, imfile, grouplist, options) else: result = cls._create_metadata( imfile, grouplist, options) ogroupfilename = "" if odfile: odir, ofile = os.path.split(odfile) ogroupfilename = os.path.join(odir, "_" + ofile) if odfile and os.path.isfile(odfile): with open(odfile, "r") as fl: jstr = fl.read() try: dresult = json.loads(jstr) if not isinstance(dresult, list): shutil.copy(odfile, ogroupfilename) dresult = [ogroupfilename] except Exception: dresult = [] if idfile and os.path.isfile(idfile) and idfile not in dresult: dresult.append(idfile) if not options.skipgroupdatablock and os.path.isfile(ogroupfilename) \ and ogroupfilename not in dresult: dresult.insert(0, ogroupfilename) agroupfilename = "" if oafile: adir, afile = os.path.split(oafile) agroupfilename = os.path.join(adir, "_" + afile) if oafile and os.path.isfile(oafile): with open(oafile, "r") as fl: jstr = fl.read() try: aresult = json.loads(jstr) if not isinstance(aresult, list): shutil.copy(oafile, agroupfilename) aresult = [agroupfilename] except Exception: aresult = [] if iafile and os.path.isfile(iafile) and iafile not in aresult: aresult.append(iafile) if not options.skipgroupdatablock and os.path.isfile(agroupfilename) \ and agroupfilename not in aresult: aresult.insert(0, agroupfilename) jsnresult = None if result is not None: jsnresult = json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoderNull) return [jsnresult, json.dumps(dresult), json.dumps(aresult)]
[docs] def show(self, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` """ try: metadata, datablocks, attachments = self.groupmetadata(options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) if options.dboutput: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.dboutput, "w", opener=opener) as fl: fl.write(datablocks) except Exception: with open(options.dboutput, "w") as fl: fl.write(datablocks) os.chmod(options.dboutput, chmod) os.umask(oldmask) else: with open(options.dboutput, "w") as fl: fl.write(datablocks) else: print(datablocks) if options.atoutput: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.atoutput, "w", opener=opener) as fl: fl.write(attachments) except Exception: with open(options.atoutput, "w") as fl: fl.write(attachments) os.chmod(options.atoutput, chmod) os.umask(oldmask) else: with open(options.atoutput, "w") as fl: fl.write(attachments) else: print(attachments) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class OrigDatablock(Runner): """ OrigDatablock runner""" #: (:obj:`str`) command description description = "generate description of all scan files" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo origdatablock /user/data/scan_12345\n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-p", "--pid", dest="pid", help=("dataset pid")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) self._parser.add_argument( "-b", "--beamline", dest="beamline", help=("beamline")) self._parser.add_argument( "-w", "--owner-group", default="", dest="ownergroup", help="owner group name. Default is {beamtimeid}-dmgt") self._parser.add_argument( "-c", "--access-groups", default=None, dest="accessgroups", help="access group names separated by commas. " "Default is {beamtimeId}-dmgt,{beamtimeid}-clbt,{beamtimeId}-part," "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-s", "--skip", help="filters for files to be skipped (separated by commas " "without spaces). Default: ''. E.g. '*.pyc,*~'", dest="skip", default="") self._parser.add_argument( "-a", "--add", help="list of files to be added (separated by commas " "without spaces). Default: ''. E.g. 'scan1.nxs,scan2.nxs'", dest="add", default="") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662"))
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( 'args', metavar='scan_name', type=str, nargs='+', help='scan name') self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat metadata file")) self._parser.add_argument( "-r", "--relative-path", dest="relpath", help=("relative path to scan files"))
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ self.show(options)
[docs] def isotime(self, tme): """ returns iso time string :returns: iso time :rtype: :obj:`str` """ tzone = time.tzname[0] try: tz = pytz.timezone(tzone) except Exception: import tzlocal tz = tzlocal.get_localzone() fmt = '%Y-%m-%dT%H:%M:%S.%f%z' starttime = tz.localize(datetime.datetime.fromtimestamp(tme)) return str(starttime.strftime(fmt))
[docs] def filterout(self, fpath, filters): found = False if filters: for df in filters: found = fnmatch.filter([fpath], df) if found: break return found
[docs] def datafiles(self, scanpath, scdir, scfiles, relpath, filters=None): dtfiles = [] totsize = 0 pdc = {'7': 'rwx', '6': 'rw-', '5': 'r-x', '4': 'r--', '3': '-wx', '2': '-w-', '1': '--x', '0': '---'} if scdir and scanpath: scdir = os.path.relpath(scdir, scanpath) for fl in scfiles: rec = {} fpath = os.path.join(scanpath, scdir, fl) if self.filterout(fpath, filters): continue status = os.stat(fpath) prm = str(oct(status.st_mode)[-3:]) isdir = 'd' if stat.S_ISDIR(status.st_mode) else '-' islink = 'l' if stat.S_ISLNK(status.st_mode) else isdir perm = islink + ''.join(pdc.get(x, x) for x in prm) path = os.path.join(scdir, fl) if path.startswith("./"): path = path[2:] if relpath: path = os.path.join(relpath, path) rec["path"] = os.path.normpath(path) rec["size"] = status.st_size rec["time"] = self.isotime(status.st_ctime) try: rec["uid"] = pwd.getpwuid(status.st_uid).pw_name except Exception: rec["uid"] = status.st_uid try: rec["gid"] = grp.getgrgid(status.st_gid).gr_name except Exception: rec["gid"] = status.st_gid rec["perm"] = perm dtfiles.append(rec) totsize += rec["size"] return dtfiles, totsize
[docs] def datablock(self, options): """ dump scan datablock JSON :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ skip = None add = [] if options.skip: skip = options.skip.split(',') if options.add: add = options.add.split(',') result = { } dtfiles = [] scanfiles = [] scandirs = [] totsize = 0 fscandir = None for arg in options.args: scandir, scanname = os.path.split(os.path.abspath(arg)) if not fscandir: fscandir = fscandir or scandir relpath = options.relpath else: relpath = os.path.relpath(scandir, fscandir) if options.relpath: relpath = os.path.join(options.relpath, relpath) for (dirpath, dirnames, filenames) in os.walk(scandir): scanfiles = [f for f in filenames if f.startswith(scanname)] scandirs = [f for f in dirnames if f.startswith(scanname)] break flist, tsize = self.datafiles(scandir, "", scanfiles, relpath, skip) dtfiles.extend(flist) totsize += tsize for fl in add: if os.path.isfile(fl): ascandir, ascanname = os.path.split(os.path.abspath(fl)) flist, tsize = self.datafiles( scandir, ascandir, [ascanname], relpath) dtfiles.extend(flist) totsize += tsize for scdir in scandirs: for (dirpath, dirnames, filenames) in os.walk( os.path.join(scandir, scdir)): flist, tsize = self.datafiles( scandir, dirpath, filenames, relpath, skip) dtfiles.extend(flist) totsize += tsize result["dataFileList"] = dtfiles result["size"] = totsize if options.ownergroup: result["ownerGroup"] = options.ownergroup if options.accessgroups is not None: accessgroups = options.accessgroups.split(",") result["accessGroups"] = accessgroups if options.pid: result["datasetId"] = options.pid beamtimeid = "" if hasattr(options, "beamtimeid") and options.beamtimeid: beamtimeid = options.beamtimeid if not beamtimeid and "datasetId" in result \ and result["datasetId"] and \ len(result["datasetId"].split("/")) > 1: bts = result["datasetId"].split("/") try: int(bts[1]) beamtimeid = bts[1] except Exception: try: int(bts[0]) beamtimeid = bts[0] except Exception: beamtimeid = bts[1] if "ownerGroup" not in result and beamtimeid: result["ownerGroup"] = "%s-dmgt" % (beamtimeid) if "accessGroups" not in result: accessgroups = [] if beamtimeid: accessgroups = [ "%s-clbt" % (beamtimeid), "%s-part" % (beamtimeid), "%s-dmgt" % (beamtimeid)] if hasattr(options, "beamline") and options.beamline: accessgroups.extend([ "%sdmgt" % (options.beamline), "%sstaff" % (options.beamline) ]) if accessgroups: result["accessGroups"] = accessgroups return json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoder)
[docs] def show(self, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` """ try: metadata = self.datablock(options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class Sample(Runner): """ Sample runner""" #: (:obj:`str`) command description description = "generate description of sample" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo sample -i petra3/h2o/234234 -d 'HH water' " \ + "-s ~/cm.json \n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-s", "--sample-id", dest="sampleid", help=("sample id")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) self._parser.add_argument( "-b", "--beamline", dest="beamline", help=("beamline")) self._parser.add_argument( "-d", "--description", dest="description", help=("sample description")) self._parser.add_argument( "-r", "--owner", dest="owner", help=("sample owner")) self._parser.add_argument( "-p", "--published", dest="published", action="store_true", help=("sample is published"), default=False) self._parser.add_argument( "-w", "--owner-group", default="", dest="ownergroup", help="owner group name. Default is {beamtimeid}-dmgt") self._parser.add_argument( "-c", "--access-groups", default=None, dest="accessgroups", help="access group names separated by commas. " "Default is {beamtimeId}-dmgt,{beamtimeid}-clbt,{beamtimeId}-part," "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662"))
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( "-m", "--sample-characteristics", dest="characteristicsmeta", help=("sample characteristics metadata file")) self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat metadata file"))
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ self.show(options)
[docs] def sample(self, options): """ create sample metadata :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ dct = {} result = {} if hasattr(options, "sampleid") and options.sampleid: result["sampleId"] = options.sampleid if hasattr(options, "description") and options.description: result["description"] = options.description if hasattr(options, "owner") and options.owner: result["owner"] = options.owner if hasattr(options, "published") and options.published: result["isPublished"] = True else: result["isPublished"] = False if options.ownergroup: result["ownerGroup"] = options.ownergroup if hasattr(options, "beamtimeid") and options.beamtimeid: if "ownerGroup" not in result: result["ownerGroup"] = "%s-dmgt" % (options.beamtimeid) if options.accessgroups is not None: accessgroups = options.accessgroups.split(",") result["accessGroups"] = accessgroups if "accessGroups" not in result: accessgroups = [] if hasattr(options, "beamtimeid") and options.beamtimeid: accessgroups = [ "%s-clbt" % (options.beamtimeid), "%s-part" % (options.beamtimeid), "%s-dmgt" % (options.beamtimeid)] if hasattr(options, "beamline") and options.beamline: accessgroups.extend([ "%sdmgt" % (options.beamline), "%sstaff" % (options.beamline) ]) if accessgroups: result["accessGroups"] = accessgroups if options.characteristicsmeta: with open(options.characteristicsmeta, "r") as fl: jstr = fl.read() try: dct = json.loads(jstr) except Exception: if jstr: nan = float('nan') # noqa: F841 dct = eval(jstr) if 'sampleCharacteristics' in dct.keys(): dct = dct['sampleCharacteristics'] result['sampleCharacteristics'] = dct return json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoder)
[docs] def show(self, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` """ try: metadata = self.sample(options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class Attachment(Runner): """ Attachment runner""" #: (:obj:`str`) command description description = "generate description of attachment" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo attachment -b p00 -i 2342342 -t 'HH water' " \ + "-o ~/at1.json thumbnail.png \n" \ + " nxsfileinfo attachment -b p00 -i 2342342 -t 'HH water' " \ + "-o ~/at2.json -s pilatus myscan_00123.nxs \n" \ + " nxsfileinfo attachment -b p00 -i 2342342 -t 'HH water' " \ + "-o ~/at2.json myscan_00124.fio \n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-a", "--id", dest="atid", help=("attachment id")) self._parser.add_argument( "-t", "--caption", dest="caption", help=("caption text")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) self._parser.add_argument( "-b", "--beamline", dest="beamline", help=("beamline")) self._parser.add_argument( "-r", "--owner", dest="owner", help=("attachment owner")) self._parser.add_argument( "-w", "--owner-group", default="", dest="ownergroup", help="owner group name. Default is {beamtimeid}-dmgt") self._parser.add_argument( "-c", "--access-groups", default=None, dest="accessgroups", help="access group names separated by commas. " "Default is {beamtimeId}-dmgt,{beamtimeid}-clbt,{beamtimeId}-part," "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-f", "--file-format", dest="fileformat", help=("input file format, e.g. 'nxs'. " "Default is defined by the file extension")) self._parser.add_argument( "--h5py", action="store_true", default=False, dest="h5py", help="use h5py module as a nexus reader") self._parser.add_argument( "--h5cpp", action="store_true", default=False, dest="h5cpp", help="use h5cpp module as a nexus reader") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662")) self._parser.add_argument( "-s", "--signals", dest="signals", help=("signals data name(s) separated by comma")) self._parser.add_argument( "-e", "--axes", dest="axes", help=("axis/axes data name(s) separated by comma")) self._parser.add_argument( "-q", "--scan-command-axes", dest="scancmdaxes", default='{"hklscan":"h;k;l","qscan":"qz;qpar"}', help=("a JSON dictionary with scan-command axes to override, " "axis/axes data name(s) separated by comma for detectors" " and by semicolon for more plots. Default: " '{"hklscan":"h;k;l","qscan":"qz;qpar"}')) self._parser.add_argument( "-m", "--frame", dest="frame", help=("a frame number for if more 2D images in the data")) self._parser.add_argument( "--signal-label", dest="slabel", help=("signal label")) self._parser.add_argument( "--xlabel", dest="xlabel", help=("x-axis label")) self._parser.add_argument( "--ylabel", dest="ylabel", help=("y-axis label")) self._parser.add_argument( "-u", "--override", action="store_true", default=False, dest="override", help="override NeXus entries by script parameters") self._parser.add_argument( "--parameters-in-caption", action="store_true", default=False, dest="ppflag", help="add plot paramters to the caption") self._parser.add_argument( "-n", "--nexus-path", help="base nexus path to element to be shown" " If the path is '' the default group is shown. " "The default: ''", dest="nexuspath", default="")
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat metadata file")) self._parser.add_argument( 'args', metavar='image_file', type=str, nargs="*", help='png or NeXus image file name')
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ if options.h5cpp: writer = "h5cpp" elif options.h5py: writer = "h5py" elif "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if (options.h5cpp and options.h5py) or writer not in WRITERS.keys(): sys.stderr.write("nxsfileinfo: Writer '%s' cannot be opened\n" % writer) sys.stderr.flush() self._parser.print_help() sys.exit(255) root = None nxfl = None if not hasattr(options, "fileformat"): options.fileformat = "" if options.args: wrmodule = WRITERS[writer.lower()] if not options.fileformat: rt, ext = os.path.splitext(options.args[0]) if ext and len(ext) > 1 and ext.startswith("."): options.fileformat = ext[1:] try: if options.fileformat in ['nxs', 'h5', 'nx', 'ndf']: nxfl = filewriter.open_file( options.args[0], readonly=True, writer=wrmodule) root = nxfl.root() elif options.fileformat in ['fio']: with open(options.args[0]) as fl: root = fl.read() elif options.fileformat in ['png']: with open(options.args[0], "rb") as fl: root = "data:image/png;base64," + \ base64.b64encode(fl.read()).decode('utf-8') except Exception: sys.stderr.write("nxsfileinfo: File '%s' cannot be opened\n" % options.args[0]) sys.stderr.flush() self._parser.print_help() sys.exit(255) self.show(root, options) if nxfl is not None: nxfl.close()
[docs] def attachment(self, root, options): """ get metadata from nexus and beamtime file :param root: nexus file root :type root: :class:`filewriter.FTGroup` :param options: parser options :type options: :class:`argparse.Namespace` :returns: atttachment metadata :rtype: :obj:`str` """ result = {} if hasattr(options, "atid") and options.atid: result["id"] = options.atid if hasattr(options, "caption") and options.caption: result["caption"] = options.caption if options.ownergroup: result["ownerGroup"] = options.ownergroup if hasattr(options, "beamtimeid") and options.beamtimeid: if "ownerGroup" not in result: result["ownerGroup"] = "%s-dmgt" % (options.beamtimeid) if options.accessgroups is not None: accessgroups = options.accessgroups.split(",") result["accessGroups"] = accessgroups if "accessGroups" not in result: accessgroups = [] if hasattr(options, "beamtimeid") and options.beamtimeid: accessgroups = [ "%s-clbt" % (options.beamtimeid), "%s-part" % (options.beamtimeid), "%s-dmgt" % (options.beamtimeid)] if hasattr(options, "beamline") and options.beamline: accessgroups.extend([ "%sdmgt" % (options.beamline), "%sstaff" % (options.beamline) ]) if accessgroups: result["accessGroups"] = accessgroups if not hasattr(options, "fileformat"): options.fileformat = "" if options.args and not options.fileformat: rt, ext = os.path.splitext(options.args[0]) if ext and len(ext) > 1 and ext.startswith("."): options.fileformat = ext[1:] if root is not None: if options.fileformat in ['png']: result["thumbnail"] = root elif MATPLOTLIB: signals = None axes = [] xlabel = None ylabel = None slabel = None frame = None if options.signals: signals = options.signals.split(",") if options.axes: axes = options.axes.split(",") if options.frame is not None: try: frame = int(options.frame) except Exception: frame = None if options.xlabel: xlabel = options.xlabel if options.ylabel: ylabel = options.ylabel if options.slabel: slabel = options.slabel if options.fileformat in ['nxs', 'h5', 'nx', 'ndf']: tn, pars = self._nxsplot( root, signals, axes, slabel, xlabel, ylabel, frame, options.caption, options.override, options.scancmdaxes, options.nexuspath) if tn: result["thumbnail"] = tn if "caption" not in result: result["caption"] = "" if options.ppflag: result["caption"] += " " + pars elif options.fileformat in ['fio']: nxsparser = FIOFileParser(root) nxsparser.oned = True nxsparser.maxonedsize = -1 nxsparser.parseMeta() data = None sdata = None adata = None signal = None params = {} scancmd = None if nxsparser.description and nxsparser.columns: desc = nxsparser.description[0] sm = None if "scientificMetadata" in desc.keys(): sm = desc["scientificMetadata"] if sm and "data" in sm.keys(): data = sm["data"] if sm and "parameters" in sm.keys(): params = sm["parameters"] if sm: if "ScanCommand" in sm and sm["ScanCommand"]: scancmd = sm["ScanCommand"] ax = self._axesfromcommand( scancmd, options.scancmdaxes, data, axes) if ax: axes = ax if not signal and 'signalcounter' in params \ and params['signalcounter']: if data and params['signalcounter'] in data: signal = params['signalcounter'] sdata = data[signal] if not slabel: slabel = signal if not signal and 'signal' in params \ and params['signal']: if data and params['signal'] in data: signal = params['signal'] sdata = data[signal] if not slabel: slabel = signal if not signal or options.override: if data and signals: for sg in signals: if sg and sg in data.keys(): signal = sg sdata = data[signal] if not slabel: slabel = signal break if not signal: if len(nxsparser.columns) > 1 and \ len(nxsparser.columns[1]) > 1: sdata = nxsparser.columns[1][1] if not slabel: slabel = nxsparser.columns[1][0] elif len(nxsparser.columns) > 0 and \ len(nxsparser.columns[0]) > 1: sdata = nxsparser.columns[0][1] if not slabel: slabel = nxsparser.columns[0][0] axis = None if data and axes: for ax in axes: if ax and ax in data.keys(): axis = ax adata = data[ax] if not xlabel: xlabel = ax break if not axis and len(nxsparser.columns) > 1 and \ len(nxsparser.columns[0]) > 1: adata = nxsparser.columns[0][1] if not xlabel: xlabel = nxsparser.columns[0][0] if sdata: result["thumbnail"], pars = self._plot1d( sdata, adata, xlabel, slabel, options.caption, scancmd=scancmd) if "caption" not in result: result["caption"] = "" if options.ppflag: result["caption"] += " " + pars if "thumbnail" in result and result["thumbnail"]: if "caption" not in result: result["caption"] = "" return json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoder)
def _axesfromcommand(self, scmd, scmdaxes, data, useraxes=None): """ create plot from nexus file :param scmd: scan command :type scmd: :obj:`str` :param scmdaxes: JSON dictionry with scan command axes :type scmdaxes: :obj:`str` :param data: nxdata nexus file :type data: class:`filewriter.FTGroup` or dict <:obj:`str`, `any`> :returns: axis from scan command :rtype: :obj:`str` """ axes = [] useraxes = useraxes or [] try: if scmdaxes: scaxes = json.loads(scmdaxes) if scaxes and isinstance(scaxes, dict): scmd1 = [sc for sc in scmd.split(" ") if sc][0] if scmd1 in scaxes: axs = scaxes[scmd1].split(",") for ax in axs: maxsz = 0 ta = "" for aa in ax.split(";"): adata = None if isinstance(data, dict): if aa and aa in data.keys(): adata = data[aa] elif hasattr(data, "names") and \ hasattr(data, "open"): if aa and aa in data.names(): try: adata = data.open(aa).read() except Exception: pass if adata is not None: mx = max(adata) - min(adata) if mx > maxsz: maxsz = mx ta = aa if maxsz: axes = [ta] break except Exception: pass if not axes and not useraxes: scmds = [sc for sc in scmd.split(" ") if sc] if len(scmds) > 1: aa = scmds[1] if isinstance(data, dict): if aa and aa in data.keys(): axes = [aa] elif hasattr(data, "names") and \ aa and aa in data.names(): axes = [aa] return axes def _nxsplot(self, root, signals, axes, slabel, xlabel, ylabel, frame, title, override=False, scmdaxes=None, nexuspath=None): """ create plot from nexus file :param root: nexus file root :type root: class:`filewriter.FTGroup` :param signals: signal names :type signals: :obj:`list`<:obj:`str`> :param axes: axes names :type axes: :obj:`list`<:obj:`str`> :param slabel: s-label :type slabel: :obj:`str` :param xlabel: x-label :type xlabel: :obj:`str` :param ylabel: y-label :type ylabel: :obj:`str` :param title: title :type title: :obj:`str` :param scmdaxes: JSON dictionry with scan command axes :type scmdaxes: :obj:`str` :param nexuspath: attachment nexuspath :type nexuspath: :obj:`str` :returns: thumbnail string :rtype: :obj:`str` """ # print(signals) sgnode = root.default_field(signals, nexuspath) nxdata = None entry = None signal = None # print(sgnode) if sgnode is not None: nxdata = sgnode.parent if nxdata is not None: if override and signals: for sg in signals: if sg in nxdata.names(): sgnode = nxdata.open(sg) signal = sg elif not signal: signal = sgnode.name entry = nxdata.parent if nxdata is None: entry = None attrs = root.attributes if hasattr(root, "names") and "default" in attrs.names(): nname = filewriter.first(attrs["default"].read()) if nname in root.names(): entry = root.open(nname) if entry is None: enames = ["entry", "scan"] for enm in enames: if enm in root.names(): entry = root.open(enm) break if entry is not None: attrs = entry.attributes if hasattr(entry, "names") and "default" in attrs.names(): nname = filewriter.first(attrs["default"].read()) if nname in entry.names(): nxdata = entry.open(nname) if entry is not None and nxdata is None: enames = ["data"] for enm in enames: if enm in entry.names(): entry = entry.open(enm) break if nxdata is not None: attrs = nxdata.attributes if hasattr(nxdata, "names") and "signal" in attrs.names(): nname = filewriter.first(attrs["signal"].read()) if nname in nxdata.names(): sgnode = nxdata.open(nname) if nxdata is not None and (override or sgnode is None) and \ signal in nxdata.names(): sgnode = nxdata.open(signal) if sgnode is None and nxdata is not None and \ "data" in nxdata.names(): sgnode = nxdata.open("data") signal = "data" if sgnode is not None: try: dtshape = sgnode.shape dtsize = sgnode.size # print(dtshape) # print(nxdata.names()) # print(signal) # print(slabel) # print(xlabel) # print(ylabel) scommand = None if len(dtshape) == 1 and dtsize > 1: if hasattr(entry, "names") and \ hasattr(nxdata, "names") and \ "program_name" in entry.names(): pn = entry.open("program_name") attr = pn.attributes names = [att.name for att in attr] if "scan_command" in names: scommand = filewriter.first( attr["scan_command"].read()) ax = self._axesfromcommand( scommand, scmdaxes, nxdata, axes) if ax: axes = ax return self._nxsplot1d( sgnode, signal, axes, slabel, xlabel, ylabel, title, override, scancmd=scommand) elif len(dtshape) == 2 and dtsize > 1: return self._nxsplot2d( sgnode, signal, slabel, title, override, scancmd=scommand) elif len(dtshape) == 3 and dtsize > 1: return self._nxsplot3d( sgnode, signal, slabel, title, override, frame, scancmd=scommand) except Exception: return None, None return None, None def _nxsplot3d(self, sgnode, signal, slabel, title, override=False, frame=None, scancmd=None): """ create plot 1d from nexus file :param sgnode: nexus signal field node :type sgnode: class:`filewriter.FTField` :param signal: signal name :type signal: :obj:`str` :param slabel: s-label :type slabel: :obj:`str` :param title: title :type title: :obj:`str` :param override: override nexus attributes flag :type override: :obj:`bool` :param frame: frame number to plot :type frame: :obj:`int` :param scancmd: scan command :type scancmd: :obj:`str` :returns: thumbnail string :rtype: :obj:`str` """ signal = sgnode.name nxdata = sgnode.parent shape = sgnode.shape mxframe = shape[0] if frame is None: frame = int(mxframe / 2) if frame and frame < 0: frame = mxframe + frame if frame and frame < 0: frame = 0 if frame and frame >= mxframe: frame = mxframe - 1 sdata = sgnode[frame, :, :] sunits = None slname = None if "units" in sgnode.attributes.names(): sunits = filewriter.first( sgnode.attributes["units"].read()) if "long_name" in sgnode.attributes.names(): slname = filewriter.first( sgnode.attributes["long_name"].read()) if not override or not slabel: if slname: slabel = "%s[%s]" % (slname, frame) else: slabel = "%s[%s]" % (signal, frame) if sunits: slabel = "%s (%s)" % (slabel, sunits) if (not override or not title) and nxdata.parent is not None and \ "title" in nxdata.parent.names(): title = filewriter.first(nxdata.parent.open("title").read()) return self._plot2d(sdata, slabel, title, scancmd=scancmd) def _nxsplot2d(self, sgnode, signal, slabel, title, override=False, frame=None, scancmd=None): """ create plot 1d from nexus file :param sgnode: nexus signal field node :type sgnode: class:`filewriter.FTField` :param signal: signal name :type signal: :obj:`str` :param slabel: s-label :type slabel: :obj:`str` :param title: title :type title: :obj:`str` :param override: override nexus attributes flag :type override: :obj:`bool` :param frame: frame number to display :type frame: :obj:`int` :param scancmd: scan command :type scancmd: :obj:`str` :returns: thumbnail string :rtype: :obj:`str` """ signal = sgnode.name nxdata = sgnode.parent sdata = sgnode.read() sunits = None slname = None if "units" in sgnode.attributes.names(): sunits = filewriter.first( sgnode.attributes["units"].read()) if "long_name" in sgnode.attributes.names(): slname = filewriter.first( sgnode.attributes["long_name"].read()) if not override or not slabel: if slname: slabel = slname elif not slabel: slabel = signal if sunits: slabel = "%s (%s)" % (slabel, sunits) if (not override or not title) and nxdata.parent is not None and \ "title" in nxdata.parent.names(): title = filewriter.first(nxdata.parent.open("title").read()) return self._plot2d(sdata, slabel, title, scancmd=scancmd) def _nxsplot1d(self, sgnode, signal, axes, slabel, xlabel, ylabel, title, override=False, scancmd=None): """ create plot 1d from nexus file :param sgnode: nexus signal field node :type sgnode: class:`filewriter.FTField` :param signal: signal name :type signal: :obj:`str` :param axes: axes names :type axes: :obj:`list`<:obj:`str`> :param slabel: s-label :type slabel: :obj:`str` :param xlabel: x-label :type xlabel: :obj:`str` :param ylabel: y-label :type ylabel: :obj:`str` :param title: title :type title: :obj:`str` :param override: override nexus attributes flag :type override: :obj:`bool` :param scancmd: scan command :type scancmd: :obj:`str` :returns: thumbnail string :rtype: :obj:`str`, """ signal = sgnode.name nxdata = sgnode.parent attrs = nxdata.attributes if hasattr(nxdata, "names") and "axes" in attrs.names(): naxes = filewriter.first(attrs["axes"].read()) if not override and naxes: axes = [naxes] adata = [] anode = None axis = "" if axes: for ax in axes: if ax in nxdata.names(): try: anode = nxdata.open(ax) axis = ax adata = anode.read() break except Exception: pass sdata = sgnode.read() sunits = None aunits = None slname = None alname = None if "units" in sgnode.attributes.names(): sunits = filewriter.first( sgnode.attributes["units"].read()) if "long_name" in sgnode.attributes.names(): slname = filewriter.first( sgnode.attributes["long_name"].read()) if not override or not slabel: if slname: slabel = slname elif not slabel: slabel = signal if sunits: slabel = "%s (%s)" % (slabel, sunits) if anode is not None and "units" in anode.attributes.names(): aunits = filewriter.first(anode.attributes["units"].read()) if anode is not None and "long_name" in anode.attributes.names(): alname = filewriter.first( anode.attributes["long_name"].read()) if (not override or not xlabel) and axes and axes[0]: if alname: xlabel = alname elif not xlabel and axis: xlabel = axis if aunits: xlabel = "%s (%s)" % (xlabel, aunits) if (not override or not title) and nxdata.parent is not None and \ "title" in nxdata.parent.names(): title = filewriter.first(nxdata.parent.open("title").read()) return self._plot1d( sdata, adata, xlabel, slabel, title, scancmd=scancmd) def _plot1d(self, data, axis, xlabel, ylabel, title, maxo=25, scancmd=None, maxtitle=68): """ create oned thumbnail plot :param data: 1d signal data :type data: :obj:`list`<:obj:`float`> or :obj:`list`<:obj:`int`> :param axis: 1d axis data :type axis: :obj:`list`<:obj:`float`> or :obj:`list`<:obj:`int`> :param xlabel: x-label :type xlabel: :obj:`str` :param ylabel: y-label :type ylabel: :obj:`str` :param title: title :type title: :obj:`str` :param maxo: maximal number of point to display dots :type maxo: :obj:`int` :param scancmd: scan command :type scancmd: :obj:`str` :param maxtitle: maximal title size :type maxtitle: :obj:`int` :returns: thumbnail string :rtype: :obj:`str` """ pars = {} matplotlib.interactive(False) matplotlib.use('Agg') import matplotlib.pyplot as plt fig, ax = plt.subplots() if scancmd and len(scancmd) > maxtitle: # scancmd = splittext(scancmd) scancmd = scancmd[:maxtitle] if ylabel: if title: title = title else: title = ylabel if axis is not None and len(axis) == len(data): if len(data) < maxo: ax.plot(axis, data, 'o', axis, data) else: ax.plot(axis, data) ax.set(xlabel=xlabel, ylabel=ylabel, title=scancmd) if xlabel: pars["xlabel"] = xlabel else: axis = list(range(len(data))) if len(data) < maxo: ax.plot(axis, data, 'o', axis, data) else: ax.plot(axis, data) ax.set(ylabel=ylabel, title=scancmd) fig.suptitle(title, fontsize=20, y=1) if ylabel: pars["ylabel"] = ylabel if title: pars["suptitle"] = title if scancmd: pars["title"] = scancmd buffer = BytesIO() plt.savefig(buffer, format='png') plt.close(fig) buffer.seek(0) png_image = buffer.getvalue() buffer.close() # with open("/tmp/myttt.png", "wb") as fl: # fl.write(png_image) thumbnail = base64.b64encode(png_image) thumbnail = "data:image/png;base64," + thumbnail.decode('utf-8') return thumbnail, json.dumps(pars) def _plot2d(self, data, slabel, title, maxratio=10, scancmd=None, maxtitle=68): """ create oned thumbnail plot :param data: 1d signal data :type data: :obj:`list`<:obj:`float`> or :obj:`list`<:obj:`int`> :param slabel: signal-label :type slabel: :obj:`str` :param title: title :type title: :obj:`str` :param maxratio: max ratio to do not change aspect ratio :type maxratio: :obj:`float` :param scancmd: scan command :type scancmd: :obj:`str` :param maxtitle: maximal title size :type maxtitle: :obj:`int` :returns: thumbnail string :rtype: :obj:`str` """ pars = {} matplotlib.interactive(False) matplotlib.use('Agg') import matplotlib.pyplot as plt fig, ax = plt.subplots() shape = data.shape if int(shape[0]/shape[1]) > maxratio or \ int(shape[1]/shape[0]) > maxratio: ax.imshow(data, aspect='auto') pars["aspect"] = 'auto' else: ax.imshow(data) if scancmd and len(scancmd) > maxtitle: scancmd = scancmd[:maxtitle] # scancmd = splittext(scancmd) if slabel: if title: title = "%s: %s" % (title, slabel) else: title = slabel if title: fig.suptitle(title, fontsize=20, y=1) pars["suptitle"] = title if scancmd: ax.set(title=scancmd) pars["title"] = scancmd buffer = BytesIO() plt.savefig(buffer, format='png') plt.close(fig) buffer.seek(0) png_image = buffer.getvalue() buffer.close() thumbnail = base64.b64encode(png_image) thumbnail = "data:image/png;base64," + thumbnail.decode('utf-8') return thumbnail, json.dumps(pars)
[docs] def show(self, root, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` :param root: nexus file root :type root: :class:`filewriter.FTGroup` """ try: metadata = self.attachment(root, options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class Instrument(Runner): """ Instrument runner""" #: (:obj:`str`) command description description = "generate description of instrument" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo instrument -p /petra3/p00 -n P00 -m " \ + "~/cm.json \n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-p", "--pid", dest="pid", help=("instrument pid")) self._parser.add_argument( "-n", "--name", dest="name", help=("instrument name")) self._parser.add_argument( "-i", "--beamtimeid", dest="beamtimeid", help=("beamtime id")) self._parser.add_argument( "-b", "--beamline", dest="beamline", help=("beamline")) self._parser.add_argument( "-w", "--owner-group", default="", dest="ownergroup", help="owner group name. Default is {beamtimeid}-dmgt") self._parser.add_argument( "-c", "--access-groups", default=None, dest="accessgroups", help="access group names separated by commas. " "Default is {beamtimeId}-dmgt,{beamtimeid}-clbt,{beamtimeId}-part," "{beamline}dmgt,{beamline}staff") self._parser.add_argument( "-x", "--chmod", dest="chmod", help=("json metadata file mod bits, e.g. 0o662"))
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( "-m", "--custom-metadata", dest="custommeta", help=("instrument characteristics metadata file")) self._parser.add_argument( "-o", "--output", dest="output", help=("output scicat metadata file"))
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ self.show(options)
[docs] def instrument(self, options): """ create instrument metadata :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ dct = {} result = {} if hasattr(options, "pid") and options.pid: result["pid"] = options.pid if hasattr(options, "name") and options.name: result["name"] = options.name if options.ownergroup: result["ownerGroup"] = options.ownergroup if hasattr(options, "beamtimeid") and options.beamtimeid: if "ownerGroup" not in result: result["ownerGroup"] = "%s-dmgt" % (options.beamtimeid) if options.accessgroups is not None: accessgroups = options.accessgroups.split(",") result["accessGroups"] = accessgroups if "accessGroups" not in result: accessgroups = [] if hasattr(options, "beamtimeid") and options.beamtimeid: accessgroups = [ "%s-clbt" % (options.beamtimeid), "%s-part" % (options.beamtimeid), "%s-dmgt" % (options.beamtimeid)] if hasattr(options, "beamline") and options.beamline: accessgroups.extend([ "%sdmgt" % (options.beamline), "%sstaff" % (options.beamline) ]) if accessgroups: result["accessGroups"] = accessgroups if options.custommeta: with open(options.custommeta, "r") as fl: jstr = fl.read() try: dct = json.loads(jstr) except Exception: if jstr: nan = float('nan') # noqa: F841 dct = eval(jstr) if 'customMetadata' in dct.keys(): dct = dct['customMetadata'] result['customMetadata'] = dct return json.dumps( result, sort_keys=True, indent=4, cls=numpyEncoder)
[docs] def show(self, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` """ try: metadata = self.instrument(options) if metadata: if options.output: chmod = None try: chmod = int(options.chmod, 8) except Exception: options.chmod = None if options.chmod: oldmask = os.umask(0) def opener(path, flags): return os.open(path, flags, chmod) try: with open(options.output, "w", opener=opener) as fl: fl.write(metadata) except Exception: with open(options.output, "w") as fl: fl.write(metadata) os.chmod(options.output, chmod) os.umask(oldmask) else: with open(options.output, "w") as fl: fl.write(metadata) else: print(metadata) except Exception as e: sys.stderr.write("nxsfileinfo: '%s'\n" % str(e)) sys.stderr.flush() self._parser.print_help() sys.exit(255)
[docs]class Field(Runner): """ Field runner""" #: (:obj:`str`) command description description = "show field information for the nexus file" #: (:obj:`str`) command epilog epilog = "" \ + " examples:\n" \ + " nxsfileinfo field /user/data/myfile.nxs\n" \ + " nxsfileinfo field /user/data/myfile.nxs -g\n" \ + " nxsfileinfo field /user/data/myfile.nxs -s\n" \ + "\n"
[docs] def create(self): """ creates parser """ self._parser.add_argument( "-c", "--columns", help="names of column to be shown (separated by commas " "without spaces). The possible names are: " "depends_on, dtype, full_path, nexus_path, nexus_type, shape," " source, source_name, source_type, strategy, trans_type, " "trans_offset, trans_vector, units, value", dest="headers", default="") self._parser.add_argument( "-f", "--filters", help="full_path filters (separated by commas " "without spaces). Default: '*'. E.g. '*:NXsample/*'", dest="filters", default="") self._parser.add_argument( "-v", "--values", help="field names which value should be stored" " (separated by commas " "without spaces). Default: depends_on", dest="values", default="") self._parser.add_argument( "-g", "--geometry", action="store_true", default=False, dest="geometry", help="perform geometry full_path filters, i.e." "*:NXtransformations/*,*/depends_on. " "It works only when -f is not defined") self._parser.add_argument( "-s", "--source", action="store_true", default=False, dest="source", help="show datasource parameters") self._parser.add_argument( "--h5py", action="store_true", default=False, dest="h5py", help="use h5py module as a nexus reader") self._parser.add_argument( "--h5cpp", action="store_true", default=False, dest="h5cpp", help="use h5cpp module as a nexus reader")
[docs] def postauto(self): """ parser creator after autocomplete run """ self._parser.add_argument( 'args', metavar='nexus_file', type=str, nargs=1, help='new nexus file name')
[docs] def run(self, options): """ the main program function :param options: parser options :type options: :class:`argparse.Namespace` :returns: output information :rtype: :obj:`str` """ if options.h5cpp: writer = "h5cpp" elif options.h5py: writer = "h5py" elif "h5cpp" in WRITERS.keys(): writer = "h5cpp" else: writer = "h5py" if (options.h5cpp and options.h5py) or writer not in WRITERS.keys(): sys.stderr.write("nxsfileinfo: Writer '%s' cannot be opened\n" % writer) sys.stderr.flush() self._parser.print_help() sys.exit(255) wrmodule = WRITERS[writer.lower()] try: fl = filewriter.open_file( options.args[0], readonly=True, writer=wrmodule) except Exception: sys.stderr.write("nxsfileinfo: File '%s' cannot be opened\n" % options.args[0]) sys.stderr.flush() self._parser.print_help() sys.exit(255) root = fl.root() self.show(root, options) fl.close()
[docs] def show(self, root, options): """ the main function :param options: parser options :type options: :class:`argparse.Namespace` :param root: nexus file root :type root: class:`filewriter.FTGroup` """ #: (:obj:`list`< :obj:`str`>) \ # parameters which have to exists to be shown toshow = None #: (:obj:`list`< :obj:`str`>) full_path filters filters = [] #: (:obj:`list`< :obj:`str`>) column headers headers = ["nexus_path", "source_name", "units", "dtype", "shape", "value"] if options.geometry: filters = ["*:NXtransformations/*", "*/depends_on"] headers = ["nexus_path", "source_name", "units", "trans_type", "trans_vector", "trans_offset", "depends_on"] if options.source: headers = ["source_name", "nexus_type", "shape", "strategy", "source"] toshow = ["source_name"] #: (:obj:`list`< :obj:`str`>) field names which value should be stored values = ["depends_on"] if options.headers: headers = options.headers.split(',') if options.filters: filters = options.filters.split(',') if options.values: values = options.values.split(',') nxsparser = NXSFileParser(root) nxsparser.filters = filters nxsparser.valuestostore = values nxsparser.parse() description = [] ttools = TableTools(nxsparser.description, toshow) ttools.title = "File name: '%s'" % options.args[0] ttools.headers = headers description.extend(ttools.generateList()) print("\n".join(description))
[docs]def main(): """ the main program function """ description = "Command-line tool for showing meta data" \ + " from Nexus Files" epilog = 'For more help:\n nxsfileinfo <sub-command> -h' parser = NXSArgParser( description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter) parser.cmdrunners = [('field', Field), ('general', General), ('metadata', Metadata), ('groupmetadata', GroupMetadata), ('origdatablock', OrigDatablock), ('sample', Sample), ('instrument', Instrument), ('attachment', Attachment), ] runners = parser.createSubParsers() try: options = parser.parse_args() except ErrorException as e: sys.stderr.write("Error: %s\n" % str(e)) sys.stderr.flush() parser.print_help() print("") sys.exit(255) if options.subparser is None: sys.stderr.write( "Error: %s\n" % str("too few arguments")) sys.stderr.flush() parser.print_help() print("") sys.exit(255) result = runners[options.subparser].run(options) if result and str(result).strip(): print(result)
if __name__ == "__main__": main()