Source code for pyretis.initiation.initiate_kick

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains functions used for initiation of paths.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

generate_initial_path_kick (:py:func:`.generate_initial_path_kick`)
    Function for generating an initial path by repeatedly kicking a
    phase point.

initiate_kick (:py:func:`.initiate_kick`)
    Helper method, selects either :py:func:`.initiate_kicki` or
    :py:func:`.initiate_kick_max`.

initiate_kicki (:py:func:`.initiate_kicki`)
    A method for initiating a path ensemble by repeatedly modifying
    velocities to find the crossing with the interfaces.

initiate_kick_max (:py:func:`.initiate_kick_max`)
    A method similar to py:meth:`.initiate_kick`. Here, if possible,
    we will use points from the previous paths, closest to the target
    interface.

initiate_path_ensemble_kick (:py:func:`.initiate_path_ensemble_kick`)
    Method to initiate a single path ensemble.

fix_path_by_tis (:py:func:`.fix_path_by_tis`)
    Method to fix an initial path that starts and ends at the wrong
    interface via TIS moves.

"""
import copy
import logging
from pyretis.core.common import compute_weight
from pyretis.core.path import paste_paths, Path
from pyretis.core.tis import shoot, time_reversal
from pyretis.inout.screen import print_to_screen
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = [
    'generate_initial_path_kick',
    'initiate_kick',
    'initiate_kicki',
    'initiate_kick_max',
    'initiate_path_ensemble_kick',
    'fix_path_by_tis'
]


[docs]def initiate_kick(simulation, settings, cycle): """Run the initiation method for several ensembles. Please see documentation for :py:func:`.initiate_path_ensemble_kick`. """ init_settings = settings['initial-path'] start = init_settings.get('kick-from', 'initial').lower() if start == 'previous': logger.info('Kick-initiate using previous configuration') return initiate_kick_max(simulation, settings, cycle) if start == 'initial': logger.info('Kick-initiate using initial configuration') return initiate_kicki(simulation, settings, cycle) errtxt = f'Unknown argument {start} for kick-from' logger.error(errtxt) raise ValueError(errtxt)
[docs]def initiate_kicki(simulation, settings, cycle): """Run the initialization for several ensembles. Please see documentation for :py:func:`.initiate_path_ensemble_kick`. """ for ensemble, set_ens in zip(simulation.ensembles, settings['ensemble']): ens_name = ensemble['path_ensemble'].ensemble_name print_to_screen(f'Running "kick" initiation in: {ens_name}') logger.info('Initiating path ensemble: %s', ensemble['path_ensemble'].ensemble_name) accept, initial_path, status = initiate_path_ensemble_kick( ensemble, set_ens['tis'], cycle) yield accept, initial_path, status, ensemble['path_ensemble']
[docs]def initiate_kick_max(simulation, settings, cycle): """Run the initiation for several ensembles. This method is similar to :py:func:`.initiate_kick`, but here we update the initial point for an ensemble to use that of the previous path (if this exist). Please see documentation for :py:func:`.initiate_path_ensemble_kick`. """ last_paths = [] for ensemble, set_ens in zip(simulation.ensembles, settings['ensemble']): path_ensemble = ensemble['path_ensemble'] logger.info('Initiating path ensemble %s', path_ensemble.ensemble_name) ens_name = path_ensemble.ensemble_name print_to_screen(f'Running kick-max initiation in: {ens_name}') if last_paths: # Look for the phase point closest to the middle interface. middle = path_ensemble.interfaces[1] current = None min_dist = float('inf') for last_path in last_paths: for phase_point in last_path.phasepoints: dist = middle - phase_point.order[0] if 0 <= dist <= min_dist: min_dist = dist current = phase_point.copy() if current is not None: logger.info('Initial state set to: %s', current) ensemble['system'] = current accept, initial_path, status = initiate_path_ensemble_kick( ensemble, set_ens['tis'], cycle) last_paths.append(initial_path) yield accept, initial_path, status, path_ensemble
[docs]def initiate_path_ensemble_kick(ensemble, tis_settings, cycle): """Run the "kick" initiate for a given ensemble. Parameters ---------- ensemble: dict It contains: * `path_ensemble` : object like :py:class:`.PathEnsemble` The path ensemble to create an initial path for. * `system` : object like :py:class:`.System` The system is used here since we need access to the temperature and to the particle list. * `order_function` : object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine` : object like :py:class:`.EngineBase` The engine to use for propagating a path. * `rgen` : object like :py:class:`.RandomGenerator` This is the random generator that will be used. tis_settings : dict This dictionary contains the TIS settings. cycle : integer, optional The cycle number we are initiating at, typically this will be 0 which is the default value. Returns ------- out[0] : boolean True if the initial path was accepted. out[1] : object like py:class:`.PathBase` The initial path. out[2] : string The status of the path. """ initial_path = None logger.info('Will generate initial path by kicking') ensemble['engine'].exe_dir = \ ensemble['path_ensemble'].directory['generate'] for attempt in generate_initial_path_kick(ensemble, tis_settings): if attempt[0] is True: initial_path = attempt[-1] break accept = True status = 'ACC' ensemble['path_ensemble'].add_path_data(initial_path, status, cycle) # Ask the engine to do clean up after the initialisation: ensemble['engine'].clean_up() return accept, initial_path, status
[docs]def generate_initial_path_kick(ensemble, tis_settings): """Generate an initial path with the kicking method. This function will generate an initial path by repeatedly kicking a phase-space point until the middle interface is crossed. The point before and after kicking are stored, so when the middle interface is crossed we have two points we can integrate forward and backward in time. This function is intended for use with TIS. For use with RETIS one should set the appropriate `tis_settings` so that the starting conditions are fine (i.e. for the [0^-] ensemble it might be different for the other ensembles). Parameters ---------- ensemble: dict. It contains: * `path_ensemble` : object like :py:class:`.PathEnsemble` The path ensemble to create an initial path for. * `system` : object like :py:class:`.System` System is used here since we need access to the temperature and to the particle list. * `order_function` : object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine` : object like :py:class:`.EngineBase` The engine to use for propagating a path. * `rgen` : object like :py:class:`.RandomGenerator` This is the random generator that will be used. tis_settings : dict This dictionary contains settings for TIS. Explicitly used here: * `start_cond`: string, starting condition, 'L'eft or 'R'ight. * `maxlength`: integer, maximum allowed length of paths. Yields ------ out[0] : boolean True if we are finished. out[1] : string The current status of this method. out[2] : object like :py:class:`.PathBase` or None If we are finished (and successful) this is the generated initial path. Otherwise, this is equal to None. """ engine = ensemble['engine'] interfaces = ensemble['path_ensemble'].interfaces rgen = ensemble['path_ensemble'].rgen initial_system = ensemble['system'].copy() middle = interfaces[1] kick_required = True if set(ensemble['path_ensemble'].start_condition) == set(['R', 'L']): initial_op = initial_system.order[0] logger.info("Initial OP: %s", initial_op) # If we are allowed to end anywhere and the initial system is between # the two interfaces, don't kick. if interfaces[0] < initial_op < interfaces[-1]: kick_required = False # Else kick to the closest interfaces elif abs(interfaces[0]-initial_op) < abs(interfaces[-1]-initial_op): middle = interfaces[0] else: middle = interfaces[-1] maxlen = tis_settings['maxlength'] while True: # Start from the initial system: ensemble['system'] = initial_system.copy() ensemble['interfaces'] = interfaces if kick_required: logger.info('Searching crossing with middle interface') left_point, right_point = engine.kick_across_middle(ensemble, middle, tis_settings) else: logger.info('Not searching a crossing with the middle interface') right_point = initial_system.copy() # Set starting point for both propagators to the current system left_point = initial_system.copy() logger.info('Propagating from crossing points') # kick_across_middle will return two points, one immediately # left of the interface and one immediately right of the # interface. So we have two points (`leftpoint` and the # current `system.particles`). We then propagate the current # phase point forward: logger.info('Propagating forward from crossing points ' 'for initial path') path_forw = Path(rgen=rgen, maxlen=maxlen) ensemble['system'] = right_point success, msg = engine.propagate(path_forw, ensemble, reverse=False) if 'exp' in tis_settings.get('shooting_move', 'sh'): path_forw.status = 'ACC' path_forw.generated = ('ki', 0, 0, 0) yield (True, 'Initial path generated.', path_forw) if not success: logger.warning('Forward path not successful: %s', msg) yield (False, f'Forward path failed: {msg}', None) continue # And we propagate the `leftpoint` backward: path_back = Path(rgen=rgen, maxlen=maxlen) ensemble['system'] = left_point success, msg = engine.propagate(path_back, ensemble, reverse=True) if not success: logger.warning('Backward path not successful: %s', msg) yield (False, f'Backward path failed: {msg}', None) continue # Merge backward and forward, here we do not set maxlen since # both backward and forward may have this length: initial_path = paste_paths(path_back, path_forw, overlap=False) if initial_path.length >= maxlen: logger.warning('Initial path too long (exceeded "MAXLEN")') yield (False, 'Initial path was too long.', None) continue start, end, _, _ = initial_path.check_interfaces(interfaces) # OK, now its time to check the path: # 0) We can start at the starting condition, pass the middle # and continue all the way to the end - perfect! # 1) We can start at the starting condition, pass the middle # and return to starting condition - this is perfectly fine. # 2) We can start at the wrong interface, pass the middle and # end at the same (wrong) interface - we fix this by # additional TIS moves. # 3) We can start at wrong interface and end at the starting # condition. To fix this, we reverse the path. if start in set(ensemble['path_ensemble'].start_condition): # Case 0) & 1): break else: # Now we do the other cases: if end in set(ensemble['path_ensemble'].start_condition): # Case 3) (and start != start_cond): logger.info('Initial path is in the wrong direction') initial_path = initial_path.reverse(ensemble['order_function']) logger.info('Initial path has been reversed!') break elif end == start: # Case 2): logger.info('Initial path starts and ends at wrong interface') logger.info('Will perform TIS moves to fix it!') yield (False, 'Trying to fix path by TIS moves', None) initial_path = fix_path_by_tis(initial_path, ensemble, tis_settings) break else: # This is reached if the start/end conditions # has been modified or are inconsistent. logger.warning('Could not generate initial path, will retry!') yield (False, 'Could not generate initial path will retry!', None) continue initial_path.status = 'ACC' initial_path.generated = ('ki', 0, 0, 0) if tis_settings.get('high_accept', False): move = tis_settings.get('shooting_move', 'sh') if move == 'wf': int_cap = tis_settings.get('interface_cap', interfaces[2]) interfaces = [interfaces[0], interfaces[1], int_cap] if move in ('ss', 'wf'): initial_path.weight = compute_weight(initial_path, interfaces, move) yield (True, 'Initial path generated.', initial_path)
[docs]def _get_help(start_cond, interfaces): """Define some helper methods for :py:func:`.fix_path_by_tis`. This method returns two methods that :py:func:`.fix_path_by_tis` can use to determine if a new path is an improvement compared to the current path and if the "fixing" is done. Parameters ---------- start_cond : string The starting condition (from the TIS settings). Left or Right. interfaces : list of floats The interfaces, on form ``[left, middle, right]``. Returns ------- out[0] : method The method which determines if a new path represents an improvement over the current path. out[1] : method The method which determines if we are done, that is if we can accept the current path. """ left, middle, right = interfaces improved, done = None, None if start_cond == 'R' or middle == right: def improved_r(newp, current): """Return True if new path is an improvement.""" return (newp.ordermax[0] > current.ordermax[0] and newp.ordermin[0] < middle) def done_r(path): """Return True if the path can be accepted.""" return path.ordermax[0] > right improved = improved_r done = done_r elif start_cond == 'L' or middle == left: def improved_l(newp, current): """Return True if new path is an improvement.""" return (newp.ordermin[0] < current.ordermin[0] and newp.ordermax[0] > middle) def done_l(path): """Return True if the path can be accepted.""" return path.ordermin[0] < left improved = improved_l done = done_l else: logger.error('Unknown start condition (should be "R" or "L")') raise ValueError('Unknown start condition (should be "R" or "L")') return improved, done
[docs]def fix_path_by_tis(initial_path, ensemble, tis_settings): """Fix a path that starts and ends at the wrong interfaces. The given path is amended by performing TIS moves (shooting and time reversal). Note that this function is intended to be used in connection with the initialisation. Parameters ---------- initial_path : object like :py:class:`.PathBase` This is the initial path to fix. It starts & ends at the wrong interface. ensemble: dict. It contains: * `path_ensemble` : object like :py:class:`.PathEnsemble` The path ensemble to create an initial path for. * `system` : object like :py:class:`.System` System is used here since we need access to the temperature and to the particle list. * `order_function` : object like :py:class:`.OrderParameter` The class used for obtaining the order parameter(s). * `engine` : object like :py:class:`.EngineBase` The engine to use for propagating a path. * `rgen` : object like :py:class:`.RandomGenerator` This is the random generator that will be used. tis_settings : dict Settings for TIS method, here we explicitly use: * `start_cond`: string which defines the start condition. * `maxlength`: integer which gives the maximum allowed path length. Note that we here explicitly set some local TIS settings for use in the `make_tis_step` function. Returns ------- out : object like :py:class:`.PathBase` The amended path. """ logger.debug('Attempting to fix path by running TIS moves.') path_ensemble = ensemble['path_ensemble'] local_tis_settings = copy.deepcopy(tis_settings) local_tis_settings['allowmaxlength'] = True local_tis_settings['aimless'] = True improved, check_ok = _get_help(path_ensemble.start_condition, path_ensemble.interfaces) backup_path = True path_ok = False path_ensemble.last_path = initial_path while not path_ok: logger.debug('Performing a TIS move to improve the initial path') if backup_path: # Move the initial path to safe place: logger.debug('Moving initial_path') path_ensemble.move_path_to_generate(initial_path, '_') backup_path = False accept, trial, _ = time_reversal(initial_path, ensemble['order_function'], path_ensemble.interfaces, path_ensemble.start_condition) if not accept: target = path_ensemble.interfaces[1] best_frame = ensemble['system'] for phasepoint in initial_path.phasepoints: if abs(best_frame.order[0] - target) > \ abs(phasepoint.order[0] - target): best_frame = phasepoint.copy() accept, trial, _ = shoot(ensemble, local_tis_settings, path_ensemble.start_condition, best_frame) if accept: if improved(trial, initial_path): logger.debug('TIS move improved path.') initial_path = trial path_ensemble.last_path = initial_path backup_path = True else: logger.debug('TIS move did not improve path') path_ok = check_ok(initial_path) initial_path.generated = ('ki', 0, 0, 0) initial_path.status = 'ACC' return initial_path