import os
import logging
import tempfile
import random
from collections import OrderedDict
from biSBM.utils import *
[docs]class OptimalKs(object):
"""Base class for OptimalKs.
Parameters
----------
engine : :class:`engine` (required)
The inference engine class.
edgelist : ``iterable`` or :class:`numpy.ndarray`, required
Edgelist (bipartite network) for model selection.
types : ``iterable`` or :class:`numpy.ndarray`, required
Types of each node specifying the type membership.
verbose : ``bool`` (optional, default: ``False``)
Logging level used. If ``True``, progress information will be shown.
default_args : ``bool`` (optional, default: ``True``)
Arguments for initiating the heuristic. If ``False``, we should use :func:`set_params`,
:func:`set_adaptive_ratio`, :func:`set_k_th_neighbor_to_search`, and :func:`set_nm` to set the default
parameters.
random_init_k : ``bool`` (optional, default: `False`)
bipartite_prior : ``bool`` (optional, default: ``True``)
Whether to use the bipartite prior for edge counts for the algorithm. If ``False``, we will pretend that we
do not assume a bipartite structure at the level of the :math:`e_{rs}` matrix. This is what has been used in
:func:`graph_tool.inference.minimize.minimize_blockmodel_dl`, even if its ``state_args`` is customized to be bipartite.
tempdir : ``str`` (optional, default: ``None``)
The directory entry for generated temporary network data, which will be removed immediately after the
class is deleted. If ``tempdir`` is unset or ``None``, we will search for a standard list of directories
and sets tempdir to the first one which the calling user can create files in, as :func:`tempfile.gettempdir`
dictates. We will pass the file to the inference :mod:`engines`.
"""
def __init__(self,
engine,
edgelist,
types,
verbose=False,
default_args=True,
random_init_k=False,
bipartite_prior=True,
tempdir=None):
self.engine_ = engine.engine # TODO: check that engine is an object
self.max_n_sweeps_ = engine.MAX_NUM_SWEEPS
self.is_par_ = engine.PARALLELIZATION
self.n_cores_ = engine.NUM_CORES
self.algm_name_ = engine.ALGM_NAME
self._virgin_run = True
self.bm_state = dict()
self.bm_state["types"] = types # TODO: "types" is only used to compute na and nb. Can be made more generic.
self.bm_state["n_a"] = 0
self.bm_state["n_b"] = 0
self.bm_state["n"] = 0
self.bm_state["ref_dl"] = 0
self.bm_state["e_rs"] = None
self.bm_state["mb"] = list()
for _type in types:
self.bm_state["n"] += 1
if _type in ["1", 1]:
self.bm_state["n_a"] += 1
elif _type in ["2", 2]:
self.bm_state["n_b"] += 1
self.edgelist = np.array(edgelist, dtype=np.uint64)
self.bm_state["e"] = len(self.edgelist)
self.i_0s = []
if engine.ALGM_NAME == "mcmc" and default_args:
engine.set_steps(self.bm_state["n"] * 1e5)
engine.set_await_steps(self.bm_state["n"] * 2e3)
engine.set_cooling("abrupt_cool")
engine.set_cooling_param_1(self.bm_state["n"] * 1e3)
engine.set_epsilon(1.)
if default_args:
self.bm_state["ka"] = int(self.bm_state["e"] ** 0.5 / 2)
self.bm_state["kb"] = int(self.bm_state["e"] ** 0.5 / 2)
self.i_0 = 1.
self.adaptive_ratio = 0.9 # adaptive parameter to make the "i_0" smaller, if it's overshooting.
self._k_th_nb_to_search = 2
self._nm = 10
self._c = 3.
else:
self.bm_state["ka"] = self.bm_state["kb"] = self.i_0 = \
self.adaptive_ratio = self._k_th_nb_to_search = self._nm = None
if random_init_k:
self.bm_state["ka"] = np.random.randint(1, self.bm_state["ka"] + 1)
self.bm_state["kb"] = np.random.randint(1, self.bm_state["kb"] + 1)
# Description Length (dl), e_rs matrix, and partition (mb) are book-kept
self.bookkeeping_dl = OrderedDict()
self.bookkeeping_e_rs = OrderedDict()
self.bookkeeping_mb = dict()
self.bookkeeping_mb["mcmc"] = OrderedDict()
self.bookkeeping_mb["merge"] = OrderedDict()
self.bookkeeping_mb["order"] = OrderedDict()
self.trace_k = [] # only for painter.paint_trace
# for debug/temp variables
self.tempdir = tempdir
self.__del__no_call = True
if self.is_par_:
# To prevent "TypeError: cannot serialize '_io.TextIOWrapper' object" when using loky
self.f_edgelist = tempfile.NamedTemporaryFile(mode='w+b', dir=tempdir, delete=False)
else:
self.f_edgelist = tempfile.NamedTemporaryFile(mode='w+b', dir=tempdir, delete=True)
self._f_edgelist_name = self._get_tempfile_edgelist()
# logging
if verbose:
_logging_level = "INFO"
else:
_logging_level = "WARNING"
self._logger = logging.Logger
self._set_logging_level(_logging_level)
self._summary = dict()
self._summary["algm_args"] = {}
self._summary["algm_args"]["init_ka"] = self.bm_state["ka"]
self._summary["algm_args"]["init_kb"] = self.bm_state["kb"]
self._summary["na"] = self.bm_state["n_a"]
self._summary["nb"] = self.bm_state["n_b"]
self._summary["e"] = self.bm_state["e"]
self._summary["avg_k"] = 2 * self.bm_state["e"] / (self.bm_state["n_a"] + self.bm_state["n_b"])
# look-up tables
self.__q_cache_max_e_r = self.bm_state["e"] if self.bm_state["e"] <= int(1e4) else int(1e4)
self.__q_cache = init_q_cache(self.__q_cache_max_e_r)
self.bipartite_prior_ = bipartite_prior
[docs] def minimize_bisbm_dl(self, bipartite_prior=True):
"""Fit the bipartite stochastic block model, by minimizing its description length using an agglomerative
heuristic.
Parameters
----------
bipartite_prior : ``bool`` (optional, default ``True``)
Returns
-------
OptimalKs.bookkeeping_dl : :py:class:`collections.OrderedDict`
References
----------
.. [yen-bipartite-2019] Tzu-Chi Yen and Daniel B. Larremore, "Blockmodeling a Bipartite Network with Bipartite Priors", in preparation.
"""
self.bipartite_prior_ = bipartite_prior
self._prerunning_checks()
self._compute_dl_and_update(1, 1)
if self.algm_name_ == "mcmc" and self._virgin_run:
self._natural_merge()
if self._check_if_local_minimum(self.bm_state["ka"], self.bm_state["kb"]):
self.trace_k += [("mdl", self.bm_state["ka"], self.bm_state["kb"])]
return self.bookkeeping_dl
else:
dS = 0.
while abs(dS) < self.i_0 * self.bm_state["ref_dl"]:
ka, kb = self.bm_state["ka"], self.bm_state["kb"]
self.trace_k += [("merge_or_rollback", ka, kb)]
if ka * kb != 1:
ka_, kb_, dS, mlist = self._merge_e_rs(ka, kb)
if self._determine_i_0(dS):
ka__, kb__, _ = self.summary(mode="simple")
self._logger.info(
f"Tried {(ka, kb)} ~~-> {(ka_, kb_)}, "
f"but *DL{(ka_, kb_)} deviates too much from *DL{(ka__, kb__)}, "
f"which is {dS}."
)
break
mb_ = accept_mb_merge(self.bm_state["mb"], mlist)
e_rs = assemble_e_rs_from_mb(self.edgelist, mb_)
self._update_bm_state(ka_, kb_, e_rs, mb_, record_merge=True)
self._logger.info(f"{(ka, kb)} ~~-> {(ka_, kb_)}")
else:
break
ka, kb = self.bm_state["ka"], self.bm_state["kb"]
self.trace_k += [("escape_to", ka, kb)]
self._logger.info(f"Escape the loop of agglomerative merges. Now {(ka, kb)} looks suspicious.")
return self.minimize_bisbm_dl(bipartite_prior=self.bipartite_prior_)
[docs] def summary(self, mode=None):
"""Return a summary of the algorithmic outcome.
Returns
-------
OptimalKs._summary : ``dict``
A summary of the algorithmic outcome with minimal description length (in nats).
"""
ka, kb = sorted(self.bookkeeping_dl, key=self.bookkeeping_dl.get)[0]
self._summary["mdl"] = self.bookkeeping_dl[(ka, kb)]
if mode == "simple":
return ka, kb, self._summary["mdl"]
self._summary["ka"] = ka
self._summary["kb"] = kb
self._summary["dl"] = self.summary_dl(ka, kb)
del self._summary["dl"]["dl"]
return self._summary
[docs] def summary_dl(self, ka, kb):
_summary = dict()
na, nb, e = self.bm_state["n_a"], self.bm_state["n_b"], self.bm_state["e"]
try:
mb = self.bookkeeping_mb["mcmc"][(ka, kb)]
except KeyError:
raise KeyError(f"Did you compute the partition at {(ka, kb)}?")
nr = assemble_n_r_from_mb(mb)
_summary["adjacency"] = float(adjacency_entropy(self.edgelist, mb))
_summary["partition"] = float(partition_entropy(ka=ka, kb=kb, na=na, nb=nb, nr=nr))
_summary["degree"] = float(degree_entropy(self.edgelist, mb, __q_cache=self.__q_cache))
_summary["edges"] = float(
model_entropy(e, ka=ka, kb=kb, na=na, nb=nb, nr=nr, is_bipartite=self.bipartite_prior_) -
_summary["partition"])
_summary["dl"] = sum(_summary.values())
return _summary
[docs] def compute_and_update(self, ka, kb, recompute=True):
"""Infer the partitions at a specific :math:`(K_a, K_b)` and then update the base class.
Parameters
----------
ka : ``int``
Number of type-`a` communities that we want to partition.
kb : ``int``
Number of type-`b` communities that we want to partition.
recompute : ``bool`` (optional, default: ``True``)
"""
if recompute:
self.bookkeeping_dl[(ka, kb)] = 0
self._compute_dl_and_update(ka, kb, recompute=recompute)
[docs] def compute_dl(self, ka, kb, recompute=False):
"""Execute the partitioning code by spawning child processes in the shell; saves its output afterwards.
Parameters
----------
ka : ``int``
Number of type-`a` communities that we want to partition.
kb : ``int``
Number of type-`b` communities that we want to partition.
recompute : ``bool`` (optional, default: ``False``)
TODO.
Returns
-------
dl : ``float``
The description length of the partition found.
e_rs : :class:`numpy.ndarray`
the affinity matrix via the group membership vector found by the partitioning engine
mb : ``list[int]``
group membership vector calculated by the partitioning engine
"""
# each time when you calculate/search at particular ka and kb
# the hood records relevant information for research
try:
self.bookkeeping_dl[(ka, kb)]
except KeyError as _:
pass
else:
if self.bookkeeping_dl[(ka, kb)] > 0:
_ = ka, kb
return self.bookkeeping_dl[_], self.bookkeeping_e_rs[_], self.bookkeeping_mb["mcmc"][_]
na, nb, e = self.bm_state["n_a"], self.bm_state["n_b"], self.bm_state["e"]
if ka == 1 and kb == 1:
mb = np.array([0] * na + [1] * nb, dtype=np.int_)
res = self._compute_desc_len(na, nb, e, ka, kb, mb)
return res[0], res[1], res[2]
if not recompute:
ka_ = self._summary["algm_args"]["init_ka"]
kb_ = self._summary["algm_args"]["init_kb"]
na = self._summary["na"]
nb = self._summary["nb"]
dist = np.sqrt((ka_ - ka) ** 2 + (kb_ - kb) ** 2)
if dist <= self._k_th_nb_to_search * np.sqrt(2):
self._logger.info(f"({na}, {nb}) ~~-> ({ka}, {kb}); Use that partition to start MCMC@({ka}, {kb}).")
_mb = None
else:
self._logger.info(f"({ka_}, {kb_}) ~~-> ({ka}, {kb}); Use that partition to start MCMC@({ka}, {kb}).")
_mb = self.bookkeeping_mb["mcmc"][(ka_, kb_)]
else:
_mb = None
def run(a, b):
return self.engine_(self._f_edgelist_name, na, nb, a, b, mb=_mb)
# Calculate the biSBM inference several times,
# choose the maximum likelihood (or minimum entropy) result.
results = []
if self.is_par_:
# automatically shutdown after idling for 600s
self.__del__no_call = True
results = list(loky_executor(self.n_cores_, 600, lambda x: run(ka, kb), list(range(self.max_n_sweeps_))))
self.__del__no_call = False
else:
for _ in range(self.max_n_sweeps_):
results += [run(ka, kb)]
result_ = [self._compute_desc_len(na, nb, e, ka, kb, r) for r in results]
result = min(result_, key=lambda x: x[0])
dl = result[0]
e_rs = result[1]
mb = result[2]
return dl, e_rs, mb
[docs] def natural_merge(self):
"""Phase 1 natural e_rs-block merge"""
na, nb, e = self.bm_state["n_a"], self.bm_state["n_b"], self.bm_state["e"]
def run(_):
# Note: setting (ka, kb) = (1, 1) is redundant.
return self.engine_(self._f_edgelist_name, na, nb, 1, 1, mb=None, method="natural")
results = []
if self.is_par_:
# automatically shutdown after idling for 600s
self.__del__no_call = True
results = list(loky_executor(self.n_cores_, 600, lambda x: run(0), list(range(self.max_n_sweeps_))))
self.__del__no_call = False
else:
for _ in range(self.max_n_sweeps_):
results += [run(0)]
result_ = [self._compute_desc_len(na, nb, e, r[0], r[1], r[2:]) for r in results]
result = min(result_, key=lambda x: x[0])
dl = result[0]
e_rs = result[1]
mb = result[2]
ka, kb = result[3]
return dl, e_rs, mb, ka, kb, na, nb
def _natural_merge(self):
dl, e_rs, mb, ka, kb, na, nb = self.natural_merge()
assert max(mb) + 1 == ka + kb, "[ERROR] inconsistency between mb. indexes and #blocks. {} != {}".format(
max(mb) + 1, ka + kb)
self._summary["algm_args"]["init_ka"] = ka
self._summary["algm_args"]["init_kb"] = kb
self._logger.info(f"Natural agglomerative merge {(na, nb)} ~~-> {(ka, kb)}.")
self.bookkeeping_dl[(ka, kb)] = dl
self.bookkeeping_e_rs[(ka, kb)] = e_rs
self.bookkeeping_mb["mcmc"][(ka, kb)] = mb
self._set_bookkeeping_mb_search_order(ka, kb)
self._update_bm_state(ka, kb, e_rs, mb)
self._virgin_run = False
def _determine_i_0(self, dS):
if self.i_0 < 1:
return False
i_0 = dS / self.bm_state["ref_dl"]
self.i_0s += [i_0]
iqr = np.percentile(self.i_0s, 75) - np.percentile(self.i_0s, 25)
if i_0 > self._c * iqr + np.percentile(self.i_0s, 75) >= 1e-4:
self.i_0 = i_0
self._summary["algm_args"]["i_0"] = i_0
self._logger.info(f"Determining \u0394 at {i_0}.") # \u0394 = Delta = i_0
return True
else:
return False
def _compute_desc_len(self, n_a, n_b, e, ka, kb, mb):
e_rs = assemble_e_rs_from_mb(self.edgelist, mb)
nr = assemble_n_r_from_mb(mb)
desc_len = get_desc_len_from_data(n_a, n_b, e, ka, kb, self.edgelist, mb, nr=nr, q_cache=self.__q_cache,
is_bipartite=self.bipartite_prior_)
return desc_len, e_rs, mb, (ka, kb)
def _merge_e_rs(self, ka, kb):
"""Apply multiple merges of the original affinity matrix, return the one that least alters the entropy
Parameters
----------
ka : ``int``
Number of type-a communities in the affinity matrix
kb : ``int``
Number of type-b communities in the affinity matrix
Returns
-------
_ka : ``int``
New number of type-a communities in the affinity matrix
_kb : ``int``
New number of type-b communities in the affinity matrix
dS : ``list(int, int)``
Difference of the new entropy and the old one
_mlist : ``list(int, int)``
The two row-indexes of the original affinity matrix that were finally chosen (and merged)
"""
m = np.arange(ka + kb)
mlists = set()
while len(mlists) == 0:
for _m in m:
pool = random.choices(m, k=self._nm)
_mlist = [[min(x, _m), max(x, _m)] for x in pool]
for _ in _mlist:
cond = (_[0] != _[1]) and not (_[1] >= ka > _[0]) and not (_[0] == 0 and ka == 1) and not (
_[0] == ka and kb == 1)
if cond:
mlists.add(str(_[0]) + "+" + str(_[1]))
dS, _mlist = virtual_moves_ds(self.bm_state["e_rs"], mlists, self.bm_state["ka"])
if np.max(_mlist) < self.bm_state["ka"]:
ka = self.bm_state["ka"] - 1
kb = self.bm_state["kb"]
else:
ka = self.bm_state["ka"]
kb = self.bm_state["kb"] - 1
return ka, kb, dS, _mlist
def _rollback(self):
ka, kb, dl = self.summary(mode="simple")
e_rs = self.bookkeeping_e_rs[(ka, kb)]
mb = self.bookkeeping_mb["mcmc"][(ka, kb)]
self._update_bm_state(ka, kb, e_rs, mb)
return ka, kb, e_rs, dl
def _update_bm_state(self, ka, kb, e_rs, mb, record_merge=False):
self.bm_state["ka"], self.bm_state["kb"] = ka, kb
self.bm_state["e_rs"] = e_rs
self.bm_state["mb"] = mb
if record_merge:
self.bookkeeping_mb["merge"][(ka, kb)] = mb
self._set_bookkeeping_mb_search_order(ka, kb)
def _compute_dl_and_update(self, ka, kb, recompute=False):
dl, e_rs, mb = self.compute_dl(ka, kb, recompute=recompute)
assert max(mb) + 1 == ka + kb, "[ERROR] inconsistency between mb. indexes and #blocks. {} != {}".format(
max(mb) + 1, ka + kb)
self.bookkeeping_dl[(ka, kb)] = dl
self.bookkeeping_e_rs[(ka, kb)] = e_rs
self.bookkeeping_mb["mcmc"][(ka, kb)] = mb
self._set_bookkeeping_mb_search_order(ka, kb)
self.trace_k += [("mcmc", ka, kb)]
self.bm_state["ref_dl"] = self.summary(mode="simple")[2] if self.bm_state["ref_dl"] != 0 else dl
return dl, e_rs, mb
# ###########
# Checkpoints
# ###########
def _is_mdl_so_far(self, desc_len):
"""Check if `desc_len` is the minimal value so far."""
return not any([i < desc_len for i in self.bookkeeping_dl.values()])
def _check_if_local_minimum(self, ka, kb):
"""The `neighborhood search` as described in the paper."""
self._logger.info(f"Is {(ka, kb)} a local minimum? Let's check.")
_dl, _e_rs, _mb = self._compute_dl_and_update(ka, kb)
null_dl = self.bookkeeping_dl[(1, 1)]
if _dl > null_dl:
self._logger.info("DL({}, {}) > DL(1, 1), which is {} compared to {}".format(ka, kb, _dl, null_dl))
self._logger.info(f"~~~- Keep merging -~~~")
self._update_bm_state(ka, kb, _e_rs, _mb)
return False
if _dl > self.summary(mode="simple")[2]:
ka, kb, _, _dl = self._rollback()
_ = self.adaptive_ratio
self.i_0 *= _
self._logger.info(f"Overshooting! There's already a point with lower DL. Let's reduce \u0394 by {_}.")
self._logger.info(f"Move to {(ka, kb)} and re-checking if it is a local minimum.")
nb_points = self._get_neighbor_points(ka, kb)
for _ka, _kb in nb_points:
self._compute_dl_and_update(_ka, _kb)
if not self._is_mdl_so_far(_dl):
_, _, _, mdl = self._rollback()
self._logger.info(
f"Warning. DL{(_ka, _kb)} = {mdl} < DL{(ka, kb)}. We move to {(_ka, _kb)} but NOT reduce \u0394.")
break
if _dl != self.summary(mode="simple")[2]:
self._logger.info(f"Bummer. {(ka, kb)} is NOT a local minimum.")
return False
else:
self._logger.info(f"YES! {(ka, kb)} is a local minimum with DL = {_dl}. We are done.")
return True
def _get_neighbor_points(self, ka, kb):
k_th = self._k_th_nb_to_search
nb_points = [(x + ka, y + kb) for (x, y) in product(range(-k_th, k_th + 1), repeat=2)]
# if any item has values less than 1, delete it. Also, exclude the suspected point (i.e., [ka, kb]).
na = self.bm_state["n_a"]
nb = self.bm_state["n_b"]
nb_points = [(i, j) for i, j in nb_points if na >= i >= 1 and nb >= j >= 1 and (i, j) != (ka, kb)]
_ = sorted(nb_points, key=lambda x: x[0] - ka + x[1] - kb, reverse=True)
nb_points = [_.pop(0), _.pop(-1)]
random.shuffle(_)
nb_points += _
return nb_points
def _prerunning_checks(self):
assert self.bm_state["n_a"] > 0, "[ERROR] Number of type-a nodes = 0, which is not allowed"
assert self.bm_state["n_b"] > 0, "[ERROR] Number of type-b nodes = 0, which is not allowed"
assert self.bm_state["n"] == self.bm_state["n_a"] + self.bm_state["n_b"], \
"[ERROR] num_nodes ({}) does not equal to num_nodes_a ({}) plus num_nodes_b ({})".format(
self.bm_state["n"], self.bm_state["n_a"], self.bm_state["n_b"]
)
if self.bm_state["ka"] is None or self.bm_state["kb"] is None or self.i_0 is None:
raise AttributeError("Arguments missing! Please assign `init_ka`, `init_kb`, and `i_0`.")
if self.adaptive_ratio is None:
raise AttributeError("Arguments missing! Please assign `adaptive_ratio`.")
if self._k_th_nb_to_search is None:
raise AttributeError("Arguments missing! Please assign `k_th_nb_to_search`.")
if self._nm is None:
raise AttributeError("Arguments missing! Please assign `size_rows_to_run`.")
def _set_bookkeeping_mb_search_order(self, ka, kb):
order = len(self.bookkeeping_mb["mcmc"]) + len(self.bookkeeping_mb["merge"])
try:
len(self.bookkeeping_mb["order"][(ka, kb)])
except KeyError:
self.bookkeeping_mb["order"][(ka, kb)] = [order - 1]
else:
self.bookkeeping_mb["order"][(ka, kb)] += [order - 1]
# #######################
# Set & Get of parameters
# #######################
[docs] def set_params(self, init_ka=10, init_kb=10, i_0=0.005):
"""Set the parameters for the heuristic.
Parameters
----------
init_ka : ``int`` (required, default: ``10``)
init_kb : ``int`` (required, default: ``10``)
i_0 : ``float`` (required, default: ``0.005``)
Notes
-----
TODO.
.. warning::
If :math:`i_0` is set too small, the heuristic will be slow and tends to get trapped in
a local minimum (in the description length landscape) where :math:`K_a` and :math:`K_b` are large.
"""
# params for the heuristic
self.bm_state["ka"] = int(init_ka)
self.bm_state["kb"] = int(init_kb)
self.i_0 = float(i_0)
assert 0. <= self.i_0 < 1, "[ERROR] Allowed range for i_0 is [0, 1)."
assert self.bm_state["ka"] <= self.bm_state[
"n_a"], "[ERROR] Number of type-a communities must be smaller than the # nodes in type-a."
assert self.bm_state["kb"] <= self.bm_state[
"n_b"], "[ERROR] Number of type-b communities must be smaller than the # nodes in type-b."
self._summary["algm_args"]["init_ka"] = self.bm_state["ka"]
self._summary["algm_args"]["init_kb"] = self.bm_state["kb"]
self._summary["algm_args"]["i_0"] = float(i_0)
[docs] def set_adaptive_ratio(self, adaptive_ratio=0.95):
"""Set the adaptive ratio (``float`` between 0 to 1, defaults to ``0.95``)."""
assert 0. < adaptive_ratio < 1, "[ERROR] Allowed range for adaptive_ratio is (0, 1)."
self.adaptive_ratio = float(adaptive_ratio)
[docs] def set_k_th_neighbor_to_search(self, k):
self._k_th_nb_to_search = int(k)
[docs] def set_c(self, c):
self._c = c
[docs] def set_nm(self, s=10):
"""Set the :math:`n_m` parameter (defaults to ``10``)."""
self._nm = int(s)
[docs] def get_f_edgelist_name(self):
return self._f_edgelist_name
[docs] def get__q_cache(self):
return self.__q_cache
def _set_logging_level(self, level):
_level = 0
if level.upper() == "INFO":
_level = logging.INFO
elif level.upper() == "WARNING":
_level = logging.WARNING
logging.basicConfig(
level=_level,
format="%(asctime)s:%(levelname)s:%(message)s"
)
self._logger = logging.getLogger(__name__)
def _get_tempfile_edgelist(self):
try:
self.f_edgelist.seek(0)
except AttributeError:
if self.is_par_:
delete = False
else:
delete = True
self.f_edgelist = tempfile.NamedTemporaryFile(mode='wb', dir=self.tempdir, delete=delete)
finally:
for edge in self.edgelist:
content = str(edge[0]) + "\t" + str(edge[1]) + "\n"
self.f_edgelist.write(content.encode())
self.f_edgelist.flush()
f_edgelist_name = self.f_edgelist.name
if self.is_par_:
del self.f_edgelist
return f_edgelist_name
def __del__(self):
if self.__del__no_call:
return
if self.is_par_:
os.remove(self._f_edgelist_name)