Source code for pyretis.core.random_gen

# -*- coding: utf-8 -*-
# Copyright (c) 2023, PyRETIS Development Team.
# Distributed under the LGPLv2.1+ License. See LICENSE for more info.
"""This module handles random number generation.

It derives most of the random number procedures from `RandomState` in
`numpy.random` and defines a class which used `RandomState` to generate
pseudo-random numbers.

Important classes defined here
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

RandomGeneratorBase (:py:class:`.RandomGeneratorBase`)
    An abstract base class for random number generators.

RandomGenerator (:py:class:`.RandomGenerator`)
    A class representing a random number generator.

ReservoirSampler (:py:class:`.ReservoirSampler`)
    A class for reservoir sampling.

MockRandomGenerator (:py:class:`.MockRandomGenerator`)
    A non-random number generator which is useful for testing.
"""
from abc import ABCMeta, abstractmethod
import logging
import numpy as np
from numpy.random import RandomState
from pyretis.core.particlefunctions import (calculate_kinetic_temperature,
                                            reset_momentum)
logger = logging.getLogger(__name__)  # pylint: disable=invalid-name
logger.addHandler(logging.NullHandler())


__all__ = [
    'RandomGeneratorBase',
    'RandomGenerator',
    'ReservoirSampler',
    'MockRandomGenerator',
    'create_random_generator'
]


[docs]class RandomGeneratorBase(metaclass=ABCMeta): """A base class for random number generators. This is a base class for random number generators. It does not actually implement a generator. Attributes ---------- seed : int A seed for the generator """
[docs] def __init__(self, seed=0): """Initialise the random number generator. Parameters ---------- seed : int, optional An integer used for seeding the generator if needed. """ self.seed = seed
[docs] @abstractmethod def rand(self, shape=1): """Draw random numbers in [0, 1). Parameters ---------- shape : int, optional The number of numbers to draw Returns ------- out : float Pseudo-random number in [0, 1) """ return
[docs] @abstractmethod def get_state(self): """Return info about the current state.""" return
[docs] @abstractmethod def set_state(self, state): """Set state for random generator.""" return
[docs] @abstractmethod def random_integers(self, low, high): """Draw random integers in [low, high]. Parameters ---------- low : int This is the lower limit high : int This is the upper limit Returns ------- out : int The pseudo-random integers in [low, high]. """ return
[docs] @abstractmethod def normal(self, loc=0.0, scale=1.0, size=None): """Return values from a normal distribution. Parameters ---------- loc : float, optional The mean of the distribution scale : float, optional The standard deviation of the distribution size : int, tuple of ints, optional Output shape, i.e. how many values to generate. Default is None which is just a single value. Returns ------- out : float, numpy.array of floats The random numbers generated. """ return
[docs] @abstractmethod def multivariate_normal(self, mean, cov, cho=None, size=1): """Draw numbers from a multi-variate distribution. Parameters ---------- mean : numpy array (1D, 2) Mean of the N-dimensional array cov : numpy array (2D, (2, 2)) Covariance matrix of the distribution. cho : numpy.array (2D, (2, 2)), optional Cholesky factorization of the covariance. If not given, it will be calculated here. size : int, optional The number of samples to do. Returns ------- out : float or numpy.array of floats size The random numbers that are drawn here. """ return
[docs] def generate_maxwellian_velocities(self, particles, boltzmann, temperature, dof, selection=None, momentum=True): """Generate velocities from a Maxwell distribution. The velocities are drawn to match a given temperature and this function can be applied to a subset of the particles. The generation is done in three steps: 1) We generate velocities from a standard normal distribution. 2) We scale the velocity of particle `i` with ``1.0/sqrt(mass_i)`` and reset the momentum. 3) We scale the velocities to the set temperature. Parameters ---------- particles : object like :py:class:`.Particles` These are the particles to set the velocity of. boltzmann : float The Boltzmann factor in correct units. temperature : float The desired temperature. Typically, `system.temperature['set']` will be used here. dof : list of floats, optional The degrees of freedom to subtract. Its shape should be equal to the number of dimensions. selection : list of ints, optional A list with indices of the particles to consider. Can be used to only apply it to a selection of particles momentum : boolean, optional If true, we will reset the momentum. Returns ------- out : None Returns `None` but modifies velocities of the selected particles. """ if selection is None: vel, imass = particles.vel, particles.imass else: vel, imass = particles.vel[selection], particles.imass[selection] vel = np.sqrt(imass) * self.normal(loc=0.0, scale=1.0, size=vel.shape) # NOTE: x[None] = x for a numpy.array - this is not valid for a list. particles.vel[selection] = vel if momentum: reset_momentum(particles, selection=selection) _, avgtemp, _ = calculate_kinetic_temperature(particles, boltzmann, dof=dof, selection=selection) scale_factor = np.sqrt(temperature/avgtemp) particles.vel[selection] *= scale_factor
[docs] def draw_maxwellian_velocities(self, system, sigma_v=None): """Draw numbers from a Gaussian distribution. Parameters ---------- system : object like :py:class:`.System` This is used to determine the temperature parameter(s) and the shape (number of particles and dimensionality) sigma_v : numpy.array, optional The standard deviation in velocity, one for each particle. If it's not given it will be estimated. """ if not sigma_v or sigma_v < 0.0: kbt = 1.0/system.temperature['beta'] # sigma_v is (n, 1) matrix sigma_v = np.sqrt(kbt*system.particles.imass) npart, dim = system.particles.vel.shape vel = self.normal(loc=0.0, scale=sigma_v, size=(npart, dim)) return vel, sigma_v
[docs]class RandomGenerator(RandomGeneratorBase): """A random number generator from numpy. This class that defines a random number generator. It will use `numpy.random.RandomState` for the actual generation and we refer to the numpy documentation [1]_. Attributes ---------- seed : int A seed for the pseudo-random generator. rgen : object like :py:class:`numpy.random.RandomState` This is a container for the Mersenne Twister pseudo-random number generator as implemented in numpy [#]_. References ---------- .. [#] The NumPy documentation on RandomState, http://docs.scipy.org/doc/numpy/reference/generated/numpy.random.RandomState.html """
[docs] def __init__(self, seed=0): """Initialise the random number generator. If a seed is given, the random number generator will be seeded. Parameters ---------- seed : int, optional An integer used for seeding the generator if needed. """ super().__init__(seed=seed) self.rgen = RandomState(seed=seed)
[docs] def choice(self, a, size=None, replace=True, p=None): """Chooses random samples. Parameters ---------- a : iterable, int The group to choose from, an int is converted to range(a). size : int, optional The number of samples to choose. replace : bool, optional If to replace samples between draws, default True. p : iterable, optional The probabilities for every option in a, default is an uniform dirstribution. Returns ------- choice : array-like The picked choices. """ return self.rgen.choice(a, size, replace, p)
[docs] def rand(self, shape=1): """Draw random numbers in [0, 1). Parameters ---------- shape : int, optional The number of numbers to draw Returns ------- out : float Pseudo-random number in [0, 1) Note ---- Here, we will just draw a list of numbers and not for an arbitrary shape. """ return self.rgen.rand(shape)
[docs] def get_state(self): """Return current state.""" state = {'seed': self.seed, 'state': self.rgen.get_state(), 'rgen': 'rgen'} return state
[docs] def set_state(self, state): """Set state for random generator.""" return self.rgen.set_state(state['state'])
[docs] def random_integers(self, low, high): """Draw random integers in [low, high]. Parameters ---------- low : int This is the lower limit high : int This is the upper limit Returns ------- out : int The pseudo-random integers in [low, high]. Note ---- np.random.randint(low, high) is defined as drawing from `low` (inclusive) to `high` (exclusive). """ return self.rgen.randint(low, high + 1)
[docs] def normal(self, loc=0.0, scale=1.0, size=None): """Return values from a normal distribution. Parameters ---------- loc : float, optional The mean of the distribution scale : float, optional The standard deviation of the distribution size : int, tuple of ints, optional Output shape, i.e. how many values to generate. Default is None which is just a single value. Returns ------- out : float, numpy.array of floats The random numbers generated. """ return self.rgen.normal(loc=loc, scale=scale, size=size)
[docs] def multivariate_normal(self, mean, cov, cho=None, size=1): """Draw numbers from a multi-variate distribution. This is an attempt on speeding up the call of `RandomState.multivariate_normal` if we need to call it over and over again. Such repeated calling will do an SVD repeatedly, which is wasteful. In this function, this transform can be supplied and it is only estimated if it's not explicitly given. Parameters ---------- mean : numpy array (1D, 2) Mean of the N-dimensional array cov : numpy array (2D, (2, 2)) Covariance matrix of the distribution. cho : numpy.array (2D, (2, 2)), optional Cholesky factorization of the covariance. If not given, it will be calculated here. size : int, optional The number of samples to do. Returns ------- out : float or numpy.array of floats size The random numbers that are drawn here. See also -------- numpy.random.multivariate_normal """ if cho is None: cho = np.linalg.cholesky(cov) norm = self.normal(loc=0.0, scale=1.0, size=2*size) norm = norm.reshape(size, 2) meanm = np.array([mean, ] * size) return meanm + np.dot(norm, cho.T)
[docs]class ReservoirSampler: """A class representing a reservoir sampler. The reservoir sampler will maintain a list of `k` items drawn randomly from a set of `N > k` items. The list is created and maintained so that we only need to store `k`items This is useful when `N` is very large or when storing all `N` items require a lot of memory. The algorithm is described by Knuth [#]_ but here we do a variation, so that each item may be picked several times. Attributes ---------- rgen : object like :py:class:`numpy.random.RandomState` This is a container for the Mersenne Twister pseudo-random number generator as implemented in numpy, see the documentation of `RandomGenerator`. items : integer The number of items seen so far, i.e. the current `N`. reservoir : list The items we have stored. length : integer The maximum number of items to store in the reservoir (i.e. `k`). ret_idx : integer This is the index of the item to return if we are requesting items from the reservoir. References ---------- .. [#] The Art of Computer Programming. """
[docs] def __init__(self, seed=0, length=10, rgen=None): """Initialise the reservoir. Parameters ---------- seed : int, optional An integer used for seeding the generator. length : int, optional The maximum number of items to store. rgen : object like :py:class:`.RandomGenerator`, optional In case we want to re-use a random generator object. If this is specified, the parameter `seed` is ignored. """ if rgen is not None: self.rgen = rgen else: self.rgen = RandomState(seed=seed) self.items = 0 self.reservoir = [] self.length = length self.ret_idx = 0
[docs] def append(self, new_item): """Try to add an item to the reservoir. Parameters ---------- new_item : any type This is the item we try to add to the reservoir. """ self.items += 1 if self.items == 1: self.reservoir = [new_item for _ in range(self.length)] else: factor = 1.0/float(self.items) for i in range(self.length): if self.rgen.rand() < factor: self.reservoir[i] = new_item
[docs] def get_item(self): """Return the next item from the reservoir. Returns ------- out : any type Returns an item from the reservoir. """ if self.ret_idx >= self.length: self.ret_idx = 0 msg = ['Out of bounds in the reservoir sampler!'] msg += ['Please increase the size of the reservoir.'] msgtxt = '\n'.join(msg) logger.critical(msgtxt) ret = self.reservoir[self.ret_idx] self.ret_idx += 1 return ret
[docs]class MockRandomGenerator(RandomGeneratorBase): """A **mock** random generator, useful **only for testing**. This class represents a random generator that can be used for testing algorithms. It will simply return numbers from a small list of pseudo-random numbers. This class is only useful for testing algorithms on different systems. It should *NEVER* be used for actual production runs! """
[docs] def __init__(self, seed=0, norm_shift=False): """Initialise the mock random number generator. Here, we set up some predefined random number which we will use as a pool for the generation. Parameters ---------- seed : int, optional An integer used for seeding the generator if needed. norm_shift: boolean If True is will ensure that the fake 'normal distribution' is shifted to get the right mean. """ super().__init__(seed=seed) self.rgen = [0.78008018, 0.04459916, 0.76596775, 0.97676713, 0.53799598, 0.98657116, 0.36343553, 0.55356511, 0.03172585, 0.48984682, 0.73416687, 0.98453452, 0.55129902, 0.40598753, 0.59448394, 0.26823255, 0.31168372, 0.05072849, 0.44876368, 0.94301709] self.length = len(self.rgen) self.randint = self.seed self.norm_shift = norm_shift logger.critical('You are using a "mock" random generator!\n') if norm_shift: logger.critical('Fake-normal is shifted.\n') logger.critical('Comparison with TISMOL might fail\n') else: logger.critical('Fake-normal is not shifted.\n') logger.critical('Random numbers not centered around 0.\n')
[docs] def rand(self, shape=1): """Draw random numbers in [0, 1). Parameters ---------- shape : int The number of numbers to draw. Returns ------- out : float Pseudo-random number in [0, 1). """ numbers = [] for _ in range(shape): if self.seed >= self.length: self.seed = 0 numbers.append(self.rgen[self.seed]) self.seed += 1 return np.array(numbers)
[docs] def get_state(self): """Return current state.""" state = {'seed': self.seed, 'state': self.seed, 'rgen': 'mock'} return state
[docs] def set_state(self, state): """Set current state.""" self.seed = state['state']
[docs] def random_integers(self, low, high): """Return random integers in [low, high]. Parameters ---------- low : int This is the lower limit high : int This is the upper limit Returns ------- out : int This is a pseudo-random integer in [low, high]. """ idx = self.rand()[0]*(high-low+1) return int(idx) + low
[docs] def normal(self, loc=0.0, scale=1.0, size=None): """Return values from a normal distribution. Parameters ---------- loc : float, optional The mean of the distribution scale : float, optional The standard deviation of the distribution size : int, tuple of ints, optional Output shape, i.e. how many values to generate. Default is None which is just a single value. Returns ------- out : float, numpy.array of floats The random numbers generated. Note ---- This is part of the Mock-random number generator. Hence, it won't provide a true normal distribution, though the mean is set to the value of loc. """ if self.norm_shift: shift = loc-0.5 else: shift = 0. if size is None: return self.rand(shape=1)+shift numbers = np.zeros(size) for i in np.nditer(numbers, op_flags=['readwrite']): i[...] = self.rand(shape=1)[0] + shift return numbers
[docs] def multivariate_normal(self, mean, cov, cho=None, size=1): """Draw numbers from a multi-variate distribution. This is an attempt on speeding up the call of `RandomState.multivariate_normal` if we need to call it over and over again. Such repeated calling will do an SVD repeatedly, which is wasteful. In this function, this transform can be supplied and it is only estimated if it's not explicitly given. Parameters ---------- mean : numpy array (1D, 2) Mean of the N-dimensional array cov : numpy array (2D, (2, 2)) Covariance matrix of the distribution. cho : numpy.array (2D, (2, 2)), optional Cholesky factorization of the covariance. If not given, it will be calculated here. size : int, optional The number of samples to do. Returns ------- out : float or numpy.array of floats size The random numbers that are drawn here. See also -------- numpy.random.multivariate_normal """ norm = self.normal(loc=0.0, scale=1.0, size=2*size) norm = norm.reshape(size, 2) meanm = np.array([mean, ] * size) return 0.01*(meanm + norm)
[docs] def choice(self, a, size=None, replace=True, p=None): """Choose random samples. Parameters ---------- a : iterable The group to choose from. size : int, optional The number of samples to choose. replace : bool, optional If to replace samples between draws, default True. p : iterable, optional The probabilities for every option in a, default is an uniform dirstribution. Returns ------- choice : array-like The picked choices. """ if isinstance(a, int): a = list(range(a)) if p is None: p = [1/len(a) for i in a] if size is None: size = 1 out = [] for _ in range(size): rand = self.rand() prob0 = 0 # Make sure p sums to 1 even after popping p_sum = sum(p) p = [i/p_sum for i in p] for i, probi in zip(a, p): prob0 += probi if rand < prob0: out.append(i) if not replace: idx = a.index(i) _ = a.pop(idx) _ = p.pop(idx) break return out
class Borg: """A class for sharing states of objects.""" class_state = None number_of_borgs = 0 @classmethod def update_state(cls, state): """Update the class state and enable sharing of it.""" if cls.class_state is None: logger.debug('Setting state to the shared.') cls.class_state = state else: logger.debug('Reusing shared state.') return cls.class_state @classmethod def reset_state(cls): """Remove memory of the shared state.""" cls.class_state = None @classmethod def make_new_swarm(cls): """Make new swarm (new class).""" name = cls.__name__ + str(cls.number_of_borgs) cls.number_of_borgs += 1 new_borg = type(name, cls.__bases__, dict(cls.__dict__)) new_borg.reset_state() # pylint: disable=no-member return new_borg def __init__(self, *args, **kwargs): """Enable share of the state. This will update the Borg.class_state and update self.__dict__ to this object, which enables sharing of the state. """ super().__init__(*args, **kwargs) self.__dict__ = self.update_state(self.__dict__) class RandomGeneratorBorg(Borg, RandomGenerator): """A class for sharing the state between RandomGenerator objects.""" class MockRandomGeneratorBorg(Borg, MockRandomGenerator): """A class for sharing the state between MockRandomGenerator objects."""
[docs]def create_random_generator(settings=None): """Create a random generator from given settings. Parameters ---------- settings : dict, optional This is the dict used for creating the random generator. Currently, we will actually just look for a seed value. Returns ------- out : object like :py:class:`.RandomGenerator` The random generator created. """ if settings is None: settings = {} rgen = settings.get('rgen', 'rgen') seed = settings.get('seed', 0) logger.debug('Seed for random generator: %s %d', rgen, seed) class_map = { 'rgen': RandomGenerator, 'rgen-borg': RandomGeneratorBorg, 'mock': MockRandomGenerator, 'mock-borg': MockRandomGeneratorBorg } rgen_class = class_map.get(rgen, RandomGenerator) rgen = rgen_class(seed=seed) if 'state' in settings: rgen.set_state(settings) rgen.status = 'restarted' else: rgen.status = 'new rgen' return rgen