# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Compiler of PyRETIS simulation data.
This module is part of the PyRETIS library and can be used both for compiling
the simulation data into a compressed file and/or load the data for later
visualization.
Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Trajectory (:py:class:`.PathBase`)
A base class to store trajectories composed by only orderp,
collective variables and energies.
PathDensity (:py:class:`.PathBase`)
A base class to assemble the data.
PathVisualize (:py:class:`.PathVisualize`)
A base class to prepare for the visualization.
Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
pyvisa_zip (:py:func: `.pyvisa_zip`)
Compress the PyVisA output file in a .zip format.
pyvisa_unzip (:py:func: `.pyvisa_unzip`)
Decompress the zip into PyVisA output.
remove_nan (:py:func: `.remove_nan`)
Checks for the presence of nan values and replace them with a local,
if available.
pyvisa_compress (:py:func: `.pyvisa_compress`)
Compress PyRETIS outputs to a .hdf5 file.
"""
import warnings
import os
import timeit
import zipfile
import numpy as np
import pandas as pd
from scipy import stats
from pyretis.inout import print_to_screen
from pyretis.inout.settings import parse_settings_file
from pyretis.pyvisa.common import where_from_to, get_cv_names
# Hard-coded labels for energies and time/cycle steps
ENERGYLABELS = ['time', 'cycle', 'potE', 'kinE', 'totE']
[docs]def pyvisa_zip(input_file):
"""Zip compress file of simulation data.
Parameters
----------
input_file : string
The file to compress.
"""
with zipfile.ZipFile(input_file + '.zip', 'w') as zipped_file:
zipped_file.write(input_file, compress_type=zipfile.ZIP_DEFLATED)
os.remove(input_file)
[docs]def pyvisa_unzip(origin, destination=None):
"""Unzip compressed file before load in visualizer.
Parameters
----------
origin : string
Zipped file to unzip.
destination : string, optional
Unzipped file name.
"""
msg = '###################################################\n'
msg += '# File type recognized as `.zip`, unzipping to tmp file \n'
msg += f'# {destination} before loading.\n'
msg += '###################################################\n'
print_to_screen(msg, level='message')
with zipfile.ZipFile(origin) as zipped:
zipped.extractall(path=os.path.dirname(os.path.abspath(origin)))
# We here assume that only a file is stored in 'zipped'.
if destination is not None:
os.rename(zipped.namelist()[0], destination)
[docs]def remove_nan(data):
"""Remove nan from data.
The function shall remove initial nan, assuming that they are originated
by incomplete initial conditions (e.g. no energy file). In the case that
nan appears as last cycle, it will not be fixed and an error shall rise
up later in the code.
Parameters
----------
data : list
Input list. If nan are present, they are replaced by the following
entry. The method accounts for multiple consecutive nan occurrence.
"""
nan = True
inan = -1
while nan:
nan = False
if type(data) in (dict, pd.DataFrame):
for keys in data:
remove_nan(data[keys])
break
for idx, data_point in reversed(list(enumerate(data))):
if type(data_point) in (list, np.ndarray):
remove_nan(data_point)
else:
if np.isnan(data_point):
nan = True
inan = idx
break
if nan and inan == len(data) - 1:
nan = False
if nan and isinstance(data, pd.Series):
data.iloc[inan] = data.iloc[inan + 1]
elif nan:
data[inan] = data[inan + 1]
[docs]def pyvisa_compress(runpath, input_file, pyvisa_dict):
"""Compress simulation data.
Parameters
----------
runpath : string
The execution folder where the input files are.
input_file : string
The input file for compression.
pyvisa_dict : dict
It determines the section of pyvisa to use.
"""
assert input_file, 'No input file'
if input_file.endswith(('hdf5', 'zip')):
msg = 'Cannot compress an already compressed file.'
raise ImportError(msg)
p_data = PathDensity(runpath, input_file)
p_data.walk_dirs(only_ops=pyvisa_dict['only_order'])
p_data.hdf5_data()
[docs]class Trajectory:
"""Class representing a simulation trajectory.
This class defines the trajectories from the completed
simulations, with all information available. The labels of
the order parameter and collective variables will either be
labeled in the fashion of opx, or from the names in the input file
if the number of names given and number of descriptors in the system
are equal.
"""
[docs] def __init__(self, frames, info):
"""Initialize the class.
Parameters
----------
frames : pandas Dataframe or dict
Dataframe/dict containing order parameter and energy-data
for the trajectory. It contains:
* `OP1`: string
Order parameter.
* `OP2`: string
Collective variables, all other CV's will be named in
increasing fashion, OP3, OP4 etc.
* `PotE`: string
Potential energy.
* `KinE`: string
Kinetic energy.
* `TotE`: string
Total energy.
info : dict
Dictionary containing information about the trajectory.
It contains:
* `ensemble_name`: string
The name of the ensemble.
* `cycle`: integer
The cycle number.
* `status`: string
Status of accepted/rejected etc.
* `MC-move`: string
Generation move of the trajectory.
* `MC-start`: string
Generation starting point of the trajectory.
* `ordermax`: int
Max value of OP.
* `ordermin`: int
Min value of OP.
* `stored`: boolean
True if the trajectory has existing trajectory files.
"""
self.info = info
self.frames = frames
[docs]class PathDensity:
"""Perform the path density analysis.
This class defines the path density analysis for completed simulations
with several order parameters.
"""
[docs] def __init__(self, basepath='.', iofile=None):
"""Initialize the class.
Parameters
----------
basepath : string, optional
The path to the input file.
iofile : string, optional
The input file.
Attributes
----------
traj_dict : dict
Values of order params and energy in all ensembles and
info about trajectories.
To each key, ensemble_name (e.g. 000, 001, etc.)
the value is the list of respective Trajectory objects.
infos : dict
Information about the simulation,
it contains:
* `ensemble_names`: list
List of ensemble names.
* `interfaces`: list
List of interface positions.
* `num_op`: int
Number of order parameters.
* `op_labels`: list
List of order parameter names.
* `energy_labels`: list
List of energy entry labels.
"""
self.basepath = os.path.abspath(basepath)
if isinstance(iofile, PathDensity):
self.traj_dict = iofile.traj_dict
self.infos = iofile.infos
return
self.iofile = os.path.join(self.basepath, iofile)
if iofile.endswith('.hdf5'):
self.initialize_compressed(iofile)
return
# Getting interfaces from iofile
settings = parse_settings_file(self.iofile)
interfaces = settings.get('simulation', {}).get('interfaces', [])
assert interfaces, 'Input file does not contain interface setting'
intnames = ['0$^{-}$', '0$^{+}$']
for i in range(1, len(interfaces) - 1):
intnames.append(str(i) + '$^{+}$')
path = []
# Getting ensembles/folders from directory
for fol in sorted(os.listdir(basepath)):
if os.path.isdir(os.path.join(basepath, fol)) and fol.isdigit():
path.append(fol)
assert path, 'No files to analyse.'
# Getting order parameters from order-file of first folder in path.
# Careful here because the order parameter function and/or the
# collective variables ones can produce a LIST. In this case the
# counting will not correspond.
filename = os.path.join(basepath, path[0], 'order.txt')
with open(filename, 'r', encoding='utf-8') as figa:
lines = len(figa.readlines())
assert lines > 3, f'File order.txt in {path[0]} empty.'
# Crash-proof, num_op is the mode of length of the last lines.
with open(os.path.join(basepath, path[0], 'order.txt'),
encoding='utf-8') as temp:
lasts = temp.readlines()[-10:]
num_op = int(stats.mode([len(i.split()) for i in lasts])[0]) - 1
op_names = get_cv_names(settings)
op_labels = op_names if len(op_names) == num_op else \
[f'op{i}' for i in range(1, num_op + 1)]
self.traj_dict = {}
self.order_parameter = op_labels[0]
self.infos = {'ensemble_names': path,
'interfaces': interfaces,
'intf_names': intnames,
'num_op': num_op,
'op_labels': op_labels,
'energy_labels': ENERGYLABELS}
[docs] def walk_dirs(self, only_ops=False):
"""Create a lists in acc or rej dictionary for all order parameters.
First generate list of folders/ensembles to iterate through.
Then search for number of order parameters(columns) in file in one
of the folders of path, and create lists in acc/rej dictionaries
for all order parameters.
Lastly iterate through all folders and files, filling in correct data
to the lists and dictionaries.
Parameters
----------
only_ops : boolean, optional
If true, PathDensity will not collect data from energy files.
"""
tic = [timeit.default_timer(), None]
print_to_screen('###################################################',
level='message')
print_to_screen('# PathDensity performing "walk" in \n# ' +
f'{os.getcwd()}/',
level='message')
print_to_screen('# Number of subfolders (0**) = ' +
str(len(self.infos['ensemble_names'])),
level='message')
print_to_screen((f'# Found {self.infos["num_op"]} order parameters in '
'output'),
level='message')
print_to_screen('###################################################'
+ '\n', level='message')
# Looping over folders, reading energy and orderP
cmax, cmin = 0, 0
for ensemble_name in self.infos['ensemble_names']:
tic[1] = timeit.default_timer()
print_to_screen(f'Reading data from {ensemble_name}',
level='message')
self.get_traj_op(ensemble_name)
if not only_ops:
self.get_traj_energy(ensemble_name)
# Sort traj_dict by cycle
self.traj_dict[ensemble_name] = \
dict(sorted(self.traj_dict[ensemble_name].items()))
for cyc in self.traj_dict[ensemble_name].keys():
traj = self.traj_dict[ensemble_name][cyc]
if int(cyc) > cmax:
cmax = cyc
if self.order_parameter in traj.frames.columns:
traj.info['ordermax'] = np.max(
traj.frames[self.order_parameter])
traj.info['ordermin'] = np.min(
traj.frames[self.order_parameter])
traj.info['length'] = len(traj.frames.index)
line = ('Done with folder, time used: '
f'{timeit.default_timer() - tic[1]:4.4f}s, proceeding.\n')
print_to_screen('=' * len(line) + '\n' + line, level='success')
self.infos['cycles'] = [cmin, cmax]
print_to_screen('###################################################',
level='success')
print_to_screen('# Data successfully retrieved, in cycles:',
level='success')
print_to_screen((f'# {self.infos["cycles"][0]} to '
f'{self.infos["cycles"][1]}'),
level='success')
print_to_screen(
f'# Time spent: {timeit.default_timer() - tic[0]:.2f}s',
level='success'
)
print_to_screen('###################################################'
+ '\n', level='success')
[docs] def hdf5_data(self):
"""Compress the data to a .hdf5 file."""
print_to_screen('###################################################',
level='message')
print_to_screen('# Compress dictionaries to file', level='message')
data = pd.Series(self.traj_dict)
infos = pd.Series(self.infos)
pfile = 'pyvisa_compressed_data.hdf5'
warnings.simplefilter(action='ignore',
category=pd.errors.PerformanceWarning)
data.to_hdf(pfile, key='data', mode='w')
infos.to_hdf(pfile, key='infos')
print_to_screen(f'# {pfile}', level='message')
pyvisa_zip(pfile)
print_to_screen(f'# {pfile}.zip', level='message')
print_to_screen('###################################################'
+ '\n', level='message')
[docs] def get_traj_op(self, ensemble_name):
"""Read order.txt files and collects data OP data.
Parameters
----------
ensemble_name : string
Updates
-------
traj_dict : dict
Dictionary containing Trajectory objects from simulation
with filled in OP-data.
"""
bad_characters = ["(", ")", ",", '"', "'"]
self.traj_dict[ensemble_name] = {}
weight = []
statw = []
frames = {}
cycle = -1
info = {}
with open(os.path.join(self.basepath, ensemble_name, 'order.txt'),
'r', encoding='utf-8') as order:
for idx, oline in enumerate(order):
if oline.startswith('#') and 'Cycle' not in oline:
continue
if oline.startswith('#') and 'Cycle' in oline:
if idx > 1:
self.fill_op(frames, info, ensemble_name, cycle)
text = oline.split()
cycle = int(text[2].rstrip(','))
frames = {}
info = {'ensemble_name': ensemble_name,
'cycle': cycle,
'status': text[4].rstrip(','),
'stored': False}
# Check for stored trajectory-files
if info['status'] == 'ACC':
status_line = 'traj-acc'
# Append statistical weights
statw.append(1)
weight.append(1)
else:
status_line = 'traj-rej'
# Append statistical weights
if weight:
weight[-1] += 1
statw.append(0)
path_to_cycles = (
f"{os.getcwd()}/{ensemble_name}/traj/{status_line}"
)
info['stored'] = os.path.exists(path_to_cycles +
f'/{cycle}/traj')
if len(text) > 6: # For backward comparability
# Add MC move
info['MC-move'] = ''.join(filter(
lambda i: i not in bad_characters, text[6]))
else:
# Read data line
odata = oline.split()
if not odata:
continue
frames[odata[0]] = {self.order_parameter: np.nan}
# Fill in OP-data to the frame
for label, data in zip(self.infos['op_labels'], odata[1:]):
frames[odata[0]][label] = np.array(data)
self.fill_op(frames, info, ensemble_name, cycle)
# Give weights to the trajectories
for cycle in self.traj_dict[ensemble_name].keys():
traj = self.traj_dict[ensemble_name][cycle]
index = list(self.traj_dict[ensemble_name].keys()).index(cycle)
if statw[index] == 1:
traj.info['weight'] = weight[0]
weight.pop(0)
else:
traj.info['weight'] = 0
[docs] def fill_op(self, frames, info, ensemble_name, cycle):
"""Fill in OP-data to a trajectory.
Function that fills the dictionary traj_dict containing
Trajectory objects with data from the order parameter
and the collective variables from the simulation.
Parameters
----------
frames: dict
Dictionary containing energy-data for the trajectory.
info : dict
Information about the trajectory.
ensemble_name : string
The name of the ensemble.
cycle : int
Cycle number.
Updates
-------
traj_dict : Trajectory object
Trajectory object with filled in OP-data.
"""
orderp = self.order_parameter
frames = pd.DataFrame.from_dict(
frames, orient='index', dtype=float)
for var in frames.columns:
remove_nan(frames[var])
if orderp in frames.columns and \
frames[orderp].isnull().sum() != len(frames[orderp].index):
start_end = where_from_to(np.array(frames[orderp]),
self.infos['interfaces'][0],
self.infos['interfaces'][-1])
info['reactive'] = start_end[0] + start_end[1] == 'LR'
self.traj_dict[ensemble_name][cycle] = Trajectory(frames, info)
[docs] def get_traj_energy(self, ensemble_name):
"""Read energy.txt files and collects energy-data.
Function that fills the dictionary traj_dict containing
Trajectory objects with energy data from the simulation.
Parameters
----------
ensemble_name : string
The name of the ensemble.
Updates
-------
traj_dict : dict
Dictionary containing Trajectory objects from simulation
with filled in energy-data.
"""
en_file = os.path.join(self.basepath, ensemble_name, 'energy.txt')
if not os.path.exists(en_file):
return
cycle = 0
eframes = {}
with open(en_file, 'r', encoding='utf-8') as energy:
for idx, eline in enumerate(energy):
if eline.startswith('#') and 'Time' in eline:
continue
if eline.startswith('#') and 'Cycle' in eline:
if idx > 1:
# Fill in energy data to the Trajectory
if cycle in self.traj_dict[ensemble_name].keys():
self.fill_energy(
self.traj_dict[ensemble_name][cycle],
eframes, ensemble_name, cycle)
cycle = int(eline.split()[2].rstrip(','))
eframes = {}
else:
edata = eline.split()
if not edata:
continue
eframes[edata[0]] = {}
# in case of missing lines in energy.txt
if len(edata) < 3:
eframes[edata[0]]['potE'] = np.nan
eframes[edata[0]]['kinE'] = np.nan
continue
eframes[edata[0]]['potE'] = float(edata[1])
eframes[edata[0]]['kinE'] = float(edata[2])
# End of file
# Fill in energy data to the Trajectory
if cycle in self.traj_dict[ensemble_name].keys():
self.fill_energy(self.traj_dict[ensemble_name][cycle],
eframes, ensemble_name, cycle)
[docs] def fill_energy(self, traj, eframes, ensemble_name, cycle):
"""Fill in energy-data to a trajectory.
Parameters
----------
traj: Trajectory object
Trajectory object to be filled in energy-data.
eframes: dict
Dictionary containing energy-data for the trajectory.
ensemble_name : string
The name of the ensemble.
cycle : integer
Cycle number.
Updates
-------
traj_dict : Trajectory object
Trajectory object with filled in energy-data.
"""
if not eframes:
return
energy_frame = pd.DataFrame.from_dict(
eframes, orient='index', dtype=float)
for energy_term in energy_frame.columns:
# If dataframe is only nan
if energy_frame[energy_term].isnull().sum() == \
len(energy_frame[energy_term].index):
return
traj.frames[energy_term] = energy_frame[energy_term]
remove_nan(traj.frames[energy_term])
traj.frames['totE'] = \
pd.to_numeric(traj.frames['potE']) + \
pd.to_numeric(traj.frames['kinE'])
self.traj_dict[ensemble_name][cycle] = traj
[docs] def initialize_compressed(self, input_file):
"""Load PathDensity from a compressed file.
Parameters
----------
input_file : string
The input file.
"""
in_file = os.path.join(self.basepath, input_file)
if os.path.isfile(in_file) and input_file.endswith('hdf5'):
self.traj_dict = pd.read_hdf(in_file, key='data')
self.infos = pd.read_hdf(in_file, key='infos')
else:
raise FileNotFoundError(f'File {in_file} not valid.')
[docs]class PathVisualize:
"""Class to define the visualization of data with PathDensity.
Class definition of the visualization of data gathered from simulation
directory using the PathDensity class.
"""
[docs] def __init__(self, basepath='.', pfile=None):
"""Initialize the PathVisualize class.
If a supported compressed input file is present, loads the pre-compiled
data from it. Else, must use specific functions explicitly.
Parameters
----------
basepath : string, optional
The path of the input file.
pfile : string, optional
The input file.
"""
self.infos, self.traj_data = {}, {}
self.op_labels = []
self.x_list, self.y_list, self.z_list = [], [], []
self.data_origin = []
if pfile is None:
self.pfile = None
elif pfile.endswith('rst'):
self.pfile = pfile
else:
self.pfile = os.path.join(basepath, pfile)
self.load_whatever()
[docs] def load_whatever(self):
"""Load all possible supported files.
This functions directs traffic towards the real loaders.
Essentially, it does almost nothing.
"""
clean = False
assert os.path.isfile(self.pfile), f'{self.pfile} does not exist.'
if self.pfile.endswith('.zip'):
origin = self.pfile
self.pfile = self.pfile.rstrip('.zip')
# If a zip file has been loaded through the file menu
tmp = os.path.join(os.path.dirname(os.path.abspath(origin)),
os.path.splitext(self.pfile)[0] + '.tmp' +
os.path.splitext(self.pfile)[1])
self.pfile = tmp
pyvisa_unzip(origin, tmp)
clean = True
if self.pfile.endswith('.hdf5'):
self.load_hdf5()
else:
raise ValueError(f'Format of {self.pfile} not recognised')
# If from zip, just keep the zip
if clean:
os.remove(tmp)
[docs] def load_hdf5(self):
"""Load precompiled data from a hdf5 file.
Function that loads precompiled data from a .hdf5 file made
using pandas.
"""
self.traj_data = pd.read_hdf(self.pfile, key='data')
self.infos = pd.read_hdf(self.pfile, key='infos')
self.op_labels = self.infos['op_labels']
[docs] def load_traj(self, criteria):
"""Load relevant data from Trajectories.
Parameters
----------
criteria : dict
Dictionary of the selection criteria for which data to load.
It contains:
* `x`: string
Name of parameter to plot.
* `y`: string
Name of parameter to plot.
* `z`: string
Name of parameter to plot.
* `ensemble_name`: string
Name of ensemble to loop through.
* `cycles`: tuple
Cycles to loop over.
* `status`: string
Status of the path: accepted/rejected.
* `MC-move`: string, optional
Generation move of the trajectory.
* `stored`: bool, optional
True if the trajectory has available trajectory files.
* `weights`: bool, optional
Option to apply statistical weights to the trajectories,
used in the weighted density plot.
Returns
-------
x_list : list
List of data from chosen parameter.
y_list : list
List of data from chosen parameter.
z_list : list
List of data from chosen parameter, if required.
data_origin: list
List of ensemble_name and cycle for each point, if required.
"""
x_list, y_list, z_list, data_origin = [], [], [], []
for cyc in range(criteria['cycles'][0], criteria['cycles'][1] + 1):
if cyc in self.traj_data[criteria['ensemble_name']].keys():
traj = self.traj_data[criteria['ensemble_name']][cyc]
# If the trajectory is empty
if 'length' not in traj.info.keys():
continue # pragma: no cover
stat_weight = traj.info['weight'] if criteria.get('weight',
False) else 1
for key in criteria.keys():
# For different frequencies of outputs
if key in ['x', 'y', 'z']:
if criteria[key] not in ['time', 'cycle', 'None'] \
and criteria[key] not in traj.frames.columns:
break # pragma: no cover
if key in traj.info.keys():
if key == 'status':
if criteria[key] == 'BOTH':
continue
if criteria[key] == 'REJ' and \
traj.info[key] != 'ACC':
continue
elif ((key == 'stored' and not criteria[key]) or
(key == 'MC-move' and criteria[key] == 'All') or
(key in ['x', 'y', 'z', 'cycles', 'weight'])):
continue
if criteria[key] != traj.info[key]:
break
else:
for _ in range(stat_weight):
data_origin.extend([[traj.info['ensemble_name'],
traj.info['cycle']]] *
traj.info['length'])
for xyz in {'x': x_list,
'y': y_list,
'z': z_list}.items():
if criteria[xyz[0]] == 'time':
xyz[1].extend(range(0, traj.info['length']))
elif criteria[xyz[0]] == 'cycle':
xyz[1].extend([traj.info['cycle']] *
traj.info['length'])
elif criteria[xyz[0]] == 'None':
xyz[1].extend([1] * traj.info['length'])
else:
xyz[1].extend(
traj.frames[criteria[xyz[0]]])
return x_list, y_list, z_list, data_origin