Source code for pyretis.inout.fileio

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""Module defining the base classes for the PyRETIS output.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

FileIO (:py:class:`.FileIO`)
    A generic class for handling input & output with files.

Important methods defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

read_some_lines (:py:func:`.read_some_lines`)
    Method to read lines from PyRETIS data files.

"""
import os
import logging
from pyretis.inout.common import OutputBase, create_backup


logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = ['FileIO', 'read_some_lines']


[docs]class FileIO(OutputBase): """A generic class for handling IO with files. This class defines how PyRETIS stores and reads data. Formatting is handled by an object like :py:class:`.OutputFormatter` Attributes ---------- filename : string Name (e.g. path) to the file to read or write. file_mode : string Specifies the mode in which the file is opened. backup : boolean Determines the behavior if we want to write to a file that is already existing. fileh : object like :py:class:`io.IOBase` The file handle we are interacting with. last_flush : object like :py:class:`datetime.datetime` The previous time for flushing to the file. """ target = 'file'
[docs] def __init__(self, filename, file_mode, formatter, backup=True): """Set up the file object. Parameters ---------- filename : string The path to the file to open or read. file_mode : string Specifies the mode for opening the file. formatter : object like py:class:`.OutputFormatter` The object responsible for formatting output. backup : boolean, optional Defines how we handle cases where we write to a file which is already existing. """ super().__init__(formatter) self.filename = filename self.file_mode = file_mode if backup not in (True, False): logger.info('Setting backup to default: True') self.backup = True else: self.backup = backup self.fileh = None if self.file_mode.startswith('a') and self.formatter is not None: self.formatter.print_header = False self.last_flush = None
[docs] def open_file_read(self): """Open a file for reading.""" if not self.file_mode.startswith('r'): raise ValueError( f'Inconsistent file mode "{self.file_mode}" for reading' ) try: encoding = 'utf-8' if 'b' in self.file_mode else None self.fileh = open(self.filename, self.file_mode, encoding=encoding) except (OSError, IOError) as error: logger.critical( 'Could not open file "%s" for reading', self.filename ) logger.critical( 'I/O error ({%d}): {%s}', error.errno, error.strerror ) return self.fileh
[docs] def open_file_write(self): """Open a file for writing. In this method, we also handle the possible backup settings. """ if not self.file_mode[0] in ('a', 'w'): raise ValueError( f'Inconsistent file mode "{self.file_mode}" for writing' ) msg = [] try: if os.path.isfile(self.filename): msg = '' if self.file_mode.startswith('a'): logger.info( 'Appending to existing file "%s"', self.filename ) else: if self.backup: msg = create_backup(self.filename) logger.debug(msg) else: logger.debug( 'Overwriting existing file "%s"', self.filename ) encoding = 'utf-8' if 'b' in self.file_mode else None self.fileh = open(self.filename, self.file_mode, encoding=encoding) except (OSError, IOError) as error: # pragma: no cover logger.critical( 'Could not open file "%s" for writing', self.filename ) logger.critical( 'I/O error (%d): %d', error.errno, error.strerror ) return self.fileh
[docs] def open(self): """Open a file for reading or writing.""" if self.fileh is not None: logger.debug( '%s asked to open file, but it has already opened a file.', self.__class__.__name__ ) return self.fileh if self.file_mode[0] in ('r',): return self.open_file_read() if self.file_mode[0] in ('a', 'w'): return self.open_file_write() raise ValueError(f'Unknown file mode "{self.file_mode}"')
[docs] def load(self): """Read blocks or lines from the file.""" return self.formatter.load(self.filename)
[docs] def write(self, towrite, end='\n'): """Write a string to the file. Parameters ---------- towrite : string The string to output to the file. end : string, optional Appended to `towrite` when writing, can be used to print a new line after the input `towrite`. Returns ------- status : boolean True if we managed to write, False otherwise. """ status = False if towrite is None: return status if self.fileh is not None and not self.fileh.closed: try: if end is not None: self.fileh.write(f'{towrite}{end}') status = True else: self.fileh.write(towrite) status = True except (OSError, IOError) as error: # pragma: no cover msg = f'Write I/O error ({error.errno}): {error.strerror}' logger.critical(msg) self.flush() return status if self.fileh is not None and self.fileh.closed: logger.warning('Ignored writing to closed file %s', self.filename) if self.fileh is None: logger.critical( 'Attempting to write to empty file handle for file %s', self.filename ) return status
[docs] def close(self): """Close the file.""" if self.fileh is not None and not self.fileh.closed: try: self.flush() finally: self.fileh.close()
[docs] def flush(self): """Flush file buffers to file.""" if self.fileh is not None and not self.fileh.closed: self.fileh.flush() os.fsync(self.fileh.fileno())
[docs] def output(self, step, data): """Open file before first write.""" if self.first_write: self.open() return super().output(step, data)
[docs] def __del__(self): """Close the file in case the object is deleted.""" self.close()
[docs] def __enter__(self): """Context manager for opening the file.""" self.open() return self
[docs] def __exit__(self, *args): """Context manager for closing the file.""" self.close()
[docs] def __iter__(self): """Make it possible to iterate over lines in the file.""" return self
[docs] def __next__(self): """Let the file object handle the iteration.""" if self.fileh is None: raise StopIteration if self.fileh.closed: raise StopIteration return next(self.fileh)
[docs] def __str__(self): """Return basic info.""" msg = [f'FileIO (file: "{self.filename}")'] if self.fileh is not None and not self.fileh.closed: msg += ['\t* File is open'] msg += [f'\t* Mode: {self.fileh.mode}'] msg += [f'\t* Formatter: {self.formatter}'] return '\n'.join(msg)
[docs]def _read_line_data(ncol, stripline, line_parser): """Read data for :py:func:`.read_some_lines.`. Parameters ---------- ncol : integer The expected number of columns to read. If this is less than 1 it is not yet set. Note that we skip data which appear inconsistent. A warning will be issued about this. stripline : string The line to read. Note that we assume that leading and trailing spaces have been removed. line_parser : callable A method we use to parse a single line. """ if line_parser is None: # Just return data without any parsing: return stripline, True, ncol try: linedata = line_parser(stripline) except (ValueError, IndexError): return None, False, -1 newcol = len(linedata) if ncol == -1: # first item ncol = newcol if newcol == ncol: return linedata, True, ncol # We assume that this is line is malformed --- skip it! return None, False, -1
[docs]def read_some_lines(filename, line_parser, block_label='#'): """Open a file and try to read as many lines as possible. This method will read a file using the given `line_parser`. If the given `line_parser` fails at a line in the file, `read_some_lines` will stop here. Further, this method will read data in blocks and yield a block when a new block is found. A special string (`block_label`) is assumed to identify the start of blocks. Parameters ---------- filename : string This is the name/path of the file to open and read. line_parser : function, optional This is a function which knows how to translate a given line to a desired internal format. If not given, a simple float will be used. block_label : string, optional This string is used to identify blocks. Yields ------ data : list The data read from the file, arranged in dicts. """ ncol = -1 # The number of columns new_block = {'comment': [], 'data': []} yield_block = False read_comment = False with open(filename, 'r', encoding='utf-8') as fileh: for i, line in enumerate(fileh): stripline = line.strip() if stripline.startswith(block_label): # this is a comment, then a new block will follow, # unless this is a multi-line comment. if read_comment: # part of multi-line comment... new_block['comment'].append(stripline) else: if yield_block: # Yield the current block yield_block = False yield new_block new_block = {'comment': [stripline], 'data': []} yield_block = True # Data has been added ncol = -1 read_comment = True else: read_comment = False data, _yieldb, _ncol = _read_line_data(ncol, stripline, line_parser) if data: new_block['data'].append(data) ncol = _ncol yield_block = _yieldb else: logger.warning('Skipped malformed data in "%s", line: %i', filename, i) # if the block has not been yielded, yield it if yield_block: yield_block = False yield new_block