# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Module defining the base classes for the PyRETIS output.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
PathStorage (:py:class:`.PathStorage`)
A class for handling storage of external trajectories to an archive.
In this case, it is simply a folder based structure.
PathStorageTar (:py:class:`.PathStorageTar`)
A class for handling storage of external trajectories to an archive
format (tar in this case).
"""
import logging
import tempfile
import shutil
import tarfile
import os
from pyretis.inout.common import OutputBase, create_backup, make_dirs
from pyretis.inout.formats import (
EnergyPathFormatter,
OrderPathFormatter,
PathExtFormatter,
)
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())
[docs]def add_to_tar_file(tar_file, files, file_mode='a'):
"""Open a tar file and add some files.
Parameters
----------
tar_file : string
The path to the tar file to create.
files : list of tuples
The path(s) to the file(s) to add with names to
create internaly in the tar file. That is:
``file[i] = (source[i], target[i])`` where ``target[i]`` will be
used as the archname for ``source[i]``.
file_mode : string
Determines how we try to open the tar file.
Returns
-------
out : boolean
True if we managed to write to the file.
"""
try:
with tarfile.open(tar_file, file_mode) as tar:
for src, trg in files:
tar.add(src, arcname=trg)
return True
except tarfile.ReadError:
logger.warning('Could not open tar file: "%s"', tar_file)
return False
[docs]def generate_traj_names(path, target_dir):
"""Generate new file names for moving copying paths.
Parameters
----------
path : object like :py:class:`.PathBase`
This is the path object we are going to store.
target_dir : string
The location where we are moving the path to.
Returns
-------
files : list of tuples of strings
The files to add and the file names to use internally
for storing. That is: ``file[i] = (source[i], target[i])``
where ``target[i]`` is the internal storage name.
"""
source = set()
files = []
for phasepoint in path.phasepoints:
pos_file, _ = phasepoint.particles.get_pos()
if pos_file not in source:
source.add(pos_file)
dest = os.path.join(target_dir, os.path.basename(pos_file))
files.append((pos_file, dest))
return files
[docs]class PathStorage(OutputBase):
"""A class for handling storage of external trajectories.
Attributes
----------
target : string
Determines the target for this output class. Here it will
be a file archive (i.e. a directory based collection of
files).
archive_acc : string
Basename for the archive with accepted trajectories.
archive_rej : string
Basename for the archive with rejected trajectories.
archive_traj : string
Basename for a sub-folder containing the actual files
for a trajectory.
formatters : dict
This dict contains the formatters for writing path data,
with default filenames used for them.
out_dir_fmt : string
A format to use for creating directories within the archive.
This one is applied to the step number for the output.
"""
target = 'file-archive'
archive_acc = 'traj-acc'
archive_rej = 'traj-rej'
archive_traj = 'traj'
formatters = {
'order': {'fmt': OrderPathFormatter(), 'file': 'order.txt'},
'energy': {'fmt': EnergyPathFormatter(), 'file': 'energy.txt'},
'traj': {'fmt': PathExtFormatter(), 'file': 'traj.txt'},
}
out_dir_fmt = '{}'
[docs] def __init__(self):
"""Set up the storage.
Note that we here do not pass any formatters to the parent
class. This is because this class is less flexible and
only intended to do one thing - write path data for external
trajectories.
"""
super().__init__(None)
[docs] def archive_name_from_status(self, status):
"""Return the name of the archive to use."""
return self.archive_acc if status == 'ACC' else self.archive_rej
[docs] def output_path_files(self, step, data, target_dir):
"""Write the output files for energy, path and order parameter.
Parameters
----------
step : integer
The current simulation step.
data : list
Here, ``data[0]`` is assumed to be an object like
:py:class:`.Path` and `data[1]`` a string containing the
status of this path.
target_dir : string
The path to where we archive the files.
Returns
-------
files : list of tuple of strings
These are the files created. The tuple contains the files as
a full path and a relative path (to the given
target directory). The form is
``files[i] = (full_path[i], relative_path[i])``.
The relative path is useful for organizing internally in
archives.
"""
path, status, = data[0], data[1]
files = []
for key, val in self.formatters.items():
logger.debug('Storing: %s', key)
fmt = val['fmt']
full_path = os.path.join(target_dir, val['file'])
relative_path = os.path.join(
self.out_dir_fmt.format(step), val['file']
)
files.append((full_path, relative_path))
with open(full_path, 'w', encoding="utf8") as output:
for line in fmt.format(step, (path, status)):
output.write('{}\n'.format(line))
return files
[docs] def output(self, step, data):
"""Format the path data and store the path.
Parameters
----------
step : integer
The current simulation step.
data : list
Here, ``data[0]`` is assumed to be an object like
:py:class:`.Path`, ``data[1]`` a string containing the
status of this path and ``data[2]`` the path ensemble for
which the path was generated.
Returns
-------
files : list of tuples of strings
The files added to the archive.
"""
path, status, path_ensemble = data
archive = self.archive_name_from_status(status)
# This is the path on form: /path/to/000/traj/11
archive_path = os.path.join(
path_ensemble.directory['traj'],
archive,
self.out_dir_fmt.format(step),
)
# To organize things we create a subfolder for storing the
# files. This is on form: /path/to/000/traj/11/traj
traj_dir = os.path.join(archive_path, self.archive_traj)
# Create the needed directories:
make_dirs(traj_dir)
# Write order, energy and traj files to the archive:
files = self.output_path_files(step, data, archive_path)
# Get name of files we need to copy for the current path:
traj_files = generate_traj_names(
path,
os.path.join(self.out_dir_fmt.format(step), self.archive_traj)
)
# Copy the actual trajectory files:
for (file_src, file_dest) in traj_files:
target = os.path.join(traj_dir, os.path.basename(file_dest))
shutil.copyfile(file_src, target)
files.append((target, file_dest))
# Copy the ensemble.restart file if path was accepted:
if data[1] == 'ACC':
file_src = os.path.join(path_ensemble.directory['path_ensemble'],
'ensemble.restart')
if os.path.isfile(file_src):
target = os.path.join(archive_path, 'ensemble.restart')
shutil.copyfile(file_src, target)
file_dest = os.path.join(self.out_dir_fmt.format(step),
'ensemble.restart')
files.append((target, file_dest))
return files
[docs] def write(self, towrite, end='\n'):
"""We do not need the write method for this object."""
logger.critical(
'%s does *not* support the "write" method!',
self.__class__.__name__
)
[docs] def __str__(self):
"""Return basic info."""
return '{} - archive writer.'.format(self.__class__.__name__)
[docs]class PathStorageTar(PathStorage):
"""A class for handling storage of external trajectories.
Here, we collect the external trajectories into tar archives.
"""
archive_acc = 'traj-acc.tar'
archive_rej = 'traj-rej.tar'
[docs] @staticmethod
def _add_to_archive(archive_file, files):
"""Add some files to an archive file.
Parameters
----------
archive_file : string
The path to the archive file to create.
files : list of tuples
The path(s) to the file(s) to add with names to
create internally in the archive. That is, the form is:
``file[i] = (source[i], target[i])`` where ``target[i]``
will be used as the archive name for file ``source[i]``.
Returns
-------
add : boolean
True if we added files to the archive.
"""
add = add_to_tar_file(archive_file, files, file_mode='a')
if not add:
logger.info(
('Could not write the trajectory file.'
' Will try recreating it.')
)
logger.info('Backing up old trajectory file first.')
logger.info(create_backup(archive_file))
add = add_to_tar_file(archive_file, files, file_mode='w')
return add
[docs] def output(self, step, data):
"""Format the path data and store the path.
Parameters
----------
step : integer
The current simulation step.
data : list
Here, ``data[0]`` is assumed to be an object like
:py:class:`.Path`, ``data[1]`` a string containing the
status of this path and ``data[2]`` the path ensemble for
which the path was generated.
Returns
-------
files : list of tuples of strings
The files added to the archive.
"""
path, status, path_ensemble = data
tar_file = self.archive_name_from_status(status)
tar_path = os.path.join(path_ensemble.directory['traj'], tar_file)
files = []
# We create the files in a temporary directory and then add them
# to the tar archive:
with tempfile.TemporaryDirectory() as tempdir:
# Write order, energy and traj files:
files1 = self.output_path_files(step, data, tempdir)
if self._add_to_archive(tar_path, files1):
files.extend(files1)
# Add the trajectory files to the archive:
target_dir = os.path.join(self.out_dir_fmt.format(step),
self.archive_traj)
files2 = generate_traj_names(path, target_dir)
if self._add_to_archive(tar_path, files2):
files.extend(files2)
# Include the ensemble.restart file if path was accepted:
if data[1] == 'ACC':
file_src = os.path.join(path_ensemble.directory['path_ensemble'],
'ensemble.restart')
if os.path.isfile(file_src):
file_dest = os.path.join(self.out_dir_fmt.format(step),
'ensemble.restart')
files.append((file_src, file_dest))
return files
[docs] def __str__(self):
"""Return basic info."""
return '{} - TAR archive writer.'.format(self.__class__.__name__)