Source code for engines.mcmc

import os
import numpy as np
import subprocess


[docs]class MCMC(object): """Base class for the Markov chain Monte Carlo algorithm. Parameters ---------- f_engine : ``str`` (required, default: `"engines/bipartiteSBM-MCMC/bin/mcmc"`) Path to the graph partitioning binary. n_sweeps : ``int`` (required, default: `1`) Number of partitioning computations for each :math:`(K_a, K_b)` data point. is_parallel : ``bool`` (required, default: `False`) Whether to compute the partitioning in parallel. n_cores : ``int`` (required, default: `1`) The number of cores used when `is_parallel is True`. algm_name : ``str`` (required, default: `mcmc`) The name of the algorithm. mcmc_steps : ``int`` (required, default: `1e5`) Number of sweeps to perform. During each sweep, a move attempt is made for each node. mcmc_await_steps : ``int`` (required, default: `2e3`) Number of iterations to wait for a record-breaking event. The algorithm will stop if there is no record-breaking event within the interval or the overall MCMC sweeps exceed ``mcmc_steps``, whichever happens earlier. mcmc_cooling : ``str`` (required, default: `abrupt_cool`) Annealing scheme used, which can be either ``exponential``, ``logarithm``, ``linear``, ``constant``, or ``abrupt_cool``. mcmc_cooling_param_1 : ``int`` (required, default: `1e3`) Parameter 1 for the annealing. mcmc_cooling_param_2 : ``float`` (required, default: `0.1`) Parameter 2 for the annealing. mcmc_epsilon : ``float`` (required, default: `1.`) The :math:`\epsilon` parameter used in the proposal moves. """ def __init__(self, f_engine="engines/bipartiteSBM-MCMC/bin/mcmc", n_sweeps=1, is_parallel=False, n_cores=1, algm_name="mcmc", mcmc_steps=1e5, mcmc_await_steps=2e3, mcmc_cooling="abrupt_cool", mcmc_cooling_param_1=1e3, mcmc_cooling_param_2=0.1, mcmc_epsilon=1.): self.MAX_NUM_SWEEPS = int(n_sweeps) self.PARALLELIZATION = bool(is_parallel) self.NUM_CORES = int(n_cores) self.ALGM_NAME = str(algm_name) # for MCMC if not os.path.isfile(f_engine): raise BaseException("[ERROR] MCMC engine binary not found!") self.f_engine = f_engine self.mcmc_steps_ = int(mcmc_steps) self.mcmc_await_steps_ = int(mcmc_await_steps) self.mcmc_cooling_ = str(mcmc_cooling) self.mcmc_cooling_param_1 = mcmc_cooling_param_1 self.mcmc_cooling_param_2 = mcmc_cooling_param_2 self.mcmc_epsilon_ = mcmc_epsilon pass
[docs] def set_steps(self, steps): self.mcmc_steps_ = int(steps)
[docs] def set_await_steps(self, await_steps): self.mcmc_await_steps_ = int(await_steps)
[docs] def set_cooling(self, cooling): self.mcmc_cooling_ = str(cooling)
[docs] def set_cooling_param_1(self, cooling_param_1): self.mcmc_cooling_param_1 = cooling_param_1
[docs] def set_cooling_param_2(self, cooling_param_2): self.mcmc_cooling_param_2 = cooling_param_2
[docs] def set_epsilon(self, epsilon): self.mcmc_epsilon_ = epsilon
[docs] def prepare_engine(self, f_edgelist, na, nb, ka, kb, mb=None, method=None): """Output shell commands for graph partitioning calculation. Parameters ---------- ka : ``int`` (required) Number of communities for type-`a` nodes to partition. kb : ``int`` (required) Number of communities for type-`b` nodes to partition. Returns ------- action_str : ``str`` the command line string that enables execution of the code """ params_ = "" if self.mcmc_cooling_ in ["exponential", "linear", "logarithmic"]: params_ = str(self.mcmc_cooling_param_1) + " " + str(self.mcmc_cooling_param_2) elif self.mcmc_cooling_ in ["constant", "abrupt_cool"]: params_ = str(self.mcmc_cooling_param_1) if mb is None: means_ = "-g" else: means_ = "--mb" + " " + " ".join(map(str, mb)) if method == "natural": means_ = "-g -u" # n_blocks_ = " ".join( # self._constrained_sum_sample_pos(ka, na) # ) + " " + " ".join( # self._constrained_sum_sample_pos(kb, nb) # ) n_blocks_ = self._gen_init_n_blocks(na, nb, ka, kb) n_types_ = str(na) + " " + str(nb) action_list = [ self.f_engine, "-e", f_edgelist, "-n", n_blocks_, "-t", str(self.mcmc_steps_), "-x", str(self.mcmc_await_steps_), "-c", self.mcmc_cooling_, "-a", params_, "-y", n_types_, "-z", str(ka) + " " + str(kb), "-E", str(self.mcmc_epsilon_), means_ ] action_str = ' '.join(action_list) # print(action_str) return action_str
[docs] def engine(self, f_edgelist, na, nb, ka, kb, mb=None, method=None): # TODO: bug when assigned verbose=False """Run the shell code. Parameters ---------- f_edgelist : ``str`` na : ``int`` nb : ``int`` ka : ``int``, required Number of communities for type-*a* nodes to partition. kb : ``int``, required Number of communities for type-*b* nodes to partition. mb : :class:`numpy.ndarray` method : Returns ------- of_group : :class:`numpy.ndarray` """ of_group = [] action_str = self.prepare_engine(f_edgelist, na, nb, ka, kb, mb=mb, method=method) num_sweeps_ = 1 def _run_engine(_): p = subprocess.Popen( action_str.split(' '), bufsize=2048, stdout=subprocess.PIPE ) out, err = p.communicate() p.wait() return out, err, p num_sweep_ = 0 while num_sweep_ < num_sweeps_: out, err, p = _run_engine("") if p.returncode == -11: # when Exception raises from the mcmc code raise RuntimeError("Exception from C++ program during inference! -- " + action_str) elif p.returncode == 0: num_sweep_ += 1 of_group = out.replace(b' \n', b'').split(b' ') # Note the space before the line break of_group = list(map(int, of_group)) return np.array(of_group, dtype=np.int_)
@staticmethod def _gen_init_n_blocks(na, nb, ka, kb): num_nodes_a = np.arange(na) n_blocks_a = map(len, np.array_split(num_nodes_a, ka)) num_nodes_b = np.arange(nb) n_blocks_b = map(len, np.array_split(num_nodes_b, kb)) n_blocks_ = " ".join(map(str, n_blocks_a)) + " " + " ".join(map(str, n_blocks_b)) return n_blocks_ @staticmethod def _constrained_sum_sample_pos(n, total): # in this setting, there will be no empty groups generated by this function n = int(n) total = int(total) normalized_list = [int(total) + 1] while sum(normalized_list) > total and np.greater_equal(normalized_list, np.zeros(n)).all(): indicator = True while indicator: normalized_list = list(map(round, map(lambda x: x * total, np.random.dirichlet(np.ones(n), 1).tolist()[0]))) normalized_list = list(map(int, normalized_list)) indicator = len(normalized_list) - np.count_nonzero(normalized_list) != 0 sum_ = 0 for ind, q in enumerate(normalized_list): if ind < len(normalized_list) - 1: sum_ += q # TODO: there is a bug here; sometimes it assigns -1 to the end of the array, but pass the while condition normalized_list[len(normalized_list) - 1] = abs(total - sum_) assert sum(normalized_list) == total, "ERROR: the constrainedSumSamplePos-sampled list does not sum to #edges." return map(str, normalized_list)
[docs] @staticmethod def gen_types(na, nb): types = [1] * int(na) + [2] * int(nb) return types