# -*- coding: utf-8 -*-
r"""
Calculate matricies of mean first passage times with graph transformation
-------------------------------------------------------------------------
.. note::
Install the `pathos` package to parallelize MFPT computations, with e.g.
.. code-block:: none
```
pip install pathos
```
"""
import numpy as np
from io import StringIO
import time,os, importlib
np.set_printoptions(linewidth=160)
from . import GT
import scipy as sp
import scipy.linalg as spla
""" test for ipython environment (is this being loaded from a notebook) """
try:
__IPYTHON__
except NameError:
in_notebook = False
else:
in_notebook = True
""" test for tqdm progress bars """
try:
if in_notebook:
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
has_tqdm=True
except:
has_tqdm=False
try:
from pathos.multiprocessing import ProcessingPool as Pool
have_pathos = True
except:
have_pathos = False
[docs]def full_MFPT_matrix(B, tau, pool_size=1, screen=False, **kwargs):
r"""Compute full matrix of inter-microstate MFPTs with GT.
Parameters
----------
B : sparse or dense matrix (N,N)
branching probability matrix.
tau : array-like (N,)
vector of waiting times from each node.
pool_size : int,optional
Number of cores over which to parallelize computation.
only attempted if ``pool_size>1`` and ``pathos`` package is installed.
Default=1.
screen: bool, optional
Show progress bar. Default=False
Returns
-------
mfpt : np.ndarray[float64] (N,N)
matrix of inter-microstate MFPTs between all pairs of nodes
"""
N = tau.size
mfpt = np.zeros((N,N))
i_a = np.arange(N*N) % N
j_a = np.arange(N*N) // N
matrix_elements = np.vstack((i_a,j_a)).T[i_a<j_a,:]
if screen and has_tqdm:
pbar = tqdm(total=len(matrix_elements),leave=True,mininterval=0.0,desc='MFPT matrix computation')
def given_ij(ij):
i, j = ij
if screen and has_tqdm:
if have_pathos and pool_size>1:
pbar.update(pool_size)
else:
pbar.update(1)
MFPTAB, MFPTBA = compute_MFPT(i, j, B, tau, **kwargs)
return MFPTAB, MFPTBA
if have_pathos and pool_size>1:
with Pool(processes=pool_size) as p:
results = p.map(given_ij, matrix_elements)
for k, result in enumerate(results):
i, j = matrix_elements[k]
mfpt[i][j], mfpt[j][i] = result
else:
for ij in matrix_elements:
i, j = ij
mfpt[i][j], mfpt[j][i] = given_ij(ij)
if screen and has_tqdm:
pbar.close()
return mfpt
[docs]def compute_MFPT(i, j, B, tau, block=1, **kwargs):
r"""Compute the inter-microstate :math:`i\leftrightarrow j` MFPT using GT.
Called by ``full_MFPT_matrix()``. Unlike ``compute_rates()`` function,
which assumes there is at least 2 microstates in the absorbing macrostate,
this function does not require knowledge of equilibrium
occupation probabilities since :math:`\mathcal{T}_{ij}=\tau_j^\prime/B_{ij}^\prime`
when there are only two nodes remaining after GT
Parameters
----------
i : int
node-ID (0-indexed) of first microstate.
j : int
node-ID (0-indexed) of second microstate.
B : sparse or dense matrix (N,N)
branching probability matrix.
tau : array-like (N,)
vector of waiting times from each node.
block : int, optional
block size for matrix generalization GT procedure.
Reverts to slower but guaranteed stable one-by-one GT (block=1)
when ``numpy`` matrix inversion routine raises errors. Default=10
Returns
-------
MFPTij : float
mean first passage time :math:`i \leftarrow j`
MFPTji : float
mean first passage time :math:`j \leftarrow i`
"""
N = B.shape[0]
AS = np.zeros(N, bool)
AS[i] = True
BS = np.zeros(N, bool)
BS[j] = True
#GT away all I states
inter_region = ~(AS+BS)
#left with a 2-state network
rB, tau_Fs = GT.blockGT(rm_vec=inter_region,B=B,tau=tau,rates=False,block=block,**kwargs)
rD = 1.0/tau_Fs
rN = tau_Fs.size
#remaining network only has 1 in A and 1 in B = 2 states
r_AS = AS[~inter_region]
r_BS = BS[~inter_region]
#tau_a^F / P_Ba^F
P_BA = rB[r_BS, :][:, r_AS]
P_AB = rB[r_AS, :][:, r_BS]
MFPTBA = tau_Fs[r_AS]/P_BA
MFPTAB = tau_Fs[r_BS]/P_AB
return MFPTAB[0,0], MFPTBA[0,0]