Source code for simplicial_test.Test

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# simplicial_test -- a python module to realize simplicial degree-size sequences
#
# Copyright (C) 2020-2021 Tzu-Chi Yen <tzuchi.yen@colorado.edu>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from . import validators
from .utils import *
from .custom_exceptions import NoMoreBalls, SimplicialSignal, GoToNextLevel
from copy import deepcopy

try:
    from sage.all import Combinations as combinations
except ImportError:
    from more_itertools import distinct_combinations as combinations
else:
    import sys

    sys.setrecursionlimit(100000)


[docs]class Test(SimplexRegistrar): r"""Base class for SimplicialCheck. Parameters ---------- degree_list : ``iterable`` or :class:`numpy.ndarray`, required Sequence of vertex degree distribution. size_list : ``iterable`` or :class:`numpy.ndarray`, required Sequence of facet size distribution. blocked_sets : ``list`` of integer-valued ``tuple`` objects (optional, default ``None``) Facets that must be respected for non-inclusion. verbose : ``bool`` (optional, default ``False``) If ``True``, progress information will be shown. **kwargs : keyword arguments Keyword arguments to be passed to base type constructor. Examples -------- >>> from simplicial_test import * >>> degree_list, size_list = ([2, 1, 3, 2, 1], [3, 2, 2, 2]) >>> st = Test(degree_list, size_list, verbose=False) >>> is_simplicial, facets = st.is_simplicial() >>> print(is_simplicial, facets) (True, ((0, 1, 3), (0, 2), (0, 4), (1, 2))) """ def __init__(self, degree_list, size_list, verbose=False, **kwargs): super().__init__() self._level = 0 self.s_depot = kwargs.pop("s_depot", SimplicialDepot( sorted(degree_list, reverse=True), sorted(size_list, reverse=True) )) trim_ones_first = kwargs.pop("trim_ones_first", True) self.facets_to_append = [] if trim_ones_first: self.size_list, self.degree_list, num_ones = pair_one_by_one(size_list, degree_list) for _ in range(num_ones): self.facets_to_append += [(len(self.degree_list) + _,)] else: self.degree_list = self.s_depot.degree_list self.size_list = self.s_depot.size_list self.blocked_sets = [] self.depth = kwargs.pop("depth", np.infty) self.width = kwargs.pop("width", 1e3) self.s_depot.cutoff = kwargs.pop("cutoff", 1e5) self.verbose = verbose self.current_fids = [] self.non_simplicial_signal = False if len(kwargs) > 0: raise ValueError(f"unrecognized keyword arguments: {str(list(kwargs.keys()))}")
[docs] def fids2facets(self): r"""Translate current selected facet ids to facets.""" facets = [] for _id in self.current_fids: facets += [self.id2name[_id]] return facets
[docs] @staticmethod def get_distinct_selection(size, degs, blocked_sets, level_map): r"""Select a candidate facet (of size ``size``). Parameters ---------- size : ``int`` Number of vertices to make the facet. degs : ``list`` of ``int`` Sequence of wanting/residual vertex degree distribution. blocked_sets : ``list`` of integer-valued ``tuple`` objects (optional, default ``None``) Facets that must be respected for non-inclusion. level_map : ``dict`` Returns ------- facet : ``tuple`` The candidate facet. Notes ----- Note that _ = vtx degree at original view, vid = vtx index at original view, level_map[vid] = vtx index at current view. """ # blocked_sets = [_ for _ in blocked_sets if len(_) >= size] equiv2vid = defaultdict(list) for vid, _ in enumerate(degs): if level_map[vid] != -1: key = tuple(get_indices_of_k_in_blocked_sets(blocked_sets, level_map[vid]) + [_]) equiv2vid[key] += [level_map[vid]] equiv2vid["pool"] += [key] equiv_class_pool = combinations(equiv2vid["pool"], size).__iter__() while True: facet = [] tracker = defaultdict(int) for equiv_class in next(equiv_class_pool): facet += [equiv2vid[equiv_class][tracker[equiv_class]]] # vids_same_equiv_class[tracker[equiv_class]] tracker[equiv_class] += 1 yield tuple(facet)
[docs] def sample_candidate_facet(self, size, valid_trials=None): """ Parameters ---------- size : ``int`` Number of vertices to make the facet. valid_trials Returns ------- """ if not valid_trials: self.s_depot.valid_trials[self._level - 1] = self.get_distinct_selection( size, self.s_depot.prev_d[1], self.blocked_sets, self.s_depot.level_map[self._level - 1]) counter = 0 while True: if counter >= self.width or self.non_simplicial_signal: raise NoMoreBalls try: facet = next(self.s_depot.valid_trials[self._level - 1]) # candidate_facet except RuntimeError: raise NoMoreBalls if validators.validate_issubset_blocked_sets(facet, self.blocked_sets): continue counter += 1 ind, reason = self.validate(facet) if ind: picked_facet, picked_facet_id = self.register(facet) self.current_fids += [picked_facet_id] return picked_facet else: self.non_simplicial_signal = self.s_depot.add_to_time_counter(self._level - 1)
[docs] def validate(self, facet) -> (bool, str): r"""This function must return True in order for the candidate facet to be considered. Parameters ---------- facet : ``tuple`` The candidate facet. Returns ------- token : ``(bool, str)`` A tuple of the form ``(ind, reason)``, where ``ind`` is the indicator for whether the candidate facet passes the validations and ``reason`` explains why they fail. Raises ------ NoMoreBalls : :class:`simplicial_test.custom_exceptions.NoMoreBalls` if we depleted this level and wanted to go up (i.e., from `lv` to `lv - 1`). SimplicialSignal : :class:`simplicial_test.custom_exceptions.SimplicialSignal` Signal that indicates a simplicial instance has been found. Notes ----- """ sizes = self.size_list if len(sizes) == 0: return True, "Last facet explored." passed, wanting_degs = validators.simple_validate(self.degree_list, sizes, facet) if not passed: return passed, wanting_degs # wanting_degs: Str = "short explanation of why the candidate facet fails." blocked_sets = simplify_blocked_sets(self.blocked_sets) for blocked_set in blocked_sets: if set(np.nonzero(wanting_degs)[0]).issubset(set(blocked_set)): return False, "Rejected b/c the remaining facets are doomed to fail the no-inclusion constraint." if Counter(wanting_degs)[len(sizes)] != 0: passed, (wanting_degs, sizes, collected_facets, exempt_vids) = validators.validate_reduced_seq( wanting_degs, sizes, [facet], blocked_sets, verbose=self.verbose ) if not passed: return False, "Rejected while reducing the sequences" if np.sum(wanting_degs) == np.sum(sizes) == 0: for collected_facet in collected_facets: if set(exempt_vids).issubset(collected_facet): raise SimplicialSignal([facet] + collected_facets) raise SimplicialSignal([facet] + [exempt_vids] + collected_facets) else: exempt_vids = [] collected_facets = [] filtered = filter_blocked_facets([facet] + blocked_sets, exempt_vids) blocked_sets = sort_facets(self.blocked_sets) sorted_wanting_degs = sorted(wanting_degs, reverse=True) _ = (tuple(sorted_wanting_degs), tuple(blocked_sets)) if _[1] in self.s_depot.explored[_[0]]: raise NoMoreBalls self.s_depot.explored[_[0]].add(_[1]) self.s_depot.mappers[self._level] = get_seq2seq_mapping(wanting_degs) # (mapping2shrinked, mapping2enlarged) self.s_depot.compute_level_map(self._level, self.s_depot.mappers[self._level][0]) self.s_depot.collects[self._level] = collected_facets self.s_depot.exempts[self._level] = exempt_vids self.s_depot.candidates[self._level] = [facet] blocked_sets = transform_facets(filtered, self.s_depot.mappers[self._level][0], to="l+1") raise GoToNextLevel(sorted_wanting_degs, sizes, blocked_sets)
[docs] def is_simplicial(self): if sum(self.size_list) == 0: return True, self.__mark(True, self._assemble_simplicial_facets(self.fids2facets())) if not validators.preprocess(self.degree_list, self.size_list): self.s_depot.add_to_time_counter(0) return False, self.__mark(False, tuple()) while True: self._level += 1 self._verbose_logging() self._stage_state() s = self.size_list.pop(0) if self.non_simplicial_signal: return False, self.__mark(False, tuple()) try: self.sample_candidate_facet(s, valid_trials=self.s_depot.valid_trials[self._level - 1]) except SimplicialSignal as e: return True, self.__mark(True, self._assemble_simplicial_facets(e.message)) except GoToNextLevel as e: degree_list, size_list, blocked_sets = e.message if not validators.preprocess(degree_list, size_list): self._rollback(self._level - 1) else: self.degree_list, self.size_list = degree_list, size_list self.blocked_sets = sort_facets(blocked_sets) except NoMoreBalls: if self._level == 1: return False, self.__mark(False, tuple()) if self._level > self.depth: self._rollback(self.depth) else: self._rollback(self._level - 1) else: if sum(self.size_list) == 0: return True, self.__mark(True, self._assemble_simplicial_facets(self.fids2facets()))
def _clean_valid_trials(self): for ind, _ in enumerate(self.s_depot.size_list): if ind >= self._level - 1: self.s_depot.valid_trials[ind] = None def _assemble_simplicial_facets(self, seed_facets): if self._level != 1: for lv in np.arange(self._level - 1, 0, -1): facets = transform_facets(seed_facets, self.s_depot.mappers[lv][1], to="l-1") facets = [self.s_depot.exempts[lv] + list(s) for s in facets] seed_facets = self.s_depot.candidates[lv] + facets + self.s_depot.collects[lv] return tuple(sort_callback(seed_facets) + self.facets_to_append) def __mark(self, simplicial, facets): self.s_depot.simplicial = simplicial self.s_depot.facets = sort_facets(facets) del self.s_depot.valid_trials del self.s_depot.explored return self.s_depot.facets def _rollback(self, level): if level == 0: self.non_simplicial_signal = True return self._clean_valid_trials() self.non_simplicial_signal = self.s_depot.add_to_time_counter(self._level - 1) for _ in np.arange(self._level, level, -1): self.s_depot.candidates[_] = dict() self.s_depot.exempts[_] = dict() self.s_depot.collects[_] = dict() self._level = level self.size_list = self.s_depot.prev_s[self._level] self.degree_list = self.s_depot.prev_d[self._level] self.blocked_sets = self.s_depot.prev_b[self._level] self._level -= 1 def _stage_state(self): self.s_depot.prev_s[self._level] = deepcopy(self.size_list) self.s_depot.prev_d[self._level] = deepcopy(self.degree_list) self.s_depot.prev_b[self._level] = deepcopy(self.blocked_sets) def _verbose_logging(self): if self.verbose: print(f"========== (l={self._level}) ==========\n" f"Size list: {[_ for _ in self.size_list if _ > 0]}\n" f"Degree list: {[_ for _ in self.degree_list if _ > 0]}\n" f"blocked_sets = {self.blocked_sets}\n" # f"currents = {self.s_depot.candidates}\n" # f"exempts = {self.s_depot.exempts}\n" # f"collects = {self.s_depot.collects}" )