Creating custom engines¶
The Engines are used to propagate the equations of motion or alter the state of the system in some other way. In this section, we describe how additional engines can be added to PyRETIS. Here, we make a distinction between two types of engines:
Internal engines which directly interact with the underlying numpy.arrays representing positions, velocities and forces (see Fig. 13-left)
External engines which are used by PyRETIS to control the execution of external programs. In this case, the state of the particles is represented by a file containing the positions and velocities. As shown in the illustration below (Fig. 13-right) , this can be accessed using
ParticlesExt.configwhich is a tuple containing a file name and an index. The interpretation is that the current state of the system can be found in the given file at the given frame/index. Further, the attributeParticlesExt.vel_revis used to determine if the velocities in the configuration should be reversed or not. In this case the engine will typically make use ofParticlesExt.set_pos()in order to update the state of the system.
Fig. 13 Illustration of how internal engines (left) and external engines (right)
interact with the system and particles.
(Left) The internal engine interacts with and alters
the state of the system, and it can directly access the positions
and velocities, as numpy.arrays, using Particles.pos and
Particles.vel.
(Right) The external engine interacts with and alters
the state of the system using ParticlesExt.set_pos() and
the state of the system is represented by the attribute
ParticlesExt.config.¶
See also
Once your engine is implemented, validate that it reproduces the same rate on a shared system with the reproducibility suite – see Extending and validating PyRETIS.
Creating a new internal engine¶
In order to create a new internal engine, we create a new class
for our engine which is sub-classing the generic EngineBase.
Often, we wish to create a new integrator for molecular dynamics
and then we can simplify our work by sub-classing the more specific
MDEngine. The steps you need to complete in order to
create a new MDengine is as follows:
Create a new Python class which is a sub-class of
MDEngineWrite a method to initialise this class, that is a
__init__method. This method will be called when PyRETIS is setting up a new simulation and it will be fed variables from the PyRETIS input file.Write a method to perform a single integration step.
Modify the input script to use the new integrator.
Example: Creating a new internal LeapFrog integrator¶
To be specific, we will in the following show the steps needed to create a new MD engine using the Leapfrog scheme [1]:
where \(\mathbf{r}\) are the positions, \(\mathbf{v}\) the velocities, \(\mathbf{a}\) the accelerations, \(\Delta t\) the time step and \(t\) is the current time.
Step 1 and 2: Sub-classing MDEngine¶
In order to define a new class to use with PyRETIS we
sub-class the MDEngine and define the
__init__ method:
from pyretis.engines import MDEngine
class LeapFrog(MDEngine):
"""Perform Leap Frog integration."""
def __init__(self, timestep):
"""Initiate the engine.
Parameters
----------
timestep : float
Set the time step for the integrator.
"""
super().__init__(timestep, 'Leap Frog integrator', dynamics='NVE')
self.half_timestep = self.timestep * 0.5
self.half_timestep_sq = 0.5 * self.timestep**2
Step 3: Adding the integration step method¶
Now, we add the actual integration routine:
def integration_step(self, system):
"""Perform one step for the Leap Frog integrator."""
particles = system.particles
imass = particles.imass
# get current acceleration:
acc_t = particles.force * imass
# update positions:
particles.pos += (self.timestep * particles.vel +
self.half_timestep_sq * acc_t)
# update forces:
system.potential_and_force()
# update acceleration:
acc_t2 = particles.force * imass
# update velocities:
particles.vel += self.half_timestep * (acc_t + acc_t2)
Step 4: Making use of the new integrator¶
The new integrator can be used by specifying the following engine section:
Engine
------
class = LeapFrog
module = leapfrog.py
timestep = 0.002
where the keyword module specifies the file where you have
stored the new LeapFrog class.
Creating a new external engine¶
The external engines are used to interface external programs.
In PyRETIS there is a generic ExternalMDEngine which
you can make use of in order to set up new custom external engines.
Before we give an overview of the methods you will have to implement
in order to create a new engine, we list some useful methods which
are already present in the generic ExternalMDEngine:
ExternalMDEngine.execute_command()which can be used to execute system commands.Methods to interact with files:
ExternalMDEngine._modify_input()which can be used to modify simple input files for the external software.
In order to create a new external engine, there are several methods that you need to implement. These methods are described in the next section. Before reading this description, please have a look at the figure below which illustrates the interaction between PyRETIS and the external engine:
Implementing methods for a new external engine¶
The following methods must be implemented in the new engine:
__init__ which does some set-up for the external engine.
step which is a method for performing a single MD step with the external engine.
_read_configuration For reading output (configurations) from the external engine. This is used for calculating the order parameter(s).
_reverse_velocities For reversing velocities in a snapshot. This method will typically make use of the _read_configuration.
_extract_frame For extracting a single frame from a trajectory.
_propagate_from The method for propagating the equations of motion using the external engine.
modify_velocities The method used for generating random velocities for shooting points.
Some examples/templates are shown in the following sections.
The __init__ method¶
The __init__ method sets up the engine and makes it ready for use.
This can, for instance, involve checking that the required input files are
in place and checking the consistency of the input files.
def __init__(self, exe, input_path, timestep, subcycles):
"""Set up the engine.
Parameters
----------
exe : string
The engine executable, optionally followed by arguments.
input_path : string
Path to the directory containing the engine input files.
timestep : float
Time step used in the external engine.
subcycles : integer
Number of external steps performed per |pyretis| step.
"""
super().__init__('New external engine', timestep, subcycles)
self.exe = shlex.split(exe)
self.input_path = os.path.abspath(input_path)
for name in REQUIRED_FILES:
full = os.path.join(self.input_path, name)
if not os.path.isfile(full):
msg = f'Engine could not find input file "{name}"'
logger.error(msg)
raise ValueError(msg)
self.input_files[name] = full
In addition, this is a good point to add a method to execute
the external program. Here we will make use of
ExternalMDEngine.execute_command() and the
ExternalMDEngine.exe_dir property. This property
is set internally by PyRETIS and points to a directory which
we will use when executing the external engine. As a concrete
example, if we are generating a path for ensemble \([1^+]\),
this will point to the sub-directory generate of the
ensemble-directory 002.
Here is a short template for executing external commands:
def _run_program(self, deffnm):
"""Execute the external engine in ``self.exe_dir``.
Parameters
----------
deffnm : string
Base name used for the output files of this run.
Returns
-------
out : dict
Mapping ``{kind: absolute path}`` for the files produced.
"""
self.execute_command(self.exe, cwd=self.exe_dir, inputs=None)
out_files = {}
for kind, template in OUTPUT_FILES.items():
full = os.path.join(self.exe_dir, template.format(deffnm))
if os.path.isfile(full):
out_files[kind] = full
return out_files
The step method¶
The step method is used to execute a single step with the external software. This method is used by the PyRETIS engine in the following way:
conf = self.step(system, 'output-file')
That is, the method takes in two parameters: a system object and a string
and returns a single parameter. This single parameter is just a string
which contains the output configuration created after executing the
external program. Here is a short template for the step method:
def step(self, system, name):
"""Perform a single MD step with the external engine.
Parameters
----------
system : object like :py:class:`.System`
The system to advance.
name : string
Base name used for the output files of this step.
Returns
-------
conf : string
Path of the output configuration.
"""
self.dump_frame(system)
out_files = self._run_program(name)
conf = os.path.join(self.exe_dir, f'{name}.{self.ext}')
ekin, vpot = read_energies(out_files['energy'])
system.particles.set_pos((conf, None))
system.particles.set_vel(False)
system.particles.ekin = ekin[-1]
system.particles.vpot = vpot[-1]
return conf
The _read_configuration method¶
The _read_configuration method is used to get the actual
positions and velocities from a configuration file. As
a short template:
@staticmethod
def _read_configuration(filename):
"""Return ``(box, xyz, vel)`` for a configuration file.
Used by :py:meth:`.ExternalMDEngine.calculate_order`.
"""
return read_configuration(filename)
The _reverse_velocities method¶
The _reverse_velocities method is used to reverse velocities for
a single configuration.
@staticmethod
def _reverse_velocities(filename, outfile):
"""Write ``outfile`` as ``filename`` with reversed velocities."""
box, xyz, vel = read_configuration(filename)
write_configuration(outfile, xyz, -vel, box=box)
The _extract_frame method¶
The _extract_frame method is used to extract a single snapshot from
a trajectory file.
def _extract_frame(self, traj_file, idx, out_file):
"""Extract frame ``idx`` of ``traj_file`` into ``out_file``.
Used by :py:meth:`.ExternalMDEngine.dump_config` when dumping
from a trajectory file. The implementation is engine specific;
for common formats one of ``mdtraj``, ``MDAnalysis`` or
``ase`` can do this in a few lines.
"""
raise NotImplementedError
The _propagate_from method¶
The _propagate_from method is used to propagate the equations of motion
using the external engine. In some cases, this can make use of the
step method, in other cases this is
really distinct (e.g. in GROMACS this would correspond to extending a
simulation).
def _propagate_from(self, name, path, ensemble, msg_file, reverse=False):
"""Propagate using the external engine.
Called from :py:meth:`.ExternalMDEngine.propagate` after the
initial configuration (and any required velocity reversal) has
been written to disk.
Parameters
----------
name : string
Base name for the trajectory we are generating.
path : object like :py:class:`.PathBase`
Path object to fill with phase-space points.
ensemble : dict
Contains ``system``, ``order_function`` and ``interfaces``.
msg_file : object like :py:class:`.FileIO`
File used to log propagation progress.
reverse : boolean, optional
If True, the system is propagated backward in time.
Returns
-------
success : boolean
True if an acceptable path was generated.
status : string
Human-readable description of the outcome.
"""
status = f'propagating with new engine (reverse = {reverse})'
system = ensemble['system']
left, _, right = ensemble['interfaces']
traj_file = os.path.join(self.exe_dir, f'{name}.{self.ext}')
msg_file.write(f'# Trajectory file is: {traj_file}')
order = self.calculate_order(ensemble)
success = False
msg_file.write('# Step order parameter cv1 cv2 ...')
out_files = {}
for i in range(path.maxlen):
msg_file.write(f'{i} {" ".join(str(o) for o in order)}')
snapshot = {
'order': order,
'config': (traj_file, i),
'vel_rev': reverse,
}
phase_point = self.snapshot_to_system(system, snapshot)
status, success, stop, _ = self.add_to_path(
path, phase_point, left, right,
)
if stop:
logger.debug('Propagation ended at step %i: %s', i, status)
break
# Advance the engine and refresh the order parameter:
out_files = self._run_program(name)
system.particles.set_pos((traj_file, i + 1))
system.particles.set_vel(reverse)
order = self.calculate_order(ensemble)
msg_file.flush()
msg_file.write('# Propagation done.')
# If the engine writes energies for the whole run, hand them
# to the path object here, e.g.:
# ekin, vpot = read_energies(out_files['energy'])
# path.update_energies(ekin, vpot)
return success, status
The modify_velocities method¶
This method is used to draw random velocities, typically when
performing the shooting move. The generic modify_velocities method
is quite involved, but for most cases is sufficient to only implement
aimless shooting. Here is a template:
def modify_velocities(self, ensemble, vel_settings=None):
"""Draw new velocities for the current configuration.
Only aimless shooting is supported by this template. Soft
velocity perturbations and energy re-scaling can be added in
the same fashion as in :py:mod:`pyretis.engines.cp2k`.
Parameters
----------
ensemble : dict
Contains ``system`` and ``rgen``.
vel_settings : dict, optional
Velocity generation options:
* ``sigma_v`` - negative for aimless shooting; otherwise
a standard deviation for soft perturbations.
* ``momentum`` - if True, reset the linear momentum.
* ``rescale`` - if a positive float, rescale the total
energy to this value.
Returns
-------
dek : float
Change in kinetic energy.
kin_new : float
New kinetic energy.
"""
rgen = ensemble['rgen']
system = ensemble['system']
particles = system.particles
rescale = vel_settings.get('rescale') if vel_settings else None
if rescale is not None and rescale is not False and rescale > 0:
msg = 'New external engine does not support energy rescale!'
logger.error(msg)
raise NotImplementedError(msg)
if not is_aimless_velocity_setting(vel_settings):
msg = 'New external engine only supports aimless shooting!'
logger.error(msg)
raise NotImplementedError(msg)
pos = self.dump_frame(system)
box, xyz, vel = read_configuration(pos)
particles.vel = vel
kin_old = calculate_kinetic_energy(particles)[0]
new_vel, _ = rgen.draw_maxwellian_velocities(system)
particles.vel = new_vel
if vel_settings.get('momentum', False):
reset_momentum(particles)
kin_new = calculate_kinetic_energy(particles)[0]
conf_out = os.path.join(self.exe_dir, f'genvel.{self.ext}')
write_configuration(conf_out, xyz, particles.vel, box=box)
particles.set_pos((conf_out, None))
particles.set_vel(False)
particles.ekin = kin_new
dek = float('inf') if not kin_old else kin_new - kin_old
return dek, kin_new