#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# simplicial_test -- a python module to realize simplicial degree-size sequences
#
# Copyright (C) 2020-2021 Tzu-Chi Yen <tzuchi.yen@colorado.edu>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .utils import *
from . import validators
from .sample import get_hitting_sets
from .custom_exceptions import SimplicialSignal
from copy import deepcopy
from itertools import combinations_with_replacement, starmap, product
from collections import defaultdict
[docs]def compute_dpv(facets, n=None, is_sorted=True):
if n is None:
dpv = defaultdict(int)
else:
dpv = np.zeros([n], dtype=np.int_)
for facet in facets:
for vid in facet:
dpv[vid] += 1
if n is not None:
return dpv
if is_sorted:
return tuple(sorted(list(dpv.values()), reverse=True))
else:
_dpv = []
for _ in range(len(dpv.keys())):
_dpv += [dpv[_]]
return tuple(_dpv)
[docs]def get_relabeled_facets(facets):
equiv = defaultdict(list)
deg_list = compute_dpv(facets, is_sorted=False)
for idx, _ in enumerate(deg_list):
equiv[_] += [idx]
index = 0
old2new = dict()
for _ in sorted(equiv.items(), key=lambda kv: kv[0], reverse=True):
for _v in _[1]:
if _v not in old2new:
old2new[_v] = index
index += 1
new_facets = []
for facet in facets:
_facet = []
for vtx in facet:
_facet += [old2new[vtx]]
new_facets += [tuple(sorted(_facet))]
return sort_facets(new_facets)
[docs]def groupby_vtx_equiv_class(d_input):
res = {}
for i, v in d_input.items():
res[v] = [i] if v not in res.keys() else res[v] + [i]
return res
[docs]class EnumRegistrar(object):
def __init__(self):
self.m = 0
self.pointer = 0
self.facets_pointer = 0
self.facet_count_per_facets = defaultdict(int)
self.ns_vtx = 0
self.dfs_locator = None
self.deg_seq = self.size_seq = None
self.facet2id = dict()
self.id2facet = dict()
self.facets2id = dict()
self.id2facets = dict()
self.states = dict()
self.facet_size_per_id = np.array([], dtype=np.int_)
self.logbook = dict()
self.facet_book = dict()
self.dpv_m = defaultdict(set)
self.dpv2fid = dict()
self.dpv2fids = defaultdict(set)
self.dpv_pointer = 0
self.dpv2id = dict()
[docs] def register_facet(self, facet) -> (tuple, int):
facet = tuple(sorted(facet, reverse=False))
if facet not in self.facet2id:
self.facet2id[facet] = self.pointer
self.id2facet[self.pointer] = facet
self.pointer += 1
return facet, self.facet2id[facet]
[docs] def register_facets(self, facets):
if facets not in self.facets2id:
self.facets2id[facets] = self.facets_pointer
self.id2facets[self.facets_pointer] = facets
self.facets_pointer += 1
# print(f"registering {sorted_facets} with id={self.facets_pointer - 1}")
return facets, self.facets2id[facets]
[docs] def register_state(self, facets, previous):
"""
dpv: `degree per vertex`
vpf: `vertex id per facet`
vsc: `vertex symmetry class`
n: `number of facets`
Parameters
----------
facets
Returns
-------
"""
fid = self.facets2id[facets]
dpv = compute_dpv(facets, is_sorted=True)
# for plot use
if dpv not in self.dpv2id:
self.dpv2id[dpv] = self.dpv_pointer
self.dpv_pointer += 1
# if dpv in self.dpv_m[len(facets)]:
# return
if fid not in self.states:
vpf = self.compute_vpf(facets)
vsc = groupby_vtx_equiv_class(self.dict2tuple(vpf))
self.states[fid] = {
"m": len(facets),
"dpv": dpv,
"vpf": vpf,
"vsc": vsc,
"loc": (len(facets), self.facet_count_per_facets[len(facets)], self.ns_vtx),
"dfs": tuple(self.dfs_locator),
"p": previous
}
self.dpv_m[len(facets)].add(dpv)
self.dpv2fids[dpv].add(fid)
self.dpv2fid[dpv] = fid
self.facet_count_per_facets[len(facets)] += 1
[docs] @staticmethod
def get_created_vids(facets):
return set(flatten(facets))
[docs] def compute_vpf(self, facets) -> dict:
vpf = defaultdict(list)
for facet in facets:
for vid in facet:
vpf[vid] += [self.facet2id[tuple(facet)]]
return vpf
[docs] @staticmethod
def dict2tuple(d_input):
d_input_tupled = dict()
for d in d_input:
d_input_tupled[d] = tuple(d_input[d])
return d_input_tupled
[docs] def log_forbidden(self, name, reason_id) -> None:
self.logbook[tuple(name)] = {
"is_simplicial": False,
"reason": reason_id
}
[docs] def update_incrementally(self, facet, facets):
if self.deg_seq is not None:
if max(facet) >= len(self.deg_seq):
return
if len(facets) > 0 and max(flatten(facets)) >= len(self.deg_seq):
return
self.size_seq = np.array(self.size_seq, dtype=np.int_)
token = validators.simple_validate(
self.deg_seq, self.size_seq[len(facets):], facet,
)
if token[0] is False:
return
if len(facets) == 0:
_facets = [tuple(facet)]
previous = None
else:
_facets = deepcopy(facets)
previous = self.facets2id[tuple(_facets)]
_facets += [tuple(facet)]
relabeled_facets = get_relabeled_facets(_facets)
for facet in relabeled_facets:
self.register_facet(facet)
self.register_facets(relabeled_facets)
self.register_state(relabeled_facets, previous)
dpv = compute_dpv(relabeled_facets, is_sorted=True)
if self.deg_seq is not None and dpv == tuple(self.deg_seq):
print(f"WeCanStopSignal:{dpv} == {self.deg_seq}")
raise SimplicialSignal
[docs]class Enum(EnumRegistrar):
def __init__(self, size_seq, deg_seq=None):
super().__init__()
self.size_seq = sorted(size_seq, reverse=True)
self.m = len(self.size_seq)
self.created_vids = set()
self.deg_seq = deg_seq
pass
[docs] @staticmethod
def get_dfs_navigator(size_seq):
m = len(size_seq)
if m == 1:
return product([0])
dummy1 = np.zeros([m], dtype=np.int_)
dummy1[1] = 1
dummy2 = [_ + 1 for _ in size_seq]
dummy2[0] = 1
s_map = starmap(range, zip(dummy1, dummy2))
return product(*s_map)
[docs] def compute(self):
s = self.size_seq.pop(0)
facets = []
facet = []
for _ in range(s):
facet += [_]
self.dfs_locator = [0]
self.update_incrementally(facet, facets)
facets_count = 0
while len(self.size_seq) > 0:
facets_count += 1
next_size = self.size_seq.pop(0)
pool = self.get_fids_per_m(facets_count)
for fid in pool:
self.dfs_locator = list(self.states[fid]["dfs"])
facets = self.id2facets[fid]
for ns_vtx in range(0, next_size + 1):
self.ns_vtx = ns_vtx
self.created_vids = self.get_created_vids(facets)
self.dfs_locator += [ns_vtx]
if ns_vtx == 0: # must check with hitting set routine
self.fill_wo_creating_new_vertices(next_size, facets)
else: # freedom in choosing slots in the shielded region, must consider identical vertices
self.fill_w_creating_new_vertices(next_size, facets, ns_vtx)
self.dfs_locator.pop(-1)
[docs] @staticmethod
def get_hs_identifier(vsc, hs):
vsc_inv = {tuple(v): k for k, v in vsc.items()}
_id = []
for _hs in hs:
for k, v in vsc_inv.items():
if _hs in k:
_id += [v[0]]
return tuple(sorted(_id))
[docs] def fill_wo_creating_new_vertices(self, next_size, facets):
hs_list = get_hitting_sets(facets, self.created_vids)
vsc = self.states[self.facets2id[tuple(facets)]]["vsc"]
vsc_deepcopy = deepcopy(vsc)
if len(hs_list) == 0:
return
else:
hs_ids = dict()
for _hs_list in hs_list:
try:
hs_ids[self.get_hs_identifier(vsc, _hs_list)]
except KeyError:
hs_ids[self.get_hs_identifier(vsc, _hs_list)] = _hs_list
else:
if sum(_hs_list) < sum(hs_ids[self.get_hs_identifier(vsc, _hs_list)]):
hs_ids[self.get_hs_identifier(vsc, _hs_list)] = _hs_list
for _hs in hs_ids.values():
if len(_hs) <= next_size:
vsc = deepcopy(vsc_deepcopy)
_next_size = next_size - len(_hs)
_iter = combinations_with_replacement([_ for _ in vsc.keys() if _ not in _hs], _next_size)
for _iter_combn in _iter:
facet = deepcopy(_hs)
tracker = defaultdict(int)
for symm_class in _iter_combn:
try:
while vsc[symm_class][tracker[symm_class]] in facet:
tracker[symm_class] += 1
facet += [vsc[symm_class][tracker[symm_class]]]
tracker[symm_class] += 1
except IndexError:
break
if len(set(facet)) == next_size:
self.update_incrementally(facet, facets)
[docs] def fill_w_creating_new_vertices(self, next_size, facets, ns_vtx):
if ns_vtx == next_size:
return self.fill_with_only_ones(next_size, facets)
facet = []
for _ in range(ns_vtx):
facet += [_ + len(self.created_vids)]
facet_deepcopy = deepcopy(facet)
vsc = self.states[self.facets2id[tuple(facets)]]["vsc"]
vsc_deepcopy = deepcopy(vsc)
_next_size = next_size - ns_vtx
_iter = combinations_with_replacement(vsc.keys(), _next_size)
for _iter_combn in _iter:
facet = deepcopy(facet_deepcopy)
vsc = deepcopy(vsc_deepcopy)
tracker = defaultdict(int)
for symm_class in _iter_combn:
try:
while vsc[symm_class][tracker[symm_class]] in facet:
tracker[symm_class] += 1
facet += [vsc[symm_class][tracker[symm_class]]]
tracker[symm_class] += 1
except IndexError:
break
if len(set(facet)) == next_size:
self.update_incrementally(facet, facets)
[docs] def fill_with_only_ones(self, next_size, facets):
facet = []
for _ in range(next_size):
facet += [_ + len(self.created_vids)]
self.update_incrementally(facet, facets)
[docs] def get_fids_per_m(self, m):
pool = []
for _ in self.states:
if self.states[_]["m"] == m:
pool += [_]
return pool
[docs] def get_dpvs_per_m(self, m):
pool = set()
for _ in self.states:
if self.states[_]["m"] == m:
pool.add(self.states[_]["dpv"])
return pool
[docs] def get_fids_per_dfs_locator(self, dfs):
pool = set()
for _ in self.states:
if self.states[_]["dfs"] == dfs:
pool.add(_)
return pool
[docs] def compute_dfs(self):
try:
nav = self.get_dfs_navigator(self.size_seq)
s = self.size_seq.pop(0)
facets = []
facet = []
for _ in range(s):
facet += [_]
self.dfs_locator = tuple([0])
self.update_incrementally(facet, facets)
for _nav in nav:
_nav = list(_nav)
_nav.pop(0)
idx = 0
dfs_locator = [0]
self.dfs_locator = tuple(dfs_locator)
while len(_nav) > 0:
ns_vtx = _nav.pop(0)
next_size = self.size_seq[idx]
self.ns_vtx = ns_vtx
pool = self.get_fids_per_dfs_locator(self.dfs_locator)
dfs_locator += [ns_vtx]
self.dfs_locator = tuple(dfs_locator)
for facets_id in pool:
facets = list(self.id2facets[facets_id])
self.created_vids = self.get_created_vids(facets)
if ns_vtx == 0:
self.fill_wo_creating_new_vertices(next_size, facets)
elif ns_vtx == next_size:
self.fill_with_only_ones(next_size, facets)
else:
self.fill_w_creating_new_vertices(next_size, facets, ns_vtx)
idx += 1
except SimplicialSignal:
return
[docs] def traceback(self, p):
ps = [p]
while p is not None:
ps += [self.states[p]["p"]]
p = self.states[p]["p"]
return ps
# def get_placed_order_per_fid(self, _id):
# flen = len(self.id2facets[_id])
# codes = []
# for c in range(flen):
# fid = tuple(self.id2facets[_id][:c + 1])
# codes += [self.states[self.facets2id[fid]]["loc"][2]]
# return codes