Source code for pyretis.pyvisa.common

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This file contains common functions for the path density.

It contains some functions that is used to compare and process data,
like matching similar lists or attempt periodic shifts of values.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

find_rst_file (:py:func: `.find_rst_file`)
    Search for a rst-file from a chosen subdirectory.

read_traj_txt_file (:py:func: `.read_traj_txt_file`)
    Read the sequence of files in a trajectory from a traj.txt file.

recalculate_all (:py:func:`.recalculate_all`)
    Recalculate order parameter and new collective variables
    by finding all trajectory files from a simulation.

shift_data (:py:func: `.shift_data`)
    Finds the median value of a given list of floats, and shifts the
    lower half of the data by the median.

try_data_shift (:py:func: `.try_data_shift`
    Takes in two lists of values, x and y, and calculates a linear
    regression and R**2-correlation of the data set. Attempts a shift of
    each data set by their respective median to increase the correlation.

where_from_to (:py:func: `.where_from_to`)
    Check the initial and final steps of a trj in respect to the interfaces.

get_cv_names (:py:func: `.get_cv_names`)
    Outputs a list of the names of the descriptors in the simulation.

recalculate_all (:py:func: `.recalculate_all`)
    Recompute all the order parameters according to the PyRETIS storage scheme
    or for individual files/folders

find_data  (:py:func: `.find_data`)
    Find suitable frames/trajectories to recompute the orderp parameter on.

"""
import os
import timeit
import scipy
from pyretis.inout import print_to_screen, settings
from pyretis.inout.common import create_backup, TRJ_FORMATS
from pyretis.inout.formats.path import PathExtFile
from pyretis.setup.common import create_orderparameter
from pyretis.tools.recalculate_order import recalculate_order
from pyretis.initiation.initiate_load import write_order_parameters

__all__ = ['find_rst_file', 'read_traj_txt_file',
           'shift_data', 'try_data_shift', 'where_from_to',
           'get_cv_names', 'recalculate_all', 'find_data']


[docs]def try_data_shift(x, y, fixedx): """Check if shifting increases correlation. Function that checks if correlation of data increases by shifting either sets of values, x or y, or both. Correlation is checked by doing a simple linear regression on the different sets of data: - x and y , x and yshift, xshift and y, xshift and yshift. If linear correlation increases (r-squared value), data sets are updated. As a precaution, no shift is performed on x values if they are of the first order parameter 'op1'. Parameters ---------- x, y : list Floats, data values fixedx : bool If True, x is main OP and should not be shifted. Returns ------- x, y : list Floats, updated (or unchanged) data values (If changed, returns x_temp or y_temp or both) """ # The unshifted data _, _, r_val, _, _ = scipy.stats.linregress(x, y) # The Y-shifted data y_temp = shift_data(y) _, _, r_y, _, _ = scipy.stats.linregress(x, y_temp) yshift = r_y**2 > r_val**2 # The X-shifted data x_temp = shift_data(x) _, _, r_x, _, _ = scipy.stats.linregress(x_temp, y) xshift = r_x**2 > r_val**2 and r_x**2 > r_y**2 # Comparing effectiveness of both shifts individually, and combined _, _, r_xy, _, _ = scipy.stats.linregress(x_temp, y_temp) xyshift = r_xy**2 > r_val**2 and r_xy**2 > r_y**2 and r_xy**2 > r_x**2 # If first op is op1, don't shift data if xyshift and not fixedx: return x_temp, y_temp if xshift and not fixedx: return x_temp, y if yshift: return x, y_temp return x, y
[docs]def shift_data(x): """Shifts the data under the median. Function that takes in a list of data, and shifts all values below the median value of the data by the max difference, effectively shifting parts of the data periodically in order to give clusters for visualization. Parameters ---------- x : list Floats, data values Returns ------- xnorm : list Floats where some values are shifted values of x, and some are left unchanged. """ xmin, xmax = min(x), max(x) xnorm = [] # The max difference in x-data diff_x = xmax - xmin # The Median of x-data medix = xmin + 0.5 * diff_x for i in x: if i < medix: xnorm.append(i + diff_x) else: xnorm.append(i) return xnorm
[docs]def read_traj_txt_file(path): """Read a traj.txt file. Function which reads a traj.txt file and returns a dict containing the name of each file in the trajectory and the sign of their velocity. Parameters ---------- path : string Path to the traj.txt file. Returns ------- files : dict Dictionary containing each file in the trajectory and the sign of their velocity. """ files = {} i = 0 with PathExtFile(path, 'r') as pfile: for block in pfile.load(): for data in block['data']: if data[0] == '0': files[i] = [data[1], data[3]] if data[1] != files[i][0]: i += 1 files[i] = [data[1], data[3]] return files
[docs]def find_rst_file(search_dir): """Search for rst-files. Parameters ---------- search_dir : string Path to the .rst file. Returns ------- out[0] : string Path and name of the .rst file. """ for _ in range(len(search_dir.split('/'))): for file_name in os.listdir(search_dir): if file_name.endswith('rst'): os.chdir(search_dir) return search_dir + '/' + file_name os.chdir('../') search_dir = os.getcwd() return search_dir
[docs]def where_from_to(trj, int_a, int_b=float('-inf')): r"""Detect L∕R starts and L / R / \* ends. Given a list of order parameters (a trj), the function will try to establish where the path started (L or R or \*) and where it ended. Note: for the 'REJ' paths, this function results might differ from PyRETIS. Parameters ---------- trj: numpy array The order parameters of the trj. int_a: float The interface that defines state A. int_b: float, optional The interface that defines state B. If not given, it is assumed that the 0^- ensemble is in use without the 0^- L interface. Returns ------- start: string\*1 The initial position of the trajectory in respect to the interfaces given (L eft, R ight or \* for nothing). end: string\*1 The final position of the trajectory in respect to the interfaces given (L eft, R ight or \* for nothing). """ start, end = '*', '*' int_l = min(int_a, int_b) int_t = max(int_a, int_b) if trj[0] >= int_t: start = 'R' if trj[0] < int_l: start = 'L' if trj[-1] >= int_t: end = 'R' if trj[-1] < int_l: end = 'L' return start, end
[docs]def get_cv_names(input_settings): """Return names of op and cv's. Parameters ---------- input_settings: dict Dictionary with the settings from the simulations. Returns ------- names = list List of the names. """ op_names = [] # Collect names of op and cv's if available if 'name' in input_settings['orderparameter'].keys(): op_names.append(input_settings['orderparameter']['name']) if 'collective-variable' in input_settings.keys(): for c_v in input_settings['collective-variable']: if 'name' in c_v.keys(): op_names.append(c_v['name']) return op_names
[docs]def recalculate_all(runfolder, iofile, ensemble_names=None, data=None): """Recalculate order parameter and collective variables. This function performs post-processing by analyzing trajectories of old simulations in order to extract data and do new calculations and write to a new order.txt file. Parameters ---------- runfolder: string The path of the execution directory. iofile: string The input file where the settings are collected. ensemble_names: list, optional List of ensemble names in the simulation to work with. data: string, optional If given, the function will check only the single file or look only in the given directory Returns ------- out : boolean True if the recomputation was successful, False otherwise. """ # Gather collective variables and corresponding option from .rst io_file = os.path.join(runfolder, iofile) assert io_file is not None, 'Input file not given' input_settings = settings.parse_settings_file(io_file) trj_dict = find_data(runfolder, ensemble_names, data=data) print_to_screen('Re-computing the collective variables.', level='message') tic = timeit.default_timer() if not trj_dict: print_to_screen('No data to re-compute from', level='warning') return False try: functions = create_orderparameter(input_settings) except (ImportError, ValueError): msg = 'Invalid Order Parameter' print_to_screen(msg, level='warning') # pragma: no cover return False # pragma: no cover # Create the composite order parameter (Avoid circular reference) for _, ens in trj_dict.items(): if ens.get('main_o') is not None: main_order = os.path.join(runfolder, ens['main_o']) create_backup(main_order) for _, cycles in sorted(ens['traj'].items()): here = os.path.dirname(os.path.abspath(cycles['traj'][0])) new_o = os.path.join(here, 'order.txt') results_dict = [] for trj in cycles['traj']: try: for order_p in recalculate_order(functions, trj, {}): results_dict.append(order_p) # pragma: no cover except (KeyError, AttributeError): print_to_screen(f'File {trj} not valid', level='warning') local_order = cycles.get('o_txt', new_o) create_backup(local_order) write_order_parameters(local_order, results_dict, cycles.get('header', 'Recalculated data')) if ens.get('main_o') is not None: os.system(f"cat {local_order} >> {main_order}") print_to_screen('# Data successfully recomputed!', level='success') print_to_screen('# Time spent: {:.2f}s'.format( timeit.default_timer() - tic), level='success') return True
[docs]def find_data(runfolder, ensemble_names=None, data=None): """Find the trajectory data used to do post-processing. find_traj returns a dict with a structure resembling that of the simulation. Parameters ---------- runfolder: string, optional The path of the execution directory. ensemble_names: list, optional List of ensemble names in the simulation to work with. data: string, optional If given, the function will check only the single file or look only in the given directory Returns ------- trj_dict : dict To each key, ensemble_name (e.g. 000, 001, etc) the values are: the last accepted trajectories given by the `accepted`-key; the generation trajectory or conf files given by the `generation`-key, and lastly the dictionary `stored_traj` that is given by the `traj`-key. `stored_traj` is split up into the dictionaries`traj-acc` and `traj-rej` which have keys for all the accepted and rejected cycles respectively, where the trajectory files for that cycle is stored. """ trj_dict = {} # Specific file/folder if data is not None: sources = [data] if os.path.isfile(data) else _get_trjs(data) if sources: trj_dict['000'] = {'traj': {'0': {'traj': sources}}} return trj_dict # Structured data flag_map = {'traj-acc': 'ACC', 'traj-rej': 'REJ'} if ensemble_names is None: ensemble_names = [i.name for i in os.scandir(runfolder) if (i.is_dir() and i.name.isdigit())] for ens_name in sorted(ensemble_names): order_txt = os.path.join(os.path.abspath(runfolder), ens_name, 'order.txt') trj_dict[ens_name] = {'main_o': order_txt, 'traj': {}} for trj_type in ['traj-acc', 'traj-rej']: here = os.path.join(os.path.abspath(runfolder), ens_name, 'traj', trj_type) if not os.path.isdir(here): continue for cycle in sorted([i.name for i in os.scandir(here) if i.name.isdigit()]): loc = os.path.join(here, cycle, 'traj') o_txt = os.path.join(here, cycle, 'order.txt') header = f'# Cycle: {cycle},' \ f' status: {flag_map[trj_type]}' if os.path.exists(o_txt): with open(o_txt, 'r', encoding='utf-8') as file_in: header = file_in.readline().replace('\n', '') trj_dict[ens_name]['traj'][cycle] = {'header': header, 'o_txt': o_txt, 'traj': _get_trjs(loc)} return trj_dict
[docs]def _get_trjs(runfolder='.'): """Find the trajectory files. Parameters ---------- runfolder : string, optional the location of the main simulation folder. Returns ------- full_name_trj : list The trajectory files contained in the folder. """ trj = [i.name for i in os.scandir(runfolder) if i.name[-4:] in TRJ_FORMATS] full_name_trj = [os.path.join(runfolder, i) for i in trj] return full_name_trj