Source code for pyretis.core.path

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Classes and functions for paths.

The classes and functions defined in this module are useful for
representing paths.


Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

PathBase (:py:class:`.PathBase`)
    A base class for paths.

Path (:py:class:`.Path`)
    Class for a generic path that stores all possible information.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

paste_paths  (:py:func:`.paste_paths`)
    Function for joining two paths, one is in a backward time
    direction and the other is in the forward time direction.
"""
from abc import abstractmethod

import logging
import numpy as np
from pyretis.core.random_gen import create_random_generator
from pyretis.core.system import System

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['PathBase', 'Path', 'paste_paths', 'check_crossing']

# The following defines a human-readable form of the possible path status:
_STATUS = {
    'ACC': 'The path has been accepted',
    'MCR': 'Momenta change rejection',
    'BWI': 'Backward trajectory end at wrong interface',
    'BTL': 'Backward trajectory too long (detailed balance condition)',
    'BTX': 'Backward trajectory too long (max-path exceeded)',
    'BTS': 'Backward trajectory too short',
    'EWI': 'Initial path ends at wrong interface',
    'EXP': 'Exploration path',
    'FTL': 'Forward trajectory too long (detailed balance condition)',
    'FTX': 'Forward trajectory too long (max-path exceeded)',
    'FTS': 'Forward trajectory too short',
    'HAS': 'High Acceptance Swap rejection for SS/WF detailed balance',
    'KOB': 'Kicked outside of boundaries',
    'NCR': 'No crossing with middle interface',
    'NSG': 'Path has no suitable segments',
    'NSS': 'No one-step crossing in stone skipping',
    'SSA': 'Stone Skipping super detailed balance rejection',
    'WFA': 'Wire Fencing super detailed balance rejection',
    'SWI': 'Initial path starts at wrong interface',
    'WTA': 'Web Throwing super detailed balance rejection',
    'TSS': 'Target swap selection rejection',
    'TSA': 'Target swap detailed balance rejection',
    'XSS': 'SS sub path too long in stone skipping',
    '0-L': 'Path in the {0-} ensemble ends at the left interface',
    'SWD': 'REPPTIS swap propagation in wrong direction',
    'SWH': 'REPPTIS swap first half extension rejected',
}


# The following defines a human-readable form of the possible moves:
_GENERATED = {
    'sh': 'Path was generated with a shooting move',
    'is': 'Path was generated by shooting initially prior to Stone Skipping',
    'tr': 'Path was generated with a time-reversal move',
    'ki': 'Path was generated by integration after kicking',
    're': 'Path was loaded from formatted external file(s)',
    'ld': 'Path was loaded from unformatted external file(s)',
    's+': 'Path was generated by a swapping move from +',
    's-': 'Path was generated by a Swapping move from -',
    'ss': 'Path was generated by Stone Skipping',
    'wt': 'Path was generated by Web Throwing',
    'wf': 'Path was generated by Wire Fencing',
    '00': 'Path was generated by a null move',
    'mr': 'Path was generated by a mirror move',
    'ts': 'Path was generated by a target swap move',
}


# Short versions of the moves:
_GENERATED_SHORT = {
    'sh': 'Shoot',
    'tr': 'Time-reversal',
    'ki': '"Kick" initiation',
    're': 'Formatted ext file load',
    'ld': 'Unformatted ext file load',
    's+': 'Swap from +',
    's-': 'Swap from -',
    'ss': 'Stone skipping',
    'wt': 'Web Throwing',
    'wf': 'Wire Fencing',
    '00': 'Null',
    'mr': 'mirror',
    'ts': 'target swap',
}


[docs]def paste_paths(path_back, path_forw, overlap=True, maxlen=None): """Merge a backward with a forward path into a new path. The resulting path is equal to the two paths stacked, in correct time. Note that the ordering is important here so that: ``paste_paths(path1, path2) != paste_paths(path2, path1)``. There are two things we need to take care of here: - `path_back` must be iterated in reverse (it is assumed to be a backward trajectory). - we may have to remove one point in `path2` (if the paths overlap). Parameters ---------- path_back : object like :py:class:`.PathBase` This is the backward trajectory. path_forw : object like :py:class:`.PathBase` This is the forward trajectory. overlap : boolean, optional If True, `path_back` and `path_forw` have a common starting-point, that is, the first point in `path_forw` is identical to the first point in `path_back`. In time-space, this means that the *first* point in `path_forw` is identical to the *last* point in `path_back` (the backward and forward path started at the same location in space). maxlen : float, optional This is the maximum length for the new path. If it's not given, it will just be set to the largest of the `maxlen` of the two given paths. Note ---- Some information about the path will not be set here. This must be set elsewhere. This includes how the path was generated (`path.generated`) and the status of the path (`path.status`). """ if maxlen is None: if path_back.maxlen == path_forw.maxlen: maxlen = path_back.maxlen else: # They are unequal and both is not None, just pick the largest. # In case one is None, the other will be picked. # Note that now there is a chance of truncating the path while # pasting! maxlen = max(path_back.maxlen, path_forw.maxlen) msg = f'Unequal length: Using {maxlen} for the new path!' logger.warning(msg) time_origin = path_back.time_origin - path_back.length + 1 new_path = path_back.empty_path(maxlen=maxlen, time_origin=time_origin) for phasepoint in reversed(path_back.phasepoints): app = new_path.append(phasepoint) if not app: msg = 'Truncated while pasting backwards at: {}' msg = msg.format(new_path.length) logger.warning(msg) return new_path first = True for phasepoint in path_forw.phasepoints: if first and overlap: first = False continue app = new_path.append(phasepoint) if not app: logger.warning("Truncated path at: %d", new_path.length) return new_path return new_path
[docs]def check_crossing(cycle, orderp, interfaces, leftside_prev): """Check if we have crossed an interface during the last step. This function is useful for checking if an interface was crossed from the previous step till the current one. This is for instance used in the MD simulations for the initial flux. If will use a variable to store the previous positions with respect to the interfaces and check if interfaces were crossed here. Parameters ---------- cycle : int This is the current simulation cycle number. orderp : float The current order parameter. interfaces : list of floats These are the interfaces to check. leftside_prev : list of booleans or None These are used to store the previous positions with respect to the interfaces. Returns ------- leftside_curr : list of booleans These are the updated positions with respect to the interfaces. cross : list of tuples If a certain interface is crossed, a tuple will be added to this list. The tuple is of form (cycle number, interface number, direction) where the direction is '-' for a crossing in the negative direction and '+' for a crossing in the positive direction. """ cross = [] if leftside_prev is None: leftside_curr = [orderp < interf for interf in interfaces] else: leftside_curr = leftside_prev for i, (left, interf) in enumerate(zip(leftside_prev, interfaces)): if left and orderp > interf: # Was on the left side, moved to the right side. leftside_curr[i] = False cross.append((cycle, i, '+')) elif not left and orderp < interf: # Was on the right side, moved to the left side. leftside_curr[i] = True cross.append((cycle, i, '-')) return leftside_curr, cross
[docs]class PathBase: """Base class for representation of paths. This class represents a path. A path consists of a series of consecutive snapshots (the trajectory) with the corresponding order parameter. Attributes ---------- generated : tuple This contains information on how the path was generated. `generated[0]` : string, as defined in the variable `_GENERATED` `generated[1:]` : additional information: For ``generated[0] == 'sh'`` the additional information is the index of the shooting point on the old path, the new path and the corresponding order parameter. maxlen : int This is the maximum path length. Some algorithms require this to be set. Others don't, which is indicated by setting `maxlen` equal to None. ordermax : tuple This is the (current) maximum order parameter for the path. `ordermax[0]` is the value, `ordermax[1]` is the index in `self.path`. ordermin : tuple This is the (current) minimum order parameter for the path. `ordermin[0]` is the value, `ordermin[1]` is the index in `self.path`. phasepoints : list of objects like :py:class:`.System` The phase points the path is made up of. rgen : object like :py:class:`.RandomGenerator` This is the random generator that will be used for the paths that required random numbers. time_origin : int This is the location of the phase point `path[0]` relative to its parent. This might be useful for plotting. status : str or None The status of the path. The possibilities are defined in the variable `_STATUS`. weight : real The statistical weight of the path. """
[docs] def __init__(self, rgen=None, maxlen=None, time_origin=0): """Initialise the PathBase object. Parameters ---------- rgen : object like :py:class:`.RandomGenerator`, optional This is the random generator that will be used. maxlen : int, optional This is the max-length of the path. The default value, None, is just a path of arbitrary length. time_origin : int, optional This can be used to store the shooting point of a parent trajectory. """ self.maxlen = maxlen self.weight = 1. self.time_origin = time_origin self.status = None self.generated = None self.phasepoints = [] if rgen is None: rgen = create_random_generator() self.rgen = rgen
@property def length(self): """Compute the length of the path.""" return len(self.phasepoints) @property def ordermin(self): """Compute the minimum order parameter of the path.""" idx = np.argmin([i.order[0] for i in self.phasepoints]) return (self.phasepoints[idx].order[0], idx) @property def ordermax(self): """Compute the maximum order parameter of the path.""" idx = np.argmax([i.order[0] for i in self.phasepoints]) return (self.phasepoints[idx].order[0], idx)
[docs] def check_interfaces(self, interfaces): """Check current status of the path. Get the current status of the path with respect to the interfaces. This is intended to determine if we have crossed certain interfaces or not. Parameters ---------- interfaces : list of floats This list is assumed to contain the three interface values left, middle and right. Returns ------- out[0] : str, 'L' or 'R' or None Start condition: did the trajectory start at the left ('L') or right ('R') interface. out[1] : str, 'L' or 'R' or None Ending condition: did the trajectory end at the left ('L') or right ('R') interface or None of them. out[2] str, 'M' or '*' 'M' if the middle interface is crossed, '*' otherwise. out[3] : list of boolean These values are given by `ordermin < interfaces[i] <= ordermax`. """ if self.length < 1: logger.warning('Path is empty!') return None, None, None, None ordermax, ordermin = self.ordermax[0], self.ordermin[0] cross = [ordermin < interpos <= ordermax for interpos in interfaces] left, right = min(interfaces), max(interfaces) # Check end & start point: end = self.get_end_point(left, right) start = self.get_start_point(left, right) middle = 'M' if cross[1] else '*' return start, end, middle, cross
[docs] def get_end_point(self, left, right=None): """Return the end point of the path as a string. The end point is either to the left of the `left` interface or to the right of the `right` interface, or somewhere in between. Parameters ---------- left : float The left interface. right : float, optional The right interface, equal to left if not specified. Returns ------- out : string A string representing where the end point is ('L' - left, 'R' - right or None). """ if right is None: right = left assert left <= right if self.phasepoints[-1].order[0] <= left: end = 'L' elif self.phasepoints[-1].order[0] >= right: end = 'R' else: end = None logger.debug('Undefined end point.') return end
[docs] def get_start_point(self, left, right=None): """Return the start point of the path as a string. The start point is either to the left of the `left` interface or to the right of the `right` interface. Parameters ---------- left : float The left interface. right : float, optional The right interface, equal to left if not specified. Returns ------- out : string A string representing where the start point is ('L' - left, 'R' - right or None). """ if right is None: right = left assert left <= right if self.phasepoints[0].order[0] <= left: start = 'L' elif self.phasepoints[0].order[0] >= right: start = 'R' else: start = None logger.debug('Undefined starting point.') return start
[docs] @abstractmethod def get_shooting_point(self): """Return a shooting point from the path. Returns ------- phasepoint : object like :py:class:`.System` A phase point which will be the state to shoot from. idx : int The index of the shooting point. """ return
[docs] def append(self, phasepoint): """Append a new phase point to the path. Parameters ---------- out : object like :py:class:`.System` The system information we add to the path. """ if self.maxlen is None or self.length < self.maxlen: self.phasepoints.append(phasepoint) return True logger.debug('Max length exceeded. Could not append to path.') return False
[docs] def get_path_data(self, status, interfaces): """Return information about the path. This information can be stored in a object like :py:class:`.PathEnsemble`. Parameters ---------- status : string This represents the current status of the path. interfaces : list These are just the interfaces we are currently considering. """ path_info = { 'generated': self.generated, 'status': status, 'length': self.length, 'ordermax': self.ordermax, 'ordermin': self.ordermin, 'weight': self.weight, } start, end, middle, _ = self.check_interfaces(interfaces) path_info['interface'] = (start, middle, end) return path_info
[docs] def set_move(self, move): """Update the path move. The path move is a short string that represents how the path was generated. It should preferably match one of the moves defined in `_GENERATED`. Parameters ---------- move : string A short description of the move. """ if self.generated is None: self.generated = (move, 0, 0, 0) else: self.generated = (move, self.generated[1], self.generated[2], self.generated[3])
[docs] def get_move(self): """Return the move used to generate the path.""" if self.generated is None: return None return self.generated[0]
[docs] def success(self, target_interface): """Check if the path is successful. The check is based on the maximum order parameter and the value of `target_interface`. It is successful if the maximum order parameter is greater than `target_interface`. Parameters ---------- target_interface : float The value for which the path is successful, i.e. the "target_interface" interface. """ return self.ordermax[0] > target_interface
[docs] def __iadd__(self, other): """Add path data to a path from another path, i.e. ``self += other``. This will simply append the phase points from `other`. Parameters ---------- other : object of type `Path` The object to add path data from. Returns ------- self : object of type `Path` The updated path object. """ for phasepoint in other.phasepoints: app = self.append(phasepoint.copy()) if not app: logger.warning( 'Truncated path at %d while adding paths', self.length ) return self return self
[docs] def copy(self): """Return a copy of the path.""" new_path = self.empty_path() for phasepoint in self.phasepoints: new_path.append(phasepoint.copy()) new_path.status = self.status new_path.time_origin = self.time_origin new_path.generated = self.generated new_path.maxlen = self.maxlen new_path.weight = self.weight return new_path
[docs] @staticmethod def reverse_velocities(system): """Reverse the velocities in the phase points.""" system.particles.reverse_velocities()
[docs] def reverse(self, order_function=False, rev_v=True): """Reverse a path and return the reverse path as a new path. This will reverse a path and return the reversed path as a new object like :py:class:`.PathBase` object. Returns ------- new_path : object like :py:class:`.PathBase` The time reversed path. order_function : object like :py:class:`.OrderParameter`, optional The method to use to re-calculate the order parameter, if it is velocity dependent. rev_v : boolean, optional If True, also the velocities are reversed, if False, the velocities for each frame are not altered. """ new_path = self.empty_path() new_path.weight = self.weight new_path.maxlen = self.maxlen for phasepoint in reversed(self.phasepoints): new_point = phasepoint.copy() if rev_v: self.reverse_velocities(new_point) app = new_path.append(new_point) if not app: # pragma: no cover msg = 'Could not reverse path' logger.error(msg) return None if order_function and order_function.velocity_dependent and rev_v: for phasepoint in new_path.phasepoints: phasepoint.order = order_function.calculate(phasepoint) return new_path
[docs] def __str__(self): """Return a simple string representation of the Path.""" msg = [f'Path with length {self.length} (max: {self.maxlen})'] msg += [f'Order parameter max: {self.ordermax}'] msg += [f'Order parameter min: {self.ordermin}'] if self.length > 0: msg += [f'Start {self.phasepoints[0].order[0]}'] msg += [f'End {self.phasepoints[-1].order[0]}'] if self.status: msg += [f'Status: {_STATUS[self.status]}'] if self.generated: move = self.generated[0] txtmove = _GENERATED.get(move, 'unknown move') msg += [f'Generated: {txtmove}'] msg += [f'Weight: {self.weight}'] return '\n'.join(msg)
[docs] @abstractmethod def restart_info(self): """Return a dictionary with restart information.""" return
[docs] @abstractmethod def load_restart_info(self, info): """Read a dictionary with restart information.""" return
[docs] @abstractmethod def empty_path(self, **kwargs): """Return an empty path of same class as the current one. This function is intended to spawn sibling paths that share some properties and also some characteristics of the current path. The idea here is that a path of a certain class should only be able to create paths of the same class. Returns ------- out : object like :py:class:`.PathBase` A new empty path. """ return
[docs] def __eq__(self, other): """Check if two paths are equal.""" if self.__class__ != other.__class__: logger.debug('%s and %s.__class__ differ', self, other) return False if set(self.__dict__) != set(other.__dict__): logger.debug('%s and %s.__dict__ differ', self, other) return False # Compare phasepoints: if not len(self.phasepoints) == len(other.phasepoints): return False for i, j in zip(self.phasepoints, other.phasepoints): if not i == j: return False if self.phasepoints: # Compare other attributes: for i in ('maxlen', 'time_origin', 'status', 'generated', 'length', 'ordermax', 'ordermin'): attr_self = hasattr(self, i) attr_other = hasattr(other, i) if attr_self ^ attr_other: # pragma: no cover logger.warning('Failed comparing path due to missing "%s"', i) return False if not attr_self and not attr_other: logger.warning( 'Skipping comparison of missing path attribute "%s"', i) continue if getattr(self, i) != getattr(other, i): return False return True
[docs] def __ne__(self, other): """Check if two paths are not equal.""" return not self == other
[docs] def delete(self, idx): """Remove a phase point from the path. Parameters ---------- idx : integer The index of the frame to remove. """ del self.phasepoints[idx]
[docs] def sorting(self, key, reverse=False): """Re-order the phase points according to the given key. Parameters ---------- key : string The attribute we will sort according to. reverse : boolean, optional If this is False, the sorting is from big to small. Yields ------ out : object like :py:class:`.System` The ordered phase points from the path. """ if key in ('ekin', 'vpot'): sort_after = [getattr(i.particles, key) for i in self.phasepoints] elif key == 'order': sort_after = [getattr(i, key)[0] for i in self.phasepoints] else: sort_after = [getattr(i, key) for i in self.phasepoints] idx = np.argsort(sort_after) if reverse: idx = idx[::-1] self.phasepoints = [self.phasepoints[i] for i in idx]
[docs] def update_energies(self, ekin, vpot): """Update the energies for the phase points. This method is useful in cases where the energies are read from external engines and returned as a list of floats. Parameters ---------- ekin : list of floats The kinetic energies to set. vpot : list of floats The potential energies to set. """ if len(ekin) != len(vpot): logger.debug( 'Kinetic and potential energies have different length.' ) if len(ekin) != len(self.phasepoints): logger.debug( 'Length of kinetic energy and phase points differ %d != %d.', len(ekin), len(self.phasepoints) ) if len(vpot) != len(self.phasepoints): logger.debug( 'Length of potential energy and phase points differ %d != %d.', len(vpot), len(self.phasepoints) ) for i, phasepoint in enumerate(self.phasepoints): try: vpoti = vpot[i] except IndexError: logger.warning( 'Ran out of potential energies, setting to None.' ) vpoti = None try: ekini = ekin[i] except IndexError: logger.warning( 'Ran out of kinetic energies, setting to None.' ) ekini = None phasepoint.particles.vpot = vpoti phasepoint.particles.ekin = ekini
[docs]class Path(PathBase): """A path where the full trajectory is stored in memory. This class represents a path. A path consists of a series of consecutive snapshots (the trajectory) with the corresponding order parameter. Here we store all information for all phase points on the path. """
[docs] def get_shooting_point(self, criteria='rnd', interfaces=None): """Return a shooting point from the path. This will simply draw a shooting point from the path at random. All points can be selected with equal probability with the exception of the end points which are not considered. Parameters ---------- criteria : string, optional The criteria to select the shooting point: 'rnd': random, except the first and last point, standard sh. 'exp': selection towards low density region. list/tuple of floats, optional These are the interface positions of the form ``[left, middle, right]``. Returns ------- out[0] : object like :py:class:`.System` The phase point we selected. out[1] : int The shooting point index. """ keep_list = [] rnd_idx = self.rgen.random_integers(1, self.length - 2) # The method is not done to be working on the 0^- ensemble. if criteria == 'exp' and interfaces[0] != float('-inf'): n_slabs = 42 hyst = [0]*n_slabs for p_p in self.phasepoints: idx = abs(int((p_p.order[0] - interfaces[0]) / (interfaces[-1] - interfaces[0])*n_slabs)) hyst[idx] += 1 # Exclude the extremis, no much interesting and always low value for i in [0, 1, 2, -3, -2, -1]: hyst[i] = max(hyst) # find the not zero minimum h_min = hyst.index(min(hyst)) while hyst[h_min] == 0: hyst[h_min] = max(hyst) h_min = hyst.index(min(hyst)) for idx, p_p in enumerate(self.phasepoints): i_slab = abs(int((p_p.order[0] - interfaces[0]) / (interfaces[-1] - interfaces[0])*n_slabs)) if i_slab == h_min: keep_list.append(idx) idx = rnd_idx if len(keep_list) < 4 else keep_list[ self.rgen.random_integers(1, len(keep_list) - 2)] logger.debug("Selected point with orderp %s", self.phasepoints[idx].order[0]) return self.phasepoints[idx], idx
[docs] def empty_path(self, **kwargs): """Return an empty path of same class as the current one. Returns ------- out : object like :py:class:`.PathBase` A new empty path. """ maxlen = kwargs.get('maxlen', None) time_origin = kwargs.get('time_origin', 0) return self.__class__(self.rgen, maxlen=maxlen, time_origin=time_origin)
[docs] def restart_info(self): """Return a dictionary with restart information.""" info = { 'rgen': self.rgen.get_state(), 'generated': self.generated, 'maxlen': self.maxlen, 'time_origin': self.time_origin, 'status': self.status, 'weight': self.weight, 'phasepoints': [i.restart_info() for i in self.phasepoints] } return info
[docs] def load_restart_info(self, info): """Set up the path using restart information.""" for key, val in info.items(): # For phasepoints, create new System objects # and load the information for these. # The snaps still need to forcefield to be re-initiated. if key == 'phasepoints': for point in val: system = System() system.load_restart_info(point) self.append(system) elif key == 'rgen': self.rgen = create_random_generator(info['rgen']) else: if hasattr(self, key): setattr(self, key, val)