# -*- coding: utf-8 -*-
r"""
Calculate first passage statistics between macrostates
------------------------------------------------------
Tools to calculate the first passage time distribution
and phenomenological rate constants between endpoint macrostates
:math:`\mathcal{A}` and :math:`\mathcal{B}`.
.. note::
Install the `pathos` package to parallelize MFPT computations.
"""
import numpy as np
from io import StringIO
import time,os, importlib
#from tqdm import tqdm
np.set_printoptions(linewidth=160)
from . import io as kio
from . import GT
from scipy.sparse import save_npz,load_npz, diags, eye, csr_matrix,bmat
from scipy.sparse.linalg import eigs,inv,spsolve
import scipy as sp
import scipy.linalg as spla
""" test for ipython environment (is this being loaded from a notebook) """
try:
__IPYTHON__
except NameError:
in_notebook = False
else:
in_notebook = True
""" test for tqdm progress bars """
try:
if in_notebook:
from tqdm import tqdm_notebook as tqdm
else:
from tqdm import tqdm
has_tqdm=True
except:
has_tqdm=False
try:
from pathos.multiprocessing import ProcessingPool as Pool
have_pathos = True
except:
have_pathos = False
[docs]def compute_passage_stats(A_sel, B_sel, pi, K, dopdf=True,rt=None):
r"""Compute the A->B and B->A first passage time distribution,
first moment, and second moment using eigendecomposition of a CTMC
rate matrix.
Parameters
----------
A_sel : (N,) array-like
boolean array that selects out the A nodes
B_sel : (N,) array-like
boolean array that selects out the B nodes
pi : (N,) array-like
stationary distribution
K : (N, N) array-like
CTMC rate matrix
dopdf : bool, optional
Do we calculate full fpt distribution or just the moments. Defaults=True.
rt: array, optional
Vector of times to evaluate first passage time distribution in multiples
of :math:`\left<t\right>` for A->B and B->A. If ``None``, defaults to a logscale
array from :math:`0.001\left<t\right>` to :math:`1000\left<t\right>`
in 400 steps, i.e. ``np.logspace(-3,3,400)``.
Only relevant if ``dopdf=True``
Returns
-------
tau : (4,) array-like
First and second moments of first passage time distribution for A->B and B->A [:math:`\mathcal{T}_{\mathcal{B}\mathcal{A}}`, :math:`\mathcal{V}_{\mathcal{B}\mathcal{A}}`, :math:`\mathcal{T}_{\mathcal{A}\mathcal{B}}`, :math:`\mathcal{V}_{\mathcal{A}\mathcal{B}}`]
pt : ( len(rt),4) array-like
time and first passage time distribution p(t) for A->B and B->A
"""
#multiply by negative 1 so eigenvalues are positive instead of negative
Q=-K
if rt is None:
rt = np.logspace(-3,3,400)
#<tauBA>, <tau^2BA>, <tauAB>, <tau^2AB>
tau = np.zeros(4)
if dopdf:
# time*tau_range, p(t) (first 2: A->B, second 2: B->A)
pt = np.zeros((4,len(rt)))
#A -> B
#P(0) is initialized to local boltzman of source community A
rho = pi * A_sel
rho /= rho.sum()
#B is absorbing, so we want Q in space of A U I
M = Q[~B_sel,:][:,~B_sel]
x = spsolve(M,rho[~B_sel])
y = spsolve(M,x)
# first moment tau(A->B) = 1.Q^{-1}.rho(A) = 1.x
tau[0] = x.sum()
# second moment = 2 x 1.Q^{-2}.rho = 2.0* 1.Q^{-1}.x
tau[1] = 2.0*y.sum()
if dopdf:
#time in multiples of the mean first passage time
pt[0] = rt*tau[0]
#nu=eigenvalues, v=left eigenvectors, w=right eigenvectors
nu,v,w = spla.eig(M.todense(),left=True)
#normalization factor
dp = np.sqrt(np.diagonal(w.T.dot(v))).real
#dot product (v.P(0)=rho)
v = (v.real.dot(np.diag(1.0/dp))).T.dot(rho[~B_sel])
#dot product (1.T.w)
w = (w.real.dot(np.diag(1.0/dp))).sum(axis=0)
nu = nu.real
#(v*w/nu).sum() is the same as <tau>, the first bit is the pdf p(t)
pt[1] = (v*w*nu)@np.exp(-np.outer(nu,pt[0]))*(v*w/nu).sum()
#B -> A
rho = pi * B_sel
rho /= rho.sum()
M = Q[~A_sel,:][:,~A_sel]
x = spsolve(M,rho[~A_sel])
y = spsolve(M,x)
tau[2] = x.sum()
tau[3] = 2.0*y.sum()
if dopdf:
pt[2] = rt*tau[2]
nu,v,w = spla.eig(M.todense(),left=True)
dp = np.sqrt(np.diagonal(w.T.dot(v))).real
v = (v.real.dot(np.diag(1.0/dp))).T.dot(rho[~A_sel])
w = (w.real.dot(np.diag(1.0/dp))).sum(axis=0)
nu = nu.real
pt[3] = (v*w*nu)@np.exp(-np.outer(nu,pt[2]))*(v*w/nu).sum()
return tau, pt.T
else:
return tau
[docs]def compute_escape_stats(B_sel, pi, K, tau_escape=None, dopdf=True,rt=None):
r"""Compute escape time distribution and first and second moment
from the basin specified by `B_sel` using eigendecomposition.
Parameters
----------
B_sel : (N,) array-like
boolean array that selects out the nodes in the active basin
pi : (N,) array-like
stationary distribution for CTMC
K : (N, N) array-like
CTMC rate matrix
tau_escape : float
mean time to escape from B. Used to calculate the escape
time distribution in multiple of tau_escape (p(t/tau_escape).
If None, uses the first moment in network defined by K.
dopdf : bool
whether to calculate full escape time distribution, defaults to True
rt: array, optional
Vector of times to evaluate first passage time distribution in multiples
of :math:`\left<t\right>` for A->B and B->A. If ``None``, defaults to a logscale
array from :math:`0.001\left<t\right>` to :math:`1000\left<t\right>`
in 400 steps, i.e. ``np.logspace(-3,3,400)``.
Only relevant if ``dopdf=True``
Returns
-------
tau : (2,) array-like
First and second moments of escape time distribution, [:math:`\left<t\right>_{\mathcal{B}}`, :math:`\left<t^2 \right>_{\mathcal{B}}`]
pt : (2, len(rt)) array-like
time and escape time distribution :math:`p(t)\left<t\right>`
"""
#multiply by negative 1 so eigenvalues are positive instead of negative
Q=-K
if rt is None:
rt = np.logspace(-3,3,400)
#<tau>, <tau^2>
tau = np.zeros(2)
if dopdf:
# time*tau_range, p(t)
pt = np.zeros((2, len(rt)))
rho = pi * B_sel
rho /= rho.sum()
M = Q[B_sel,:][:,B_sel]
x = spsolve(M,rho[B_sel])
y = spsolve(M,x)
tau[0] = x.sum()
tau[1] = 2.0*y.sum()
if tau_escape is None:
tau_escape = tau[0]
if dopdf:
pt[0] = rt*tau_escape
nu,v,w = spla.eig(M.todense(),left=True)
dp = np.sqrt(np.diagonal(w.T.dot(v))).real
v = (v.real.dot(np.diag(1.0/dp))).T.dot(rho[B_sel])
w = (w.real.dot(np.diag(1.0/dp))).sum(axis=0)
nu = nu.real
pt[1] = (v*w*nu)@np.exp(-np.outer(nu,pt[0]))*tau_escape
return tau, pt
else:
return tau
[docs]def compute_rates(A_sel, B_sel, B, tau, pi, initA=None, initB=None, MFPTonly=True, fullGT=False,
pool_size=None, block=1, screen=False, **kwargs):
r"""
In a total state space partitioned into three sets A,B and I, calculate
various approximate A<->B transition rates [Wales09]_ and the MFPT from a
rate matrix K. K can be the matrix of an original Markov chain, or a
partially graph-transformed Markov chain. [Swinburne20a]_
Rate definitions (all assume A,B in local equilibrium) see [Swinburne20a]_ for details
kSS : assumes I in steady state, A,B local equilibrium
kNSS : Relaxes counts for non-steady state in I. Tends to exact rate if reactant metastable
k^F : Boltzmann weighted sum of inverse of exact MFPT from each reactant state, i.e. :math:`\sum_j\pi_j/\mathcal{T}_j`
k^* : Inverse of exact MFPT from reactant, i.e. :math:`1/\sum_j\pi_j\mathcal{T}_j`
``compute_rates()`` differs from ``compute_passage_stats()`` in that this function removes all intervening states
using GT before computing FPT stats and rates on the fully reduced network
with state space :math:`(\mathcal{A} \cup \mathcal{B})`. This implementation also does not rely on a full
eigendecomposition of the non-absorbing matrix; it instead performs a matrix inversion,
or if `fullGT` is specified, all nodes in the set :math:`(\mathcal{A} \cup b)^\mathsf{c}`
are removed using GT for each :math:`b \in \mathcal{B}` so that the
MFPT is given by an average:
.. math::
\begin{equation}
\mathcal{T}_{\mathcal{A}\mathcal{B}} = \frac{1}{\sum_{b \in
\mathcal{B}} p_b(0)} \sum_{b \in \mathcal{B}} \frac{p_b(0)
\tau^\prime_b}{1-P^\prime_{bb}}
\end{equation}
If the MFPT is less than :math:`10^{20}`, `fullGT` should not be needed
since the inversion of the non-absorbing matrix should be numerically
stable. However, a condition number check is performed regardless, which
forces a full GT if the MFPT problem is considered numerically unstable.
Parameters
----------
A_sel : array-like (N,)
selects the N_A nodes in the :math:`\mathcal{A}` set.
B_sel : array-like (N,)
selects the N_B nodes in the :math:`\mathcal{B}` set.
B : sparse or dense matrix (N,N)
branching probability matrix for CTMC
tau : array-like (N,)
vector of waiting times from each node.
pi : array-like (N,)
stationary distribution of CTMC
initA : array-like (N_A,), optional
normalized initial occupation probabilities in :math:`\mathcal{A}` set.
Default= local Boltzmann distribution
initB : array-like (N_B,), optional
normalized initial occupation probabilities in :math:`\mathcal{B}` set.
Default= local Boltzmann distribution
MFPTonly : bool
If True, only MFPTs are calculated (rate calculations ignored).
fullGT : bool
If True, all source nodes are isolated with GT to obtain the average
MFPT.
pool_size : int
Number of cores over which to parallelize fullGT computation.
Returns
-------
results: dictionary
dictionary of results, with keys
'MFPTAB', 'kSSAB', 'kNSSAB', 'kQSDAB', 'k*AB', 'kFAB' for A<-B and
'MFPTBA', 'kSSBA', 'kNSSBA', 'kQSDBA', 'k*BA', 'kFBA' for B<-A
such that
results['MFPTAB'] = mean first passage time for A<-B
"""
N = len(A_sel)
assert(N==len(B_sel))
if A_sel.sum()==1 and B_sel.sum()==1:
raise NotImplementedError('There must be at least 2 microstates in A and B.')
inter_region = ~(A_sel+B_sel)
r_AS = A_sel[~inter_region]
r_BS = B_sel[~inter_region]
rDSS = 1.0/tau[~inter_region]
if initA is None:
initA = pi[A_sel]/pi[A_sel].sum()
if initB is None:
initB = pi[B_sel]/pi[B_sel].sum()
#use GT to renormalize away all I states
rB, rtau, rQ = GT.blockGT(rm_vec=inter_region,B=B,tau=tau,rates=True,block=block,**kwargs)
rD = 1.0/rtau
rN = rtau.size
rQ = -rQ #multiply by -1 so eigenvalues are positive
#first do A->B direction, then B->A
#r_s is the non-absorbing region (A first, then B)
df = {}
dirs = ['BA', 'AB']
inits = [initA, initB]
for i, r_s in enumerate([r_AS, r_BS]) :
if has_tqdm and screen:
pbar = tqdm(total=r_s.sum(),leave=True,mininterval=0.0,desc='Rates')
#local equilibrium distribution in r_s
rho = inits[i]
#MFPTs to B from each source microstate a
T_Ba = np.zeros(r_s.sum())
if not fullGT:
fullGT = bool(np.linalg.cond(rQ[r_s,:][:,r_s])>1.0e12)
if not fullGT:
#MFPT calculation via matrix inversion
invQ = spla.inv(rQ[r_s,:][:,r_s])
mfpt = invQ.dot(rho).sum(axis=0)
T_Ba = invQ.sum(axis=0)
#compare to individual T_Ba quantities from further GT compression
else:
def disconnect_sources(s):
#disconnect all source nodes except for `s`
rm_reg = np.zeros(rN, bool)
rm_reg[r_s] = True
aind = r_s.nonzero()[0][s]
#print(f'Disconnecting source node {aind}')
rm_reg[r_s.nonzero()[0][s]] = False
rfB, tau_Fs = GT.blockGT(rm_vec=rm_reg,B=rB,tau=1.0/rD,rates=False,block=block,screen=screen,Ndense=1)
#escape time tau_F
rfD = 1./tau_Fs
rfN = tau_Fs.size
#remaining network only as 1 in A and 1 in B = 2 states
rf_s = r_s[~rm_reg]
#tau_a^F / P_Ba^F
P_Ba = np.ravel(rfB[~rf_s,:][:,rf_s].sum(axis=0))[0]
if screen and has_tqdm:
if have_pathos and pool_size is not None:
pbar.update(pool_size)
else:
pbar.update(1)
return tau_Fs[rf_s][0]/P_Ba
if have_pathos and pool_size is not None:
with Pool(processes=pool_size) as p:
T_Ba = p.map(disconnect_sources, [s for s in range(r_s.sum())])
else:
for s in range(r_s.sum()):
T_Ba[s] = disconnect_sources(s)
#MFPT_BA = (T_Ba@rho)
mfpt = T_Ba@rho
df[f'MFPT{dirs[i]}'] = mfpt
"""
Rates: SS, NSS, QSD, k*, kF
"""
if not MFPTonly:
#eigendecomposition of rate matrix in non-abosrbing region
#for A, full_RK[r_A, :][:, r_A] is just a 5x5 matrix
l, v = spla.eig(rQ[r_s,:][:,r_s])
#order the eigenvalues from smallest to largest -- they are positive since Q = D - K instead of K-D
qsdo = np.abs(l.real).argsort()
nu = l.real[qsdo]
#v[:, qsdo[0]] is the eigenvector corresponding to smallest eigenvalue
#aka quasi=stationary distribution
qsd = v[:,qsdo[0]]
qsd /= qsd.sum()
#committor C^B_A: probability of reaching B before A: 1_B.B_BA^I (eqn 6 of SwinburneW20)
C = np.ravel(rB[~r_s,:][:,r_s].sum(axis=0))
#for SS, we use waiting times from non-reduced network (DSS)
df[f'kSS{dirs[i]}'] = C.dot(np.diag(rDSS[r_s])).dot(rho)
#for NSS, we use waiting times D^I_s from reduced network
df[f'kNSS{dirs[i]}'] = C.dot(np.diag(rD[r_s])).dot(rho)
#kQSD is same as NSS except using qsd instead of boltzmann
df[f'kQSD{dirs[i]}'] = C.dot(np.diag(rD[r_s])).dot(qsd)
#k* is just 1/MFPT
df[f'k*{dirs[i]}'] = 1./mfpt
#and kF is <1/T_Ab>
df[f'kF{dirs[i]}'] = (rho/T_Ba).sum()
return df