Source code for pyretis.engines.gromacs2

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""A GROMACS external MD integrator interface.

This module defines a class for using GROMACS as an external engine.

Important classes defined here
------------------------------

GromacsEngine2
    A class responsible for interfacing GROMACS.
"""
import logging
import os
import shlex
import signal
import subprocess
from time import sleep
from pyretis.core.box import box_matrix_to_list
from pyretis.engines.gromacs import GromacsEngine
from pyretis.inout.formats.gromacs import (
    read_trr_header,
    read_trr_data,
    TRR_DATA_ITEMS
)
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())

TRR_HEAD_SIZE = 1000
# Actually we don't know the header size, this is just a "large"
# number so that we don't try to read the header before at least
# this number of bytes have been written.


[docs]class GromacsEngine2(GromacsEngine): """ A class for interfacing GROMACS. This class defines an interface to GROMACS. Attributes are similar to :py:class:`.GromacsEngine`. In this particular interface, GROMACS is executed without starting and stopping and we rely on reading the output TRR file from GROMACS while a simulation is running. """
[docs] def __init__(self, gmx, mdrun, input_path, timestep, subcycles, exe_path=os.path.abspath('.'), maxwarn=0, gmx_format='gro', write_vel=True, write_force=False, domain_decomp=False): """Set up the GROMACS engine. Parameters ---------- gmx : string The GROMACS executable. mdrun : string The GROMACS mdrun executable. input_path : string The absolute path to where the input files are stored. timestep : float The time step used in the GROMACS MD simulation. subcycles : integer The number of steps each GROMACS MD run is composed of. exe_path : string, optional The absolute path at which the main PyRETIS simulation will be run. maxwarn : integer, optional Setting for the GROMACS ``grompp -maxwarn`` option. gmx_format : string, optional The format used for GROMACS configurations. write_vel : boolean, optional Determines if GROMACS should write velocities or not. write_force : boolean, optional Determines if GROMACS should write forces or not. domain_decomp: boolean, optional Wheter GROMACS uses domain decomposition or not. This will not set the domain decomposition behavior, the user should know know this. """ super().__init__(gmx, mdrun, input_path, timestep, subcycles, exe_path=exe_path, maxwarn=maxwarn, gmx_format=gmx_format, write_vel=write_vel, write_force=write_force, domain_decomp=domain_decomp)
[docs] def _propagate_from(self, name, path, ensemble, msg_file, reverse=False): """ Propagate with GROMACS from the current system configuration. Here, we assume that this method is called after the propagate() has been called in the parent. The parent is then responsible for reversing the velocities and also for setting the initial state of the system. Parameters ---------- name : string A name to use for the trajectory we are generating. path : object like :py:class:`pyretis.core.Path.PathBase` This is the path we use to fill in phase-space points. ensemble: dict it contains: * `system` : object like :py:class:`.System` The system object gives the initial state for the integration. The initial state is stored and the system is reset to the initial state when the integration is done. * `order_function` : object like :py:class:`.OrderParameter` The object used for calculating the order parameter. * `interfaces` : list of floats These interfaces define the stopping criterion. msg_file : object like :py:class:`.FileIO` An object we use for writing out messages that are useful for inspecting the status of the current propagation. reverse : boolean, optional If True, the system will be propagated backward in time. Returns ------- success : boolean This is True if we generated an acceptable path. status : string A text description of the current status of the propagation. """ status = f'propagating with GROMACS (reverse = {reverse})' system = ensemble['system'] interfaces = ensemble['interfaces'] order_function = ensemble['order_function'] logger.debug(status) success = False left, _, right = interfaces # Dumping of the initial config were done by the parent, here # we will just use it: initial_conf = system.particles.get_pos()[0] # Get the current order parameter: order = self.calculate_order(ensemble) msg_file.write( f'# Initial order parameter: {" ".join([str(i) for i in order])}' ) # So, here we will just blast off GROMACS and check the .trr # output when we can. # 1) Create mdp_file with updated number of steps: settings = {'gen_vel': 'no', 'nsteps': path.maxlen * self.subcycles, 'continuation': 'no'} mdp_file = os.path.join(self.exe_dir, f'{name}.mdp') self._modify_input(self.input_files['input'], mdp_file, settings, delim='=') # 2) Run GROMACS preprocessor: out_files = self._execute_grompp(mdp_file, initial_conf, name) # Generate some names that will be created by mdrun: confout = f'{name}.{self.ext}' out_files['conf'] = confout out_files['cpt_prev'] = f'{name}_prev.cpt' for key in ('cpt', 'edr', 'log', 'trr'): out_files[key] = f'{name}.{key}' # Remove some of these files if present (e.g. left over from a # crashed simulation). This is so that GromacsRunner will not # start reading a .trr left from a previous simulation. remove = [val for key, val in out_files.items() if key != 'tpr'] self._remove_files(self.exe_dir, remove) tpr_file = out_files['tpr'] trr_file = os.path.join(self.exe_dir, out_files['trr']) edr_file = os.path.join(self.exe_dir, out_files['edr']) cmd = shlex.split(self.mdrun.format(tpr_file, name, confout)) # 3) Fire off GROMACS mdrun: logger.debug('Executing GROMACS.') msg_file.write(f'# Trajectory file is: {trr_file}') msg_file.write('# Starting GROMACS.') msg_file.write('# Step order parameter cv1 cv2 ...') with GromacsRunner(cmd, trr_file, edr_file, self.exe_dir) as gro: for i, data in enumerate(gro.get_gromacs_frames()): # Update the configuration file: system.particles.set_pos((trr_file, i)) # Also provide the loaded positions since they are # available: system.particles.pos = data['x'] system.particles.vel = data.get('v', None) if system.particles.vel is not None and reverse: system.particles.vel *= -1 length = box_matrix_to_list(data['box']) system.update_box(length) order = order_function.calculate(system) msg_file.write(f'{i} {" ".join([str(j) for j in order])}') snapshot = {'order': order, 'config': (trr_file, i), 'vel_rev': reverse} phase_point = self.snapshot_to_system(system, snapshot) status, success, stop, _ = self.add_to_path(path, phase_point, left, right) if stop: logger.debug('Ending propagate at %i. Reason: %s', i, status) break logger.debug('GROMACS propagation done, obtaining energies!') msg_file.write('# Propagation done.') msg_file.write(f'# Reading energies from: {out_files["edr"]}') energy = self.get_energies(out_files['edr']) path.update_energies(energy['kinetic en.'], energy['potential']) logger.debug('Removing GROMACS output after propagate.') remove = [val for key, val in out_files.items() if key not in ('trr', 'gro', 'g96')] self._remove_files(self.exe_dir, remove) self._remove_gromacs_backup_files(self.exe_dir) msg_file.flush() return success, status
[docs] def integrate(self, ensemble, steps, thermo='full'): """ Perform several integration steps. This method will perform several integration steps using GROMACS. It will also calculate order parameter(s) and energy terms if requested. Parameters ---------- ensemble: dict it contains: * `system` : object like :py:class:`.System` The system object gives the initial state for the integration. The initial state is stored and the system is reset to the initial state when the integration is done. * `order_function` : object like :py:class:`.OrderParameter` The object used for calculating the order parameter. steps : integer The number of steps we are going to perform. Note that we do not integrate on the first step (e.g. step 0) but we do obtain the other properties. This is to output the starting configuration. thermo : string, optional Select the thermodynamic properties we are to obtain. Yields ------ results : dict The results from a MD step. This contains the state of the system and order parameter(s) and energies (if calculated). """ logger.debug('Integrating with GROMACS') # Dump the initial config: system = ensemble['system'] order_function = ensemble.get('order_function') initial_file = self.dump_frame(system) self.energy_terms = self.select_energy_terms(thermo) if order_function: order = self.calculate_order(ensemble) else: order = None name = 'pyretis-gmx' # 1) Create mdp_file with updated number of steps: # Note the -1 here due do different numbering in GROMACS and PyRETIS. settings = {'nsteps': (steps - 1) * self.subcycles, 'continuation': 'no'} mdp_file = os.path.join(self.exe_dir, f'{name}.mdp') self._modify_input(self.input_files['input'], mdp_file, settings, delim='=') # 2) Run GROMACS preprocessor: out_files = self._execute_grompp(mdp_file, initial_file, name) # Generate some names that will be created by mdrun: confout = f'{name}.{self.ext}' out_files['conf'] = confout out_files['cpt_prev'] = f'{name}_prev.cpt' for key in ('cpt', 'edr', 'log', 'trr'): out_files[key] = f'{name}.{key}' # Remove some of these files if present (e.g. left over from a # crashed simulation). This is so that GromacsRunner will not # start reading a .trr left from a previous simulation. remove = [val for key, val in out_files.items() if key != 'tpr'] self._remove_files(self.exe_dir, remove) tpr_file = out_files['tpr'] trr_file = os.path.join(self.exe_dir, out_files['trr']) edr_file = os.path.join(self.exe_dir, out_files['edr']) cmd = shlex.split(self.mdrun.format(tpr_file, name, confout)) # 3) Fire off GROMACS mdrun: logger.debug('Executing GROMACS.') with GromacsRunner(cmd, trr_file, edr_file, self.exe_dir) as gro: for i, data in enumerate(gro.get_gromacs_frames()): system.particles.pos = data['x'] system.particles.vel = data.get('v', None) length = box_matrix_to_list(data['box']) system.update_box(length) results = {} if order: results['order'] = order if order_function: order = order_function.calculate(system) time1 = (i * self.timestep * self.subcycles - 0.1 * self.timestep) time2 = ((i + 1) * self.timestep * self.subcycles + 0.1 * self.timestep) energy = self.get_energies(out_files['edr'], begin=time1, end=time2) results['thermo'] = self.rename_energies(energy) yield results logger.debug('GROMACS execution done.')
[docs]def get_data(fileh, header): """Read data from the TRR file. Parameters ---------- fileh : file object The file we are reading. header : dict The previously read header. Contains sizes and what to read. Returns ------- data : dict The data read from the file. data_size : integer The size of the data read. """ data_size = sum([header[key] for key in TRR_DATA_ITEMS]) data = read_trr_data(fileh, header) return data, data_size
[docs]def reopen_file(filename, fileh, inode, bytes_read): """Reopen a file if the inode has changed. Parameters ---------- filename : string The name of the file we are working with. fileh : file object The current open file object. inode : integer The current inode we are using. bytes_read : integer The position we should start reading at. Returns ------- out[0] : file object or None The new file object. out[1] : integer or None The new inode. """ if os.stat(filename).st_ino != inode: new_fileh = open(filename, 'rb') fileh.close() new_inode = os.fstat(new_fileh.fileno()).st_ino new_fileh.seek(bytes_read) return new_fileh, new_inode return None, None
[docs]def read_remaining_trr(filename, fileh, start): """Read remaining frames from the TRR file. Parameters ---------- filename : string The file we are reading from. fileh : file object The file object we are reading from. start : integer The current position we are at. Yields ------ out[0] : string The header read from the file out[1] : dict The data read from the file. out[2] : integer The size of the data read. """ stop = False bytes_read = start bytes_total = os.path.getsize(filename) logger.debug('Reading remaing data from: %s', filename) while not stop: if bytes_read >= bytes_total: stop = True continue header = None new_bytes = bytes_read try: header, new_bytes = read_trr_header(fileh) except EOFError: # pragma: no cover # Just assume that we have reached the end of the # file and we just stop here. It should not be reached, # kept for safety stop = True continue if header is not None: bytes_read += new_bytes try: data, new_bytes = get_data(fileh, header) if data is not None: bytes_read += new_bytes yield header, data, bytes_read except EOFError: # pragma: no cover # Hopefully, this code should not be reached. # kept for safety stop = True continue
[docs]class GromacsRunner: """A helper class for running GROMACS. This class handles the reading of the TRR on the fly and it is used to decide when to end the GROMACS execution. Attributes ---------- cmd : string The command for executing GROMACS. trr_file : string The GROMACS TRR file we are going to read. edr_file : string A .edr file we are going to read. exe_dir : string Path to where we are currently running GROMACS. fileh : file object The current open file object. running : None or object like :py:class:`subprocess.Popen` The process running GROMACS. bytes_read : integer The number of bytes read so far from the TRR file. ino : integer The current inode we are using for the file. stop_read : boolean If this is set to True, we will stop the reading. SLEEP : float How long we wait after an unsuccessful read before reading again. data_size : integer The size of the data (x, v, f, box, etc.) in the TRR file. header_size : integer The size of the header in the TRR file. """ SLEEP = 0.1
[docs] def __init__(self, cmd, trr_file, edr_file, exe_dir): """Set the GROMACS command and the files we need. Parameters ---------- cmd : string The command for executing GROMACS. trr_file : string The GROMACS TRR file we are going to read. edr_file : string A .edr file we are going to read. exe_dir : string Path to where we are currently running GROMACS. """ self.cmd = cmd self.trr_file = trr_file self.edr_file = edr_file self.exe_dir = exe_dir self.fileh = None self.running = None self.bytes_read = 0 self.ino = 0 self.stop_read = True self.data_size = 0 self.header_size = 0 self.stdout_name = None self.stderr_name = None self.stdout = None self.stderr = None
[docs] def start(self): """Start execution of GROMACS and wait for output file creation.""" logger.debug('Starting GROMACS execution in %s', self.exe_dir) self.stdout_name = os.path.join(self.exe_dir, 'stdout.txt') self.stderr_name = os.path.join(self.exe_dir, 'stderr.txt') self.stdout = open(self.stdout_name, 'wb') self.stderr = open(self.stderr_name, 'wb') self.running = subprocess.Popen( self.cmd, stdin=subprocess.PIPE, stdout=self.stdout, stderr=self.stderr, shell=False, cwd=self.exe_dir, preexec_fn=os.setsid, ) present = [] # Wait for the TRR/EDR files to appear: for fname in (self.trr_file, self.edr_file): while not os.path.isfile(fname): logger.debug('Waiting for GROMACS file "%s"', fname) sleep(self.SLEEP) poll = self.check_poll() if poll is not None: logger.debug('GROMACS execution stopped') break if os.path.isfile(fname): present.append(fname) # Prepare and open the TRR file: self.bytes_read = 0 # Ok, so GROMACS might have crashed in between writing the # files. Check that both files are indeed here: if self.trr_file in present and self.edr_file in present: self.fileh = open(self.trr_file, 'rb') self.ino = os.fstat(self.fileh.fileno()).st_ino self.stop_read = False else: self.stop_read = True
[docs] def __enter__(self): """Start running GROMACS, for a context manager.""" self.start() return self
[docs] def get_gromacs_frames(self): """Read the GROMACS TRR file on-the-fly.""" first_header = True header = None while not self.stop_read: poll = self.check_poll() if poll is not None: # GROMACS is done, read remaining data. self.stop_read = True if os.path.getsize(self.trr_file) - self.bytes_read > 0: for _, data, _ in read_remaining_trr(self.trr_file, self.fileh, self.bytes_read): yield data else: # First we try to get the header from the file: size = os.path.getsize(self.trr_file) if self.header_size == 0: header_size = TRR_HEAD_SIZE else: header_size = self.header_size if size >= self.bytes_read + header_size: # Try to read next frame: try: header, new_bytes = read_trr_header(self.fileh) except EOFError: new_fileh, new_ino = reopen_file(self.trr_file, self.fileh, self.ino, self.bytes_read) if new_fileh is not None: self.fileh = new_fileh self.ino = new_ino if header is not None: self.bytes_read += new_bytes self.header_size = new_bytes if first_header: logger.debug('TRR header was: %i', new_bytes) first_header = False # Calculate the size of the data: self.data_size = sum([header[key] for key in TRR_DATA_ITEMS]) data = None while data is None: size = os.path.getsize(self.trr_file) if size >= self.bytes_read + self.data_size: try: data, new_bytes = get_data(self.fileh, header) except EOFError: new_fileh, new_ino = reopen_file( self.trr_file, self.fileh, self.ino, self.bytes_read) if new_fileh is not None: self.fileh = new_fileh self.ino = new_ino if data is None: # Data is not ready, just wait: sleep(self.SLEEP) else: self.bytes_read += new_bytes yield data else: # Data is not ready, just wait: sleep(self.SLEEP) else: # Header was not ready, just wait before trying again. sleep(self.SLEEP)
[docs] def close(self): """Close the file, in case that is explicitly needed.""" if self.fileh is not None and not self.fileh.closed: logger.debug('Closing GROMACS file: "%s"', self.trr_file) self.fileh.close() for handle in (self.stdout, self.stderr): if handle is not None and not handle.closed: handle.close()
[docs] def stop(self): """Stop the current GROMACS execution.""" if self.running: for handle in (self.running.stdin, self.running.stdout, self.running.stderr): if handle: try: handle.close() except AttributeError: pass if self.running.returncode is None: logger.debug('Terminating GROMACS execution') os.killpg(os.getpgid(self.running.pid), signal.SIGTERM) self.running.wait(timeout=360) self.stop_read = True self.close() # Close the TRR file.
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Just stop execution and close file for a context manager.""" self.stop()
[docs] def __del__(self): """Just stop execution and close file.""" self.stop()
[docs] def check_poll(self): """Check the current status of the running subprocess.""" if self.running: poll = self.running.poll() if poll is not None: logger.debug('Execution of GROMACS stopped') logger.debug('Return code was: %i', poll) if poll != 0: logger.error('STDOUT, see file: %s', self.stdout_name) logger.error('STDERR, see file: %s', self.stderr_name) raise RuntimeError('Error in GROMACS execution.') return poll raise RuntimeError('GROMACS is not running.')