Source code for pyretis.inout.plotting.txt_plotting

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""A class for writing text data for the analysis.

This module defines a text plotter which supports the same method as
the generic plotter, however, the output is human-readable text.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

TxtPlotter (:py:class:`.TxtPlotter`)
    A class for writing text output.
"""
import gzip
import logging
import os
import shutil
import numpy as np
from pyretis.inout.plotting.plotting import Plotter
from pyretis.inout.common import name_file
from pyretis.inout.common import (ENERFILES, ENERTITLE, FLUXFILES,
                                  ORDERFILES, PATHFILES, PATH_MATCH)
from pyretis.inout.formats.txt_table import txt_save_columns

np.set_printoptions(legacy='1.25')

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['TxtPlotter']


[docs]class TxtPlotter(Plotter): """A plotter writing text-based output. This class will just write text-based output. It is similar to (:py:class:`.MplPlotter`) in the sense that the same functions are supported. Here, however, we do not plot any figures, we just write a column based text file with the results. Attributes ---------- out_fmt : string Selects format for output plots. """
[docs] def __init__(self, out_fmt, backup=False, out_dir=None): """Initialise the text writer. Parameters ---------- out_fmt : string This is the format to use for the images. backup : boolean, optional Determines if we should overwrite or backup old files. out_dir : string, optional Determines if we should write the files to a particular directory. """ super().__init__(backup=backup, plotter_type='text', out_dir=out_dir) self.out_fmt = out_fmt
[docs] def output_flux(self, results): """Store the output from the flux analysis in text files. Parameters ---------- results : dict This is the dict with the results from the flux analysis. Returns ------- outputfile : list of dicts A list containing the files created for the flux and for the error in the flux. """ outfiles = [] # make running average txt and error txt: for i in range(len(results['flux'])): flux = results['flux'][i] runflux = results['runflux'][i] errflux = results['errflux'][i] outfile = name_file(FLUXFILES['runflux'].format(i + 1), self.out_fmt, path=self.out_dir) outfiles.append(outfile) # output running average: txt_save_columns(outfile, 'Time, running average', (flux[:, 0], runflux), backup=self.backup) # output block-error results: outfile = name_file(FLUXFILES['block'].format(i + 1), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_block_error(outfile, 'Block error for flux analysis', errflux, backup=self.backup) return outfiles
[docs] def output_energy(self, results, energies): """Save the output from the energy analysis to text files. Parameters ---------- results : dict Each item in `results` contains the results for the corresponding energy. It is assumed to contains the keys 'vpot', 'ekin', 'etot', 'ham', 'temp', 'elec'. energies : numpy.array This is the raw data for the energy analysis. Returns ------- outfiles : list The output files created by this function. """ outfiles = [] time = energies['time'] # 1) Store the running average: header = ['Running average of energy data: time'] data = [time] for key in ['vpot', 'ekin', 'etot', 'ham', 'temp', 'ext']: if key in results: data.append(results[key]['running']) header.append(key) headertxt = ' '.join(header) outfile = name_file(ENERFILES['run_energies'], self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, headertxt, data, backup=self.backup) # 2) Save block error data: for key in ['vpot', 'ekin', 'etot', 'temp']: if key in results: outfile = name_file(ENERFILES['block'].format(key), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_block_error(outfile, ENERTITLE[key], results[key]['blockerror'], backup=self.backup) # 3) Save histograms: for key in ['vpot', 'ekin', 'etot', 'temp']: if key in results: outfile = name_file(ENERFILES['dist'].format(key), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_histogram(outfile, f'Histogram for {ENERTITLE[key]}', [results[key]['distribution']], backup=self.backup) return outfiles
[docs] def output_orderp(self, results, orderdata): """Save the output from the order parameter analysis to text files. Parameters ---------- results : dict Each item in `results` contains the results for the corresponding order parameter. orderdata : numpy.array This is the raw data for the order parameter analysis Returns ------- outfiles : list The output files created by this function. Note ---- We are here only outputting results for the first order parameter. I.e. other order parameters or velocities are not written here. This will be changed when the structure of the output order parameter file has been fixed. Also note that, if present, the first order parameter will be plotted against the second one - i.e. the second one will be assumed to represent the velocity here. """ outfiles = [] time = orderdata[:, 0] # output running average: outfile = name_file(ORDERFILES['run_order'], self.out_fmt, path=self.out_dir) txt_save_columns(outfile, 'Time, running average', (time, results[0]['running']), backup=self.backup) outfiles.append(outfile) # output block-error results: outfile = name_file(ORDERFILES['block'], self.out_fmt, path=self.out_dir) _txt_block_error(outfile, 'Block error for order param', results[0]['blockerror'], backup=self.backup) outfiles.append(outfile) # output distributions: outfile = name_file(ORDERFILES['dist'], self.out_fmt, path=self.out_dir) _txt_histogram(outfile, 'Order parameter', [results[0]['distribution']], backup=self.backup) outfiles.append(outfile) # output msd if it was calculated: if 'msd' in results[0]: msd = results[0]['msd'] outfile = name_file(ORDERFILES['msd'], self.out_fmt, path=self.out_dir) txt_save_columns(outfile, 'Time MSD Std', (time[:len(msd)], msd[:, 0], msd[:, 1]), backup=self.backup) outfiles.append(outfile) # TODO: time c/should here be multiplied with the correct dt return outfiles
[docs] def output_path(self, results, path_ensemble): """Output all the results obtained by the path analysis. Parameters ---------- results : dict This dict contains the result from the analysis. path_ensemble : object like :py:class:`.PathEnsemble` This is the path ensemble we have analysed. Returns ------- outfiles : list The output files created by this function. """ ens = path_ensemble.ensemble_name # identify the ensemble ens_simplified = path_ensemble.ensemble_name_simple detect = results['detect'] outfiles = [] if 'pcross' in results: # 1) Output pcross vs lambda: outfile = name_file(PATHFILES['pcross'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, f'Ensemble: {ens}, detect: {detect}', [results['pcross'][0], results['pcross'][1]], backup=self.backup) if 'prun' in results: # 2) Output the running average of p: outfile = name_file(PATHFILES['prun'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, f'Ensemble: {ens}', [results['prun']], backup=self.backup) if 'blockerror' in results: # 3) Block error results: outfile = name_file(PATHFILES['perror'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_block_error(outfile, f'Ensemble: {ens}', results['blockerror'], backup=self.backup) # 3) Length histograms outfile = name_file(PATHFILES['pathlength'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_histogram(outfile, 'Histograms for acc and all', [results['pathlength'][0], results['pathlength'][1]], backup=self.backup) # 4) Shoot histograms outfile = name_file(PATHFILES['shoots'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_shoots_histogram(outfile, results['shoots'][0], results['shoots'][1], ens, backup=self.backup) return outfiles
[docs] def output_pppath(self, results, path_ensemble): """Output all the results obtained by the repptis path analysis. Parameters ---------- results : dict This dict contains the result from the analysis. path_ensemble : object like :py:class:`.PathEnsemble` This is the path ensemble we have analysed. Returns ------- outfiles : list The output files created by this function. """ ens = path_ensemble.ensemble_name # identify the ensemble ens_simplified = path_ensemble.ensemble_name_simple detect = results['detect'] outfiles = [] if 'pcross' in results: # 1) Output pcross vs lambda: outfile = name_file(PATHFILES['pcross'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, f'Ensemble: {ens}, detect: {detect}', [results['pcross'][0], results['pcross'][1]], backup=self.backup) if 'prun_sl' in results: # 2) Output the running average of p: outfile = name_file(PATHFILES['prun_sl'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, f'Ensemble: {ens}', [results['prun_sl']], backup=self.backup) if 'prun_sr' in results: # 2) Output the running average of p: outfile = name_file(PATHFILES['prun_sr'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) txt_save_columns(outfile, f'Ensemble: {ens}', [results['prun_sr']], backup=self.backup) if 'blockerror_sl' in results: # 3) Block error results: outfile = name_file(PATHFILES['perror_sl'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_block_error(outfile, f'Ensemble: {ens}', results['blockerror_sl'], backup=self.backup) if 'blockerror_sr' in results: # 3) Block error results: outfile = name_file(PATHFILES['perror_sr'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_block_error(outfile, f'Ensemble: {ens}', results['blockerror_sr'], backup=self.backup) # 3) Length histograms outfile = name_file(PATHFILES['pathlength'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_histogram(outfile, 'Histograms for acc and all', [results['pathlength'][0], results['pathlength'][1]], backup=self.backup) # 4) Shoot histograms outfile = name_file(PATHFILES['shoots'].format(ens_simplified), self.out_fmt, path=self.out_dir) outfiles.append(outfile) _txt_shoots_histogram(outfile, results['shoots'][0], results['shoots'][1], ens, backup=self.backup) return outfiles
[docs] def output_matched_probability(self, path_ensembles, detect, matched, reptis=False): """Output the matched probabilities to a text file. This function will output the matched probabilities for the different ensembles and also output the over-all matched probability. Parameters ---------- path_ensembles : list of strings This is the names for the path ensembles we have calculated the probability for. detect : list of floats These are the detect interfaces used in the analysis. matched : dict This dict contains the results from the matching of the probabilities. We make use of `matched['overall-prob']` and `matched['matched-prob']` here. Returns ------- outfiles : list The files created by this function. """ outfiles = [] # start by creating the matched file, here we use a custom # file writer: if not reptis: outfile = name_file(PATH_MATCH['match'], self.out_fmt, path=self.out_dir) use_gzip = outfile.endswith('.gz') if use_gzip: outfile = outfile[:-3] with open(outfile, 'wb') as fhandle: for prob, ens, idet in zip(matched['matched-prob'], path_ensembles, detect): header = f'Ensemble: {ens}, idetect: {idet}' np.savetxt(fhandle, prob, header=header) if use_gzip: with open(outfile, 'rb') as fhandle: with gzip.open(f'{outfile}.gz', 'wb') as fhandle_out: shutil.copyfileobj(fhandle, fhandle_out) os.remove(outfile) outfile = f'{outfile}.gz' outfiles.append(outfile) # output the over-all matched probability: if reptis: matched['overall-prob'] = np.array([detect, matched['pcross']]).T outfile = name_file(PATH_MATCH['total'], self.out_fmt, path=self.out_dir) interf = ' , '.join([str(idet) for idet in detect]) header = 'Total matched probability. Interfaces: {}' txt_save_columns(outfile, header.format(interf), (matched['overall-prob'][:, 0], matched['overall-prob'][:, 1]), backup=self.backup) outfiles.append(outfile) # output the prun matched probability: if 'overall-rrun' in matched: outfile = name_file('overall-rrun', self.out_fmt, path=self.out_dir) interf = ' , '.join([str(idet) for idet in detect]) header = 'Running average overall rate. Interfaces: {}' txt_save_columns(outfile, header.format(interf), (matched['overall-cycle'], matched['overall-rrun']), backup=self.backup) outfiles.append(outfile) if 'overall-prun' in matched: outfile = name_file('overall-prun', self.out_fmt, path=self.out_dir) interf = ' , '.join([str(idet) for idet in detect]) header = 'Running average crossing probability. Interfaces: {}' txt_save_columns(outfile, header.format(interf), (matched['overall-cycle'], matched['overall-prun']), backup=self.backup) outfiles.append(outfile) return outfiles
[docs]def _txt_block_error(outputfile, title, error, backup=False): """Write the output from the error analysis to a text file. Parameters ---------- outputfile : string This is the name of the output file to create. title : string This is an identifier/title to add to the header, e.g. 'Ensemble: 001', 'Kinetic energy', etc. error : list This is the result of the error analysis. backup : boolean, optional Determines if we will do a backup of old files or not. """ header = '{0}, Rel.err: {1:9.6e}, Ncor: {2:9.6f}' header = header.format(title, error[4], error[6]) txt_save_columns(outputfile, header, (error[0], error[3]), backup=backup)
[docs]def _txt_histogram(outputfile, title, histograms, backup=False): """Write histograms to a text file. Parameters ---------- outputfile : string This is the name of the output file to create. title : string A descriptive title to add to the header. histograms : tuple or list The histograms to store. backup : boolean, optional Determines if we will do a backup of old files or not. """ data = [] header = [f'{title}'] for hist in histograms: header.append(f'avg: {hist[2][0]:6.2f}, std: {hist[2][1]:6.2f}') data.append(hist[1]) data.append(hist[0]) headertxt = ', '.join(header) txt_save_columns(outputfile, headertxt, data, backup=backup)
[docs]def _txt_shoots_histogram(outputfile, histograms, scale, ensemble, backup=False): """Write the histograms from the shoots analysis to a text file. Parameters ---------- histograms : dict These are the histograms obtained in the shoots analysis. scale : dict These are the scale factors for normalising the histograms obtained in the shoots analysis. ensemble : string This is the ensemble identifier, e.g. 001, 002, etc. outputfile : string This is the name of the output file to create. backup : boolean, optional Determines if we will do a backup of old files or not. """ data = [] header = [f'Ensemble: {ensemble}'] for key in ['ACC', 'REJ', 'BWI', 'ALL']: try: mid = histograms[key][2] hist = histograms[key][0] hist_scale = hist * scale[key] data.append(mid) data.append(hist) data.append(hist_scale) header.append(f'{key} (mid, hist, hist*scale)') except KeyError: continue headertxt = ', '.join(header) txt_save_columns(outputfile, headertxt, data, backup=backup)