Source code for pyretis.core.pathensemble

# -*- coding: utf-8 -*-
# Copyright (c) 2019, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Classes and functions for path ensembles.

The classes and functions defined in this module are useful for
representing path ensembles.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

PathEnsemble (:py:class:`.PathEnsemble`)
    Class for defining path ensembles.

PathEnsembleExt (:py:class:`.PathEnsembleExt`)
    Class for defining path ensembles when we are working with
    paths stored on disk and not in memory only.
"""
import collections
import logging
import os
import shutil
import tarfile
from pyretis.inout.common import create_backup


logger = logging.getLogger(__name__)  # pylint: disable=C0103
logger.addHandler(logging.NullHandler())


__all__ = ['PathEnsemble', 'PathEnsembleExt']


PATH_DIR_FMT = '{:03d}'  # For naming path ensemble (and its output dir).


def _generate_file_names(path, target_dir, prefix=None):
    """Generate new file names for moving copying paths.

    Parameters
    ----------
    path : object like :py:class:`.PathBase`
        This is the path object we are going to store.
    target_dir : string
        The location were we are moving the path to.
    prefix : string or None
        The prefix can be used to prefix the name of the files.

    Returns
    -------
    out[0] : list
        A list with new file names.
    out[1] : dict
        A dict which defines the unique "source -> destination" for
        copy/move operations.

    """
    source = {}
    new_pos = [None for _ in range(len(path.pos))]
    for i, phasepoint in enumerate(path.trajectory(reverse=False)):
        pos_file, idx = phasepoint['pos']
        if pos_file not in source:
            localfile = os.path.basename(pos_file)
            if prefix is not None:
                localfile = '{}{}'.format(prefix, localfile)
            dest = os.path.join(target_dir, localfile)
            source[pos_file] = dest
        dest = source[pos_file]
        new_pos[i] = (dest, idx)
    return new_pos, source


[docs]class PathEnsemble: """Representation of a path ensemble. This class represents a collection of `Paths` in a path ensemble. In general paths may be 'long and complicated' so here, we really just store an simplified abstraction of the path, which is obtained by the `Path.get_path_data()` function for a given `Path` object. The returned dictionary is stored in the list `PathEnsemble.paths`. The only full path we store, is the last accepted path. This is convenient for the RETIS method where paths may be swapped between path ensembles. Attributes ---------- ensemble : integer This integer is used to represent the path ensemble, for retis simulations it's useful to identify the path ensemble. The path ensembles are numbered sequentially 0, 1, 2, etc. This corresponds to ``[0^-]``, ``[0^+]``, ``[1^+]``, etc. ensemble_name : string A string which can be used for printing the ensemble name ensemble_name_simple : string A string with a simpler representation of the ensemble name, can be used for creating output files etc. interfaces : list of floats Interfaces, specified with the values for the order parameters: `[left, middle, right]`. detect : float Interface to use for analysis. paths : list This list contains the stored information for the paths. Here we only store the data returned by calling the `get_path_data()` function of the `Path` object. nstats : dict of ints This dict store some statistics for the path ensemble. The keys are: * npath : The number of paths stored. * nshoot : The number of accepted paths generated by shooting. * ACC, BWI, ... : Number of paths with given status (from `_STATUS`). maxpath : int The maximum number of paths to store. last_path : object like :py:class:`.PathBase` This is the last **accepted** path. """ def __init__(self, ensemble, interfaces, detect=None, maxpath=10000, exe_dir=None): """Initialise the PathEnsemble object. Parameters ---------- ensemble : integer An integer used to identify the ensemble. interfaces : list of floats These are the interfaces specified with the values for the order parameters: [left, middle, right] detect : float, optional The interface used for detecting successful path in the analysis. maxpath : integer, optional The maximum number of paths to store information for in memory. Note, that this will not influence the analysis as long as you are using the output files when running the analysis. exe_dir : string, optional The base folder where the simulation was executed from. This is used to set up output directories for the path ensemble. """ self.ensemble = ensemble self.interfaces = tuple(interfaces) # Should not change interfaces self.detect = detect # detect interface to use for analysis self.last_path = None self.nstats = {'npath': 0, 'nshoot': 0, 'ACC': 0} self.paths = [] self.maxpath = maxpath if self.ensemble == 0: self.ensemble_name = '[0^-]' self.start_condition = 'R' else: self.ensemble_name = '[{}^+]'.format(self.ensemble - 1) self.start_condition = 'L' self.ensemble_name_simple = PATH_DIR_FMT.format(self.ensemble) self.directory = collections.OrderedDict() self.directory['path-ensemble'] = None self.directory['accepted'] = None self.directory['generate'] = None self.directory['traj'] = None if exe_dir is not None: path_dir = os.path.join(exe_dir, self.ensemble_name_simple) self.update_directories(path_dir)
[docs] def directories(self): """Yield the directories PyRETIS should make.""" yield self.directory['path-ensemble']
[docs] def update_directories(self, path): """Update directory names. This method will not create new directories, but it will update the directory names. Parameters ---------- path : string The base path to set. """ for key, val in self.directory.items(): if key == 'path-ensemble': self.directory[key] = path else: self.directory[key] = os.path.join(path, key) if val is None: logger.debug('Setting directory "%s" to %s', key, self.directory[key]) else: logger.debug('Updating directory "%s": %s -> %s', key, val, self.directory[key])
[docs] def reset_data(self): """Erase the stored data in the path ensemble. It can be used in combination with flushing the data to a file in order to periodically write and empty the amount of data stored in memory. Notes ----- We do not reset `self.last_path` as this might be used in the RETIS function. """ self.paths = [] for key in self.nstats: self.nstats[key] = 0
[docs] def store_path(self, path): """Store a new accepted path in the path ensemble. Parameters ---------- path : object like :py:class:`.PathBase` The path we are going to store. Returns ------- None, but we update self.last_path """ self.last_path = path
[docs] def add_path_data(self, path, status, cycle=0): """Append data from the given path to `self.path_data`. This will add the data from a given` path` to the list path data for this ensemble. If will also update `self.last_path` if the given `path` is accepted. Parameters ---------- path : object like :py:class:`.PathBase` This is the object to store data from. status : string This is the status of the path. Note that the path object also have a status property. However this one might not be set, for instance when path is just None. We therefore use `status` here as a parameter. cycle : int, optional The current cycle number """ if len(self.paths) >= self.maxpath: # This is just to limit the data we keep in memory in # case of really long simulations. logger.debug(('Path-data memory storage reset for ensemble %s.\n' 'This is just to limit the amount of data we store ' 'in memory.\nThis will *NOT* influence the ' 'simulation'), self.ensemble_name) self.paths = [] # update statistics: if path is None: # Here we add a dummy path with minimal info. This is because we # could not generate a path for some reason which should be # specified by the status. path_data = {'status': status, 'generated': ('', 0, 0, 0)} else: path_data = path.get_path_data(status, self.interfaces) if path_data['status'] == 'ACC': # store the path self.store_path(path) if path_data['generated'][0] == 'sh': self.nstats['nshoot'] += 1 path_data['cycle'] = cycle # also store cycle number self.paths.append(path_data) # store the new data # update some statistics: try: self.nstats[status] += 1 except KeyError: # this is the first occurrence of the status: self.nstats[status] = 1 self.nstats['npath'] += 1
[docs] def get_accepted(self): """Yield accepted paths from the PathEnsemble. This function will give an iterator useful for iterating over accepted paths only. In the PathEnsemble we store both accepted and rejected paths. This function will loop over all paths stored and yield the accepted paths the correct number of times. """ last_path = None for path in self.paths: if path['status'] == 'ACC': last_path = path yield last_path
[docs] def get_acceptance_rate(self): """Return acceptance rate for the path ensemble. The acceptance rate is obtained as the fraction of accepted paths to the total number of paths in the path ensemble. This will only consider the paths that are currently stored in `self.paths`. Returns ------- out : float The acceptance rate. """ acc = 0 npath = 0 for path in self.paths: if path['status'] == 'ACC': acc += 1 npath += 1 return float(acc) / float(npath)
[docs] def get_paths(self): """Yield the different paths stored in the path ensemble. It is included here in order to have a simple compatibility between the `PathEnsemble` object and the `PathEnsembleFile` object defined in `pyretis.inout`. This is useful for the analysis. Yields ------ out : dict This is the dictionary representing the path data. """ for path in self.paths: yield path
[docs] def move_path_to_generated(self, path, prefix=None): """Move a path for temporary storing.""" pass
[docs] def __str__(self): """Return a string with some info about the path ensemble.""" msg = ['Path ensemble: {}'.format(self.ensemble_name)] msg += ['\tInterfaces: {}'.format(self.interfaces)] if self.detect is not None: msg += ['\tDetect: {}'.format(self.detect)] if self.nstats['npath'] > 0: npath = self.nstats['npath'] nacc = self.nstats.get('ACC', 0) msg += ['\tNumber of paths stored: {}'.format(npath)] msg += ['\tNumber of accepted paths: {}'.format(nacc)] ratio = float(nacc) / float(npath) msg += ['\tRatio accepted/total paths: {}'.format(ratio)] return '\n'.join(msg)
[docs] def restart_info(self): """Return a dictionary with restart information.""" restart = { 'nstats': self.nstats, 'interfaces': self.interfaces, 'detect': self.detect, 'ensemble': self.ensemble, } if self.last_path: restart['last_path'] = self.last_path.restart_info() return restart
[docs] def load_restart_info(self, path, info, cycle=0): """Load restart information. Parameters ---------- path : object like :py:class:`.PathBase` A object we can load the stored path into. info : dict A dictionary with the restart information. cycle : integer The current simulation cycle. """ self.nstats = info['nstats'] for attr in ('interfaces', 'detect', 'ensemble'): if info[attr] != getattr(self, attr): logger.warning('Inconsistent ensemble restart info for %s', attr) path.load_restart_info(info['last_path']) path_data = path.get_path_data('ACC', self.interfaces) path_data['cycle'] = cycle self.last_path = path self.paths.append(path_data)
[docs]class PathEnsembleExt(PathEnsemble): """Representation of a path ensemble. This class is similar to :py:class:`.PathEnsemble` but it is made to work with external paths. That is, some extra file handling is done when accepting a path. """ def __init__(self, ensemble, interfaces, detect=None, maxpath=10000, exe_dir=None): """Initialise the PathEnsembleExt object. Parameters ---------- ensemble : integer An integer used to identify the ensemble. interfaces : list of floats These are the interfaces specified with the values for the order parameters: [left, middle, right] """ super().__init__(ensemble, interfaces, detect=detect, maxpath=maxpath, exe_dir=exe_dir) self._traj_file = os.path.join(self.directory['traj'], 'traj.tar')
[docs] def directories(self): """Yield the directories PyRETIS should make.""" for key in self.directory: yield self.directory[key]
@staticmethod def _move_path(path, target_dir, prefix=None): """Move a path to a given target directory. Parameters ---------- path : object like :py:class:`.PathBase` This is the path object we are going to store. target_dir : string The location were we are moving the path to. prefix : string or None To give a prefix to the name of moved files. """ logger.debug('Moving path to %s', target_dir) new_pos, source = _generate_file_names(path, target_dir, prefix=prefix) path.pos = new_pos for src, dest in source.items(): if src == dest: logger.debug('Skipping move %s -> %s', src, dest) else: if os.path.exists(dest): if os.path.isfile(dest): logger.debug('Removing %s as it exists', dest) os.remove(dest) logger.debug('Moving %s -> %s', src, dest) shutil.move(src, dest) @staticmethod def _copy_path(path, target_dir, prefix=None): """Copy a path to a given target directory. Parameters ---------- path : object like :py:class:`.PathBase` This is the path object we are going to store. target_dir : string The location were we are moving the path to. Returns ------- out : object like py:class:`.PathBase` A copy of the input path. """ new_pos, source = _generate_file_names(path, target_dir, prefix=prefix) path_copy = path.copy_path() path_copy.pos = new_pos for src, dest in source.items(): shutil.copy(src, dest) return path_copy
[docs] def store_path(self, path): """Store a path by explicitly moving it. Parameters ---------- path : object like :py:class:`.PathBase` This is the path object we are going to store. """ self._move_path(path, self.directory['accepted']) self.last_path = path for entry in self.list_superfluous(): try: os.remove(entry) except OSError: # pragma: no cover pass
[docs] def list_superfluous(self): """List files in accepted directory that we do not need.""" last = set() if self.last_path: for phasepoint in self.last_path.trajectory(reverse=False): pos_file, _ = phasepoint['pos'] last.add(pos_file) for entry in os.scandir(self.directory['accepted']): if entry.is_file() and entry.path not in last: yield entry.path
[docs] def move_path_to_generated(self, path, prefix=None): """Move a path for temporary storing.""" self._move_path(path, self.directory['generate'], prefix=prefix)
[docs] def generate_output(self, cycle, path): """Output a trajectory by adding it to a tar file. This method handles the "physical" output. Parameters ---------- cycle : dict The current cycle number dictionary as obtained from the simulation object. This is used to generate a unique name for the output file. path : object like :py:class:`.PathBase` The path to output. Returns ------- path_copy : object like :py:class:`.PathBase` A path like the input `path`, but with updated file names. """ new_pos, source = _generate_file_names( path, self.directory['traj'], prefix='{}_'.format(cycle['step'])) path_copy = path.copy_path() path_copy.pos = new_pos try: with tarfile.open(self._traj_file, 'a') as tar: for src, dest in source.items(): tar.add(src, arcname=os.path.basename(dest)) except tarfile.ReadError: logger.warning('Could not open trajectory: "%s"', self._traj_file) logger.info('Will backup and create new file.') logtxt = create_backup(self._traj_file) logger.info(logtxt) with tarfile.open(self._traj_file, 'w') as tar: for src, dest in source.items(): tar.add(src, arcname=os.path.basename(dest)) except OSError: # pragma: no cover logger.warning( 'Could not find trajectory: "%s". Will not write.', self._traj_file ) return path_copy
[docs] def load_restart_info(self, path, info, cycle=0): """Load restart for external path.""" super().load_restart_info(path, info, cycle=cycle) # Update file names: directory = self.directory['accepted'] new_pos = [] for pos in path.pos: filename = os.path.basename(pos[0]) new_file_name = os.path.join(directory, filename) if not os.path.isfile(new_file_name): logger.critical('The restart path "%s" does not exist', new_file_name) new_pos.append((new_file_name, pos[1])) path.pos = new_pos
def get_path_ensemble_class(ensemble_type): """Return the path ensemble class consistent with the given engine. Parameters ---------- ensemble_type : string The type of ensemble we are requesting. """ path_ensemble_map = {'internal': PathEnsemble, 'external': PathEnsembleExt} try: return path_ensemble_map[ensemble_type] except KeyError: msg = 'Unknown ensemble type "{}" requested.'.format(ensemble_type) logger.critical(msg) raise ValueError(msg)