# -*- coding: utf-8 -*-
"""
Created on Tue Dec 1 08:04:28 2015
Module:
bicm - Bipartite Configuration Model
Author:
Mika Straka
Description:
Implementation of the Bipartite Configuration Model (BiCM) for binary
undirected bipartite networks [Saracco2015]_.
Given the biadjacency matrix of a bipartite graph in the form of a binary
array as input, the module allows the user to calculate the biadjacency
matrix of the ensemble average graph :math:`<G>^*` of the BiCM null model.
The matrix entries correspond to the link probabilities :math:`<G>^*_{rc} =
p_{rc}` between nodes of the two distinct bipartite node sets.
Subsequently, one can calculate the p-values of the node similarities for
nodes in the same bipartite layer [Saracco2017]_.
Usage:
Be ``mat`` a two-dimensional binary NumPy array. The nodes of the two
bipartite layers are ordered along the rows and columns, respectively. In
the algorithm, the two layers are identified by the boolean values ``True``
for the **row-nodes** and ``False`` for the **column-nodes**.
Import the module and initialize the Bipartite Configuration Model::
>>> from src.bicm import BiCM
>>> cm = BiCM(bin_mat=mat)
To create the biadjacency matrix of the BiCM, use::
>>> cm.make_bicm()
.. note::
Note that ``make_bicm`` outputs a *status message* in the console, which
informs the user whether the underlying numerical solver has converged
to a solution.
The function is based on the ``scipy.optimize.root`` routine of the
`SciPy package <http://scipy.org>`_ to solve a log-likelihood
maximization problem and uses thus the same arguments (except for
*fun* and *args*, which are specified in our problem).
This means that the user has full control over the selection of a
solver, the initial conditions, tolerance, etc.
As a matter of fact, it may happen that the default function call
``make_bicm()`` results in an unsuccessful solver, which requires
adjusting the function arguments.
In this case, please refer the description of the functions
:func:`BiCM.make_bicm` and :func:`BiCM.solve_equations`, and the
`scipy.optimize.root documentation
<https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/
scipy.optimize.root.html>`_.
The biadjacency matrix of the BiCM null model can be saved in *<filename>*::
>>> cm.save_biadjacency(filename=<filename>, delim='\t')
By default, the file is saved in a human-readable ``.csv`` format with tab
delimiters, which can be changed using the keyword ``delim``. The
information can also be saved as a binary NumPy file ``.npy`` by using::
>>> cm.save_biadjacency(filename=<filename>, binary=True)
If the file is not binary, it should end with ``.csv``. If it is binary
instead, NumPy automatically attaches the ending ``.npy``.
In order to analyze the similarity of the **row-nodes** and to save
the p-values of the corresponding :math:`\\Lambda`-motifs (i.e. of the
number of shared neighbors [Saracco2017]_), use::
>>> cm.lambda_motifs(True, filename=<filename>)
For the **column-nodes**, use::
>>> cm.lambda_motifs(False, filename=<filename>)
By default, the resulting p-values are saved as binary NumPy file to reduce
the required disk space, and the format suffix ``.npy`` is appended. If the
file should be saved in a human-readable ``.csv`` format, use::
>>> cm.lambda_motifs(True, filename=<filename>, delim='\\t', \
binary=False)
or analogously::
>>> cm.lambda_motifs(False, filename=<filename>, delim='\\t', \
binary=False)
.. note::
The p-values are saved as a one-dimensional array with index
:math:`k \\in \\left[0, \\ldots, \\binom{N}{2} - 1\\right]` for a
bipartite layer of :math:`N` nodes. The indices ``(i, j)`` of the
nodes corresponding to entry ``k`` in the array can be reconstructed
using the method :func:`BiCM.flat2_triumat_idx`. The number of nodes
``N`` can be recovered from the length of the array with
:func:`BiCM.flat2_triumat_dim`.
Subsequently, the p-values can be used to perform a multiple hypotheses
testing of the node similarities and to obtain statistically validated
monopartite projections [Saracco2017]_. The p-values are calculated in
parallel by default, see :ref:`parallel` for details.
.. note::
Since the calculation of the p-values is computationally demanding, the
``bicm`` module uses the Python `multiprocessing
<https://docs.python.org/2/library/multiprocessing.html>`_ package by
default for this purpose. The number of parallel processes depends on
the number of CPUs of the work station (see variable ``num_procs`` in
the method :func:`BiCM.get_pvalues_q`).
If the calculation should **not** be performed in parallel, use::
>>> cm.lambda_motifs(<bool>, parallel=False)
instead of::
>>> cm.lambda_motifs(<bool>)
References:
.. [Saracco2015] `F. Saracco, R. Di Clemente, A. Gabrielli, T. Squartini,
Randomizing bipartite networks: the case of the World Trade Web,
Scientific Reports 5, 10595 (2015)
<http://www.nature.com/articles/srep10595>`_
.. [Saracco2017] `F. Saracco, M. J. Straka, R. Di Clemente, A. Gabrielli,
G. Caldarelli, and T. Squartini,
Inferring monopartite projections of bipartite networks: an entropy-based
approach, New J. Phys. 19, 053022 (2017)
<http://stacks.iop.org/1367-2630/19/i=5/a=053022>`_
"""
import ctypes
import multiprocessing
import scipy.optimize as opt
import numpy as np
from poibin.poibin import PoiBin
[docs]class BiCM(object):
"""Bipartite Configuration Model for undirected binary bipartite networks.
This class implements the Bipartite Configuration Model (BiCM), which can
be used as a null model for the analysis of undirected and binary bipartite
networks. The class provides methods for calculating the biadjacency matrix
of the null model and for quantifying node similarities in terms of
p-values.
"""
def __init__(self, bin_mat):
"""Initialize the parameters of the BiCM.
:param bin_mat: binary input matrix describing the biadjacency matrix
of a bipartite graph with the nodes of one layer along the rows
and the nodes of the other layer along the columns.
:type bin_mat: numpy.array
"""
self.bin_mat = np.array(bin_mat, dtype=np.int64)
self.check_input_matrix_is_binary()
[self.num_rows, self.num_columns] = self.bin_mat.shape
self.dseq = self.set_degree_seq()
self.dim = self.dseq.size
self.sol = None # solution of the equation system
self.adj_matrix = None # biadjacency matrix of the null model
self.input_queue = None # queue for parallel processing
self.output_queue = None # queue for parallel processing
[docs] def set_degree_seq(self):
"""Return the node degree sequence of the input matrix.
:returns: node degree sequence [degrees row-nodes, degrees column-nodes]
:rtype: numpy.array
:raise AssertionError: raise an error if the length of the returned
degree sequence does not correspond to the total number of nodes
"""
dseq = np.empty(self.num_rows + self.num_columns)
dseq[self.num_rows:] = np.squeeze(np.sum(self.bin_mat, axis=0))
dseq[:self.num_rows] = np.squeeze(np.sum(self.bin_mat, axis=1))
assert dseq.size == (self.num_rows + self.num_columns)
return dseq
[docs] def make_bicm(self, x0=None, method='hybr', jac=None, tol=None,
callback=None, options=None):
"""Create the biadjacency matrix of the BiCM null model.
Solve the log-likelihood maximization problem to obtain the BiCM
null model which respects constraints on the degree sequence of the
input matrix.
The problem is solved using ``scipy``'s root function with the solver
defined by ``method``. The status of the solver after running
``scipy.root``and the difference between the network and BiCM degrees
are printed in the console.
The default solver is the modified Powell method ``hybr``. Least-squares
can be chosen with ``method='lm'`` for the Levenberg-Marquardt approach.
Depending on the solver, keyword arguments ``kwargs`` can be passed to
the solver. Please refer to the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/
scipy.optimize.root.html>`_ for detailed descriptions.
.. note::
It can happen that the solver ``method`` used by ``scipy.root``
does not converge to a solution.
In this case, please try another ``method`` or different initial
conditions and refer to the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/
scipy.optimize.root.html>`_.
:param x0: initial guesses for the solutions. The first entries are the
initial guesses for the row-nodes, followed by the initial guesses
for the column-nodes.
:type x0: 1d numpy.array, optional
:param method: type of solver, default is ‘hybr’. For other
solvers, see the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/
scipy-0.19.0/reference/generated/scipy.optimize.root.html>`_.
:type method: str, optional
:param jac: Jacobian of the system
:type jac: bool or callable, optional
:param tol: tolerance for termination. For detailed control, use
solver-specific options.
:type tol: float, optional
:param callback: optional callback function to be called at
every iteration as ``callback(self.equations, x)``,
see ``scipy.root`` documentation
:type callback: function, optional
:param options: a dictionary of solver options, e.g. ``xtol`` or
``maxiter``, see scipy.root documentation
:type options: dict, optional
:param kwargs: solver-specific options, please refer to the SciPy
documentation
:raise ValueError: raise an error if not enough initial conditions
are provided
"""
self.sol = self.solve_equations(x0=x0, method=method, jac=jac, tol=tol,
callback=callback, options=options)
# create BiCM biadjacency matrix:
self.adj_matrix = self.get_biadjacency_matrix(self.sol.x)
# self.print_max_degree_differences()
# assert self.test_average_degrees(eps=1e-2)
# ------------------------------------------------------------------------------
# Solve coupled nonlinear equations and get BiCM biadjacency matrix
# ------------------------------------------------------------------------------
[docs] def solve_equations(self, x0=None, method='hybr', jac=None, tol=None,
callback=None, options=None):
"""Solve the system of equations of the maximum log-likelihood problem.
The system of equations is solved using ``scipy``'s root function with
the solver defined by ``method``. The solutions correspond to the
Lagrange multipliers
.. math::
x_i = \exp(-\\theta_i).
Depending on the solver, keyword arguments ``kwargs`` can be passed to
the solver. Please refer to the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/
scipy.optimize.root.html>`_ for detailed descriptions.
The default solver is the modified Powell method ``hybr``. Least-squares
can be chosen with ``method='lm'`` for the Levenberg-Marquardt approach.
.. note::
It can happen that the solver ``method`` used by ``scipy.root``
does not converge to a solution.
In this case, please try another ``method`` or different initial
conditions and refer to the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/scipy-0.19.0/reference/generated/
scipy.optimize.root.html>`_.
:param x0: initial guesses for the solutions. The first entries are the
initial guesses for the row-nodes, followed by the initial guesses
for the column-nodes.
:type x0: 1d numpy.array, optional
:param method: type of solver, default is ‘hybr’. For other
solvers, see the `scipy.optimize.root documentation
<https://docs.scipy.org/doc/
scipy-0.19.0/reference/generated/scipy.optimize.root.html>`_.
:type method: str, optional
:param jac: Jacobian of the system
:type jac: bool or callable, optional
:param tol: tolerance for termination. For detailed control, use
solver-specific options.
:type tol: float, optional
:param callback: optional callback function to be called at
every iteration as ``callback(self.equations, x)``,
see ``scipy.root`` documentation
:type callback: function, optional
:param options: a dictionary of solver options, e.g. ``xtol`` or
``maxiter``, see scipy.root documentation
:type options: dict, optional
:param kwargs: solver-specific options, please refer to the SciPy
documentation
:returns: solution of the equation system
:rtype: scipy.optimize.OptimizeResult
:raise ValueError: raise an error if not enough initial conditions
are provided
"""
# use Jacobian if the hybr solver is chosen
if method is 'hybr':
jac = self.jacobian
# set initial conditions
if x0 is None:
x0 = self.dseq / np.sqrt(np.sum(self.dseq))
else:
if not len(x0) == self.dim:
msg = "One initial condition for each parameter is required."
raise ValueError(msg)
# solve equation system
sol = opt.root(fun=self.equations, x0=x0, method=method, jac=jac,
tol=tol, options=options, callback=callback)
# check whether system has been solved successfully
print "Solver successful:", sol.success
print sol.message
if not sol.success:
errmsg = "Try different initial conditions and/or a" + \
"different solver, see documentation at " + \
"https://docs.scipy.org/doc/scipy-0.19.0/reference/" + \
"generated/scipy.optimize.root.html"
print errmsg
return sol
[docs] def equations(self, xx):
"""Return the equations of the log-likelihood maximization problem.
Note that the equations for the row-nodes depend only on the
column-nodes and vice versa, see [Saracco2015]_.
:param xx: Lagrange multipliers which have to be solved
:type xx: numpy.array
:returns: equations to be solved (:math:`f(x) = 0`)
:rtype: numpy.array
"""
eq = -self.dseq
for i in xrange(0, self.num_rows):
for j in xrange(self.num_rows, self.dim):
dum = xx[i] * xx[j] / (1. + xx[i] * xx[j])
eq[i] += dum
eq[j] += dum
return eq
[docs] def jacobian(self, xx):
"""Return a NumPy array with the Jacobian of the equation system.
:param xx: Lagrange multipliers which have to be solved
:type xx: numpy.array
:returns: Jacobian
:rtype: numpy.array
"""
jac = np.zeros((self.dim, self.dim))
for i in xrange(0, self.num_rows):
# df_c / df_c' = 0 for all c' != c
for j in xrange(self.num_rows, self.dim):
# df_c / dx_c != 0
xxi = xx[i] / (1.0 + xx[i] * xx[j]) ** 2
xxj = xx[j] / (1.0 + xx[i] * xx[j]) ** 2
jac[i, i] += xxj
jac[i, j] = xxi
jac[j, i] = xxj
jac[j, j] += xxi
return jac
[docs] def get_biadjacency_matrix(self, xx):
""" Calculate the biadjacency matrix of the null model.
The biadjacency matrix describes the BiCM null model, i.e. the optimal
average graph :math:`<G>^*` with the average link probabilities
:math:`<G>^*_{rc} = p_{rc}` ,
:math:`p_{rc} = \\frac{x_r \\cdot x_c}{1 + x_r\\cdot x_c}.`
:math:`x` are the solutions of the equation system which has to be
solved for the null model.
Note that :math:`r` and :math:`c` are taken from opposite bipartite
node sets, thus :math:`r \\neq c`.
:param xx: solutions of the equation system (Lagrange multipliers)
:type xx: numpy.array
:returns: biadjacency matrix of the null model
:rtype: numpy.array
:raises ValueError: raise an error if :math:`p_{rc} < 0` or
:math:`p_{rc} > 1` for any :math:`r, c`
"""
mat = np.empty((self.num_rows, self.num_columns))
xp = xx[range(self.num_rows, self.dim)]
for i in xrange(self.num_rows):
mat[i, ] = xx[i] * xp / (1 + xx[i] * xp)
# account for machine precision:
mat += np.finfo(np.float).eps
if np.any(mat < 0):
errmsg = 'Error in get_adjacency_matrix: probabilities < 0 in ' \
+ str(np.where(mat < 0))
raise ValueError(errmsg)
elif np.any(mat > (1. + np.finfo(np.float).eps)):
errmsg = 'Error in get_adjacency_matrix: probabilities > 1 in' \
+ str(np.where(mat > 1))
raise ValueError(errmsg)
assert mat.shape == self.bin_mat.shape, \
"Biadjacency matrix has wrong dimensions."
return mat
# ------------------------------------------------------------------------------
# Test correctness of results:
# ------------------------------------------------------------------------------
[docs] def print_max_degree_differences(self):
"""Print the maximal differences between input network and BiCM degrees.
Check that the degree sequence of the solved BiCM null model graph
corresponds to the degree sequence of the input graph.
"""
ave_deg_columns =np.sum(self.adj_matrix, axis=0)
ave_deg_rows = np.sum(self.adj_matrix, axis=1)
print "Maximal degree differences between data and BiCM:"
print "Columns:", np.abs(np.max(
self.dseq[self.num_rows:] - ave_deg_columns))
print "Rows:", np.abs(np.max(
self.dseq[:self.num_rows] - ave_deg_rows))
[docs] def test_average_degrees(self, eps=1e-2):
"""Test the constraints on the node degrees.
Check that the degree sequence of the solved BiCM null model graph
corresponds to the degree sequence of the input graph.
:param eps: maximum difference between degrees of the real network
and the BiCM
:type eps: float
"""
ave_deg_columns = np.squeeze(np.sum(self.adj_matrix, axis=0))
ave_deg_rows = np.squeeze(np.sum(self.adj_matrix, axis=1))
c_derr = np.where(np.logical_or(
# average degree too small:
ave_deg_rows + eps < self.dseq[:self.num_rows],
# average degree too large:
ave_deg_rows - eps > self.dseq[:self.num_rows]))
p_derr = np.where(np.logical_or(
ave_deg_columns + eps < self.dseq[self.num_rows:],
ave_deg_columns - eps > self.dseq[self.num_rows:]))
# Check row-nodes degrees:
if not np.array_equiv(c_derr, np.array([])):
print '...inaccurate row-nodes degrees:'
for i in c_derr[0]:
print 'Row-node ', i, ':',
print 'input:', self.dseq[i], 'average:', ave_deg_rows[i]
return False
# Check column-nodes degrees:
if not np.array_equiv(p_derr, np.array([])):
print '...inaccurate column-nodes degrees:'
for i in c_derr[0]:
print 'Column-node ', i, ':',
print 'input:', self.dseq[i + self.num_rows], \
'average:', ave_deg_columns[i]
return False
return True
# ------------------------------------------------------------------------------
# Lambda motifs
# ------------------------------------------------------------------------------
[docs] def lambda_motifs(self, bip_set, parallel=True, filename=None,
delim='\t', binary=True, num_chunks=4):
"""Calculate and save the p-values of the :math:`\\Lambda`-motifs.
For each node couple in the bipartite layer specified by ``bip_set``,
calculate the p-values of the corresponding :math:`\\Lambda`-motifs
according to the link probabilities in the biadjacency matrix of the
BiCM null model.
The results can be saved either as a binary ``.npy`` or a
human-readable ``.csv`` file, depending on ``binary``.
.. note::
* The total number of p-values that are calculated is split into
``num_chunks`` chunks, which are processed sequentially in order
to avoid memory allocation errors. Note that a larger value of
``num_chunks`` will lead to less memory occupation, but comes at
the cost of slower processing speed.
* The output consists of a one-dimensional array of p-values. If
the bipartite layer ``bip_set`` contains ``n`` nodes, this means
that the array will contain :math:`\\binom{n}{2}` entries. The
indices ``(i, j)`` of the nodes corresponding to entry ``k`` in
the array can be reconstructed using the method
:func:`BiCM.flat2_triumat_idx`. The number of nodes ``n``
can be recovered from the length of the array with
:func:`BiCM.flat2_triumat_dim`
* If ``binary == False``, the ``filename`` should end with
``.csv``. If ``binary == True``, it will be saved in binary NumPy
``.npy`` format and the suffix ``.npy`` will be appended
automatically. By default, the file is saved in binary format.
:param bip_set: select row-nodes (``True``) or column-nodes (``False``)
:type bip_set: bool
:param parallel: select whether the calculation of the p-values should
be run in parallel (``True``) or not (``False``)
:type parallel: bool
:param filename: name of the output file
:type filename: str
:param delim: delimiter between entries in the ``.csv``file, default is
``\\t``
:type delim: str
:param binary: if ``True``, the file will be saved in the binary
NumPy format ``.npy``, otherwise as ``.csv``
:type binary: bool
:param num_chunks: number of chunks of p-value calculations that are
performed sequentially
:type num_chunks: int
:raise ValueError: raise an error if the parameter ``bip_set`` is
neither ``True`` nor ``False``
"""
if (type(bip_set) == bool) and bip_set:
biad_mat = self.adj_matrix
bin_mat = self.bin_mat
elif (type(bip_set) == bool) and not bip_set:
biad_mat = np.transpose(self.adj_matrix)
bin_mat = np.transpose(self.bin_mat)
else:
errmsg = "'" + str(bip_set) + "' " + 'not supported.'
raise NameError(errmsg)
n = self.get_triup_dim(bip_set)
pval = np.ones(shape=(n, ), dtype='float') * (-0.1)
# handle layers of dimension 2 separately
if n == 1:
nlam = np.dot(bin_mat[0, :], bin_mat[1, :].T)
plam = biad_mat[0, :] * biad_mat[1, :]
pb = PoiBin(plam)
pval[0] = pb.pval(nlam)
else:
# if the dimension of the network is too large, split the
# calculations # of the p-values in ``m`` intervals to avoid memory
# allocation errors
if n > 100:
kk = self.split_range(n, m=num_chunks)
else:
kk = [0]
# calculate p-values for index intervals
for i in range(len(kk) - 1):
k1 = kk[i]
k2 = kk[i + 1]
nlam = self.get_lambda_motif_block(bin_mat, k1, k2)
plam = self.get_plambda_block(biad_mat, k1, k2)
pv = self.get_pvalues_q(plam, nlam, k1, k2)
pval[k1:k2] = pv
# last interval
k1 = kk[len(kk) - 1]
k2 = n - 1
nlam = self.get_lambda_motif_block(bin_mat, k1, k2)
plam = self.get_plambda_block(biad_mat, k1, k2)
# for the last entry we have to INCLUDE k2, thus k2 + 1
pv = self.get_pvalues_q(plam, nlam, k1, k2 + 1)
pval[k1:] = pv
# check that all p-values have been calculated
# assert np.all(pval >= 0) and np.all(pval <= 1)
if filename is None:
fname = 'p_values_' + str(bip_set)
if not binary:
fname += '.csv'
else:
fname = filename
# account for machine precision:
pval += np.finfo(np.float).eps
self.save_array(pval, filename=fname, delim=delim,
binary=binary)
[docs] def get_lambda_motif_block(self, mm, k1, k2):
"""Return a subset of :math:`\\Lambda`-motifs as observed in ``mm``.
Given the binary input matrix ``mm``, count the number of
:math:`\\Lambda`-motifs for all the node couples specified by the
interval :math:`\\left[k_1, k_2\\right[`.
.. note::
* The :math:`\\Lambda`-motifs are counted between the **row-nodes**
of the input matrix ``mm``.
* If :math:`k_2 \equiv \\binom{mm.shape[0]}{2}`, the interval
becomes :math:`\\left[k_1, k_2\\right]`.
:param mm: binary matrix
:type mm: numpy.array
:param k1: lower interval limit
:type k1: int
:param k2: upper interval limit
:type k2: int
:returns: array of observed :math:`\\Lambda`-motifs
:rtype: numpy.array
"""
ndim = mm.shape[0]
# if the upper limit is the largest possible index, i.e. corresponds to
# the node couple (ndim - 2, ndim - 1), where node indices start from 0,
# include the result
if k2 == (ndim * (ndim - 1) / 2 - 1):
flag = 1
else:
flag = 0
aux = np.ones(shape=(k2 - k1 + flag, )) * (-1) # -1 as a test
[i1, j1] = self.flat2triumat_idx(k1, ndim)
[i2, j2] = self.flat2triumat_idx(k2, ndim)
# if limits have the same row index
if i1 == i2:
aux[:k2 - k1] = np.dot(mm[i1, :], mm[j1:j2, :].T)
# if limits have different row indices
else:
k = 0
# get values for lower limit row
fi = np.dot(mm[i1, :], mm[j1:, :].T)
aux[:len(fi)] = fi
k += len(fi)
# get values for intermediate rows
for i in range(i1 + 1, i2):
mid = np.dot(mm[i, :], mm[i + 1:, :].T)
aux[k : k + len(mid)] = mid
k += len(mid)
# get values for upper limit row
if flag == 1:
aux[-1] = np.dot(mm[ndim - 2, :], mm[ndim - 1, :].T)
else:
la = np.dot(mm[i2, :], mm[i2 + 1 : j2, :].T)
aux[k:] = la
return aux
[docs] def get_plambda_block(self, biad_mat, k1, k2):
"""Return a subset of the :math:`\\Lambda` probability matrix.
Given the biadjacency matrix ``biad_mat`` with
:math:`\\mathbf{M}_{rc} = p_{rc}`, which describes the probabilities of
row-node ``r`` and column-node ``c`` being linked, the method returns
the matrix
:math:`P(\\Lambda)_{ij} = \\left(M_{i\\alpha_1} \\cdot M_{j\\alpha_1},
M_{i\\alpha_2} \\cdot M_{j\\alpha_2}, \\ldots\\right),`
for all the node couples in the interval
:math:`\\left[k_1, k_2\\right[`. :math:`(i, j)` are two **row-nodes**
of ``biad_mat`` and :math:`\\alpha_k` runs over the nodes in the
opposite layer.
.. note::
* The probabilities are calculated between the **row-nodes** of the
input matrix ``biad_mat``.
* If :math:`k_2 \equiv \\binom{biad\\_mat.shape[0]}{2}`, the
interval becomes :math:`\\left[k1, k2\\right]`.
:param biad_mat: biadjacency matrix
:type biad_mat: numpy.array
:param k1: lower interval limit
:type k1: int
:param k2: upper interval limit
:type k2: int
:returns: :math:`\\Lambda`-motif probability matrix
:rtype: numpy.array
"""
[ndim1, ndim2] = biad_mat.shape
# if the upper limit is the largest possible index, i.e. corresponds to
# the node couple (ndim - 2, ndim - 1), where node indices start from 0,
# include the result
if k2 == (ndim1 * (ndim1 - 1) / 2 - 1):
flag = 1
else:
flag = 0
paux = np.ones(shape=(k2 - k1 + flag, ndim2), dtype='float') * (-0.1)
[i1, j1] = self.flat2triumat_idx(k1, ndim1)
[i2, j2] = self.flat2triumat_idx(k2, ndim1)
# if limits have the same row index
if i1 == i2:
paux[:k2 - k1, :] = biad_mat[i1, ] * biad_mat[j1:j2, :]
# if limits have different indices
else:
k = 0
# get values for lower limit row
fi = biad_mat[i1, :] * biad_mat[j1:, :]
paux[:len(fi), :] = fi
k += len(fi)
# get values for intermediate rows
for i in range(i1 + 1, i2):
mid = biad_mat[i, :] * biad_mat[i + 1:, :]
paux[k : k + len(mid), :] = mid
k += len(mid)
# get values for upper limit row
if flag == 1:
paux[-1, :] = biad_mat[ndim1 - 2, :] * biad_mat[ndim1 - 1, :]
else:
la = biad_mat[i2, :] * biad_mat[i2 + 1:j2, :]
paux[k:, :] = la
return paux
[docs] def get_pvalues_q(self, plam_mat, nlam_mat, k1, k2, parallel=True):
"""Calculate the p-values of the observed :math:`\\Lambda`-motifs.
For each number of :math:`\\Lambda`-motifs in ``nlam_mat`` for the node
interval :math:`\\left[k1, k2\\right[`, construct the Poisson Binomial
distribution using the corresponding
probabilities in ``plam_mat`` and calculate the p-value.
:param plam_mat: array containing the list of probabilities for the
single observations of :math:`\\Lambda`-motifs
:type plam_mat: numpy.array (square matrix)
:param nlam_mat: array containing the observations of
:math:`\\Lambda`-motifs
:type nlam_mat: numpy.array (square matrix)
:param k1: lower interval limit
:type k1: int
:param k2: upper interval limit
:type k2: int
:param parallel: if ``True``, the calculation is executed in parallel;
if ``False``, only one process is started
:type parallel: bool
"""
n = len(nlam_mat)
# the array must be sharable to be accessible by all processes
shared_array_base = multiprocessing.Array(ctypes.c_double, n)
pval_mat = np.frombuffer(shared_array_base.get_obj())
# number of processes running in parallel has to be tested.
# good guess is multiprocessing.cpu_count() +- 1
if parallel:
num_procs = multiprocessing.cpu_count() - 1
elif not parallel:
num_procs = 1
else:
num_procs = 1
self.input_queue = multiprocessing.Queue()
self.output_queue = multiprocessing.Queue()
p_inqueue = multiprocessing.Process(target=self.add2inqueue,
args=(num_procs, plam_mat, nlam_mat,
k1, k2))
p_outqueue = multiprocessing.Process(target=self.outqueue2pval_mat,
args=(num_procs, pval_mat))
ps = [multiprocessing.Process(target=self.pval_process_worker,
args=()) for i in range(num_procs)]
# start queues
p_inqueue.start()
p_outqueue.start()
# start processes
for p in ps:
p.start() # each process has an id, p.pid
p_inqueue.join()
for p in ps:
p.join()
p_outqueue.join()
return pval_mat
[docs] def add2inqueue(self, nprocs, plam_mat, nlam_mat, k1, k2):
"""Add elements to the in-queue to calculate the p-values.
:param nprocs: number of processes running in parallel
:type nprocs: int
:param plam_mat: array containing the list of probabilities for the
single observations of :math:`\\Lambda`-motifs
:type plam_mat: numpy.array (square matrix)
:param nlam_mat: array containing the observations of
:math:`\\Lambda`-motifs
:type nlam_mat: numpy.array (square matrix)
:param k1: lower interval limit
:type k1: int
:param k2: upper interval limit
:type k2: int
"""
n = len(plam_mat)
# add tuples of matrix elements and indices to the input queue
for k in xrange(k1, k2):
self.input_queue.put((k - k1, plam_mat[k - k1, :],
nlam_mat[k - k1]))
# add as many poison pills "STOP" to the queue as there are workers
for i in xrange(nprocs):
self.input_queue.put("STOP")
[docs] def pval_process_worker(self):
"""Calculate p-values and add them to the out-queue."""
# take elements from the queue as long as the element is not "STOP"
for tupl in iter(self.input_queue.get, "STOP"):
pb = PoiBin(tupl[1])
pv = pb.pval(int(tupl[2]))
# add the result to the output queue
self.output_queue.put((tupl[0], pv))
# once all the elements in the input queue have been dealt with, add a
# "STOP" to the output queue
self.output_queue.put("STOP")
[docs] def outqueue2pval_mat(self, nprocs, pvalmat):
"""Put the results from the out-queue into the p-value array."""
# stop the work after having met nprocs times "STOP"
for work in xrange(nprocs):
for val in iter(self.output_queue.get, "STOP"):
k = val[0]
pvalmat[k] = val[1]
[docs] def get_triup_dim(self, bip_set):
"""Return the number of possible node couples in ``bip_set``.
:param bip_set: selects row-nodes (``True``) or column-nodes
(``False``)
:type bip_set: bool
:returns: return the number of node couple combinations corresponding
to the layer ``bip_set``
:rtype: int
:raise ValueError: raise an error if the parameter ``bip_set`` is
neither ``True`` nor ``False``
"""
if bip_set:
return self.triumat2flat_dim(self.num_rows)
elif not bip_set:
return self.triumat2flat_dim(self.num_columns)
else:
errmsg = "'" + str(bip_set) + "' " + 'not supported.'
raise NameError(errmsg)
[docs] def split_range(self, n, m=4):
"""Split the interval :math:`\\left[0,\ldots, n\\right]` in ``m`` parts.
:param n: upper limit of the range
:type n: int
:param m: number of part in which range should be split
:type n: int
:returns: delimiter indices for the ``m`` parts
:rtype: list
"""
return [i * n / m for i in range(m)]
# ------------------------------------------------------------------------------
# Auxiliary methods
# ------------------------------------------------------------------------------
@staticmethod
[docs] def triumat2flat_idx(i, j, n):
"""Convert an matrix index couple to a flattened array index.
Given a square matrix of dimension ``n`` and the index couple
``(i, j)`` *of the upper triangular part* of the matrix, return the
index which the matrix element would have in a flattened array.
.. note::
* :math:`i \\in [0, ..., n - 1]`
* :math:`j \\in [i + 1, ..., n - 1]`
* returned index :math:`\\in [0,\\, n (n - 1) / 2 - 1]`
:param i: row index
:type i: int
:param j: column index
:type j: int
:param n: dimension of the square matrix
:type n: int
:returns: flattened array index
:rtype: int
"""
return int((i + 1) * n - (i + 2) * (i + 1) / 2. - (n - (j + 1)) - 1)
@staticmethod
[docs] def triumat2flat_dim(n):
"""Return the size of the triangular part of a ``n x n`` matrix.
:param n: the dimension of the square matrix
:type n: int
:returns: number of elements in the upper triangular part of the matrix
(excluding the diagonal)
:rtype: int
"""
return n * (n - 1) / 2
@staticmethod
[docs] def flat2triumat_dim(k):
"""Return the dimension of the matrix hosting ``k`` triangular elements.
:param k: the number of elements in the upper triangular
part of the corresponding square matrix, excluding the diagonal
:type k: int
:returns: dimension of the corresponding square matrix
:rtype: int
"""
return int(0.5 + np.sqrt(0.25 + 2 * k))
@staticmethod
[docs] def flat2triumat_idx(k, n):
"""Convert an array index into the index couple of a triangular matrix.
``k`` is the index of an array of length :math:`\\binom{n}{2}{2}`,
which contains the elements of an upper triangular matrix of dimension
``n`` excluding the diagonal. The function returns the index couple
:math:`(i, j)` that corresponds to the entry ``k`` of the flat array.
.. note::
* :math:`k \\in \left[0,\\ldots, \\binom{n}{2} - 1\\right]`
* returned indices:
* :math:`i \\in [0,\\ldots, n - 1]`
* :math:`j \\in [i + 1,\\ldots, n - 1]`
:param k: flattened array index
:type k: int
:param n: dimension of the square matrix
:type n: int
:returns: matrix index tuple (row, column)
:rtype: tuple
"""
# row index of array index k in the the upper triangular part of the
# square matrix
r = n - 2 - int(0.5 * np.sqrt(-8 * k + 4 * n * (n - 1) - 7) - 0.5)
# column index of array index k in the the upper triangular part of the
# square matrix
c = k + 1 + r * (3 - 2 * n + r) / 2
return r, c
[docs] def save_biadjacency(self, filename, delim='\t', binary=False):
"""Save the biadjacendy matrix of the BiCM null model.
The matrix can either be saved as a binary NumPy ``.npy`` file or as a
human-readable ``.csv`` file.
.. note::
* The relative path has to be provided in the filename, e.g.
*../data/pvalue_matrix.csv*.
* If ``binary==True``, NumPy
automatically appends the format ending ``.npy`` to the file.
:param filename: name of the output file
:type filename: str
:param delim: delimiter between values in file
:type delim: str
:param binary: if ``True``, save as binary ``.npy``, otherwise as a
``.csv`` file
:type binary: bool
"""
self.save_array(self.adj_matrix, filename, delim, binary)
@staticmethod
[docs] def save_array(mat, filename, delim='\t', binary=False):
"""Save the array ``mat`` in the file ``filename``.
The array can either be saved as a binary NumPy ``.npy`` file or as a
human-readable ``.npy`` file.
.. note::
* The relative path has to be provided in the filename, e.g.
*../data/pvalue_matrix.csv*.
* If ``binary==True``, NumPy
automatically appends the format ending ``.npy`` to the file.
:param mat: array
:type mat: numpy.array
:param filename: name of the output file
:type filename: str
:param delim: delimiter between values in file
:type delim: str
:param binary: if ``True``, save as binary ``.npy``, otherwise as a
``.csv`` file
:type binary: bool
"""
if binary:
np.save(filename, mat)
else:
np.savetxt(filename, mat, delimiter=delim)
################################################################################
# Main
################################################################################
if __name__ == "__main__":
pass