Source code for engines.kl

import os
import re
import shutil
import subprocess
import hashlib
from collections import OrderedDict
import random
import numpy as np

[docs]class KL(object): """Base class for the Kernighan-Lin algorithm. Parameters ---------- f_engine : ``str`` (required, default: ``"engines/bipartiteSBM-KL/biSBM"``) n_sweeps : ``int`` (required, default: ``4``) Note that this will generate ``<n_sweeps>`` output sub-folders in ``<f_kl_output>``. is_parallel : ``bool`` (required, default: ``True``) n_cores : ``int`` (required, default: ``4``) algm_name : ``str`` (required, default: ``kl``) kl_edgelist_delimiter : ``str`` (required, default: ``"\\t"``) Due to the KL code accepts 1-indexed nodes by default, we used the delimiter to transform our 0-indexed input. kl_steps : ``int`` (required, default: ``5``) Number of random initializations (refer to the README_cplusplus.txt file) kl_itertimes : ``int`` (required, default: ``1``) Number of KL runs (within each ``<outputFOLDER>``) for returning an optimal result. f_kl_output : ``str`` (required, default: ``"engines/bipartiteSBM-KL/f_kl_output"``) Path to the KL output dir; recommended to be in the same folder as the binary. kl_verbose : ``bool`` (required, default: ``True``) kl_is_parallel : ``bool`` (required, default: ``False``) """ def __init__(self, f_engine="engines/bipartiteSBM-KL/biSBM", n_sweeps=1, is_parallel=False, n_cores=1, algm_name="kl", kl_edgelist_delimiter="\t", kl_steps=5, kl_itertimes=1, f_kl_output="engines/bipartiteSBM-KL/f_kl_output", kl_verbose=True, kl_is_parallel=False): self.MAX_NUM_SWEEPS = int(n_sweeps) self.PARALLELIZATION = bool(is_parallel) self.NUM_CORES = int(n_cores) self.KL_PARALLELIZATION = bool(kl_is_parallel) self.ALGM_NAME = str(algm_name) # <kl_itertimes> is the number of KL sweeps (<kl_steps> per sweep) performed before returning the optimal result # during each engine run (Note that there are <n_sweeps> engines running in parallel via loky) self.MAX_KL_NUM_SWEEPS = int(kl_itertimes) if self.KL_PARALLELIZATION: raise NotImplementedError("[ERROR] KL calculation accross many cores is not supported.") # for KL if not os.path.isfile(f_engine): raise BaseException("[ERROR] KL engine binary not found!") self.f_engine = f_engine self.kl_steps = int(kl_steps) self.f_kl_output = str(f_kl_output) self.kl_verbose = bool(kl_verbose) self.kl_edgelist_delimiter = kl_edgelist_delimiter pass
[docs] def prepare_engine(self, f_edgelist, na, nb, ka, kb, delimiter=None): """Output shell commands for graph partitioning calculation. Parameters ---------- ka : ``int`` (required) Number of communities for type-`a` nodes to partition. kb : ``int`` (required) Number of communities for type-`b` nodes to partition. Returns ------- action_str : ``str`` The command line string that enables execution of the code. """ if delimiter is None: delimiter = self.kl_edgelist_delimiter try: os.mkdir(self.f_kl_output) except OSError: pass finally: self.f_kl_output += "/" + hashlib.md5(str(random.random()).encode()).hexdigest() try: os.mkdir(self.f_kl_output) except OSError: # clear the working dir, and mkdir a new one shutil.rmtree(self.f_kl_output, ignore_errors=True) os.mkdir(self.f_kl_output) pass filename = hashlib.md5(f_edgelist.encode()).hexdigest() f_edgelist_1_indexed = self.f_kl_output + "/" + filename + "_1-indexed.edgelist" self._save_edgelist_as_1_indexed(f_edgelist, f_edgelist_1_indexed, delimiter) f_types = self.f_kl_output + "/" + filename + ".types" self.types = self._save_types(f_types, na, nb) action_list = [ self.f_engine, f_edgelist_1_indexed, f_types, self.f_kl_output, str(ka), str(kb), '1', # degree-corrected str(self.kl_steps) ] action_str = ' '.join(action_list) return action_str
[docs] def engine(self, f_edgelist, na, nb, ka, kb, mb=None): # TODO: bug when assigned verbose=False """Run the shell code. Parameters ---------- ka : ``int`` (required) Number of communities for type-`a` nodes to partition. kb : ``int`` (required) Number of communities for type-`b` nodes to partition. Returns ------- of_group : ``list[int]`` """ action_str = self.prepare_engine(f_edgelist, na, nb, ka, kb) num_sweeps_ = self.MAX_KL_NUM_SWEEPS verbose_ = self.kl_verbose parallelization_ = self.KL_PARALLELIZATION num_cores_ = self.NUM_CORES if not verbose_: stdout = open(os.devnull, "w") else: stdout = subprocess.PIPE def run(_): p = subprocess.Popen( action_str.split(' '), bufsize=2048, stdout=stdout ) out, err = p.communicate() p.wait() return out, err, p kl_output = OrderedDict() num_sweep_ = 0 while num_sweep_ < num_sweeps_: if not parallelization_: out, err, p = run("") if p.returncode == -11: # when Exception raises from the KL code raise RuntimeError("[ERROR] Exception from C++ program during inference! -- " + action_str) elif p.returncode == 0: num_sweep_ += 1 assert type(self._get_score_by_index(num_sweep_)) == float kl_output[self._get_score_by_index(num_sweep_)] = self._get_of_group_by_index(num_sweep_) else: # spawn processes across cores, collect results, and return the best option # However, in main code, we may calculate each single point in parallel, which # might raise "AssertionError: daemonic processes are not allowed to have children" # TODO: For now, parallel calculation for KL is disabled. (Fix it?) raise NotImplementedError of_group = kl_output[max(kl_output)] try: shutil.rmtree(self.f_kl_output, ignore_errors=True) finally: return np.array(of_group)
[docs] @staticmethod def gen_types(na, nb): types = [1] * int(na) + [2] * int(nb) return types
def _get_of_group_by_index(self, num_sweep_): of_group = [] f = self._open_biDCSBMcomms_file(num_sweep_) for ind, line in enumerate(f): of_group.append(int(line.split('\n')[0])) f.close() return of_group def _get_score_by_index(self, num_sweep_): f = self._get_bisbm_score_file(num_sweep_) for ind, line in enumerate(f): score = float(line.split('\n')[0]) f.close() return score def _get_bisbm_score_file(self, num_sweep_): """:return: file handle""" f = open( self.f_kl_output + '/biDCSBMcomms' + str(int(num_sweep_)) + '.score', 'r' ) return f def _open_biDCSBMcomms_file(self, num_sweep_): """:return: file handle""" f = open( self.f_kl_output + '/biDCSBMcomms' + str(int(num_sweep_)) + '.tsv', 'r' ) return f @staticmethod def _save_edgelist_as_1_indexed(f_edgelist, f_target_edgelist, delimiter="\t"): """ Note that this function always saves with delimiter. Parameters ---------- f_edgelist : ``str`` f_target_edgelist : ``str`` delimiter : ``str`` Returns ------- """ with open(f_target_edgelist, "w") as g: with open(f_edgelist, "r") as f: for line in f: line = line.replace('\r', '').replace('\n', '') edge = re.split(delimiter, line) try: g.write(str(int(edge[0]) + 1) + "\t" + str(int(edge[1]) + 1) + "\n") except ValueError as e: raise ValueError( "[ERROR] Please check if the delimiter for the edgelist file is wrong -- {}".format(e) ) @staticmethod def _save_types(f_types, na, nb): assert na > 0, "[ERROR] Number of type-a nodes = 0, which is not allowed" assert nb > 0, "[ERROR] Number of type-b nodes = 0, which is not allowed" types = [1] * int(na) + [2] * int(nb) with open(f_types, "w") as f: for line in types: f.write(str(line) + '\n') return types