Source code for pyretis.engines.openmm

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Definition of the OpenMM engine.

This module defines the class for the OpenMM MD engine.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

OpenMMEngine (:py:class:`.OpenMMEngine`)
    The class for running the OpenMM engine.

"""

import logging
from pyretis.core import System, Particles
from pyretis.core.particlefunctions import (calculate_kinetic_energy,
                                            reset_momentum)
from pyretis.engines.engine import EngineBase
from pyretis.core.box import create_box, box_matrix_to_list
from pyretis.core.common import import_from
try:
    import openmm
    import openmm.unit as u
    HAS_OPENMM = True
except ImportError:
    try:
        from simtk import openmm
        import simtk.unit as u  # pragma: no cover
        HAS_OPENMM = True  # pragma: no cover
    except ImportError:  # pragma: no cover
        HAS_OPENMM = False  # pragma: no cover

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())

__all__ = ['OpenMMEngine']


def make_pyretis_system(settings):
    """Make a PyRETIS system from OpenMM Simulation.

    First this looks for setting->engine->openmm_module and openmm_simulation.
    It then imports that openmm_simulation object and creates a internal
    pyretis system representation
    """
    simulation = import_from(settings['engine']['openmm_module'],
                             settings['engine']['openmm_simulation'])
    state = simulation.context.getState(getPositions=True, getVelocities=True)
    box = state.getPeriodicBoxVectors(asNumpy=True).T
    box = create_box(cell=box_matrix_to_list(box),
                     periodic=[True, True, True])
    system = System(units='gromacs', box=box,
                    temperature=simulation.integrator.getTemperature())
    system.particles = Particles(system.get_dim())
    pos = state.getPositions(asNumpy=True)
    vel = state.getVelocities(asNumpy=True)
    for i, atom in enumerate(simulation.topology.atoms()):
        mass = simulation.system.getParticleMass(i)
        mass = mass.value_in_unit(u.grams/u.mole)
        name = atom.name
        system.particles.add_particle(pos=pos[i], vel=vel[i],
                                      force=[None, None, None],
                                      mass=mass, name=name)
    return system


[docs]class OpenMMEngine(EngineBase): """ A class for interfacing with OpenMM. This class defines the interface to OpenMM. Attributes ---------- simulation: OpenMM.simulation OpenMM simulation object. subcyles: int Number of OpenMM steps per PyRETIS step. """ engine_type = 'openmm'
[docs] def __init__(self, openmm_simulation, subcycles=1, openmm_module=None): """Set up the OpenMM Engine. Parameters ---------- openmm_simulation : OpenMM.simulation, or string The OpenMM simulation object or the variable name if `module` is not None. subcycles: int Number of OpenMM integration steps per PyRETIS step. openmm_module: string, optional If defined and simulation is a string, try loading `simulation` from `module`. """ # Check if openmm is installed. if HAS_OPENMM is False: raise RuntimeError("OpenMM is not installed") super(OpenMMEngine, self).__init__(description='OpenMM') if isinstance(openmm_simulation, str) and openmm_module is not None: openmm_simulation = import_from(openmm_module, openmm_simulation) if isinstance(openmm_simulation, str): msg = "simulation is a string but could not be loaded from module" msg += f" '{openmm_module}'" raise RuntimeError(msg) self.simulation = openmm_simulation self.subcycles = subcycles
[docs] def calculate_order(self, ensemble, xyz=None, vel=None, box=None): """Return the order parameter. This method is just to help to calculate the order parameter in cases where only the engine can do it. Parameters ---------- ensemble : dict It contains: * `system` : object like :py:class:`.System` This is the system that contains the particles we are investigating * `order_function` : object like :py:class:`.OrderParameter` The class used for calculating the order parameter. xyz : numpy.array, optional The positions to use. Typically for internal engines, this is not needed. It is included here as it can be used for testing and also to be compatible with the generic function defined by the parent. vel : numpy.array, optional The velocities to use. box : numpy.array, optional The current box vectors. Returns ------- out : list of floats The calculated order parameter(s). """ system = ensemble['system'] order_function = ensemble['order_function'] if any((xyz is None, vel is None, box is None)): return order_function.calculate(system) system.particles.pos = xyz system.particles.vel = vel system.box.update_size(box) return order_function.calculate(system)
[docs] def clean_up(self): """Clean up after using the engine. Currently, this is only included for compatibility with external integrators. """ return
[docs] def dump_phasepoint(self, phasepoint, deffnm=None): """For compatibility with external integrators.""" return
[docs] def integration_step(self, system): """Perform one integration step of n subcycles. Parameters ---------- system : object like :py:class:`.System` The system to integrate/act on. Assumed to have a particle list in ``system.particles``. Returns ------- out : None Does not return anything, but alters the state of the given system. """ particles = system.particles pos = particles.pos vel = particles.vel box = system.box context = self.simulation.context # Update the simulation to the correct system: context.setPositions(pos) context.setVelocities(vel) # PyRETIS matrix.T is openmm boxvectors: if box is not None: vectors = list(box.box_matrix.T) context.setPeriodicBoxVectors(*vectors) # Run the simulation for n subcycles: self.simulation.step(self.subcycles) # Get the current state: state = context.getState(getPositions=True, getVelocities=True) # Update system: system.particles.pos = state.getPositions(asNumpy=True) system.particles.vel = state.getVelocities(asNumpy=True) if box is not None: box.update_size( box_matrix_to_list(state.getPeriodicBoxVectors(asNumpy=True).T) )
[docs] def kick_across_middle(self, ensemble, middle, tis_settings): """Force a phase point across the middle interface. This is accomplished by repeatedly kicking the phase point so that it crosses the middle interface. Parameters ---------- ensemble : dict It contains: * `system` : object like :py:class:`.System` This is the system that contains the particles we are investigating * `order_function` : object like :py:class:`.OrderParameter` The object used for calculating the order parameter. * `rgen` : object like :py:class:`.RandomGenerator` This is the random generator that will be used. middle : float This is the value for the middle interface. tis_settings : dict This dictionary contains settings for TIS. Explicitly used here: * `zero_momentum`: boolean, determines if the momentum is zeroed. * `rescale_energy`: boolean, determines if energy is re-scaled. Returns ------- out[0] : object like :py:class:`.System` The phase-point just before the interface. out[1] : object like :py:class:`.System` The phase-point just after the interface. Note ---- This function will update the system state. """ # Taken from MDEngine in internal.py # We search for crossing with the middle interface and do this # by sequentially kicking the initial phase point: system = ensemble['system'] previous = None curr = self.calculate_order(ensemble)[0] while True: # pragma: no cover # Save current state: previous = system.copy() # Save previous box matrix: prev_box = system.box.box_matrix # Modify velocities: vel_settings = {'sigma_v': None, 'aimless': True, 'momentum': tis_settings['zero_momentum'], 'rescale': tis_settings['rescale_energy']} self.modify_velocities(ensemble, vel_settings) # Update order parameter in case it is velocity dependent: curr = self.calculate_order(ensemble)[0] previous.order = curr # Store modified velocities: previous.particles.set_vel(system.particles.get_vel()) # Integrate forward one step: self.integration_step(system) # Compare previous order parameter and the new one: prev = curr curr = self.calculate_order(ensemble)[0] if (prev < middle < curr) or (curr < middle < prev): # Middle interface was crossed, just stop the loop. logger.info('Crossing found: %9.6f %9.6f ', prev, curr) break if (prev <= curr <= middle) or (middle <= curr <= prev): # We are getting closer, keep the new point. pass elif prev == middle: # Unlucky case, we want more than 1 point, so search more. pass else: # We did not get closer, fall back to previous point. system.particles = previous.particles.copy() system.box.update_size(box_matrix_to_list(prev_box)) curr = previous.order return previous, system
[docs] def modify_velocities(self, ensemble, vel_settings): """Modify the velocities of the current state. This method will modify the velocities of a time slice. And it is part of the integrator since it, conceptually, fits here: we are acting on the system and modifying it. Parameters ---------- ensemble : dict It contains: * `system` : object like :py:class:`.System` This is the system that contains the particles we are investigating * `rgen` : object like :py:class:`.RandomGenerator` This is the random generator that will be used. vel_settings: dict. It contains info about velocity settings: * `sigma_v` : numpy.array, optional These values can be used to set a standard deviation (one for each particle) for the generated velocities. * `aimless` : boolean, optional Determines if we should do aimless shooting or not. * `momentum` : boolean, optional If True, we reset the linear momentum to zero after generating. * `rescale` : float, optional In some NVE simulations, we may wish to re-scale the energy to a fixed value. If `rescale` is a float > 0, we will re-scale the energy (after modification of the velocities) to match the given float. Returns ------- dek : float The change in the kinetic energy. kin_new : float The new kinetic energy. """ system = ensemble['system'] rescale = vel_settings.get('rescale') particles = system.particles rgen = ensemble['rgen'] if rescale is not None and rescale is not False: if rescale > 0: kin_old = rescale - particles.vpot do_rescale = True else: logger.warning('Ignored re-scale 6.2%f < 0.0.', rescale) return 0.0, calculate_kinetic_energy(particles)[0] else: kin_old = calculate_kinetic_energy(particles)[0] do_rescale = False if vel_settings.get('aimless', False): vel, _ = rgen.draw_maxwellian_velocities(system=system) particles.vel = vel else: # Soft velocity change, from a Gaussian distribution: dvel, _ = rgen.draw_maxwellian_velocities( system=system, sigma_v=vel_settings['sigma_v']) particles.vel = particles.vel + dvel if vel_settings.get('momentum', False): reset_momentum(particles) if do_rescale: system.rescale_velocities(rescale) kin_new = calculate_kinetic_energy(particles)[0] dek = kin_new - kin_old return dek, kin_new
[docs] def propagate(self, path, ensemble, reverse=False): """Generate a path by integrating until a criterion is met. This function will generate a path by calling the function specifying the integration step repeatedly. The integration is carried out until the order parameter has passed the specified interfaces or if we have integrated for more than a specified maximum number of steps. The given system defines the initial state and the system is reset to its initial state when this method is done. Parameters ---------- path : object like :py:class:`.PathBase` This is the path we use to fill in phase-space points. We are here not returning a new path - this since we want to delegate the creation of the path (type) to the method that is running `propagate`. ensemble: dict it contains: * `system` : object like :py:class:`.System` The system object gives the initial state for the integration. The initial state is stored and the system is reset to the initial state when the integration is done. * `order_function` : object like :py:class:`.OrderParameter` The object used for calculating the order parameter. * `interfaces` : list of floats These interfaces define the stopping criterion. reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ system = ensemble['system'] status = 'Propagate w/OpenMM engine' logger.debug(status) success = False left, _, right = ensemble['interfaces'] for i in range(path.maxlen): order = self.calculate_order(ensemble) kin = calculate_kinetic_energy(system.particles)[0] snapshot = {'order': order, 'pos': system.particles.pos, 'vel': system.particles.vel, 'box': system.box, 'ekin': kin, 'vpot': None} phasepoint = self.snapshot_to_system(system, snapshot) status, success, stop, _ = self.add_to_path(path, phasepoint, left, right) if stop: logger.debug('Stopping propagate at step: %i ', i) break if reverse: system.particles.vel *= -1.0 self.integration_step(system) system.particles.vel *= -1.0 else: self.integration_step(system) logger.debug('Propagate done: "%s" (success: %s)', status, success) return success, status