""" Utilities for network data manipulation and entropy computation. """
import heapq
from .int_part import *
from numba import njit
from scipy.sparse import lil_matrix, coo_matrix
from scipy.special import comb
from itertools import product, combinations
from loky import get_reusable_executor
[docs]def db_factorial_ln(val):
m = int(val)
if m & 0x1 == 1: # m is odd
return gammaln(m + 1) - gammaln((m - 1) / 2 + 1) - ((m - 1) / 2) * np.log(2)
else:
return gammaln(m / 2 + 1) + (m / 2) * np.log(2)
# #################
# ENTROPY FUNCTIONS
# #################
# @njit(cache=True)
[docs]def partition_entropy(ka=None, kb=None, k=None, na=None, nb=None, n=None, nr=None, allow_empty=False):
"""partition_entropy
Compute the partition entropy, P(b), for the current partition. It has several variations depending on the priors
used. In the crudest way (`compute_profile_likelihood_from_e_rsnr == None`), we formulate P(b) = P(b | B) * P(B).
Or, by a two-level Bayesian hierarchy,
we can do P(b) = P(b | n) * P(n | B) * P(B).
Parameters
----------
ka : ``int``
Number of communities in type-*a*.
kb : ``int``
Number of communities in type-*b*.
k : ``int``
Number of communities in the graph.
na : ``int``
Number of vertices in type-*a*.
nb : ``int``
Number of vertices in type-*b*.
n : ``int``
Number of vertices in the graph.
nr : :class:`numpy.ndarray`
Vertex property array of the block graph which contains the block sizes.
allow_empty : ``bool`` (optional, default: ``False``)
If ``True``, partition description length computed will allow for empty groups.
Returns
-------
ent : ``float``
The description length (or entropy) in nat of the partition.
"""
if type(n) is int:
n = np.array([n])
k = np.array([k])
elif type(na) is int and type(nb) is int:
na = np.array([na])
ka = np.array([ka])
nb = np.array([nb])
kb = np.array([kb])
if nr is None:
ent = n * np.log(k) + np.log1p(-(1 - 1. / k) ** n) # TODO: check this term
else:
if ka is None and kb is None and k is not None:
if allow_empty:
ent = lbinom(k + n - 1, n)
else:
ent = lbinom(n - 1, k - 1)
ent += (gammaln(n + 1) - gammaln(nr + 1).sum()) + np.log(n) # TODO: check the last term (should be alright)
elif ka is not None and kb is not None and k is None:
if allow_empty:
# TODO
raise NotImplementedError
else:
ent = lbinom(na - 1, ka - 1) + lbinom(nb - 1, kb - 1)
ent += (gammaln(na + 1) + gammaln(nb + 1) - gammaln(nr + 1).sum()) + np.log(na) + np.log(nb)
else:
raise AttributeError
return ent
[docs]def adjacency_entropy(edgelist, mb, exact=True, multigraph=True):
"""adjacency_entropy
Calculate the entropy (a.k.a. negative log-likelihood) associated with the current block partition. It does not
include the model entropy.
Parameters
----------
edgelist : :class:`numpy.ndarray`
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
exact : ``bool``
multigraph : ``bool``
Returns
-------
ent : ``float``
The description length (or entropy) in nat of the fitting.
"""
ent = 0.
e_rs = np.zeros((max(mb) + 1, max(mb) + 1), dtype=np.int_)
m_ij = lil_matrix((len(mb), len(mb)), dtype=np.int_)
for i in edgelist:
# Please do check the index convention of the edgelist
source_group = int(mb[int(i[0])])
target_group = int(mb[int(i[1])])
e_rs[source_group][target_group] += 1
e_rs[target_group][source_group] += 1
m_ij[int(i[0]), int(i[1])] += 1 # we only update the upper triangular part of the adj-matrix
italic_i = 0.
e_r = np.sum(e_rs, axis=1, dtype=np.int_)
sum_m_ii = 0.
sum_m_ij = 0.
sum_e_rs = 0.
sum_e_rr = 0.
sum_e_r = 0.
if exact:
if multigraph:
ind_i, ind_j = m_ij.nonzero()
for ind in zip(ind_i, ind_j):
val = m_ij[ind[0], ind[1]]
if val > 1:
if ind[0] == ind[1]:
sum_m_ii += db_factorial_ln(val)
else:
sum_m_ij += gammaln(val + 1)
for _e_r in e_r:
sum_e_r += gammaln(_e_r + 1)
for ind, e_val in enumerate(np.nditer(e_rs)):
ind_i = int(np.floor(ind / (e_rs.shape[0])))
ind_j = ind % (e_rs.shape[0])
if exact:
if ind_j > ind_i:
sum_e_rs += gammaln(e_val + 1)
elif ind_j == ind_i:
sum_e_rr += db_factorial_ln(e_val)
else:
if e_val != 0.0:
italic_i += e_val * np.log(
e_val / e_r[ind_i] / e_r[ind_j]
)
ent += -italic_i / 2
n_k = assemble_n_k_from_edgelist(edgelist, mb)
ent_deg = 0
for deg, k in enumerate(n_k):
if deg != 0 and k != 0:
ent_deg -= k * gammaln(deg + 1)
ent += ent_deg
if exact:
return ent_deg - sum_e_rs - sum_e_rr + sum_e_r + sum_m_ii + sum_m_ij
else:
num_edges = len(edgelist)
ent += -num_edges
return ent
[docs]def model_entropy(e, ka=None, kb=None, na=None, nb=None, nr=None, allow_empty=False, is_bipartite=True):
"""model_entropy
Computes the amount of information necessary for the parameters of the (bipartite) blockmodel ensemble,
for ``ka`` type-`a` blocks, ``kb`` type-`b` blocks, ``na`` type-`a` vertices,
``nb`` vertices, ``e`` edges, and either bipartite or general as a prior. This includes the entropy from
modeling the edge counts and the partition.
Note that if we know `a priori` that the network is bipartite, we can further compress the model.
Parameters
----------
e : ``int``
Number of edges.
ka : ``int``
Number of communities in type-*a*.
kb : ``int``
Number of communities in type-*b*.
na : ``int``
Number of vertices in type-*a*.
nb : ``int``
Number of vertices in type-*b*.
nr : :class:`numpy.ndarray`
Vertex property array of the block graph which contains the block sizes.
allow_empty : ``bool`` (optional, default: ``False``)
If ``True``, partition description length computed will allow for empty groups.
is_bipartite : ``bool`` (optional, default: ``True``)
If ``False``, edge counts description length computed will assume a purely flat :math:`e_{rs}`.
Returns
-------
dl : ``float``
The description length (or entropy) in nat of the model.
References
----------
.. [peixoto-parsimonious-2013] Tiago P. Peixoto, "Parsimonious module
inference in large networks", Phys. Rev. Lett. 110, 148701 (2013),
:doi:`10.1103/PhysRevLett.110.148701`, :arxiv:`1212.4794`.
.. [peixoto-nonparametric-2017] Tiago P. Peixoto, "Nonparametric
Bayesian inference of the microcanonical stochastic block model",
Phys. Rev. E 95 012317 (2017), :doi:`10.1103/PhysRevE.95.012317`,
:arxiv:`1610.02703`
"""
if not is_bipartite:
k = ka + kb
x = (k * (k + 1)) / 2
else:
x = ka * kb
if nr is False:
dl = lbinom(x + e - 1, e)
else:
dl = lbinom(x + e - 1, e) + partition_entropy(ka=ka, kb=kb, na=na, nb=nb, nr=nr, allow_empty=allow_empty)
# TO use the general prior for the partition entropy, replace the function with:
# n = na + nb
# partition_entropy(k=k, n=n, nr=nr, allow_empty=allow_empty)
return dl
[docs]def degree_entropy(edgelist, mb, __q_cache=np.array([], ndmin=2), degree_dl_kind="distributed",
q_cache_max_e_r=int(1e4)):
"""degree_entropy
degree_entropy
Parameters
----------
edgelist : ``iterable`` or :class:`numpy.ndarray`
mb : ``iterable`` or :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
__q_cache : :class:`numpy.ndarray` (required, default: ``np.array([], ndmin=2)``)
degree_dl_kind: ``str``
1. ``degree_dl_kind == "uniform"``
This corresponds to a non-informative prior, where the node
degrees are sampled from an uniform distribution.
2. ``degree_dl_kind == "distributed"``
This option should be preferred in most cases.
Returns
-------
ent : ``float``
The entropy.
"""
ent = 0
n_r = assemble_n_r_from_mb(mb)
e_r = np.zeros(max(mb) + 1)
for i in edgelist:
# Please do check the index convention of the edgelist
source_group = int(mb[int(i[0])])
target_group = int(mb[int(i[1])])
e_r[source_group] += 1
e_r[target_group] += 1
if degree_dl_kind == "uniform":
ent += np.sum(lbinom(n_r + e_r - 1, e_r))
elif degree_dl_kind == "distributed":
if len(__q_cache) == 1:
__q_cache = init_q_cache(q_cache_max_e_r, __q_cache) # the pre-computed lookup table affects precision!
for ind, n_r_ in enumerate(n_r):
ent += log_q(e_r[ind], n_r_, __q_cache)
eta_rk = assemble_eta_rk_from_edgelist_and_mb(edgelist, mb)
for mb_, eta_rk_ in enumerate(eta_rk):
for eta in eta_rk_:
ent -= +gammaln(eta + 1)
ent -= -gammaln(n_r[mb_] + 1)
elif degree_dl_kind == "entropy":
raise NotImplementedError
return ent
[docs]def virtual_moves_ds(ori_e_rs, mlists, ka):
"""virtual_moves_ds
virtual_moves_ds
Parameters
----------
ori_e_rs : :class:`numpy.ndarray`
mlists : ``set``
ka : ``int``
Number of communities in type-*a*.
Returns
-------
dS : ``float``
_mlist : :class:`numpy.ndarray`
"""
ori_e_r = np.sum(ori_e_rs, axis=1)
size = ori_e_rs.shape[0] - 1
t = np.inf
dS = 0.
_mlist = np.zeros(2, dtype=np.int_)
for _ in mlists:
mlist = np.int_([_.split("+")[0], _.split("+")[1]])
if (np.max(mlist) >= ka > np.max(mlist)) or (np.max(mlist) == 0 and ka == 1) or (
np.max(mlist) == ka and ori_e_rs.shape[0] == 1 + ka):
continue
else:
if np.max(mlist) < ka: # we are merging groups of type-a
_1, _2 = ori_e_rs[[mlist[0], mlist[1]], ka + 1: size + 1]
else:
_1, _2 = ori_e_rs[[mlist[0], mlist[1]], 0: ka]
_ds = 0
_ds -= np.sum(gammaln(_1 + _2 + 1))
_ds += np.sum(gammaln(_1 + 1))
_ds += np.sum(gammaln(_2 + 1))
_3, _4 = ori_e_r[[mlist[0], mlist[1]]]
_ds += gammaln(_3 + _4 + 1)
_ds -= gammaln(_3 + 1) + gammaln(_4 + 1)
if 0 <= _ds < t:
t = _ds
dS, _mlist = _ds, mlist
return dS, _mlist
# ####################
# GENERATION FUNCTIONS
# ####################
[docs]def gen_equal_partition(n, total):
"""gen_equal_partition
gen_equal_partition
Parameters
----------
n : ``int``
total : ``int``
Returns
-------
n_blocks : ``list``
"""
all_nodes = np.arange(total)
n_blocks = list(map(len, np.array_split(all_nodes, n)))
return n_blocks
[docs]def gen_unequal_partition(n, total, avg_deg, alpha):
"""gen_unequal_partition
Parameters
----------
n : ``int``
Number of communities.
total : ``int``
Number of nodes.
avg_deg : ``float``
Average degree of the network.
alpha : ``float``
The parameter of the Dirichlet distribution (the smaller the unevener the distribution is).
We usually use alpha=1 to generate a case and
filter it out if the lowest size of a community is less than (2 * total / avg_deg) ** 0.5 (resolution limit).
Returns
-------
a : ``list``
The sizes of communities to sum to `total`.
_ : ``float``
The ratio of largest-sized community to the lowest-sized one.
"""
cutoff = (2 * total / avg_deg) ** 0.5
d = np.random.dirichlet([alpha] * n, 1)[0]
while not np.all(d * total > cutoff):
d = np.random.dirichlet([alpha] * n, 1)[0]
a = list(map(int, d * total))
remain_a = int(total - sum(a))
for idx, _ in enumerate(range(remain_a)):
a[idx] += 1
return a, max(a) / min(a)
[docs]def gen_e_rs(b, n_edges, p=0):
"""gen_e_rs
Parameters
----------
b : ``int``
Number of communities within each type. (suppose Ka = Kb)
n_edges : ``int``
Number of edges planted in the system.
p : ``float``
Edge propensity between groups; i.e., the ratio :math:`c_{out} / c_{in}`.
Returns
-------
e_rs : :class:`numpy.ndarray`
Edge counts matrix.
"""
c = n_edges / (b + (b ** 2 - b) * p)
c_in = c
c_out = c * p
e_rs = np.zeros((b * 2, b * 2), dtype=np.int_)
perm = product(range(0, b), range(b, b * 2))
idx_in = np.linspace(0, b ** 2 - 1, b, dtype=np.int_)
for idx, p in enumerate(perm):
i = p[0]
j = p[1]
if idx in idx_in:
e_rs[i][j] = c_in
else:
e_rs[i][j] = c_out
e_rs[j][i] = e_rs[i][j]
return e_rs
[docs]def gen_e_rs_harder(ka, kb, n_edges, samples=1, top_k=1):
"""gen_e_rs_harder
Parameters
----------
ka : ``int``
Number of communities within type-*a*.
kb : ``int``
Number of communities within type-*b*.
n_edges : ``int``
Number of edges planted in the system.
samples : ``int``
Number of random draws made on :math:`e_{rs}`.
top_k : ``int``
Number of samples selected. These are `top-k` samples with higher profile likelihood.
Returns
-------
e_rs : :class:`numpy.ndarray` or ``list[numpy.ndarray]`` (when ``top_k > 1``)
Edge counts matrix.
"""
if top_k <= 0:
raise ValueError("Argument `top_k` needs to be a positive integer.")
e_rs_inst = []
for _ in range(int(samples)):
c = np.int_(np.random.dirichlet([1] * ka * kb, 1)[0] * n_edges)
e_rs = np.zeros((ka + kb, ka + kb), dtype=np.int_)
remain_c = n_edges - np.sum(c, dtype=np.int_)
for idx, _ in enumerate(range(remain_c)):
c[idx] += 1
perm = product(range(0, ka), range(ka, ka + kb))
for idx, p in enumerate(perm):
i = p[0]
j = p[1]
e_rs[i][j] = c[idx]
e_rs[j][i] = e_rs[i][j]
heapq.heappush(e_rs_inst, (- compute_profile_likelihood_from_e_rs(e_rs), e_rs))
e_rs = heapq.heappop(e_rs_inst)[1]
if samples == 1 or top_k == 1:
return e_rs
else:
e_rs = []
for _ in range(top_k):
e_rs += [heapq.heappop(e_rs_inst)[1]]
return e_rs
[docs]def gen_e_rs_hard(ka, kb, n_edges, p=0):
"""gen_e_rs_hard
Parameters
----------
ka : ``int``
Mumber of communities within type-*a*.
kb : ``int``
Number of communities within type-*b*.
n_edges : ``int``
Number of edges planted in the system.
p : ``float``
Edge propensity between groups; i.e., the ratio :math:`c_{out} / c_{in}`.
Returns
-------
e_rs : :class:`numpy.ndarray`
Edge counts matrix.
"""
k_max = max(ka, kb)
k_min = min(ka, kb)
if k_max > 2 ** k_min - 1:
raise NotImplementedError
else:
blocks = 0
k_min_ = 1
_cum_comb = 0
cum_comb = comb(k_min, 1)
nonzero_indices = []
for i in range(1, k_max + 1):
if i > cum_comb:
k_min_ += 1
_cum_comb = int(cum_comb)
cum_comb += comb(k_min, k_min_)
blocks += k_min_
for __i in list(combinations(range(k_min), k_min_))[i - _cum_comb - 1]:
nonzero_indices += [(__i, i - 1 + k_min)]
c = n_edges / (blocks + (ka * kb - blocks) * p)
c_in = c
c_out = c * p
e_rs = np.zeros((ka + kb, ka + kb), dtype=np.int_)
perm = product(range(0, ka), range(ka, ka + kb))
for _, p in enumerate(perm):
i = p[0]
j = p[1]
if (i, j) in nonzero_indices:
e_rs[i][j] = c_in
else:
e_rs[i][j] = c_out
e_rs[j][i] = e_rs[i][j]
return e_rs
[docs]def gen_equal_bipartite_partition(na, nb, ka, kb):
"""gen_equal_bipartite_partition
Parameters
----------
na : ``int``
Number of nodes in type-*a*.
nb : ``int``
Number of nodes in type-*b*.
ka : ``int``
Number of communities in type-*a*.
kb : ``int``
Number of communities in type-*b*.
Returns
-------
n : :class:`numpy.ndarray`
"""
n_blocks = map(int, gen_equal_partition(ka, na) + gen_equal_partition(kb, nb))
n = []
for idx, i in enumerate(n_blocks):
n += [idx] * i
return np.array(n, dtype=np.int_)
[docs]@njit(cache=True)
def gen_bicliques_edgelist(b, num_nodes):
"""Generate an array of edgelist and node-type mapping for a group of bi-cliques.
Parameters
----------
b : ``int``
Number of bi-cliques.
num_nodes : ``int``
Number of nodes (size) for each bi-clique.
Returns
-------
el : :class:`numpy.ndarray`
The edgelist of a group of bi-cliques.
types : :class:`numpy.ndarray`
The types-array that maps each node id to a bipartite type (``0`` or ``1``).
"""
total_num_nodes = b * num_nodes
types = np.zeros(total_num_nodes, dtype=np.int_)
num_edges_each_clique = int(num_nodes ** 2 / 4)
el = np.zeros((num_edges_each_clique * b, 2), dtype=np.int_)
idx = 0
base = 0
for _b in range(0, b):
for i in range(0, int(num_nodes / 2)):
types[i + base] = 1
for j in range(int(num_nodes / 2), num_nodes):
types[j + base] = 2
el[idx] = [i + base, j + base]
idx += 1
base = (_b + 1) * num_nodes
return el, types
[docs]def assemble_old2new_mapping(types):
"""Create a mapping that map the old node-id's to new ones, such that the types-array is sorted orderly.
Parameters
----------
types : :class:`numpy.ndarray`
Returns
-------
old2new : ``dict``
Dictionary that maps the old node index to a new one.
new2old : ``dict``
Dictionary that maps the new node index to the old one; i.e., a reverse mapping of ``old2new``.
new_types : :class:`numpy.ndarray`
The new types-array, which is sorted orderly and directly applicable to :class:`det_k_bisbm.OptimalKs`.
"""
old2new = dict()
new_types = np.zeros(len(types), dtype=np.int_)
new_id = 0
for _id, t in enumerate(types):
if t == 1:
old2new[_id] = new_id
new_types[new_id] = 1
new_id += 1
for _id, t in enumerate(types):
if t == 2:
old2new[_id] = new_id
new_types[new_id] = 2
new_id += 1
new2old = {value: key for key, value in old2new.items()}
return old2new, new2old, new_types
# ##################
# ASSEMBLE FUNCTIONS
# ##################
[docs]def assemble_edgelist_old2new(edgelist, old2new):
"""Assemble the new edgelist array via an old2new mapping.
Parameters
----------
edgelist : :class:`numpy.ndarray`
old2new : ``dict``
Dictionary that maps the old node index to a new one.
Returns
-------
el : :class:`numpy.ndarray`
The new edgelist of a group of bi-cliques (directly pluggable to :class:`det_k_bisbm.OptimalKs`)
"""
el = np.zeros((len(edgelist), 2), dtype=np.int_)
for _id, _ in enumerate(edgelist):
el[_id][0] = old2new[_[0]]
el[_id][1] = old2new[_[1]]
return el
[docs]@njit(cache=True)
def assemble_mb_new2old(mb, new2old):
"""Assemble the partition that corresponds to the old space of node indices.
Parameters
----------
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
new2old : ``dict``
Dictionary that maps the new node index to the old one; i.e., a reverse mapping of ``old2new``.
Returns
-------
old_mb : :class:`numpy.ndarray`
The partition that corresponds to the old space of node indices.
"""
old_mb = np.zeros(len(mb), dtype=np.int_)
for _id, _ in enumerate(mb):
old_mb[new2old[_id]] = _
return old_mb
[docs]@njit(cache=True)
def assemble_n_r_from_mb(mb):
"""Get :math:`n_r`, i.e., the number of nodes in each group, from the partition :math:`b`.
Parameters
----------
mb : ``iterable`` or :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
Returns
-------
n_r : :class:`numpy.ndarray`
"""
n_r = np.zeros(np.max(mb) + 1)
for block_id in mb:
n_r[block_id] += 1
n_r = np.array([int(x) for x in n_r], dtype=np.int_)
return n_r
[docs]@njit(cache=True)
def assemble_n_k_from_edgelist(edgelist, mb):
"""Get :math:`n_k`, i.e., the number :math:`n_k` of nodes of degree :math:`k`.
Parameters
----------
edgelist : :class:`numpy.ndarray`
List of edge tuples.
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
Returns
-------
n_k : :class:`numpy.ndarray`
Array of the number of nodes of degree :math:`k`.
"""
k = np.zeros(len(mb) + 1, dtype=np.int_)
for idx in range(len(edgelist)):
k[edgelist[idx][0]] += 1
k[edgelist[idx][1]] += 1
max_ = np.int_(np.max(k))
n_k = np.zeros(max_ + 1, dtype=np.int_)
for k_ in k:
n_k[k_] += 1
return n_k
[docs]def assemble_e_rs_from_mb(edgelist, mb):
"""Get :math:`e_{rs}`, i.e., the matrix of edge counts between blocks.
Parameters
----------
edgelist : :class:`numpy.ndarray`
List of edge tuples.
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
Returns
-------
e_rs : :class:`numpy.ndarray`
Edge count matrix :math:`e_{rs}`.
"""
sources, targets = zip(*edgelist)
sources = [mb[node] for node in sources]
targets = [mb[node] for node in targets]
data = np.ones(len(sources + targets), dtype=np.int_)
shape = int(np.max(mb) + 1)
e_rs = coo_matrix((data, (sources + targets, targets + sources)), shape=(shape, shape))
return e_rs.toarray()
[docs]@njit(cache=True)
def assemble_eta_rk_from_edgelist_and_mb(edgelist, mb):
"""Get :math:`\eta_{rk}`, or the number :math:`\eta_{rk}` of nodes of degree :math:`k` that belong to group :math:`r`.
Parameters
----------
edgelist : :class:`numpy.ndarray`
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
Returns
-------
eta_rk : :class:`numpy.ndarray`
"""
mb_max_ = np.int_(np.max(mb))
k = np.zeros((mb_max_ + 1, len(mb)), dtype=np.int_)
for idx in range(len(edgelist)):
k[mb[edgelist[idx][0]]][edgelist[idx][0]] += 1
k[mb[edgelist[idx][1]]][edgelist[idx][1]] += 1
max_array = np.empty(k.shape[0], dtype=np.int_)
for _k_idx in range(len(k)):
max_array[_k_idx] = np.max(k[_k_idx])
max_ = np.max(max_array)
eta_rk = np.zeros((mb_max_ + 1, max_ + 1), dtype=np.int_)
for mb_ in range(mb_max_ + 1):
for node_idx, k_ in enumerate(k[mb_]):
if mb[node_idx] == mb_:
eta_rk[mb_][k_] += 1
return eta_rk
[docs]def compute_profile_likelihood(edgelist, mb, ka=None, kb=None, k=None):
"""compute_profile_likelihood
Parameters
----------
edgelist : :class:`numpy.ndarray`
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
ka : ``int``
Number of communities in type-*a*.
kb : ``int``
Number of communities in type-*b*.
k : ``int``
Total number of communities.
Returns
-------
italic_i : ``float``
"""
# First, let's compute the m_e_rs from the edgelist and mb
m_e_rs = np.zeros((max(mb) + 1, max(mb) + 1))
for i in edgelist:
# Please do check the index convention of the edgelist
source_group = int(mb[int(i[0])])
target_group = int(mb[int(i[1])])
if ka is not None or kb is not None:
if source_group == target_group:
raise ImportError("This is not a bipartite network!")
m_e_rs[source_group][target_group] += 1
m_e_rs[target_group][source_group] += 1
# then, we compute the profile likelihood from the m_e_rs
italic_i = 0.
m_e_r = np.sum(m_e_rs, axis=1)
num_edges = m_e_r.sum() / 2.
for ind, e_val in enumerate(np.nditer(m_e_rs)):
ind_i = int(np.floor(ind / (m_e_rs.shape[0])))
ind_j = ind % (m_e_rs.shape[0])
if e_val != 0.0:
italic_i += e_val / 2. / num_edges * np.log(
e_val / m_e_r[ind_i] / m_e_r[ind_j] * 2 * num_edges
)
if ka is not None or kb is not None:
assert m_e_rs.shape[0] == ka + kb, "[ERROR] m_e_rs dimension (={}) is not equal to ka (={}) + kb (={})!".format(
m_e_rs.shape[0], ka, kb
)
elif k is not None:
assert m_e_rs.shape[0] == k, "[ERROR] m_e_rs dimension (={}) is not equal to k (={})!".format(
m_e_rs.shape[0], k
)
return italic_i
[docs]@njit(cache=True)
def compute_profile_likelihood_from_e_rs(e_rs):
"""compute_profile_likelihood_from_e_rs
Parameters
----------
e_rs : :class:`numpy.ndarray`
Returns
-------
italic_i : ``float``
"""
italic_i = 0.
e_r = np.sum(e_rs, axis=1)
num_edges = e_r.sum() / 2.
for ind, e_val in enumerate(np.nditer(e_rs)):
ind_i = np.int_(np.floor(ind / (e_rs.shape[0])))
ind_j = ind % (e_rs.shape[0])
if e_val != 0.0:
italic_i += e_val / 2. / num_edges * np.log(
e_val / e_r[ind_i] / e_r[ind_j] * 2 * num_edges
)
return italic_i
[docs]def get_desc_len_from_data(na, nb, n_edges, ka, kb, edgelist, mb, diff=False, nr=None, allow_empty=False,
degree_dl_kind="distributed", q_cache=np.array([], ndmin=2), is_bipartite=True):
"""Description length difference to a randomized instance
Parameters
----------
na : ``int``
Number of nodes in type-*a*.
nb : ``int``
Number of nodes in type-*b*.
n_edges : ``int``
Number of edges.
ka : ``int``
Number of communities in type-*a*.
kb : ``int``
Number of communities in type-*b*.
edgelist : :class:`numpy.ndarray`
Edgelist in Python list structure.
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
diff : ``bool``
When `diff == True`,
the returned description value will be the difference to that of a random bipartite network. Otherwise, it will
return the entropy (a.k.a. negative log-likelihood) associated with the current block partition.
allow_empty : ``bool``
nr : :class:`numpy.ndarray`
degree_dl_kind : str (optional, default: `"distributed"`)
1. `degree_dl_kind == "uniform"`
2. `degree_dl_kind == "distributed"` (default)
3. `degree_dl_kind == "entropy"`
is_bipartite: `bool` (default: `"True"`)
Returns
-------
desc_len_b : ``float``
Difference of the description length to the bipartite ER network, per edge.
"""
desc_len = 0.
# finally, we compute the description length
if diff: # todo: add more options to it; now, only uniform prior for P(e) is included.
italic_i = compute_profile_likelihood(edgelist, mb, ka=ka, kb=kb)
desc_len += (na * np.log(ka) + nb * np.log(kb) - n_edges * (italic_i - np.log(2))) / n_edges
x = float(ka * kb) / n_edges
desc_len += (1 + x) * np.log(1 + x) - x * np.log(x)
desc_len -= (1 + 1 / n_edges) * np.log(1 + 1 / n_edges) - (1 / n_edges) * np.log(1 / n_edges)
else:
desc_len += adjacency_entropy(edgelist, mb)
desc_len += model_entropy(n_edges, ka=ka, kb=kb, na=na, nb=nb, nr=nr, allow_empty=allow_empty,
is_bipartite=is_bipartite)
desc_len += degree_entropy(edgelist, mb, __q_cache=q_cache, degree_dl_kind=degree_dl_kind)
return desc_len.__float__()
[docs]def get_desc_len_from_data_uni(n, n_edges, k, edgelist, mb):
"""Description length difference to a randomized instance, via PRL 110, 148701 (2013).
Parameters
----------
n : ``int``
Number of nodes.
n_edges : ``int``
Number of edges.
k : ``int``
Number of communities.
edgelist : :class:`numpy.ndarray`
A list of edges.
mb : :class:`numpy.ndarray`
Partition :math:`b` of nodes into blocks.
Returns
-------
desc_len_b : ``float``
Difference of the description length to the ER network, per edge.
"""
italic_i = compute_profile_likelihood(edgelist, mb, k=k)
# finally, we compute the description length
desc_len_b = (n * np.log(k) - n_edges * italic_i) / n_edges
x = float(k * (k + 1)) / 2. / n_edges
desc_len_b += (1 + x) * np.log(1 + x) - x * np.log(x)
desc_len_b -= (1 + 1 / n_edges) * np.log(1 + 1 / n_edges) - (1 / n_edges) * np.log(1 / n_edges)
return desc_len_b
[docs]@njit(cache=True, fastmath=True)
def accept_mb_merge(mb, mlist):
"""accept_mb_merge
Accept partition merge.
Parameters
----------
mb : ``iterable`` or :class:`numpy.ndarray`
The partition to be merged.
mlist : ``iterable`` or :class:`numpy.ndarray`
The two block labels to be merged.
Returns
-------
_mb : :class:`numpy.ndarray`
The merged partition.
"""
_mb = np.zeros(mb.size, dtype=np.int_)
mlist.sort()
for _node_id, _g in enumerate(mb):
if _g == mlist[1]:
_mb[_node_id] = mlist[0]
elif _g < mlist[1]:
_mb[_node_id] = _g
else:
_mb[_node_id] = _g - 1
return _mb
# ###############
# Parallelization
# ###############
[docs]def loky_executor(max_workers, timeout, func, feeds):
assert type(feeds) is list, "[ERROR] feeds should be a Python list; here it is {}".format(str(type(feeds)))
loky_executor = get_reusable_executor(max_workers=int(max_workers), timeout=int(timeout))
results = loky_executor.map(func, feeds)
return results
# ##########################
# Requires graph-tool to run
# ##########################
[docs]def get_flat_entropies(state):
"""get_flat_entropies
Parameters
----------
state : :class:`graph_tool.inference.blockmodel.BlockState`
The stochastic block model state of a given graph, as defined in Graph-tool.
Returns
-------
dl : ``dict``
The entropic report of the partition.
"""
na = sum(state.pclabel.a == 0)
dl = dict()
dl["mdl"] = state.entropy()
dl["ka"] = len(set(state.b.a.tolist()[:na]))
dl["kb"] = len(set(state.b.a.tolist()[na:]))
dl["adjacency"] = state.entropy(adjacency=1) - state.entropy(adjacency=0)
dl["partition"] = state.entropy(partition_dl=1) - state.entropy(partition_dl=0)
dl["degree"] = state.entropy(degree_dl=1) - state.entropy(degree_dl=0)
dl["edges"] = state.entropy(edges_dl=1) - state.entropy(edges_dl=0)
return dl
[docs]def get_nested_entropies(state):
"""get_nested_entropies
Parameters
----------
state : :class:`graph_tool.inference.nested_blockmodel.NestedBlockState`
Returns
-------
dl : ``dict``
"""
na = sum(state.levels[0].pclabel.a == 0)
dl = dict()
dl["mdl"] = state.entropy()
dl["ka"] = len(set(state.levels[0].b.a.tolist()[:na]))
dl["kb"] = len(set(state.levels[0].b.a.tolist()[na:]))
dl["partition"] = state.levels[0].entropy(partition_dl=1) - state.levels[0].entropy(partition_dl=0)
dl["edges"] = state.levels[0].entropy(edges_dl=1) - state.levels[0].entropy(edges_dl=0)
dl["degree"] = state.levels[0].entropy(degree_dl=1) - state.levels[0].entropy(degree_dl=0)
dl["adjacency"] = state.levels[0].entropy(adjacency=1) - state.levels[0].entropy(adjacency=0)
multigraph_dls = []
for i in range(1, len(state.levels)):
adj = state.levels[i].entropy(adjacency=1, multigraph=1, partition_dl=0, degree_dl=0, edges_dl=0, dense=1)
partition = state.levels[i].entropy(adjacency=0, multigraph=1, partition_dl=1, degree_dl=0, edges_dl=0, dense=1)
if i != len(state.levels) - 1:
edges = 0.
else:
edges = state.levels[i].entropy(adjacency=0, multigraph=1, partition_dl=0, degree_dl=0, edges_dl=1, dense=1)
degree = state.levels[i].entropy(adjacency=0, multigraph=1, partition_dl=0, degree_dl=1, edges_dl=0, dense=1)
total = adj + partition + edges + degree
multigraph_dls += [{
"sum": total,
"adjacency": adj,
"partition": partition,
"edges": edges,
"degree": degree
}]
dl["intermediate_dls"] = multigraph_dls
dl["edge_dl_nested"] = sum(map(lambda x: x["sum"], multigraph_dls))
return dl