# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Classes and functions for paths.
The classes and functions defined in this module are useful for
representing paths.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
PathBase (:py:class:`.PathBase`)
A base class for paths.
Path (:py:class:`.Path`)
Class for a generic path that stores all possible information.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
paste_paths (:py:func:`.paste_paths`)
Function for joining two paths, one is in a backward time
direction and the other is in the forward time direction.
"""
from abc import abstractmethod
import logging
import numpy as np
from pyretis.core.random_gen import create_random_generator
from pyretis.core.system import System
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
__all__ = ['PathBase', 'Path', 'paste_paths', 'check_crossing']
# The following defines a human-readable form of the possible path status:
_STATUS = {
'ACC': 'The path has been accepted',
'MCR': 'Momenta change rejection',
'BWI': 'Backward trajectory end at wrong interface',
'BTL': 'Backward trajectory too long (detailed balance condition)',
'BTX': 'Backward trajectory too long (max-path exceeded)',
'BTS': 'Backward trajectory too short',
'EWI': 'Initial path ends at wrong interface',
'EXP': 'Exploration path',
'FTL': 'Forward trajectory too long (detailed balance condition)',
'FTX': 'Forward trajectory too long (max-path exceeded)',
'FTS': 'Forward trajectory too short',
'HAS': 'High Acceptance Swap rejection for SS/WF detailed balance',
'KOB': 'Kicked outside of boundaries',
'NCR': 'No crossing with middle interface',
'NSG': 'Path has no suitable segments',
'NSS': 'No one-step crossing in stone skipping',
'SSA': 'Stone Skipping super detailed balance rejection',
'WFA': 'Wire Fencing super detailed balance rejection',
'SWI': 'Initial path starts at wrong interface',
'WTA': 'Web Throwing super detailed balance rejection',
'TSS': 'Target swap selection rejection',
'TSA': 'Target swap detailed balance rejection',
'XSS': 'SS sub path too long in stone skipping',
'0-L': 'Path in the {0-} ensemble ends at the left interface',
'SWD': 'REPPTIS swap propagation in wrong direction',
'SWH': 'REPPTIS swap first half extension rejected',
}
# The following defines a human-readable form of the possible moves:
_GENERATED = {
'sh': 'Path was generated with a shooting move',
'is': 'Path was generated by shooting initially prior to Stone Skipping',
'tr': 'Path was generated with a time-reversal move',
'ki': 'Path was generated by integration after kicking',
're': 'Path was loaded from formatted external file(s)',
'ld': 'Path was loaded from unformatted external file(s)',
's+': 'Path was generated by a swapping move from +',
's-': 'Path was generated by a Swapping move from -',
'ss': 'Path was generated by Stone Skipping',
'wt': 'Path was generated by Web Throwing',
'wf': 'Path was generated by Wire Fencing',
'00': 'Path was generated by a null move',
'mr': 'Path was generated by a mirror move',
'ts': 'Path was generated by a target swap move',
}
# Short versions of the moves:
_GENERATED_SHORT = {
'sh': 'Shoot',
'tr': 'Time-reversal',
'ki': '"Kick" initiation',
're': 'Formatted ext file load',
'ld': 'Unformatted ext file load',
's+': 'Swap from +',
's-': 'Swap from -',
'ss': 'Stone skipping',
'wt': 'Web Throwing',
'wf': 'Wire Fencing',
'00': 'Null',
'mr': 'mirror',
'ts': 'target swap',
}
[docs]def paste_paths(path_back, path_forw, overlap=True, maxlen=None):
"""Merge a backward with a forward path into a new path.
The resulting path is equal to the two paths stacked, in correct
time. Note that the ordering is important here so that:
``paste_paths(path1, path2) != paste_paths(path2, path1)``.
There are two things we need to take care of here:
- `path_back` must be iterated in reverse (it is assumed to be a
backward trajectory).
- we may have to remove one point in `path2` (if the paths overlap).
Parameters
----------
path_back : object like :py:class:`.PathBase`
This is the backward trajectory.
path_forw : object like :py:class:`.PathBase`
This is the forward trajectory.
overlap : boolean, optional
If True, `path_back` and `path_forw` have a common
starting-point, that is, the first point in `path_forw` is
identical to the first point in `path_back`. In time-space, this
means that the *first* point in `path_forw` is identical to the
*last* point in `path_back` (the backward and forward path
started at the same location in space).
maxlen : float, optional
This is the maximum length for the new path. If it's not given,
it will just be set to the largest of the `maxlen` of the two
given paths.
Note
----
Some information about the path will not be set here. This must be
set elsewhere. This includes how the path was generated
(`path.generated`) and the status of the path (`path.status`).
"""
if maxlen is None:
if path_back.maxlen == path_forw.maxlen:
maxlen = path_back.maxlen
else:
# They are unequal and both is not None, just pick the largest.
# In case one is None, the other will be picked.
# Note that now there is a chance of truncating the path while
# pasting!
maxlen = max(path_back.maxlen, path_forw.maxlen)
msg = f'Unequal length: Using {maxlen} for the new path!'
logger.warning(msg)
time_origin = path_back.time_origin - path_back.length + 1
new_path = path_back.empty_path(maxlen=maxlen, time_origin=time_origin)
for phasepoint in reversed(path_back.phasepoints):
app = new_path.append(phasepoint)
if not app:
msg = 'Truncated while pasting backwards at: {}'
msg = msg.format(new_path.length)
logger.warning(msg)
return new_path
first = True
for phasepoint in path_forw.phasepoints:
if first and overlap:
first = False
continue
app = new_path.append(phasepoint)
if not app:
logger.warning("Truncated path at: %d", new_path.length)
return new_path
return new_path
[docs]def check_crossing(cycle, orderp, interfaces, leftside_prev):
"""Check if we have crossed an interface during the last step.
This function is useful for checking if an interface was crossed
from the previous step till the current one. This is for instance
used in the MD simulations for the initial flux.
If will use a variable to store the previous positions with respect
to the interfaces and check if interfaces were crossed here.
Parameters
----------
cycle : int
This is the current simulation cycle number.
orderp : float
The current order parameter.
interfaces : list of floats
These are the interfaces to check.
leftside_prev : list of booleans or None
These are used to store the previous positions with respect
to the interfaces.
Returns
-------
leftside_curr : list of booleans
These are the updated positions with respect to the interfaces.
cross : list of tuples
If a certain interface is crossed, a tuple will be added to this
list. The tuple is of form
(cycle number, interface number, direction)
where the direction is '-' for a crossing in the negative
direction and '+' for a crossing in the positive direction.
"""
cross = []
if leftside_prev is None:
leftside_curr = [orderp < interf for interf in interfaces]
else:
leftside_curr = leftside_prev
for i, (left, interf) in enumerate(zip(leftside_prev, interfaces)):
if left and orderp > interf:
# Was on the left side, moved to the right side.
leftside_curr[i] = False
cross.append((cycle, i, '+'))
elif not left and orderp < interf:
# Was on the right side, moved to the left side.
leftside_curr[i] = True
cross.append((cycle, i, '-'))
return leftside_curr, cross
[docs]class PathBase:
"""Base class for representation of paths.
This class represents a path. A path consists of a series of
consecutive snapshots (the trajectory) with the corresponding order
parameter.
Attributes
----------
generated : tuple
This contains information on how the path was generated.
`generated[0]` : string, as defined in the variable `_GENERATED`
`generated[1:]` : additional information:
For ``generated[0] == 'sh'`` the additional information is the
index of the shooting point on the old path, the new path and
the corresponding order parameter.
maxlen : int
This is the maximum path length. Some algorithms require this
to be set. Others don't, which is indicated by setting `maxlen`
equal to None.
ordermax : tuple
This is the (current) maximum order parameter for the path.
`ordermax[0]` is the value, `ordermax[1]` is the index in
`self.path`.
ordermin : tuple
This is the (current) minimum order parameter for the path.
`ordermin[0]` is the value, `ordermin[1]` is the index in
`self.path`.
phasepoints : list of objects like :py:class:`.System`
The phase points the path is made up of.
rgen : object like :py:class:`.RandomGenerator`
This is the random generator that will be used for the
paths that required random numbers.
time_origin : int
This is the location of the phase point `path[0]` relative to
its parent. This might be useful for plotting.
status : str or None
The status of the path. The possibilities are defined
in the variable `_STATUS`.
weight : real
The statistical weight of the path.
"""
[docs] def __init__(self, rgen=None, maxlen=None, time_origin=0):
"""Initialise the PathBase object.
Parameters
----------
rgen : object like :py:class:`.RandomGenerator`, optional
This is the random generator that will be used.
maxlen : int, optional
This is the max-length of the path. The default value,
None, is just a path of arbitrary length.
time_origin : int, optional
This can be used to store the shooting point of a parent
trajectory.
"""
self.maxlen = maxlen
self.weight = 1.
self.time_origin = time_origin
self.status = None
self.generated = None
self.phasepoints = []
if rgen is None:
rgen = create_random_generator()
self.rgen = rgen
@property
def length(self):
"""Compute the length of the path."""
return len(self.phasepoints)
@property
def ordermin(self):
"""Compute the minimum order parameter of the path."""
idx = np.argmin([i.order[0] for i in self.phasepoints])
return (self.phasepoints[idx].order[0], idx)
@property
def ordermax(self):
"""Compute the maximum order parameter of the path."""
idx = np.argmax([i.order[0] for i in self.phasepoints])
return (self.phasepoints[idx].order[0], idx)
[docs] def check_interfaces(self, interfaces):
"""Check current status of the path.
Get the current status of the path with respect to the
interfaces. This is intended to determine if we have crossed
certain interfaces or not.
Parameters
----------
interfaces : list of floats
This list is assumed to contain the three interface values
left, middle and right.
Returns
-------
out[0] : str, 'L' or 'R' or None
Start condition: did the trajectory start at the left ('L')
or right ('R') interface.
out[1] : str, 'L' or 'R' or None
Ending condition: did the trajectory end at the left ('L')
or right ('R') interface or None of them.
out[2] str, 'M' or '*'
'M' if the middle interface is crossed, '*' otherwise.
out[3] : list of boolean
These values are given by
`ordermin < interfaces[i] <= ordermax`.
"""
if self.length < 1:
logger.warning('Path is empty!')
return None, None, None, None
ordermax, ordermin = self.ordermax[0], self.ordermin[0]
cross = [ordermin < interpos <= ordermax for interpos in interfaces]
left, right = min(interfaces), max(interfaces)
# Check end & start point:
end = self.get_end_point(left, right)
start = self.get_start_point(left, right)
middle = 'M' if cross[1] else '*'
return start, end, middle, cross
[docs] def get_end_point(self, left, right=None):
"""Return the end point of the path as a string.
The end point is either to the left of the `left` interface or
to the right of the `right` interface, or somewhere in between.
Parameters
----------
left : float
The left interface.
right : float, optional
The right interface, equal to left if not specified.
Returns
-------
out : string
A string representing where the end point is ('L' - left,
'R' - right or None).
"""
if right is None:
right = left
assert left <= right
if self.phasepoints[-1].order[0] <= left:
end = 'L'
elif self.phasepoints[-1].order[0] >= right:
end = 'R'
else:
end = None
logger.debug('Undefined end point.')
return end
[docs] def get_start_point(self, left, right=None):
"""Return the start point of the path as a string.
The start point is either to the left of the `left` interface or
to the right of the `right` interface.
Parameters
----------
left : float
The left interface.
right : float, optional
The right interface, equal to left if not specified.
Returns
-------
out : string
A string representing where the start point is ('L' - left,
'R' - right or None).
"""
if right is None:
right = left
assert left <= right
if self.phasepoints[0].order[0] <= left:
start = 'L'
elif self.phasepoints[0].order[0] >= right:
start = 'R'
else:
start = None
logger.debug('Undefined starting point.')
return start
[docs] @abstractmethod
def get_shooting_point(self):
"""Return a shooting point from the path.
Returns
-------
phasepoint : object like :py:class:`.System`
A phase point which will be the state to shoot from.
idx : int
The index of the shooting point.
"""
return
[docs] def append(self, phasepoint):
"""Append a new phase point to the path.
Parameters
----------
out : object like :py:class:`.System`
The system information we add to the path.
"""
if self.maxlen is None or self.length < self.maxlen:
self.phasepoints.append(phasepoint)
return True
logger.debug('Max length exceeded. Could not append to path.')
return False
[docs] def get_path_data(self, status, interfaces):
"""Return information about the path.
This information can be stored in a object like
:py:class:`.PathEnsemble`.
Parameters
----------
status : string
This represents the current status of the path.
interfaces : list
These are just the interfaces we are currently considering.
"""
path_info = {
'generated': self.generated,
'status': status,
'length': self.length,
'ordermax': self.ordermax,
'ordermin': self.ordermin,
'weight': self.weight,
}
start, end, middle, _ = self.check_interfaces(interfaces)
path_info['interface'] = (start, middle, end)
return path_info
[docs] def set_move(self, move):
"""Update the path move.
The path move is a short string that represents how the path
was generated. It should preferably match one of the moves
defined in `_GENERATED`.
Parameters
----------
move : string
A short description of the move.
"""
if self.generated is None:
self.generated = (move, 0, 0, 0)
else:
self.generated = (move, self.generated[1], self.generated[2],
self.generated[3])
[docs] def get_move(self):
"""Return the move used to generate the path."""
if self.generated is None:
return None
return self.generated[0]
[docs] def success(self, target_interface):
"""Check if the path is successful.
The check is based on the maximum order parameter and the value
of `target_interface`. It is successful if the maximum order parameter
is greater than `target_interface`.
Parameters
----------
target_interface : float
The value for which the path is successful, i.e. the
"target_interface" interface.
"""
return self.ordermax[0] > target_interface
[docs] def __iadd__(self, other):
"""Add path data to a path from another path, i.e. ``self += other``.
This will simply append the phase points from `other`.
Parameters
----------
other : object of type `Path`
The object to add path data from.
Returns
-------
self : object of type `Path`
The updated path object.
"""
for phasepoint in other.phasepoints:
app = self.append(phasepoint.copy())
if not app:
logger.warning(
'Truncated path at %d while adding paths', self.length
)
return self
return self
[docs] def copy(self):
"""Return a copy of the path."""
new_path = self.empty_path()
for phasepoint in self.phasepoints:
new_path.append(phasepoint.copy())
new_path.status = self.status
new_path.time_origin = self.time_origin
new_path.generated = self.generated
new_path.maxlen = self.maxlen
new_path.weight = self.weight
return new_path
[docs] @staticmethod
def reverse_velocities(system):
"""Reverse the velocities in the phase points."""
system.particles.reverse_velocities()
[docs] def reverse(self, order_function=False, rev_v=True):
"""Reverse a path and return the reverse path as a new path.
This will reverse a path and return the reversed path as
a new object like :py:class:`.PathBase` object.
Returns
-------
new_path : object like :py:class:`.PathBase`
The time reversed path.
order_function : object like :py:class:`.OrderParameter`, optional
The method to use to re-calculate the order parameter,
if it is velocity dependent.
rev_v : boolean, optional
If True, also the velocities are reversed, if False, the velocities
for each frame are not altered.
"""
new_path = self.empty_path()
new_path.weight = self.weight
new_path.maxlen = self.maxlen
for phasepoint in reversed(self.phasepoints):
new_point = phasepoint.copy()
if rev_v:
self.reverse_velocities(new_point)
app = new_path.append(new_point)
if not app: # pragma: no cover
msg = 'Could not reverse path'
logger.error(msg)
return None
if order_function and order_function.velocity_dependent and rev_v:
for phasepoint in new_path.phasepoints:
phasepoint.order = order_function.calculate(phasepoint)
return new_path
[docs] def __str__(self):
"""Return a simple string representation of the Path."""
msg = [f'Path with length {self.length} (max: {self.maxlen})']
msg += [f'Order parameter max: {self.ordermax}']
msg += [f'Order parameter min: {self.ordermin}']
if self.length > 0:
msg += [f'Start {self.phasepoints[0].order[0]}']
msg += [f'End {self.phasepoints[-1].order[0]}']
if self.status:
msg += [f'Status: {_STATUS[self.status]}']
if self.generated:
move = self.generated[0]
txtmove = _GENERATED.get(move, 'unknown move')
msg += [f'Generated: {txtmove}']
msg += [f'Weight: {self.weight}']
return '\n'.join(msg)
[docs] @abstractmethod
def restart_info(self):
"""Return a dictionary with restart information."""
return
[docs] @abstractmethod
def load_restart_info(self, info):
"""Read a dictionary with restart information."""
return
[docs] @abstractmethod
def empty_path(self, **kwargs):
"""Return an empty path of same class as the current one.
This function is intended to spawn sibling paths that share some
properties and also some characteristics of the current path.
The idea here is that a path of a certain class should only be
able to create paths of the same class.
Returns
-------
out : object like :py:class:`.PathBase`
A new empty path.
"""
return
[docs] def __eq__(self, other):
"""Check if two paths are equal."""
if self.__class__ != other.__class__:
logger.debug('%s and %s.__class__ differ', self, other)
return False
if set(self.__dict__) != set(other.__dict__):
logger.debug('%s and %s.__dict__ differ', self, other)
return False
# Compare phasepoints:
if not len(self.phasepoints) == len(other.phasepoints):
return False
for i, j in zip(self.phasepoints, other.phasepoints):
if not i == j:
return False
if self.phasepoints:
# Compare other attributes:
for i in ('maxlen', 'time_origin', 'status', 'generated',
'length', 'ordermax', 'ordermin'):
attr_self = hasattr(self, i)
attr_other = hasattr(other, i)
if attr_self ^ attr_other: # pragma: no cover
logger.warning('Failed comparing path due to missing "%s"',
i)
return False
if not attr_self and not attr_other:
logger.warning(
'Skipping comparison of missing path attribute "%s"',
i)
continue
if getattr(self, i) != getattr(other, i):
return False
return True
[docs] def __ne__(self, other):
"""Check if two paths are not equal."""
return not self == other
[docs] def delete(self, idx):
"""Remove a phase point from the path.
Parameters
----------
idx : integer
The index of the frame to remove.
"""
del self.phasepoints[idx]
[docs] def sorting(self, key, reverse=False):
"""Re-order the phase points according to the given key.
Parameters
----------
key : string
The attribute we will sort according to.
reverse : boolean, optional
If this is False, the sorting is from big to small.
Yields
------
out : object like :py:class:`.System`
The ordered phase points from the path.
"""
if key in ('ekin', 'vpot'):
sort_after = [getattr(i.particles, key) for i in self.phasepoints]
elif key == 'order':
sort_after = [getattr(i, key)[0] for i in self.phasepoints]
else:
sort_after = [getattr(i, key) for i in self.phasepoints]
idx = np.argsort(sort_after)
if reverse:
idx = idx[::-1]
self.phasepoints = [self.phasepoints[i] for i in idx]
[docs] def update_energies(self, ekin, vpot):
"""Update the energies for the phase points.
This method is useful in cases where the energies are
read from external engines and returned as a list of
floats.
Parameters
----------
ekin : list of floats
The kinetic energies to set.
vpot : list of floats
The potential energies to set.
"""
if len(ekin) != len(vpot):
logger.debug(
'Kinetic and potential energies have different length.'
)
if len(ekin) != len(self.phasepoints):
logger.debug(
'Length of kinetic energy and phase points differ %d != %d.',
len(ekin), len(self.phasepoints)
)
if len(vpot) != len(self.phasepoints):
logger.debug(
'Length of potential energy and phase points differ %d != %d.',
len(vpot), len(self.phasepoints)
)
for i, phasepoint in enumerate(self.phasepoints):
try:
vpoti = vpot[i]
except IndexError:
logger.warning(
'Ran out of potential energies, setting to None.'
)
vpoti = None
try:
ekini = ekin[i]
except IndexError:
logger.warning(
'Ran out of kinetic energies, setting to None.'
)
ekini = None
phasepoint.particles.vpot = vpoti
phasepoint.particles.ekin = ekini
[docs]class Path(PathBase):
"""A path where the full trajectory is stored in memory.
This class represents a path. A path consists of a series of
consecutive snapshots (the trajectory) with the corresponding
order parameter. Here we store all information for all phase points
on the path.
"""
[docs] def get_shooting_point(self, criteria='rnd', interfaces=None):
"""Return a shooting point from the path.
This will simply draw a shooting point from the path at
random. All points can be selected with equal probability with
the exception of the end points which are not considered.
Parameters
----------
criteria : string, optional
The criteria to select the shooting point:
'rnd': random, except the first and last point, standard sh.
'exp': selection towards low density region.
list/tuple of floats, optional
These are the interface positions of the form
``[left, middle, right]``.
Returns
-------
out[0] : object like :py:class:`.System`
The phase point we selected.
out[1] : int
The shooting point index.
"""
keep_list = []
rnd_idx = self.rgen.random_integers(1, self.length - 2)
# The method is not done to be working on the 0^- ensemble.
if criteria == 'exp' and interfaces[0] != float('-inf'):
n_slabs = 42
hyst = [0]*n_slabs
for p_p in self.phasepoints:
idx = abs(int((p_p.order[0] - interfaces[0]) /
(interfaces[-1] - interfaces[0])*n_slabs))
hyst[idx] += 1
# Exclude the extremis, no much interesting and always low value
for i in [0, 1, 2, -3, -2, -1]:
hyst[i] = max(hyst)
# find the not zero minimum
h_min = hyst.index(min(hyst))
while hyst[h_min] == 0:
hyst[h_min] = max(hyst)
h_min = hyst.index(min(hyst))
for idx, p_p in enumerate(self.phasepoints):
i_slab = abs(int((p_p.order[0] - interfaces[0]) /
(interfaces[-1] - interfaces[0])*n_slabs))
if i_slab == h_min:
keep_list.append(idx)
idx = rnd_idx if len(keep_list) < 4 else keep_list[
self.rgen.random_integers(1, len(keep_list) - 2)]
logger.debug("Selected point with orderp %s",
self.phasepoints[idx].order[0])
return self.phasepoints[idx], idx
[docs] def empty_path(self, **kwargs):
"""Return an empty path of same class as the current one.
Returns
-------
out : object like :py:class:`.PathBase`
A new empty path.
"""
maxlen = kwargs.get('maxlen', None)
time_origin = kwargs.get('time_origin', 0)
return self.__class__(self.rgen, maxlen=maxlen,
time_origin=time_origin)
[docs] def restart_info(self):
"""Return a dictionary with restart information."""
info = {
'rgen': self.rgen.get_state(),
'generated': self.generated,
'maxlen': self.maxlen,
'time_origin': self.time_origin,
'status': self.status,
'weight': self.weight,
'phasepoints': [i.restart_info() for i in self.phasepoints]
}
return info
[docs] def load_restart_info(self, info):
"""Set up the path using restart information."""
for key, val in info.items():
# For phasepoints, create new System objects
# and load the information for these.
# The snaps still need to forcefield to be re-initiated.
if key == 'phasepoints':
for point in val:
system = System()
system.load_restart_info(point)
self.append(system)
elif key == 'rgen':
self.rgen = create_random_generator(info['rgen'])
else:
if hasattr(self, key):
setattr(self, key, val)