# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains common functions for the path density.
It contains some functions that is used to compare and process data,
like matching similar lists or attempt periodic shifts of values.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
find_rst_file (:py:func: `.find_rst_file`)
Search for a rst-file from a chosen subdirectory.
read_traj_txt_file (:py:func: `.read_traj_txt_file`)
Read the sequence of files in a trajectory from a traj.txt file.
recalculate_all (:py:func:`.recalculate_all`)
Recalculate order parameter and new collective variables
by finding all trajectory files from a simulation.
shift_data (:py:func: `.shift_data`)
Finds the median value of a given list of floats, and shifts the
lower half of the data by the median.
try_data_shift (:py:func: `.try_data_shift`
Takes in two lists of values, x and y, and calculates a linear
regression and R**2-correlation of the data set. Attempts a shift of
each data set by their respective median to increase the correlation.
where_from_to (:py:func: `.where_from_to`)
Check the initial and final steps of a trj in respect to the interfaces.
get_cv_names (:py:func: `.get_cv_names`)
Outputs a list of the names of the descriptors in the simulation.
recalculate_all (:py:func: `.recalculate_all`)
Recompute all the order parameters according to the PyRETIS storage scheme
or for individual files/folders
find_data (:py:func: `.find_data`)
Find suitable frames/trajectories to recompute the orderp parameter on.
"""
import os
import timeit
import scipy
from pyretis.inout import print_to_screen, settings
from pyretis.inout.common import create_backup, TRJ_FORMATS
from pyretis.inout.formats.path import PathExtFile
from pyretis.setup.common import create_orderparameter
from pyretis.tools.recalculate_order import recalculate_order
from pyretis.initiation.initiate_load import write_order_parameters
__all__ = ['find_rst_file', 'read_traj_txt_file',
'shift_data', 'try_data_shift', 'where_from_to',
'get_cv_names', 'recalculate_all', 'find_data']
[docs]def try_data_shift(x, y, fixedx):
"""Check if shifting increases correlation.
Function that checks if correlation of data increases by shifting
either sets of values, x or y, or both. Correlation is checked by
doing a simple linear regression on the different sets of data:
- x and y , x and yshift, xshift and y, xshift and yshift.
If linear correlation increases (r-squared value), data sets are
updated.
As a precaution, no shift
is performed on x values if they are of the first order parameter 'op1'.
Parameters
----------
x, y : list
Floats, data values
fixedx : bool
If True, x is main OP and should not be shifted.
Returns
-------
x, y : list
Floats, updated (or unchanged) data values
(If changed, returns x_temp or y_temp or both)
"""
# The unshifted data
_, _, r_val, _, _ = scipy.stats.linregress(x, y)
# The Y-shifted data
y_temp = shift_data(y)
_, _, r_y, _, _ = scipy.stats.linregress(x, y_temp)
yshift = r_y**2 > r_val**2
# The X-shifted data
x_temp = shift_data(x)
_, _, r_x, _, _ = scipy.stats.linregress(x_temp, y)
xshift = r_x**2 > r_val**2 and r_x**2 > r_y**2
# Comparing effectiveness of both shifts individually, and combined
_, _, r_xy, _, _ = scipy.stats.linregress(x_temp, y_temp)
xyshift = r_xy**2 > r_val**2 and r_xy**2 > r_y**2 and r_xy**2 > r_x**2
# If first op is op1, don't shift data
if xyshift and not fixedx:
return x_temp, y_temp
if xshift and not fixedx:
return x_temp, y
if yshift:
return x, y_temp
return x, y
[docs]def shift_data(x):
"""Shifts the data under the median.
Function that takes in a list of data, and shifts all values
below the median value of the data by the max difference,
effectively shifting parts of the data periodically in order to
give clusters for visualization.
Parameters
----------
x : list
Floats, data values
Returns
-------
xnorm : list
Floats where some values are shifted values of x,
and some are left unchanged.
"""
xmin, xmax = min(x), max(x)
xnorm = []
# The max difference in x-data
diff_x = xmax - xmin
# The Median of x-data
medix = xmin + 0.5 * diff_x
for i in x:
if i < medix:
xnorm.append(i + diff_x)
else:
xnorm.append(i)
return xnorm
[docs]def read_traj_txt_file(path):
"""Read a traj.txt file.
Function which reads a traj.txt file and returns a dict containing
the name of each file in the trajectory and the sign of their velocity.
Parameters
----------
path : string
Path to the traj.txt file.
Returns
-------
files : dict
Dictionary containing each file in the trajectory and
the sign of their velocity.
"""
files = {}
i = 0
with PathExtFile(path, 'r') as pfile:
for block in pfile.load():
for data in block['data']:
if data[0] == '0':
files[i] = [data[1], data[3]]
if data[1] != files[i][0]:
i += 1
files[i] = [data[1], data[3]]
return files
[docs]def find_rst_file(search_dir):
"""Search for rst-files.
Parameters
----------
search_dir : string
Path to the .rst file.
Returns
-------
out[0] : string
Path and name of the .rst file.
"""
for _ in range(len(search_dir.split('/'))):
for file_name in os.listdir(search_dir):
if file_name.endswith('rst'):
os.chdir(search_dir)
return search_dir + '/' + file_name
os.chdir('../')
search_dir = os.getcwd()
return search_dir
[docs]def where_from_to(trj, int_a, int_b=float('-inf')):
r"""Detect L∕R starts and L / R / \* ends.
Given a list of order parameters (a trj), the function
will try to establish where the path started (L or R or \*)
and where it ended.
Note: for the 'REJ' paths, this function results might differ
from PyRETIS.
Parameters
----------
trj: numpy array
The order parameters of the trj.
int_a: float
The interface that defines state A.
int_b: float, optional
The interface that defines state B. If not given, it is assumed
that the 0^- ensemble is in use without the 0^- L interface.
Returns
-------
start: string\*1
The initial position of the trajectory in respect to the
interfaces given (L eft, R ight or \* for nothing).
end: string\*1
The final position of the trajectory in respect to the
interfaces given (L eft, R ight or \* for nothing).
"""
start, end = '*', '*'
int_l = min(int_a, int_b)
int_t = max(int_a, int_b)
if trj[0] >= int_t:
start = 'R'
if trj[0] < int_l:
start = 'L'
if trj[-1] >= int_t:
end = 'R'
if trj[-1] < int_l:
end = 'L'
return start, end
[docs]def get_cv_names(input_settings):
"""Return names of op and cv's.
Parameters
----------
input_settings: dict
Dictionary with the settings from the simulations.
Returns
-------
names = list
List of the names.
"""
op_names = []
# Collect names of op and cv's if available
if 'name' in input_settings['orderparameter'].keys():
op_names.append(input_settings['orderparameter']['name'])
if 'collective-variable' in input_settings.keys():
for c_v in input_settings['collective-variable']:
if 'name' in c_v.keys():
op_names.append(c_v['name'])
return op_names
[docs]def recalculate_all(runfolder, iofile, ensemble_names=None, data=None):
"""Recalculate order parameter and collective variables.
This function performs post-processing by analyzing trajectories
of old simulations in order to extract data and do new calculations
and write to a new order.txt file.
Parameters
----------
runfolder: string
The path of the execution directory.
iofile: string
The input file where the settings are collected.
ensemble_names: list, optional
List of ensemble names in the simulation to work with.
data: string, optional
If given, the function will check only the single file or
look only in the given directory
Returns
-------
out : boolean
True if the recomputation was successful, False otherwise.
"""
# Gather collective variables and corresponding option from .rst
io_file = os.path.join(runfolder, iofile)
assert io_file is not None, 'Input file not given'
input_settings = settings.parse_settings_file(io_file)
trj_dict = find_data(runfolder, ensemble_names, data=data)
print_to_screen('Re-computing the collective variables.', level='message')
tic = timeit.default_timer()
if not trj_dict:
print_to_screen('No data to re-compute from', level='warning')
return False
try:
functions = create_orderparameter(input_settings)
except (ImportError, ValueError):
msg = 'Invalid Order Parameter'
print_to_screen(msg, level='warning') # pragma: no cover
return False # pragma: no cover
# Create the composite order parameter (Avoid circular reference)
for _, ens in trj_dict.items():
if ens.get('main_o') is not None:
main_order = os.path.join(runfolder, ens['main_o'])
create_backup(main_order)
for _, cycles in sorted(ens['traj'].items()):
here = os.path.dirname(os.path.abspath(cycles['traj'][0]))
new_o = os.path.join(here, 'order.txt')
results_dict = []
for trj in cycles['traj']:
try:
for order_p in recalculate_order(functions, trj, {}):
results_dict.append(order_p) # pragma: no cover
except (KeyError, AttributeError):
print_to_screen(f'File {trj} not valid', level='warning')
local_order = cycles.get('o_txt', new_o)
create_backup(local_order)
write_order_parameters(local_order, results_dict,
cycles.get('header', 'Recalculated data'))
if ens.get('main_o') is not None:
os.system(f"cat {local_order} >> {main_order}")
print_to_screen('# Data successfully recomputed!', level='success')
print_to_screen(f'# Time spent: {timeit.default_timer() - tic:.2f}s',
level='success')
return True
[docs]def find_data(runfolder, ensemble_names=None, data=None):
"""Find the trajectory data used to do post-processing.
find_traj returns a dict with a structure resembling that
of the simulation.
Parameters
----------
runfolder: string, optional
The path of the execution directory.
ensemble_names: list, optional
List of ensemble names in the simulation to work with.
data: string, optional
If given, the function will check only the single file or
look only in the given directory
Returns
-------
trj_dict : dict
To each key, ensemble_name (e.g. 000, 001, etc)
the values are: the last accepted trajectories given by the
`accepted`-key; the generation trajectory or conf files given by
the `generation`-key, and lastly the dictionary `stored_traj`
that is given by the `traj`-key. `stored_traj` is split up into
the dictionaries`traj-acc` and `traj-rej` which have keys for
all the accepted and rejected cycles respectively,
where the trajectory files for that cycle is stored.
"""
trj_dict = {}
# Specific file/folder
if data is not None:
sources = [data] if os.path.isfile(data) else _get_trjs(data)
if sources:
trj_dict['000'] = {'traj': {'0': {'traj': sources}}}
return trj_dict
# Structured data
flag_map = {'traj-acc': 'ACC', 'traj-rej': 'REJ'}
if ensemble_names is None:
ensemble_names = [i.name for i in os.scandir(runfolder) if
(i.is_dir() and i.name.isdigit())]
for ens_name in sorted(ensemble_names):
order_txt = os.path.join(os.path.abspath(runfolder), ens_name,
'order.txt')
trj_dict[ens_name] = {'main_o': order_txt, 'traj': {}}
for trj_type in ['traj-acc', 'traj-rej']:
here = os.path.join(os.path.abspath(runfolder),
ens_name, 'traj', trj_type)
if not os.path.isdir(here):
continue
for cycle in sorted([i.name for i in os.scandir(here) if
i.name.isdigit()]):
loc = os.path.join(here, cycle, 'traj')
o_txt = os.path.join(here, cycle, 'order.txt')
header = f'# Cycle: {cycle},' \
f' status: {flag_map[trj_type]}'
if os.path.exists(o_txt):
with open(o_txt, 'r', encoding='utf-8') as file_in:
header = file_in.readline().replace('\n', '')
trj_dict[ens_name]['traj'][cycle] = {'header': header,
'o_txt': o_txt,
'traj': _get_trjs(loc)}
return trj_dict
[docs]def _get_trjs(runfolder='.'):
"""Find the trajectory files.
Parameters
----------
runfolder : string, optional
the location of the main simulation folder.
Returns
-------
full_name_trj : list
The trajectory files contained in the folder.
"""
trj = [i.name for i in os.scandir(runfolder) if i.name[-4:] in TRJ_FORMATS]
full_name_trj = [os.path.join(runfolder, i) for i in trj]
return full_name_trj