Creating custom engines

The Engines are used to propagate the equations of motion or alter the state of the system in some other way. In this section, we describe how additional engines can be added to PyRETIS. Here, we make a distinction between two types of engines:

  • Internal engines which directly interact with the underlying numpy.arrays representing positions, velocities and forces (see Fig. 13-left)

  • External engines which are used by PyRETIS to control the execution of external programs. In this case, the state of the particles is represented by a file containing the positions and velocities. As shown in the illustration below (Fig. 13-right) , this can be accessed using ParticlesExt.config which is a tuple containing a file name and an index. The interpretation is that the current state of the system can be found in the given file at the given frame/index. Further, the attribute ParticlesExt.vel_rev is used to determine if the velocities in the configuration should be reversed or not. In this case the engine will typically make use of ParticlesExt.set_pos() in order to update the state of the system.

Comparison of internal and external engines.

Fig. 13 Illustration of how internal engines (left) and external engines (right) interact with the system and particles. (Left) The internal engine interacts with and alters the state of the system, and it can directly access the positions and velocities, as numpy.arrays, using Particles.pos and Particles.vel. (Right) The external engine interacts with and alters the state of the system using ParticlesExt.set_pos() and the state of the system is represented by the attribute ParticlesExt.config.

See also

Once your engine is implemented, validate that it reproduces the same rate on a shared system with the reproducibility suite – see Extending and validating PyRETIS.

Creating a new internal engine

In order to create a new internal engine, we create a new class for our engine which is sub-classing the generic EngineBase. Often, we wish to create a new integrator for molecular dynamics and then we can simplify our work by sub-classing the more specific MDEngine. The steps you need to complete in order to create a new MDengine is as follows:

  1. Create a new Python class which is a sub-class of MDEngine

  2. Write a method to initialise this class, that is a __init__ method. This method will be called when PyRETIS is setting up a new simulation and it will be fed variables from the PyRETIS input file.

  3. Write a method to perform a single integration step.

  4. Modify the input script to use the new integrator.

Example: Creating a new internal LeapFrog integrator

To be specific, we will in the following show the steps needed to create a new MD engine using the Leapfrog scheme [1]:

\[\begin{split}\mathbf{r} (t + \Delta t) &= \mathbf{r} (t) + \mathbf{v} (t) \Delta t + \tfrac{1}{2} \mathbf{a} (t) \Delta t^2 \\ \mathbf{v} (t + \Delta t) &= \mathbf{v} (t) + \tfrac{1}{2} \left[\mathbf{a} (t) + \mathbf{a} (t+\Delta t) \right] \Delta t\end{split}\]

where \(\mathbf{r}\) are the positions, \(\mathbf{v}\) the velocities, \(\mathbf{a}\) the accelerations, \(\Delta t\) the time step and \(t\) is the current time.

Step 1 and 2: Sub-classing MDEngine

In order to define a new class to use with PyRETIS we sub-class the MDEngine and define the __init__ method:

from pyretis.engines import MDEngine


class LeapFrog(MDEngine):
    """Perform Leap Frog integration."""

    def __init__(self, timestep):
        """Initiate the engine.

        Parameters
        ----------
        timestep : float
            Set the time step for the integrator.
        """
        super().__init__(timestep, 'Leap Frog integrator', dynamics='NVE')
        self.half_timestep = self.timestep * 0.5
        self.half_timestep_sq = 0.5 * self.timestep**2

Step 3: Adding the integration step method

Now, we add the actual integration routine:

    def integration_step(self, system):
        """Perform one step for the Leap Frog integrator."""
        particles = system.particles
        imass = particles.imass

        # get current acceleration:
        acc_t = particles.force * imass
        # update positions:
        particles.pos += (self.timestep * particles.vel +
                          self.half_timestep_sq * acc_t)
        # update forces:
        system.potential_and_force()
        # update acceleration:
        acc_t2 = particles.force * imass
        # update velocities:
        particles.vel += self.half_timestep * (acc_t + acc_t2)

Step 4: Making use of the new integrator

The new integrator can be used by specifying the following engine section:

Example Engine section for LeapFrog:
Engine
------
class = LeapFrog
module = leapfrog.py
timestep = 0.002

where the keyword module specifies the file where you have stored the new LeapFrog class.

Creating a new external engine

The external engines are used to interface external programs. In PyRETIS there is a generic ExternalMDEngine which you can make use of in order to set up new custom external engines. Before we give an overview of the methods you will have to implement in order to create a new engine, we list some useful methods which are already present in the generic ExternalMDEngine:

In order to create a new external engine, there are several methods that you need to implement. These methods are described in the next section. Before reading this description, please have a look at the figure below which illustrates the interaction between PyRETIS and the external engine:

Implementing methods for a new external engine

The following methods must be implemented in the new engine:

  • __init__ which does some set-up for the external engine.

  • step which is a method for performing a single MD step with the external engine.

  • _read_configuration For reading output (configurations) from the external engine. This is used for calculating the order parameter(s).

  • _reverse_velocities For reversing velocities in a snapshot. This method will typically make use of the _read_configuration.

  • _extract_frame For extracting a single frame from a trajectory.

  • _propagate_from The method for propagating the equations of motion using the external engine.

  • modify_velocities The method used for generating random velocities for shooting points.

Some examples/templates are shown in the following sections.

The __init__ method

The __init__ method sets up the engine and makes it ready for use. This can, for instance, involve checking that the required input files are in place and checking the consistency of the input files.

    def __init__(self, exe, input_path, timestep, subcycles):
        """Set up the engine.

        Parameters
        ----------
        exe : string
            The engine executable, optionally followed by arguments.
        input_path : string
            Path to the directory containing the engine input files.
        timestep : float
            Time step used in the external engine.
        subcycles : integer
            Number of external steps performed per |pyretis| step.

        """
        super().__init__('New external engine', timestep, subcycles)
        self.exe = shlex.split(exe)
        self.input_path = os.path.abspath(input_path)
        for name in REQUIRED_FILES:
            full = os.path.join(self.input_path, name)
            if not os.path.isfile(full):
                msg = f'Engine could not find input file "{name}"'
                logger.error(msg)
                raise ValueError(msg)
            self.input_files[name] = full

In addition, this is a good point to add a method to execute the external program. Here we will make use of ExternalMDEngine.execute_command() and the ExternalMDEngine.exe_dir property. This property is set internally by PyRETIS and points to a directory which we will use when executing the external engine. As a concrete example, if we are generating a path for ensemble \([1^+]\), this will point to the sub-directory generate of the ensemble-directory 002. Here is a short template for executing external commands:

    def _run_program(self, deffnm):
        """Execute the external engine in ``self.exe_dir``.

        Parameters
        ----------
        deffnm : string
            Base name used for the output files of this run.

        Returns
        -------
        out : dict
            Mapping ``{kind: absolute path}`` for the files produced.

        """
        self.execute_command(self.exe, cwd=self.exe_dir, inputs=None)
        out_files = {}
        for kind, template in OUTPUT_FILES.items():
            full = os.path.join(self.exe_dir, template.format(deffnm))
            if os.path.isfile(full):
                out_files[kind] = full
        return out_files

The step method

The step method is used to execute a single step with the external software. This method is used by the PyRETIS engine in the following way:

conf = self.step(system, 'output-file')

That is, the method takes in two parameters: a system object and a string and returns a single parameter. This single parameter is just a string which contains the output configuration created after executing the external program. Here is a short template for the step method:

    def step(self, system, name):
        """Perform a single MD step with the external engine.

        Parameters
        ----------
        system : object like :py:class:`.System`
            The system to advance.
        name : string
            Base name used for the output files of this step.

        Returns
        -------
        conf : string
            Path of the output configuration.

        """
        self.dump_frame(system)
        out_files = self._run_program(name)
        conf = os.path.join(self.exe_dir, f'{name}.{self.ext}')
        ekin, vpot = read_energies(out_files['energy'])
        system.particles.set_pos((conf, None))
        system.particles.set_vel(False)
        system.particles.ekin = ekin[-1]
        system.particles.vpot = vpot[-1]
        return conf

The _read_configuration method

The _read_configuration method is used to get the actual positions and velocities from a configuration file. As a short template:

    @staticmethod
    def _read_configuration(filename):
        """Return ``(box, xyz, vel)`` for a configuration file.

        Used by :py:meth:`.ExternalMDEngine.calculate_order`.

        """
        return read_configuration(filename)

The _reverse_velocities method

The _reverse_velocities method is used to reverse velocities for a single configuration.

    @staticmethod
    def _reverse_velocities(filename, outfile):
        """Write ``outfile`` as ``filename`` with reversed velocities."""
        box, xyz, vel = read_configuration(filename)
        write_configuration(outfile, xyz, -vel, box=box)

The _extract_frame method

The _extract_frame method is used to extract a single snapshot from a trajectory file.

    def _extract_frame(self, traj_file, idx, out_file):
        """Extract frame ``idx`` of ``traj_file`` into ``out_file``.

        Used by :py:meth:`.ExternalMDEngine.dump_config` when dumping
        from a trajectory file. The implementation is engine specific;
        for common formats one of ``mdtraj``, ``MDAnalysis`` or
        ``ase`` can do this in a few lines.
        """
        raise NotImplementedError

The _propagate_from method

The _propagate_from method is used to propagate the equations of motion using the external engine. In some cases, this can make use of the step method, in other cases this is really distinct (e.g. in GROMACS this would correspond to extending a simulation).

    def _propagate_from(self, name, path, ensemble, msg_file, reverse=False):
        """Propagate using the external engine.

        Called from :py:meth:`.ExternalMDEngine.propagate` after the
        initial configuration (and any required velocity reversal) has
        been written to disk.

        Parameters
        ----------
        name : string
            Base name for the trajectory we are generating.
        path : object like :py:class:`.PathBase`
            Path object to fill with phase-space points.
        ensemble : dict
            Contains ``system``, ``order_function`` and ``interfaces``.
        msg_file : object like :py:class:`.FileIO`
            File used to log propagation progress.
        reverse : boolean, optional
            If True, the system is propagated backward in time.

        Returns
        -------
        success : boolean
            True if an acceptable path was generated.
        status : string
            Human-readable description of the outcome.

        """
        status = f'propagating with new engine (reverse = {reverse})'
        system = ensemble['system']
        left, _, right = ensemble['interfaces']
        traj_file = os.path.join(self.exe_dir, f'{name}.{self.ext}')
        msg_file.write(f'# Trajectory file is: {traj_file}')

        order = self.calculate_order(ensemble)
        success = False
        msg_file.write('# Step order parameter cv1 cv2 ...')
        out_files = {}
        for i in range(path.maxlen):
            msg_file.write(f'{i} {" ".join(str(o) for o in order)}')
            snapshot = {
                'order': order,
                'config': (traj_file, i),
                'vel_rev': reverse,
            }
            phase_point = self.snapshot_to_system(system, snapshot)
            status, success, stop, _ = self.add_to_path(
                path, phase_point, left, right,
            )
            if stop:
                logger.debug('Propagation ended at step %i: %s', i, status)
                break
            # Advance the engine and refresh the order parameter:
            out_files = self._run_program(name)
            system.particles.set_pos((traj_file, i + 1))
            system.particles.set_vel(reverse)
            order = self.calculate_order(ensemble)
            msg_file.flush()
        msg_file.write('# Propagation done.')
        # If the engine writes energies for the whole run, hand them
        # to the path object here, e.g.:
        #     ekin, vpot = read_energies(out_files['energy'])
        #     path.update_energies(ekin, vpot)
        return success, status

The modify_velocities method

This method is used to draw random velocities, typically when performing the shooting move. The generic modify_velocities method is quite involved, but for most cases is sufficient to only implement aimless shooting. Here is a template:

    def modify_velocities(self, ensemble, vel_settings=None):
        """Draw new velocities for the current configuration.

        Only aimless shooting is supported by this template. Soft
        velocity perturbations and energy re-scaling can be added in
        the same fashion as in :py:mod:`pyretis.engines.cp2k`.

        Parameters
        ----------
        ensemble : dict
            Contains ``system`` and ``rgen``.
        vel_settings : dict, optional
            Velocity generation options:

            * ``sigma_v`` - negative for aimless shooting; otherwise
              a standard deviation for soft perturbations.
            * ``momentum`` - if True, reset the linear momentum.
            * ``rescale`` - if a positive float, rescale the total
              energy to this value.

        Returns
        -------
        dek : float
            Change in kinetic energy.
        kin_new : float
            New kinetic energy.

        """
        rgen = ensemble['rgen']
        system = ensemble['system']
        particles = system.particles
        rescale = vel_settings.get('rescale') if vel_settings else None
        if rescale is not None and rescale is not False and rescale > 0:
            msg = 'New external engine does not support energy rescale!'
            logger.error(msg)
            raise NotImplementedError(msg)
        if not is_aimless_velocity_setting(vel_settings):
            msg = 'New external engine only supports aimless shooting!'
            logger.error(msg)
            raise NotImplementedError(msg)
        pos = self.dump_frame(system)
        box, xyz, vel = read_configuration(pos)
        particles.vel = vel
        kin_old = calculate_kinetic_energy(particles)[0]
        new_vel, _ = rgen.draw_maxwellian_velocities(system)
        particles.vel = new_vel
        if vel_settings.get('momentum', False):
            reset_momentum(particles)
        kin_new = calculate_kinetic_energy(particles)[0]
        conf_out = os.path.join(self.exe_dir, f'genvel.{self.ext}')
        write_configuration(conf_out, xyz, particles.vel, box=box)
        particles.set_pos((conf_out, None))
        particles.set_vel(False)
        particles.ekin = kin_new
        dek = float('inf') if not kin_old else kin_new - kin_old
        return dek, kin_new

References