# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Module for handling GROMACS input/output.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
read_gromacs_generic (:py:func:`.read_gromacs_generic`)
A method for reading snapshots from any GROMACS file.
read_gromacs_file (:py:func:`.read_gromacs_file`)
A method for reading snapshots from a GROMACS GRO file.
read_gromacs_gro_file (:py:func:`.read_gromacs_gro_file`)
Read a single snapshot from a GROMACS GRO file.
write_gromacs_gro_file (:py:func:`.write_gromacs_gro_file`)
Write configuration in GROMACS GRO format.
read_gromos96_file (:py:func:`.read_gromos96_file`)
Read a single configuration GROMACS .g96 file.
write_gromos96_file (:py:func:`.write_gromos96_file`)
Write configuration in GROMACS g96 format.
read_xvg_file (:py:func:`.read_xvg_file`)
For reading .xvg files from GROMACS.
read_trr_header (:py:func:`.read_trr_header`)
Read a header from an open TRR file.
read_trr_data (:py:func:`.read_trr_data`)
Read data from an open TRR file.
skip_trr_data (:py:func:`.skip_trr_data`)
Skip reading data from an open TRR file and move on to the
next header.
read_trr_file (:py:func:`.read_trr_file`)
Yield frames from a TRR file.
trr_frame_to_g96 (:py:func:`.trr_frame_to_g96`)
Dump a specific frame from a TRR file to a .g96 file.
write_trr_frame (:py:func:`.write_trr_frame`)
A simple method to write to a TRR file.
"""
import logging
import os
import struct
import io
import numpy as np
from pyretis.core.box import box_matrix_to_list
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
np.set_printoptions(legacy='1.25')
__all__ = [
'read_gromacs_generic',
'read_gromacs_file',
'read_gromacs_gro_file',
'write_gromacs_gro_file',
'read_gromos96_file',
'write_gromos96_file',
'read_xvg_file',
'read_trr_header',
'read_trr_data',
'skip_trr_data',
'read_trr_file',
'trr_frame_to_g96',
'write_trr_frame',
]
# Define formats for the trajectory output:
_GRO_FMT = '{0:5d}{1:5s}{2:5s}{3:5d}{4:8.3f}{5:8.3f}{6:8.3f}'
_GRO_VEL_FMT = _GRO_FMT + '{7:8.4f}{8:8.4f}{9:8.4f}'
_GRO_BOX_FMT = '{:15.9f}'
_G96_FMT = '{0:}{1:15.9f}{2:15.9f}{3:15.9f}\n'
_G96_FMT_FULL = '{0:5d} {1:5s} {2:5s}{3:7d}{4:15.9f}{5:15.9f}{6:15.9f}\n'
_G96_BOX_FMT = '{:15.9f}' * 9 + '\n'
_G96_BOX_FMT_3 = '{:15.9f}' * 3 + '\n'
# Definitions for the TRR reader.
_GROMACS_MAGIC = 1993
_DIM = 3
_TRR_VERSION = 'GMX_trn_file'
_TRR_VERSION_B = b'GMX_trn_file'
_SIZE_FLOAT = struct.calcsize('f')
_SIZE_DOUBLE = struct.calcsize('d')
_HEAD_FMT = '{}13i'
_HEAD_ITEMS = ('ir_size', 'e_size', 'box_size', 'vir_size', 'pres_size',
'top_size', 'sym_size', 'x_size', 'v_size', 'f_size',
'natoms', 'step', 'nre', 'time', 'lambda')
TRR_DATA_ITEMS = ('box_size', 'vir_size', 'pres_size',
'x_size', 'v_size', 'f_size')
def read_gromacs_lines(lines):
"""Read and parse GROMACS GRO data.
This method will read a GROMACS file and yield the different
snapshots found in the file.
Parameters
----------
lines : iterable
Some lines of text data representing a GROMACS GRO file.
Yields
------
out : dict
This dict contains the snapshot.
"""
lines_to_read = 0
snapshot = {}
read_natoms = False
gro = (5, 5, 5, 5, 8, 8, 8, 8, 8, 8)
gro_keys = ('residunr', 'residuname', 'atomname', 'atomnr',
'x', 'y', 'z', 'vx', 'vy', 'vz')
gro_type = (int, str, str, int, float, float, float, float, float, float)
for line in lines:
if read_natoms:
read_natoms = False
lines_to_read = int(line.strip()) + 1
continue # just skip to next line
if lines_to_read == 0: # new snapshot
if snapshot:
_add_matrices_to_snapshot(snapshot)
yield snapshot
snapshot = {'header': line.strip()}
read_natoms = True
elif lines_to_read == 1: # read box
snapshot['box'] = np.array(
[float(i) for i in line.strip().split()]
)
lines_to_read -= 1
else: # read atoms
lines_to_read -= 1
current = 0
for i, key, gtype in zip(gro, gro_keys, gro_type):
val = line[current:current+i].strip()
if not val:
# This typically happens if we try to read velocities
# and they are not present in the file.
break
value = gtype(val)
current += i
try:
snapshot[key].append(value)
except KeyError:
snapshot[key] = [value]
if snapshot:
_add_matrices_to_snapshot(snapshot)
yield snapshot
[docs]def _add_matrices_to_snapshot(snapshot):
"""Extract positions and velocities as matrices from GROMACS.
The extracted positions and velocities will be added to the given
snapshot.
Parameters
----------
snapshot : dict
This dict contains the data read from the GROMACS file.
Returns
-------
xyz : numpy.array
The positions as an array, (N, 3).
vel : numpy.array
The velocities as an array, (N, 3).
"""
xyz = np.zeros((len(snapshot['atomnr']), 3))
for i, key in enumerate(('x', 'y', 'z')):
if key in snapshot:
xyz[:, i] = snapshot[key]
vel = np.zeros_like(xyz)
for i, key in enumerate(('vx', 'vy', 'vz')):
if key in snapshot:
vel[:, i] = snapshot[key]
snapshot['xyz'] = xyz
snapshot['vel'] = vel
return xyz, vel
[docs]def read_gromacs_generic(filename):
"""Read GROMACS files.
This method will read a GROMACS file and yield the different
snapshots found in the file. This file is intended to be used
to just count the n of snapshots stored in a file.
Parameters
----------
filename : string
The file to check.
Yields
------
out : None.
"""
if filename[-4:] == '.gro':
for _ in read_gromacs_file(filename):
yield None
if filename[-4:] == '.g96':
yield None
if filename[-4:] == '.trr':
for _ in read_trr_file(filename):
yield None
[docs]def read_gromacs_file(filename):
"""Read GROMACS GRO files.
This method will read a GROMACS file and yield the different
snapshots found in the file. This file is intended to be used
if we want to read all snapshots present in a file.
Parameters
----------
filename : string
The file to open.
Yields
------
out : dict
This dict contains the snapshot.
Examples
--------
>>> from pyretis.inout.formats.gromacs import read_gromacs_file
>>> for snapshot in read_gromacs_file('traj.gro'):
... print(snapshot['x'][0])
"""
with open(filename, 'r', encoding='utf-8') as fileh:
for snapshot in read_gromacs_lines(fileh):
yield snapshot
[docs]def read_gromacs_gro_file(filename):
"""Read a single configuration GROMACS GRO file.
This method will read the first configuration from the GROMACS
GRO file and return the data as give by
:py:func:`.read_gromacs_lines`. It will also explicitly
return the matrices with positions, velocities and box size.
Parameters
----------
filename : string
The file to read.
Returns
-------
frame : dict
This dict contains all the data read from the file.
xyz : numpy.array
The positions. The array is (N, 3) where N is the
number of particles.
vel : numpy.array
The velocities. The array is (N, 3) where N is the
number of particles.
box : numpy.array
The box dimensions.
"""
snapshot = None
xyz = None
vel = None
box = None
with open(filename, 'r', encoding='utf8') as fileh:
snapshot = next(read_gromacs_lines(fileh))
box = snapshot.get('box', None)
xyz = snapshot.get('xyz', None)
vel = snapshot.get('vel', None)
return snapshot, xyz, vel, box
[docs]def write_gromacs_gro_file(outfile, txt, xyz, vel=None, box=None):
"""Write configuration in GROMACS GRO format.
Parameters
----------
outfile : string
The name of the file to create.
txt : dict of lists of strings
This dict contains the information on residue-numbers, names,
etc. required to write the GRO file.
xyz : numpy.array
The positions to write.
vel : numpy.array, optional
The velocities to write.
box: numpy.array, optional
The box matrix.
"""
resnum = txt['residunr']
resname = txt['residuname']
atomname = txt['atomname']
atomnr = txt['atomnr']
npart = len(xyz)
with open(outfile, 'w', encoding='utf-8') as output:
output.write(f'{txt["header"]}\n')
output.write(f'{npart}\n')
for i in range(npart):
if vel is None:
buff = _GRO_FMT.format(
resnum[i],
resname[i],
atomname[i],
atomnr[i],
xyz[i, 0],
xyz[i, 1],
xyz[i, 2])
else:
buff = _GRO_VEL_FMT.format(
resnum[i],
resname[i],
atomname[i],
atomnr[i],
xyz[i, 0],
xyz[i, 1],
xyz[i, 2],
vel[i, 0],
vel[i, 1],
vel[i, 2])
output.write(f'{buff}\n')
if box is None:
box = ' '.join([_GRO_BOX_FMT.format(i) for i in txt['box']])
else:
box = ' '.join([_GRO_BOX_FMT.format(i) for i in box])
output.write(f'{box}\n')
[docs]def read_gromos96_file(filename):
"""Read a single configuration GROMACS .g96 file.
Parameters
----------
filename : string
The file to read.
Returns
-------
rawdata : dict of list of strings
This is the raw data read from the file grouped into sections.
Note that this does not include the actual positions and
velocities as these are returned separately.
xyz : numpy.array
The positions.
vel : numpy.array
The velocities.
box : numpy.array
The simulation box.
"""
_len = 15
_pos = 24
rawdata = {'TITLE': [], 'POSITION': [], 'VELOCITY': [], 'BOX': [],
'POSITIONRED': [], 'VELOCITYRED': []}
section = None
with open(filename, 'r', encoding='utf-8', errors='replace') as gromosfile:
for lines in gromosfile:
new_section = False
stripline = lines.strip()
if stripline == 'END':
continue
for key in rawdata:
if stripline == key:
new_section = True
section = key
break
if new_section:
continue
rawdata[section].append(lines.rstrip())
txtdata = {}
xyzdata = {}
for key in ('POSITION', 'VELOCITY'):
txtdata[key] = []
xyzdata[key] = []
for line in rawdata[key]:
txt = line[:_pos]
txtdata[key].append(txt)
pos = [float(line[i:i+_len]) for i in range(_pos, 4*_len, _len)]
xyzdata[key].append(pos)
for line in rawdata[key+'RED']:
txt = line[:_pos]
txtdata[key].append(txt)
pos = [float(line[i:i+_len]) for i in range(0, 3*_len, _len)]
xyzdata[key].append(pos)
xyzdata[key] = np.array(xyzdata[key])
rawdata['POSITION'] = txtdata['POSITION']
rawdata['VELOCITY'] = txtdata['VELOCITY']
if not rawdata['VELOCITY']:
# No velocities were found in the input file.
xyzdata['VELOCITY'] = np.zeros_like(xyzdata['POSITION'])
logger.info('Input g96 did not contain velocities')
if rawdata['BOX']:
box = np.array([float(i) for i in rawdata['BOX'][0].split()])
else:
box = None
logger.info('Input g96 did not contain box vectors.')
return rawdata, xyzdata['POSITION'], xyzdata['VELOCITY'], box
[docs]def write_gromos96_file(filename, raw, xyz, vel, box=None):
"""Write configuration in GROMACS .g96 format.
Parameters
----------
filename : string
The name of the file to create.
raw : dict of lists of strings
This contains the raw data read from a .g96 file.
xyz : numpy.array
The positions to write.
vel : numpy.array
The velocities to write.
box: numpy.array, optional
The box matrix.
"""
_keys = ('TITLE', 'POSITION', 'VELOCITY', 'BOX')
with open(filename, 'w', encoding='utf-8') as outfile:
for key in _keys:
if key not in raw:
continue
outfile.write(f'{key}\n')
for i, line in enumerate(raw[key]):
if key == 'POSITION':
outfile.write(_G96_FMT.format(line, *xyz[i]))
elif key == 'VELOCITY':
if vel is not None:
outfile.write(_G96_FMT.format(line, *vel[i]))
elif box is not None and key == 'BOX':
if len(box) == 3:
outfile.write(_G96_BOX_FMT_3.format(*box))
else:
outfile.write(_G96_BOX_FMT.format(*box))
else:
outfile.write(f'{line}\n')
outfile.write('END\n')
[docs]def read_xvg_file(filename):
"""Return data in xvg file as numpy array."""
data = []
legends = []
with open(filename, 'r', encoding='utf-8') as fileh:
for lines in fileh:
if lines.startswith('@ s') and lines.find('legend') != -1:
legend = lines.split('legend')[-1].strip()
legend = legend.replace('"', '')
legends.append(legend.lower())
else:
if lines.startswith('#') or lines.startswith('@'):
pass
else:
data.append([float(i) for i in lines.split()])
data = np.array(data)
data_dict = {'step': np.arange(tuple(data.shape)[0])}
for i, key in enumerate(legends):
data_dict[key] = data[:, i+1]
return data_dict
def swap_integer(integer):
"""Convert little/big endian."""
return (((integer << 24) & 0xff000000) | ((integer << 8) & 0x00ff0000) |
((integer >> 8) & 0x0000ff00) | ((integer >> 24) & 0x000000ff))
def swap_endian(endian):
"""Just swap the string for selecting big/little."""
if endian == '>':
return '<'
if endian == '<':
return '>'
raise ValueError('Undefined swap!')
def read_struct_buff(fileh, fmt):
"""Unpack from a file handle with a given format.
Parameters
----------
fileh : file object
The file handle to unpack from.
fmt : string
The format to use for unpacking.
Returns
-------
out : tuple
The unpacked elements according to the given format.
Raises
------
EOFError
We will raise an EOFError if `fileh.read()` attempts to read
past the end of the file.
"""
buff = fileh.read(struct.calcsize(fmt))
if not buff:
raise EOFError
return struct.unpack(fmt, buff)
def read_matrix(fileh, endian, double):
"""Read a matrix from the TRR file.
Here, we assume that the matrix will be of
dimensions (_DIM, _DIM).
Parameters
----------
fileh : file object
The file handle to read from.
endian : string
Determines the byte order.
double : boolean
If true, we will assume that the numbers
were stored in double precision.
Returns
-------
mat : numpy.array
The matrix as an array.
"""
if double:
fmt = f'{endian}{_DIM**2}d'
else:
fmt = f'{endian}{_DIM**2}f'
read = read_struct_buff(fileh, fmt)
mat = np.zeros((_DIM, _DIM))
for i in range(_DIM):
for j in range(_DIM):
mat[i, j] = read[i * _DIM + j]
return mat
def read_coord(fileh, endian, double, natoms):
"""Read a coordinate section from the TRR file.
This method will read the full coordinate section from a TRR
file. The coordinate section may be positions, velocities or
forces.
Parameters
----------
fileh : file object
The file handle to read from.
endian : string
Determines the byte order.
double : boolean
If true, we will assume that the numbers
were stored in double precision.
natoms : int
The number of atoms we have stored coordinates for.
Returns
-------
mat : numpy.array
The coordinates as a numpy array. It will have
``natoms`` rows and ``_DIM`` columns.
"""
if double:
fmt = f'{endian}{natoms * _DIM}d'
else:
fmt = f'{endian}{natoms * _DIM}f'
read = read_struct_buff(fileh, fmt)
mat = np.array(read)
mat.shape = (natoms, _DIM)
return mat
def is_double(header):
"""Determine if we should use double precision.
This method determined the precision to use when reading
the TRR file. This is based on the header read for a given
frame which defines the sizes of certain "fields" like the box
or the positions. From this size, the precision can be obtained.
Parameters
----------
header : dict
The header read from the TRR file.
Returns
-------
out : boolean
True if we should use double precision.
"""
key_order = ('box_size', 'x_size', 'v_size', 'f_size')
size = 0
for key in key_order:
if header[key] != 0:
if key == 'box_size':
size = int(header[key] / _DIM**2)
break
size = int(header[key] / (header['natoms'] * _DIM))
break
if size not in (_SIZE_FLOAT, _SIZE_DOUBLE):
raise ValueError('Could not determine size!')
return size == _SIZE_DOUBLE
[docs]def skip_trr_data(fileh, header):
"""Skip coordinates/box data etc.
This method is used when we want to skip a data section in
the TRR file. Rather than reading the data, it will use the
size read in the header to skip ahead to the next frame.
Parameters
----------
fileh : file object
The file handle for the file we are reading.
header : dict
The header read from the TRR file.
"""
offset = sum([header[key] for key in TRR_DATA_ITEMS])
fileh.seek(offset, 1)
[docs]def read_trr_data(fileh, header):
"""Read box, coordinates etc. from a TRR file.
Parameters
----------
fileh : file object
The file handle for the file we are reading.
header : dict
The header read from the file.
Returns
-------
data : dict
The data we read from the file. It may contain the following
keys if the data was found in the frame:
- ``box`` : the box matrix,
- ``vir`` : the virial matrix,
- ``pres`` : the pressure matrix,
- ``x`` : the coordinates,
- ``v`` : the velocities, and
- ``f`` : the forces
"""
data = {}
endian = header['endian']
double = header['double']
for key in ('box', 'vir', 'pres'):
header_key = f'{key}_size'
if header[header_key] != 0:
data[key] = read_matrix(fileh, endian, double)
for key in ('x', 'v', 'f'):
header_key = f'{key}_size'
if header[header_key] != 0:
data[key] = read_coord(fileh, endian, double,
header['natoms'])
return data
[docs]def read_trr_file(filename, read_data=True):
"""Yield frames from a TRR file."""
with open(filename, 'rb') as infile:
while True:
try:
header, _ = read_trr_header(infile)
if read_data:
data = read_trr_data(infile, header)
else:
skip_trr_data(infile, header)
data = None
yield header, data
except EOFError:
return None, None
except struct.error:
logger.warning(
'Could not read a frame from the TRR file. Aborting!'
)
return None, None
def read_trr_frame(filename, index):
"""Return a given frame from a TRR file."""
idx = 0
with open(filename, 'rb') as infile:
while True:
try:
header, _ = read_trr_header(infile)
if idx == index:
data = read_trr_data(infile, header)
return header, data
skip_trr_data(infile, header)
idx += 1
if idx > index:
logger.error('Frame %i not found in %s', index, filename)
return None, None
except EOFError:
return None, None
[docs]def trr_frame_to_g96(trr_file, index, outfile):
"""Dump a TRR frame to a GROMOS96 (.g96) file.
Parameters
----------
trr_file : string
The TRR file to read from.
index : integer
The index for the frame to dump from ``trr_file``.
outfile : string
The g96 file to write.
"""
header, data = read_trr_frame(trr_file, index)
with open(outfile, 'w', encoding='utf-8') as output:
output.write('TITLE\n')
output.write(f' Dump from TRR, index = {index}, '
f'time = {header["time"]}\n')
output.write('END\n')
output.write('POSITION\n')
for i, posi in enumerate(data['x']):
output.write(_G96_FMT_FULL.format(i, 'DUM', 'X', i, *posi))
output.write('END\n')
if 'v' in data:
output.write('VELOCITY\n')
for i, veli in enumerate(data['v']):
output.write(_G96_FMT_FULL.format(i, 'DUM', 'X', i, *veli))
output.write('END\n')
output.write('BOX\n')
box = data['box']
output.write(_G96_BOX_FMT.format(*box_matrix_to_list(box, full=True)))
output.write('END\n')
[docs]def _get_chunks(start, end, size):
"""Yield chunks between start and end of the given size."""
while start < end:
new_size = min(size, end - start)
start += new_size
yield start, new_size
def reverse_trr(filename, outname, print_progress=True):
"""Reverse a GROMACS TRR file.
Parameters
----------
filename : string
The file path to the input file.
outname : string
The file path to the output file.
print_progress : boolean, optional
This can be used to print out information about the
progress to the screen. Might be useful if we are reading
a large file.
"""
# We first loop over the file until we reach the
# last header, we skip the data and read all the headers.
all_headers = []
buff_size = io.DEFAULT_BUFFER_SIZE
with open(filename, 'rb') as infile, open(outname, 'wb') as outfile:
while True:
try:
header, header_size = read_trr_header(infile)
header_loc = infile.tell()
skip_trr_data(infile, header)
all_headers.append((header, header_loc, header_size))
except EOFError:
break
# Loop through headers in reverse and write data.
for header, header_loc, header_size in reversed(all_headers):
if print_progress: # pragma: no cover
print(f'Processing step {header["step"]} '
f'time {header["time"]}')
data_size = sum([header[key] for key in TRR_DATA_ITEMS])
start = header_loc - header_size
infile.seek(start)
end = start + header_size + data_size
for _, chunk_size in _get_chunks(start, end, buff_size):
outfile.write(infile.read(chunk_size))
[docs]def write_trr_frame(filename, data, endian=None, double=False, append=False):
"""Write data in TRR format to a file.
Parameters
----------
filename : string
The name/path of the file to write to.
data : dict
The data we will write to the file.
endian : string, optional
Select the byte order; big-endian or little-endian. If not
specified, the native byte order will be used.
double : boolean, optional
If True, we will write in double precision.
append : boolean, optional
If True, we will append to the file, otherwise, the file
will be overwritten.
"""
if append:
mode = 'ab'
else:
mode = 'wb'
if double:
size = _SIZE_DOUBLE
floatfmt = '{}d'
else:
floatfmt = '{}f'
size = _SIZE_FLOAT
if endian:
floatfmt = endian + floatfmt
header = {}
for key in _HEAD_ITEMS:
header[key] = 0
header['natoms'] = data['natoms']
header['step'] = data['step']
header['box_size'] = size * _DIM * _DIM
for i in ('x', 'v', 'f'):
if i in data:
header[f'{i}_size'] = data['natoms'] * size * _DIM
header['endian'] = endian
header['double'] = double
header['time'] = data['time']
header['lambda'] = data['lambda']
with open(filename, mode) as outfile:
write_trr_header(outfile, header, floatfmt, endian=endian)
for key in TRR_DATA_ITEMS:
if header[key] != 0:
matrix = data[key.split('_', 1)[0]]
fmt = floatfmt.format(matrix.size)
outfile.write(struct.pack(fmt, *matrix.flatten()))
return header
def write_trr_header(outfile, header, floatfmt, endian=None):
"""Write TRR header.
Parameters
----------
outfile : filehandle
The file we can write to.
header : dict
The header data for the TRR file.
floatfmt : string
The string which gives the format for floats. It should indicate
if we are writing for double or single precision.
endian : string, optional
Can be used to force endianess.
"""
slen = (13, 12)
fmt = ['1i', '2i', f'{slen[0] - 1}s', '13i']
if endian:
fmt = [endian + i for i in fmt]
outfile.write(struct.pack(fmt[0], _GROMACS_MAGIC))
outfile.write(struct.pack(fmt[1], *slen))
outfile.write(struct.pack(fmt[2], _TRR_VERSION_B))
head = [header[key] for key in _HEAD_ITEMS[:13]]
outfile.write(struct.pack(fmt[3], *head))
outfile.write(struct.pack(floatfmt.format(1), header['time']))
outfile.write(struct.pack(floatfmt.format(1), header['lambda']))
outfile.flush()
def gromacs_settings(settings, input_path):
"""Read and processes GROMACS settings.
Parameters
----------
settings : dict
The current input settings..
input_path : string
The GROMACS input path
"""
ext = settings['engine'].get('gmx_format', 'gro')
default_files = {'conf': f'conf.{ext}',
'input_o': 'grompp.mdp',
'topology': 'topol.top',
'index': 'index.ndx'}
settings['engine']['input_files'] = {}
for key in ('conf', 'input_o', 'topology', 'index'):
# Add input path and the input files if input is no given:
settings['engine']['input_files'][key] = \
settings['engine'].get(key,
os.path.join(input_path,
default_files[key]))