Source code for pyretis.inout.common

# -*- coding: utf-8 -*-
# Copyright (c) 2021, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains common functions for the input/output.

It contains some slave functions that are used in the in/output function

Important classes defined here

OutputBase (:py:class:`.OutputBase`)
    A base class for handling the output.

Important methods defined here

check_python_version (:py:func:`.check_python_version`)
    A method that will give warnings when we use older and possibly
    unsupported Python versions.

create_backup (:py:func:`.create_backup`)
    A function to handle the creation of backups of old files.

make_dirs (:py:func:`.make_dirs`)
    Create directories (for path simulation).

create_empty_ensembles (:py:func:`.create_ensembles`)
    A method to prepare the ensembles inputs in settings

simplify_ensemble_name (:py:func:`.simplify_ensemble_name`)
    Simplify the name of ensembles for creating directories.

generate_file_name (:py:func:`.generate_file_name`)
    Generate file name for an output task, from settings.

import logging
import errno
import os
import re
import sys
from abc import ABCMeta, abstractmethod
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name

__all__ = [

# Hard-coded patters for energy analysis output files.
# These are just used to make it simpler to change these default
# names in the future.
ENERFILES = {'energies': 'energies',
             'run_energies': 'runenergies',
             'temperature': 'temperature',
             'run_temp': 'runtemperature',
             'block': '{}block',
             'dist': '{}dist'}
# hard-coded information for the energy terms:
ENERTITLE = {'vpot': 'Potential energy',
             'ekin': 'Kinetic energy',
             'etot': 'Total energy',
             'ham': 'Hamilt. energy',
             'temp': 'Temperature',
             'elec': 'Energy (externally computed)'}
# hard-coded patters for flux analysis output files:
FLUXFILES = {'runflux': 'runflux_{}',
             'block': 'errflux_{}'}
# order files:
ORDERFILES = {'order': 'order',
              'ordervel': 'orderv',
              'run_order': 'runorder',
              'dist': 'orderdist',
              'block': 'ordererror',
              'msd': 'ordermsd'}
# hard-coded patters for path analysis output files:
PATHFILES = {'pcross': '{}_pcross',
             'prun': '{}_prun',
             'perror': '{}_perror',
             'lengtherror': '{}_lerror',
             'pathlength': '{}_lpath',
             'shoots': '{}_shoots',
             'shoots_scaled': '{}_shoots_scaled'}
# hard-coded patterns for matched files:
PATH_MATCH = {'total': 'total-probability',
              'progress': 'overall-rrun',
              'error': 'overall-err',
              'match': 'matched-probability'}

[docs]def create_backup(outputfile): """Check if a file exist and create backup if requested. This function will check if the given file name exists and if it does, it will move that file to a new file name such that the given one can be used without overwriting. Parameters ---------- outputfile : string This is the name of the file we wish to create. Returns ------- out : string This string is None if no backup is made, otherwise, it will just say what file was moved (and to where). Note ---- No warning is issued here. This is just in case the `msg` returned here will be part of some more elaborate message. """ filename = str(outputfile) fileid = 0 msg = None while os.path.isfile(filename) or os.path.isdir(filename): filename = f'{outputfile}_{fileid:03d}' fileid += 1 if fileid > 0: msg = f'Backup existing file "{outputfile}" to "{filename}"' os.rename(outputfile, filename) return msg
[docs]def _remove_extension(filename): """Remove the extension of a given file name. Parameters ---------- filename : string The file name to check. Returns ------- out : string The filename with the extension removed. """ try: return os.path.splitext(filename)[0] except IndexError: # pragma: no cover return filename
[docs]def make_dirs(dirname): """Create directories for path simulations. This function will create a folder using a specified path. If the path already exists and if it's a directory, we will do nothing. If the path exists and is a file we will raise an `OSError` exception here. Parameters ---------- dirname : string This is the directory to create. Returns ------- out : string A string with some info on what this function did. Intended for output. """ try: os.makedirs(dirname) msg = f'Created directory: "{dirname}"' except OSError as err: if err.errno != errno.EEXIST: # pragma: no cover raise err if os.path.isfile(dirname): msg = f'"{dirname}" is a file. Will abort!' raise OSError(errno.EEXIST, msg) from err if os.path.isdir(dirname): msg = f'Directory "{dirname}" already exist.' return msg
[docs]def simplify_ensemble_name(ensemble, fmt='{:03d}'): """Simplify path names for file/directory names. Here, we are basically translating ensemble names to more friendly names for directories and files that is: - ``[0^-]`` returns ``000``, - ``[0^+]`` returns ``001``, - ``[1^+]`` returns ``002``, etc. Parameters ---------- ensemble : string This is the string to simplify. fmt : string. optional This is a format to use for the directories. """ match_ensemble ='(?<=\[)(\d+)(?=\^)', ensemble) if match_ensemble: ens = int( else: match_ensemble ='(?<=\[)(\d+)(?=\])', ensemble) if match_ensemble: ens = int( else: return ensemble # Assume that the ensemble is OK as it is. match_dir ='(?<=\^)(.)(?=\])', ensemble) if match_dir: dire = if dire != '-': ens += 1 else: msg = [f'Could not get direction for ensemble {ensemble}.', 'We assume "+", note that this might overwrite files'] logger.warning('\n'.join(msg)) ens += 1 return fmt.format(ens)
def add_dirname(filename, dirname): """Add a directory as a prefix to a filename, i.e. `dirname/filename`. Parameters ---------- filename : string The filename. dirname : string The directory we want to prefix. It can be None, in which case we ignore it. Returns ------- out : string The path to the resulting file. """ if dirname is not None: return os.path.join(dirname, filename) return filename def name_file(name, extension, path=None): """Return a file name by joining a name and an file extension. This function is used to create file names. It will use `os.extsep` to create the file names and `os.path.join` to add a path name if the `path` is given. The returned file name will be of form (example for posix): ``path/name.extension``. Parameters ---------- name : string This is the name, without extension, for the file. extension : string The extension to use for the file name. path : string, optional An optional path to add to the file name. Returns ------- out : string The resulting file name. """ return add_dirname(os.extsep.join([name, extension]), path) def generate_file_name(basename, directory, settings): """Generate file name for an output task, from settings. Parameters ---------- basename : string The base file name to use. directory : string A directory to output to. Can be None to output to the current working directory. settings : dict The input settings Returns ------- filename : string The file name to use. """ prefix = settings['output'].get('prefix', None) if prefix is not None: filename = f'{prefix}{basename}' else: filename = basename filename = add_dirname(filename, directory) return filename
[docs]def check_python_version(): # pragma: no cover """Give a warning about old python version(s).""" pyversion = sys.version.split()[0] if sys.version_info < (3, 0): msgtxt = ('Please upgrade to Python 3.' f'\nPython {pyversion} is not supported!') logger.error(msgtxt) raise SystemExit(msgtxt)
[docs]class OutputBase(metaclass=ABCMeta): """A generic class for handling output. Attributes ---------- formatter : object like :py:class:`.OutputFormatter` The object responsible for formatting output. first_write : boolean Determines if we have written something yet, or if this is the first write. """ target = None """string or None : Determines the target for the output, for instance "screen" or "file"."""
[docs] def __init__(self, formatter): """Create the object and attach a formatter. Parameters ---------- formatter : object like :py:class:`.OutputFormatter` The object responsible for formatting output. """ self.formatter = formatter self.first_write = True
[docs] def output(self, step, data): """Use the formatter to write data to the file. Parameters ---------- step : int The current step number. data : list The data we are going to output. """ if self.first_write and self.formatter.print_header: self.first_write = False self.write(self.formatter.header) for line in self.formatter.format(step, data): self.write(line)
[docs] @abstractmethod def write(self, towrite, end='\n'): """Write a string to the output defined by this class. Parameters ---------- towrite : string The string to write. end : string, optional A "terminator" for the given string. Returns ------- status : boolean True if we managed to write, False otherwise. """ return
[docs] def formatter_info(self): """Return a string with info about the formatter.""" if self.formatter is not None: return self.formatter.__class__ return None
[docs] def __str__(self): """Return basic info.""" return f'{self.__class__.__name__}\n\t* Formatter: {self.formatter}'
[docs]def create_empty_ensembles(settings): """Create missing ensembles in the settings. Checks the input and allocate it to the right ensemble. In theory inouts shall include all these info, but it is not practical. Parameters ---------- settings : dict The current input settings. Returns ------- None, but this method might add data to the input settings. """ ints = settings['simulation']['interfaces'] # Determine how many ensembles are needed. # (In PyRETIS 2 the flux and zero ensemble are always considered) add0 = 1 # if some ensembles have inputs, they need to be kept. if 'ensemble' in settings: orig_set = settings['ensemble'].copy() else: orig_set = [] settings['ensemble'] = [] add = add0 # if in the main settings an ensemble_number is defined, then only # that ensemble will be considered. if 'tis' in settings: idx = settings['tis'].get('ensemble_number') if idx is not None: settings['ensemble'].append({'interface': ints[1], 'tis': {'ensemble_number': idx}}) for sav in orig_set: settings['ensemble'][0] = {**settings['ensemble'][0], **sav} return # if one wants to compute the flux, the 000 ensemble is for it # todo remove this labelling mismatch, and give to the flux # a flux name folder (instead of 000), and leave 000 for the O^+ ens. settings['ensemble'].append({'interface': ints[0], 'tis': {'ensemble_number': 0}}) for i in range(add, len(ints)): settings['ensemble'].append({'interface': ints[i - 1], 'tis': {'ensemble_number': i}}) # create the ensembles in setting, keeping eventual inputs. # nb. in the settings, specific input for an ensemble can be now given. for i_ens, ens in enumerate(settings['ensemble']): for sav in orig_set: if 'tis' in sav and 'ensemble_number' in sav['tis']: if ens['tis']['ensemble_number'] ==\ sav['tis']['ensemble_number']: settings['ensemble'][i_ens].update(sav) elif ens['interface'] == sav['interface']: settings['ensemble'][i_ens].update(sav) return