Source code for pyretis.analysis.path_analysis

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Methods for analysis of path ensembles.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

analyse_path_ensemble (:py:func:`.analyse_path_ensemble`)
    Method to analyse a path ensemble, it will calculate crossing
    probabilities and information about moves etc. This method
    can be applied to files as well as path ensemble objects.

analyse_path_ensemble_object (:py:func:`.analyse_path_ensemble_object`)
    Method to analyse a path ensemble, it will calculate crossing
    probabilities and information about moves etc. This method is
    intended to work directly on path ensemble objects.

match_probabilities (:py:func:`.match_probabilities`)
    Match probabilities from several path ensembles and calculate
    efficiencies and the error for the matched probability.

retis_flux (:py:func:`.retis_flux`)
    Calculate the initial flux with errors for a RETIS simulation.

retis_rate (:py:func:`.retis_rate`)
    Calculate the rate constant with errors for a RETIS simulation.
"""
import os
import logging
import numpy as np
from pyretis.analysis.analysis import running_average, block_error_corr
from pyretis.analysis.histogram import histogram, histogram_and_avg
from pyretis.inout.formats import OrderPathFile

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['analyse_path_ensemble', 'analyse_repptis_ensemble',
           'match_probabilities', 'retis_flux', 'retis_rate',
           'perm_calculations', 'skip_paths']


SHOOTING_MOVES = {'sh', 'ss', 'wt', 'wf', 'ts'}
INITIAL_MOVES = {'ki', 'ld', 'is', 're'}


[docs]def _pcross_lambda_cumulative(orderparam, ordermin, ordermax, ngrid, weights=None, ha_weights=None): """Obtain crossing probability as a function of the order parameter. It will do the actual calculation of the crossing probability as a function of order parameter. It is split off from `pcross_lambda` since the analysis is intended to be backward compatible with the output/results from the old ``TISMOL FORTRAN`` program. Parameters ---------- orderparam : numpy.array Array containing the order parameters. ordermin : float Minimum allowed order parameter. ordermax : float Maximum allowed order parameter. ngrid : int This is the number of grid points. weights : numpy.array, optional The weight of each order parameter. This is used in order to count a specific order parameter more than once. If not given, the values in `orderparam` will be weighted equally. ha_weights : numpy.array, optional The weights of the high acceptance scheme (e.g. Stone Skipping) Returns ------- lamb: numpy.array The grid on which pcross is based. pcross : numpy.array Estimated crossing probability distribution. """ lamb = np.linspace(ordermin, ordermax, ngrid) pcross = np.zeros(ngrid) delta_l = lamb[1] - lamb[0] sumw = 0.0 for i, orderp in enumerate(orderparam): idx = np.floor((orderp - ordermin) / delta_l) idx = max(0, int(idx) + 1) # +1: idx is here defined so that lamb[idx-1] <= orderp < lamb[idx] # further this lambda will contribute up to and including lamb[idx] # this is accomplished by the idx+1 when summing weights below weight = 1. if weights is None else weights[i] ha_weight = 1. if ha_weights is None else ha_weights[i] sumw += weight*ha_weight if idx >= ngrid: pcross += weight*ha_weight else: pcross[:idx + 1] += weight*ha_weight # +1 to include up to idx pcross /= sumw # normalisation return lamb, pcross
[docs]def _get_path_length(path, ensemble_number): """Return the path length for different moves. Different moves may have a different way of obtaining the path length. (Example: time-reversal vs. shooting move). Parameters ---------- path : dict This is the dict containing the information about the path. It can typically be obtained by iterating over the path ensemble object, e.g. with a `for path in path_ensemble.get_paths():`. ensemble_number : int This integer identifies the ensemble. This is used for the swapping moves in [0^-] and [0^+]. Returns ------- out : int The path length """ move = path['generated'][0] return_table = {'tr': 0, 's+': 0, 's-': 0, '00': 0, 'mr': 0} if move in return_table: if move == 's+' and ensemble_number == 0: return path['length'] - 2 if move == 's-' and ensemble_number == 1: return path['length'] - 2 return return_table[move] if move in SHOOTING_MOVES: return path['length'] - 1 if move in INITIAL_MOVES: logger.info('Skipped initial path (move "%s")', move) return None logger.warning('Skipped path with unknown mc move: %s', move) return None
[docs]def _update_shoot_stats(shoot_stats, path): """Update the shooting statistics with the status of the given path. Parameters ---------- shoot_stats : dict This dict contains the results from the analysis of shooting moves. E.g. `shoot_stats[key]` contain the order parameters for the status `key` which can be the different statuses defined in `pyretis.core.path._STATUS` or 'REJ' (for rejected). path : dict This is the path information, represented as a dictionary. Returns ------- out : None Returns `None` but will update `shoot_stats` for shooting moves. """ move = path['generated'][0] if move in SHOOTING_MOVES: orderp = path['generated'][1] status = path['status'] if status not in shoot_stats: shoot_stats[status] = [] shoot_stats[status].append(orderp) if status != 'ACC': shoot_stats['REJ'].append(orderp) shoot_stats['ALL'].append(orderp)
[docs]def _create_shoot_histograms(shoot_stats, bins): """Create histograms and scale for the shoot analysis. Parameters ---------- shoot_stats : dict This dict contains the results from the analysis of shooting moves. E.g. `shoot_stats[key]` contain the order parameters for the status `key` which can be the different statuses defined in `pyretis.core.path._STATUS` or 'REJ' (for rejected). bins : int The number of bins to use for the histograms. Returns ------- out[0] : dict For each possible status ('ACC, 'BWI', etc.) this dict will contain a histogram as returned by the histogram function. It will also contain a 'REJ' key which is the concatenation of all rejections and a 'ALL' key which is simply all the values. out[1] : dict For each possible status ('ACC, 'BWI', etc.) this dict will contain the scale factors for the histograms. The scale factors are obtained by dividing with the 'ALL' value. See Also -------- :py:func:`.histogram` in :py:mod:`.histogram`. """ histograms = {} scale = {} for key in shoot_stats: if not shoot_stats[key]: logger.warning('No shoots data found for %s (empty histogram)', key) shoot_stats[key] = np.array([]) mind = 0.0 maxd = 0.1 else: shoot_stats[key] = np.array(shoot_stats[key]) mind = shoot_stats[key].min() maxd = shoot_stats[key].max() histograms[key] = histogram(shoot_stats[key], bins=bins, limits=(mind, maxd), density=True) scale[key] = (float(len(shoot_stats[key])) / float(len(shoot_stats['ALL']))) return histograms, scale
[docs]def analyse_path_ensemble(path_ensemble, settings): """Analyse a path ensemble. This function will make use of the different analysis functions and analyse a path ensemble. This function is more general than the `analyse_path_ensemble_object` function in that it should work on both `PathEnsemble` and `PathEnsembleFile` objects. The running average is updated on-the-fly, see Wikipedia for details [wikimov]_. Parameters ---------- path_ensemble : object like :py:class:`.PathEnsemble` This is the path ensemble to analyse. settings : dict This dictionary contains settings for the analysis. We make use of the following keys: * `ngrid`: The number of grid points for calculating the crossing probability as a function of the order parameter. * `maxblock`: The max length of the blocks for the block error analysis. Note that this will maximum be equal the half of the length of the data, see `block_error` in `.analysis`. * `blockskip`: Can be used to skip certain block lengths. A `blockskip` equal to `n` will consider every n'th block up to `maxblock`, i.e. it will use block lengths equal to `1`, `1+n`, `1+2n`, etc. * `bins`: The number of bins to use for creating histograms. Returns ------- out : dict This dictionary contains the main results for the analysis which can be used for plotting or other kinds of output. See Also -------- :py:func:`._update_shoot_stats`, :py:func:`.pcross_lambda_cumulative` and :py:func:`._create_shoot_histograms`. References ---------- .. [wikimov] Wikipedia, "Moving Average", https://en.wikipedia.org/wiki/Moving_average """ detect = settings['tis']['detect'] ens_number = path_ensemble.ensemble_number if ens_number == 0: return analyse_path_ensemble0(path_ensemble, settings) result = {'cycle': [], 'detect': detect, 'ensemble': path_ensemble.ensemble_name, 'ensembleid': ens_number, 'interfaces': list(path_ensemble.interfaces)} orderparam = [] # list of all accepted order parameters weights = [] pdata = [] length_acc = [] length_all = [] shoot_stats = {'REJ': [], 'ALL': []} ha_w = 0 nacc = 0 nacc_swap = 0 npath = 0 npath_swap = 0 skip_lines = settings.get("analysis", {}).get("skip", 0) skip_weight = 0 production_run = False # True from the first ACC path not in I.M. for i, path in enumerate(path_ensemble.get_paths()): # loop over all paths if not production_run and path['status'] == 'ACC' and \ path['generated'][0] in SHOOTING_MOVES: production_run = True if not production_run: continue npath += 1 if path['generated'][0] in ('s-', 's+'): npath_swap += 1 if path['status'] == 'ACC': nacc_swap += 1 if path['status'] == 'ACC': nacc += 1 weights.append(1) ha_w = 1./path.get('weight', 1.) orderparam.append(path['ordermax'][0]) length_acc.append([path['length'], ha_w]) if ens_number != 0: success = 1 if path['ordermax'][0] > detect else 0. pdata.append([success, ha_w]) # Store data for block analysis elif nacc != 0: # just increase the weights weights[-1] += 1 if i < skip_lines: skip_weight += 1 # we also update the running average of the probability here: result['cycle'].append(path['cycle']) # get the length - note that this length depends on the type of move # see the `_get_path_length` function. length = _get_path_length(path, ens_number) if length is not None: length_all.append([length, ha_w]) # update the shoot stats, this will only be done for shooting moves _update_shoot_stats(shoot_stats, path) # Update the weights for skipped paths weights, nacc = skip_paths(weights, nacc, skip_weight) # Check if we have enough data to do analysis: assert nacc != 0, f'No accepted paths to analyse in ensemble {ens_number}' # When restarting a simulations, or by stacking together different # simulations, the cycles might not be in order. We thus reset the counter. result['cycle'] = np.arange(len(result['cycle'])) # 1) result['prun'] is already calculated. pdata = np.array(pdata) result['pdata'] = np.array(pdata) # 2) lambda pcross: analysis = settings['analysis'] if ens_number != 0: # 1) result['prun'] is calculated here with the correct path weights. result['prun'] = running_average(np.repeat(pdata, weights, axis=0)) result['pdata'] = np.array(pdata) # 2) lambda pcross: orderparam = np.array(orderparam) ordermax = min(orderparam.max(), max(path_ensemble.interfaces)) result['pcross'] = _pcross_lambda_cumulative( orderparam, path_ensemble.interfaces[1], ordermax, analysis['ngrid'], weights=weights, ha_weights=result['pdata'][:, 1]) # 3) block error analysis: result['blockerror'] = block_error_corr( data=np.repeat(pdata, weights, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) # 4) length analysis: length_acc = np.array(length_acc) length_all = np.array(length_all) hist1 = histogram_and_avg(np.repeat(length_acc[:, 0], weights), analysis['bins'], density=True, weights=np.repeat(length_acc[:, 1], weights)) hist2 = histogram_and_avg(np.array(length_all[:, 0]), analysis['bins'], density=True, weights=length_all[:, 1]) result['pathlength'] = (hist1, hist2) # 5) shoots analysis: result['shoots'] = _create_shoot_histograms(shoot_stats, analysis['bins']) # 6) Add some simple efficiency metrics: result['tis-cycles'] = npath result['efficiency'] = [float(nacc - nacc_swap) / float(npath - npath_swap)] if npath_swap == 0: result['efficiency'].append(np.nan) else: result['efficiency'].append(float(nacc_swap) / float(npath_swap)) # extra analysis for the [0^- and 0^+] ensembles in case we will determine # the initial flux: if ens_number in [0, 1]: result['lengtherror'] = block_error_corr( data=np.repeat(length_acc, weights, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) lenge2 = result['lengtherror'][4] * hist1[2][0] / (hist1[2][0]-2.) result['fluxlength'] = [hist1[2][0]-2.0, lenge2, lenge2 * (hist1[2][0]-2.)] result['fluxlength'].append(result['efficiency'][1] * lenge2**2) return result
def analyse_path_ensemble0(path_ensemble, settings): """Analyse the [0^-] ensemble. Parameters ---------- path_ensemble : object like :py:class:`.PathEnsemble` This is the path ensemble to analyse. settings : dict This dictionary contains settings for the analysis. We make use of the following keys: * `ngrid`: The number of grid points for calculating the crossing probability as a function of the order parameter. * `maxblock`: The max length of the blocks for the block error analysis. Note that this will maximum be equal the half of the length of the data, see `block_error` in `.analysis`. * `blockskip`: Can be used to skip certain block lengths. A `blockskip` equal to `n` will consider every n'th block up to `maxblock`, i.e. it will use block lengths equal to `1`, `1+n`, `1+2n`, etc. * `bins`: The number of bins to use for creating histograms. Returns ------- result : dict The results from the analysis for the ensemble. """ simulation_settings = settings.get('simulation', {}) permeability = simulation_settings.get('permeability', False) detect = settings['tis']['detect'] ensemble_number = path_ensemble.ensemble_number sub_cycles = settings.get('engine', {}).get('subcycles', 1) timestep = settings.get('engine', {}).get('timestep', 1)*sub_cycles if permeability: path_ensemble.interfaces = (settings['simulation'].get('zero_left', float('-inf') ), path_ensemble.interfaces[1], path_ensemble.interfaces[2]) result = {'cycle': [], 'detect': detect, 'ensemble': path_ensemble.ensemble_name, 'ensembleid': ensemble_number, 'interfaces': list(path_ensemble.interfaces), 'permeability': permeability} length_acc, length_all, weights = [], [], [] # Regular setting[analysis] is overwritten by the code that generates # out.rst, but this one is complete analysis = settings['analysis'] # Permeability analysis bin_edges = analysis.get('tau_ref_bin', None) end_acc, tau_acc, tau_ref_acc = [], [], [] tau_ref_bins = make_tau_ref_bins(list(path_ensemble.interfaces), nbins=10, tau_region=bin_edges) shoot_stats = {'REJ': [], 'ALL': []} nacc, npath = 0, 0 nacc_swap, npath_swap = 0, 0 production_run = True fn_op = path_ensemble.filename.rsplit("/", 1)[0]+"/order.txt" o_ps = None op_data = None skip_lines = settings.get("analysis", {}).get("skip", 0) skip_weight = 0 if os.path.isfile(fn_op): op_file = OrderPathFile(fn_op, 'r') o_ps = op_file.load() op_file.close() for num, path in enumerate(path_ensemble.get_paths()): # loop over paths if o_ps is not None: err = False try: o_p = next(o_ps) except StopIteration: o_ps = None err = True if not err: cycle_comment = [i for i in o_p['comment'] if "Cycle" in i][-1] op_cycle = int( cycle_comment.split(',')[0].split(':')[1].strip()) err = op_cycle != path['cycle'] if not err: op_data = o_p['data'].T if err: op_data = None msg = ("Order-file does not align with path-ensemble setting " "op data to None") logger.warning(msg) if path['generated'][0] in INITIAL_MOVES: production_run = False if not production_run and path['status'] == 'ACC' and \ path['generated'][0] in SHOOTING_MOVES: production_run = True if not production_run: continue npath += 1 if path['generated'][0] in ('s-', 's+'): npath_swap += 1 if path['status'] == 'ACC': nacc_swap += 1 if path['status'] == 'ACC': nacc += 1 ha_w = 1./path.get('weight', 1.) weights.append(1) length_acc.append([path['length'], ha_w]) # Get the end point end_acc.append(path['interface'][-1]) # Calculate tau tau_acc.append(_calc_tau(op_data, bin_edges, timestep=timestep)) tau_ref_acc.append( [_calc_tau(op_data, edges, timestep=timestep) for edges in tau_ref_bins] ) elif nacc != 0: # just increase the weights weights[-1] += 1 if num < skip_lines: skip_weight += 1 result['cycle'].append(path['cycle']) length = _get_path_length(path, ensemble_number) if length is not None: ha_w = 1./path.get('weight', 1.) length_all.append([length, ha_w]) # update the shoot stats, this will only be done for shooting moves _update_shoot_stats(shoot_stats, path) # Update the weights to skip properly weights, nacc = skip_paths(weights, nacc, skip_weight) length_acc = np.array(length_acc) length_all = np.array(length_all) # Check if we have enough data to do analysis: ens_number = 0 assert nacc != 0, f'No accepted paths to analyse in ensemble {ens_number}' # Perform the different analysis tasks: result['cycle'] = np.array(result['cycle']) # 1) length analysis: hist1 = histogram_and_avg(np.repeat(length_acc[:, 0], weights), analysis['bins'], density=True) hist2 = histogram_and_avg(np.array(length_all[:, 0]), analysis['bins'], density=True) result['pathlength'] = (hist1, hist2) # 2) block error of lengths: result['lengtherror'] = block_error_corr(data=np.repeat(length_acc, weights, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) # 3) shoots analysis: result['shoots'] = _create_shoot_histograms(shoot_stats, analysis['bins']) # 4) Add some simple efficiency metrics: result['efficiency'] = [float(nacc - nacc_swap) / float(npath - npath_swap)] if npath_swap == 0: result['efficiency'].append(np.nan) else: result['efficiency'].append(float(nacc_swap) / float(npath_swap)) lenge2 = result['lengtherror'][4] * hist1[2][0] / (hist1[2][0]-2.) result['fluxlength'] = [hist1[2][0]-2.0, lenge2, lenge2 * (hist1[2][0]-2.)] result['fluxlength'].append(result['efficiency'][1] * lenge2**2) result['tis-cycles'] = npath # 5) optional, do the permeability analysis if permeability: end_acc = [1 if i == 'R' else 0 for i in end_acc] # Xi is 1/running average result['xi'] = running_average(np.repeat(end_acc, weights)) result['xierror'] = block_error_corr(result['xi'], maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) result['tau'] = running_average(np.repeat(tau_acc, weights)) result['tauerror'] = block_error_corr(result['tau'], maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) result['tau_bin'] = bin_edges result['tau_ref'] = running_average(np.repeat(tau_ref_acc, repeats=weights, axis=0))[-1] result['tau_ref_bins'] = tau_ref_bins return result
[docs]def analyse_repptis_ensemble(path_ensemble, settings): """Analyse a repptis path ensemble. This function is a modification of analyse_path_ensemble(), which make use of the different analysis functions and analyse a repptis path ensemble. Parameters ---------- path_ensemble : object like :py:class:`.PathEnsemble` This is the path ensemble to analyse. settings : dict This dictionary contains settings for the analysis. We make use of the following keys: * `ngrid`: The number of grid points for calculating the crossing probability as a function of the order parameter. * `maxblock`: The max length of the blocks for the block error analysis. Note that this will maximum be equal the half of the length of the data, see `block_error` in `.analysis`. * `blockskip`: Can be used to skip certain block lengths. A `blockskip` equal to `n` will consider every n'th block up to `maxblock`, i.e. it will use block lengths equal to `1`, `1+n`, `1+2n`, etc. * `bins`: The number of bins to use for creating histograms. Returns ------- out : dict This dictionary contains the main results for the analysis which can be used for plotting or other kinds of output. """ detect = settings['tis']['detect'] ens_number = path_ensemble.ensemble_number if ens_number == 0: return analyse_path_ensemble0(path_ensemble, settings) result = {'cycle': [], 'detect': detect, 'ensemble': path_ensemble.ensemble_name, 'ensembleid': ens_number, 'interfaces': list(path_ensemble.interfaces)} orderparam = [] # list of all accepted order parameters weights = [] nacc_sl, nacc_sr = 0, 0 weights_sl, weights_sr = [], [] pdata = [] pdata_sl = [] # start left pdata_sr = [] # start right length_acc = [] length_all = [] shoot_stats = {'REJ': [], 'ALL': []} ha_w = 0 nacc = 0 nacc_swap = 0 npath = 0 npath_swap = 0 skip_lines = settings.get("analysis", {}).get("skip", 0) skip_weight = 0 production_run = False # True from the first ACC path not in I.M. lmrs = [] # keep track of a path's left, middle and right ordermin = [] intfs000 = path_ensemble.interfaces ptypes = [[0, 0, 0, 0]] # LML, LMR, RMR, RML amounts for i, path in enumerate(path_ensemble.get_paths()): # loop over all paths # we skip the first number of lines that should not be part of # the analysis. if i < skip_lines: continue if not production_run and path['status'] == 'ACC' and \ path['generated'][0] in SHOOTING_MOVES: production_run = True if not production_run: continue npath += 1 if path['generated'][0] in ('s-', 's+'): npath_swap += 1 if path['status'] == 'ACC': nacc_swap += 1 if path['status'] == 'ACC': nacc += 1 weights.append(1) ha_w = 1./path.get('weight', 1.) orderparam.append(path['ordermax'][0]) ordermin.append(path['ordermin'][0]) length_acc.append([path['length'], ha_w]) lmrs.append(''.join(path['interface'])) if ens_number != 0: success = 1 if path['ordermax'][0] > detect else 0. pdata.append([success, ha_w]) # Store data for block analysis if lmrs[-1][0] == 'L': nacc_sl += 1 weights_sl.append(1) success_sl = 1 if path['ordermax'][0] > intfs000[2] else 0. pdata_sl.append([success_sl, ha_w]) ptype = [0, 1, 0, 0] if success_sl == 1 else [1, 0, 0, 0] elif lmrs[-1][0] == 'R': weights_sr.append(1) nacc_sr += 1 success_sr = 1 if path['ordermin'][0] < intfs000[0] else 0. pdata_sr.append([success_sr, ha_w]) ptype = [0, 0, 0, 1] if success_sr == 1 else [0, 0, 1, 0] elif nacc != 0: # just increase the weights weights[-1] += 1 if ens_number != 0: if lmrs[-1][0] == 'L': weights_sl[-1] += 1 elif lmrs[-1][0] == 'R': weights_sr[-1] += 1 if i < skip_lines: skip_weight += 1 # Update ptypes with the last ptype ptypes.append(ptype) # we also update the running average of the probability here: result['cycle'].append(path['cycle']) # get the length - note that this length depends on the type of move # see the `_get_path_length` function. length = _get_path_length(path, ens_number) if length is not None: length_all.append([length, ha_w]) # update the shoot stats, this will only be done for shooting moves _update_shoot_stats(shoot_stats, path) # save the ptypes, which will be used for recursive block analysis result['ptypes'] = np.cumsum(np.array(ptypes), axis=0) # Check if we have enough data to do analysis: assert nacc != 0, f'No accepted paths to analyse in ensemble {ens_number}' # When restarting a simulations, or by stacking together different # simulations, the cycles might not be in order. We thus reset the counter. result['cycle'] = np.arange(len(result['cycle'])) # 1) result['prun'] is already calculated. pdata = np.array(pdata) result['pdata'] = np.array(pdata) # 2) lambda pcross: analysis = settings['analysis'] if ens_number != 0: # 1) result['prun'] is calculated here with the correct path weights. result['prun_sl'] = running_average(np.repeat(pdata_sl, weights_sl, axis=0)) result['prun_sr'] = running_average(np.repeat(pdata_sr, weights_sr, axis=0)) result['pdata'] = np.array(pdata) # 2) lambda pcross: orderparam = np.array(orderparam) ordermax = min(orderparam.max(), max(path_ensemble.interfaces)) result['pcross'] = _pcross_lambda_cumulative( orderparam, path_ensemble.interfaces[1], ordermax, analysis['ngrid'], weights=weights, ha_weights=result['pdata'][:, 1]) # 3) block error analysis: # if we get here, it means that either pdata_sl and/or pdata_sr is not # empty. So we set the non-empty one first, and then equate the empty # one to zeros and copy the blockerror of the non-empty one. if len(pdata_sl) > 0: result['blockerror_sl'] = block_error_corr( data=np.repeat(pdata_sl, weights_sl, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) if len(pdata_sr) > 0: result['blockerror_sr'] = block_error_corr( data=np.repeat(pdata_sr, weights_sr, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) if len(pdata_sl) == 0: result['blockerror_sl'] = result['blockerror_sr'] result['prun_sl'] = np.zeros_like(result['prun_sr']) if len(pdata_sr) == 0: result['blockerror_sr'] = result['blockerror_sl'] result['prun_sr'] = np.zeros_like(result['prun_sl']) # PPRETIS analysis pp_dic = {'lmrs': np.array(lmrs), 'interfaces': path_ensemble.interfaces, 'weights': np.array(weights), 'ordermin': np.array(ordermin), 'ordermax': np.array(orderparam)} pptup = cross_dist_distr(pp_dic) result['repptis'] = pptup # 4) length analysis: length_acc = np.array(length_acc) length_all = np.array(length_all) hist1 = histogram_and_avg(np.repeat(length_acc[:, 0], weights), analysis['bins'], density=True, weights=np.repeat(length_acc[:, 1], weights)) hist2 = histogram_and_avg(np.array(length_all[:, 0]), analysis['bins'], density=True, weights=length_all[:, 1]) result['pathlength'] = (hist1, hist2) # 5) shoots analysis: result['shoots'] = _create_shoot_histograms(shoot_stats, analysis['bins']) # 6) Add some simple efficiency metrics: result['tis-cycles'] = npath result['efficiency'] = [float(nacc - nacc_swap) / float(npath - npath_swap)] swap_acc_ratio = float(nacc_swap) / float(npath_swap) if npath_swap > 0 \ else np.nan result['efficiency'].append(swap_acc_ratio) # extra analysis for the [0^- and 0^+] ensembles in case we will determine # the initial flux: if ens_number in [0, 1]: result['lengtherror'] = block_error_corr( data=np.repeat(length_acc, weights, axis=0), maxblock=analysis['maxblock'], blockskip=analysis['blockskip']) lenge2 = result['lengtherror'][4] * hist1[2][0] / (hist1[2][0]-2.) result['fluxlength'] = [hist1[2][0]-2.0, lenge2, lenge2 * (hist1[2][0]-2.)] result['fluxlength'].append(result['efficiency'][1] * lenge2**2) return result
[docs]def match_probabilities(path_results, detect, settings=None): """Match probabilities from several path ensembles. It calculates the efficiencies and errors for the matched probability. Parameters ---------- path_results : list These are the results from the path analysis. `path_results[i]` contains the output from `analyse_path_ensemble` applied to ensemble no. `i`. Here we make use of the following keys from `path_results[i]`: * `pcross`: The crossing probability. * `prun`: The running average of the crossing probability. * `blockerror`: The output from the block error analysis. * `efficiency`: The output from the efficiency analysis. detect : list of floats These are the detect interfaces used in the analysis. settings : dict, optional This dictionary contains settings for the analysis. Here we make use of the following keys from the analysis section: * `ngrid`: The number of grid points for calculating the crossing probability as a function of the order parameter. * `maxblock`: The max length of the blocks for the block error analysis. Note that this will maximum be equal the half of the length of the data, see `block_error` in `.analysis`. * `blockskip`: Can be used to skip certain block lengths. A `blockskip` equal to `n` will consider every n'th block up to `maxblock`, i.e. it will use block lengths equal to `1`, `1+n`, `1+2n`, etc. * `bins`: The number of bins to use for creating histograms. Returns ------- results : dict These are results for the over-all probability and error and also some over-all TIS efficiencies. """ results = {'matched-prob': [], 'overall-prun': [], 'overall-err': [], 'overall-prob': [[], []]} accprob = 1.0 accprob_err = 0.0 minlen_pdata, minlen_prun = float('inf'), float('inf') for idet, result in zip(detect, path_results): # do match only in part left of idetect: idx = np.where(result['pcross'][0] <= idet)[0] results['overall-prob'][0].extend(result['pcross'][0][idx]) results['overall-prob'][1].extend(result['pcross'][1][idx] * accprob) # update probabilities, error and efficiency: mat = np.column_stack((result['pcross'][0], result['pcross'][1])) mat[:, 1] *= accprob results['matched-prob'].append(mat) accprob *= result['prun'][-1] accprob_err += result['blockerror'][4]**2 # Find the maximum number of cycles for ensemble minlen_pdata = min(minlen_pdata, len(result['pdata'])) minlen_prun = min(minlen_prun, len(result['prun'])) # Finally Construct the cumulative output now results['overall-cycle'] = np.arange(minlen_prun) results['overall-prun'] = [1]*minlen_prun results['overall-pdata'] = [[1, 1]]*minlen_pdata for result in path_results: results['overall-prun'] = np.multiply(result['prun'][-minlen_prun:], results['overall-prun']) results['overall-pdata'] = np.multiply(result['pdata'][-minlen_pdata:], results['overall-pdata']) if settings is not None: analysis = settings['analysis'] results['overall-error'] = block_error_corr(results['overall-pdata'], analysis['maxblock'], analysis['blockskip']) results['overall-prob'] = np.transpose(results['overall-prob']) results['prob'] = accprob results['relerror'] = np.sqrt(accprob_err) return results
[docs]def retis_flux(results0, results1, timestep): """Calculate the initial flux for RETIS. Parameters ---------- results0 : dict Results from the analysis of ensemble [0^-] results1 : dict Results from the analysis of ensemble [0^+] timestep : float The simulation timestep. Returns ------- flux : float The initial flux. flux_error : float The relative error in the initial flux. """ flux0 = results0['fluxlength'] flux1 = results1['fluxlength'] tsum = flux0[0] + flux1[0] flux = 1.0 / (tsum * timestep) flux_error = (np.sqrt((flux0[1]*flux0[0])**2 + (flux1[1]*flux1[0])**2) / tsum) return flux, flux_error
[docs]def retis_rate(pcross, pcross_relerror, flux, flux_relerror): """Calculate the rate constant for RETIS. Parameters ---------- pcross : float Estimated crossing probability pcross_relerror : float Relative error in crossing probability. flux : float The initial flux. flux_relerror : float Relative error in the initial flux. Returns ------- rate : float The rate constant rate_error : float The relative error in the rate constant. """ rate = pcross * flux rate_error = np.sqrt(pcross_relerror**2 + flux_relerror**2) return rate, rate_error
[docs]def perm_calculations(results0, pcross, pcross_err): """Calculate the permeability. This function collects the relevant permeability properties from the output data. It then calculates the permeability and its error based on the values of xi, tau, and the crossing probability pcross. Paramaters ---------- results0 : dict Results from the analysis of ensemble [0^-] pcross : float Estimated crossing probability pcross_err : float Relative error in crossing probability. Returns ------- xxi : float The value of xi tau : float The value of tau perm : float The value of the permeability xi_err : float The error in xi tau_err : float The error in tau perm_err : float The error in permeability """ out = results0['out'] permeability = out.get("permeability", False) xxi, tau, perm, xi_err, tau_err, perm_err = ( None, None, None, float('inf'), float('inf'), float('inf')) if permeability: xxi = out['xi'][-1] xi_err = out['xierror'][4] tau = out['tau'][-1] tau_err = out['tauerror'][4] perm = xxi * (1 / tau) * pcross perm_err = np.sqrt(xi_err**2+tau_err**2+pcross_err**2) return xxi, tau, perm, xi_err, tau_err, perm_err
[docs]def _calc_tau(op_data, bin_edges=None, timestep=1): """Calculate the value of tau. This function calculates tau, a measure of time spent within a certain interval (bin) in the [0^-'] ensemble. It does so by counting the number of times the order parameter falls within this interval defined by the bin_edges. It then multiplies this count by the timestep and divides by the width of the interval. If bin_edges is None or not a 2-element list, or if the order parameter data op_data is None, it just returns 0. Parameters ---------- op_data : list The order parameter data from the [0^-'] ensemble. bin_edges : list, optional The edges of the bin to calculate tau for. If None, it returns 0. timestep : float, optional The simulation timestep. Default is 1. Returns ------- tau : float The value of tau. """ if bin_edges is None or len(bin_edges) != 2 or op_data is None: return 0 order = op_data[1] bin_l, bin_r = tuple(bin_edges) delta = bin_r-bin_l count = sum([1 for i in order if bin_l < i <= bin_r]) return count*timestep/delta
def make_tau_ref_bins(interfaces, nbins=10, tau_region=None): """Make the reference bins for calculating tau. This function makes reference bins for calculating tau based on the [0^-'] interfaces and a number of bins. If the left interface is -inf and no tau region is provided, it returns a list with None. If a tua region is provided, it adjusts the left interface accordingly. It then divides the range from left to right interface into nbins equal parts and returns these as a list of tuples. Parameters ---------- interfaces : list The interfaces from the [0^-'] ensemble. nbins : int, optional The number of bins to divide the range into. Default is 10. tau_region : list, optional The region to calculate tau for. If None, it uses the whole range. Returns ------- ref_bins : list of tuples The reference bins for calculating tau. """ left, _, right = interfaces if left == float('-inf') and (tau_region is None or tau_region == []): return [None] if left == float('-inf'): # Take the left of the tau_region to be the middle left = right-2*(right-tau_region[0]) diff = (right-left)/nbins return [(left+diff*i, left+diff*(i+1)) for i in range(nbins)]
[docs]def skip_paths(weights, nacc, nskip): """Skips not counted paths. Parameters ---------- weights : array of integers The weights of the paths. nacc : integer Number of accepted paths. nskip : integer Number of paths to skip. Returns ------- new_weights : array of integers The new weights of the paths once skipped the nskip ones. new_acc : integer Recomputed number of accepted paths. """ new_weights = [] new_acc = nacc i = 0 while nskip > 0: try: weight = weights[i] except IndexError as err: raise RuntimeError("Skipping more trajs than available") from err new_weight = max(weight-nskip, 0) diff = weight-new_weight new_acc -= 1 nskip -= diff new_weights.append(new_weight) i += 1 # Slicing on an index that does not exist gives an empty list # and always works new_weights += weights[i:] return new_weights, new_acc
def cross_dist_distr(pp_dic): """Return the distribution of lambda values for the LMR and RML paths. It calculates the distribution of lambda_max values for LM* paths, and the distribution of lambda_min values for RM* paths. Parameters ---------- pp_dic : dict The partial-path dictionary. It contains the following keys: 'lmrs' : array of strings (type of paths: "LML", "LMR", ...) 'weights' : array of integers (weights of the paths) 'interfaces' : array of floats (interfaces of the pathensemble) 'ordermin' : array of floats (lambda_mins of the paths) 'ordermax' : array of floats (lambda_maxs of the paths) Returns ------- left : float The left interface of the pathensemble. middle : float The middle interface of the pathensemble. right : float The right interface of the pathensemble. percents : array of floats The distribution of lambda_max values for LM* paths, given at lambda values 'lambs' between middle and right interfaces. lambs : array of floats The lambda values at which the lambda_max distribution is given. percents2 : array of floats The distribution of lambda_min values for RM* paths, given at lambda values 'lambs2' between left and middle interfaces. lambs2 : array of floats The lambda values at which the lambda_min distribution is given. """ lmrs = pp_dic['lmrs'] weights = pp_dic['weights'] interfaces = pp_dic['interfaces'] ordermin = pp_dic['ordermin'] ordermax = pp_dic['ordermax'] # LM*: paths = select_with_or_masks(ordermax, [lmrs == "LML", lmrs == "LMR"]) repeat = np.repeat(paths, select_with_or_masks(weights, [lmrs == "LML", lmrs == "LMR"])) left, middle, right = interfaces[0], interfaces[1], interfaces[2] percents = [] lambs = np.linspace(middle, np.max(paths), 100) if paths.size != 0 else \ np.linspace(middle, right, 100) for i in lambs: percents.append(np.sum(repeat >= i)) percents = percents / percents[0] if percents else percents # RM*: paths2 = select_with_or_masks(ordermin, [lmrs == "RMR", lmrs == "RML"]) repeat2 = np.repeat(paths2, select_with_or_masks(weights, [lmrs == "RMR", lmrs == "RML"])) percents2 = [] lambs2 = np.linspace(np.min(paths2), middle, 100) if paths2.size != 0 \ else np.linspace(left, middle, 100) for i in lambs2: percents2.append(np.sum(repeat2 <= i)) percents2 = percents2 / percents2[-1] if percents2 else percents2 return left, middle, right, percents, lambs, percents2, lambs2 def select_with_or_masks(array, masks): """Select elements of array that are True in at least one of the masks. Parameters ---------- array : np.array Array to select elements from masks : list of masks where each mask is a boolean array with the same shape as array. Returns ------- np.array A new array with the elements of A that are True in at least one of the masks. """ # first check whether masks have the same shape as A for mask in masks: assert mask.shape == array.shape # now we can use the masks to select the elements of A union_mask = np.any(masks, axis=0).astype(bool) return array[union_mask]