#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# simplicial_test -- a python module to realize simplicial degree-size sequences
#
# Copyright (C) 2020-2021 Tzu-Chi Yen <tzuchi.yen@colorado.edu>
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
from dataclasses import dataclass, field
from collections import defaultdict, Counter
from functools import partial
from typing import List
from copy import deepcopy
[docs]@dataclass(frozen=False)
class SimplicialDepot:
degree_list: List
size_list: List
simplicial: bool = False
facets: tuple = ()
prev_d: dict = field(repr=False, default_factory=dict)
prev_s: dict = field(repr=False, default_factory=dict)
prev_b: dict = field(repr=False, default_factory=dict)
mappers: dict = field(repr=False, default_factory=dict)
exempts: dict = field(repr=False, default_factory=dict)
collects: dict = field(repr=False, default_factory=dict)
candidates: dict = field(repr=False, default_factory=dict)
valid_trials: dict = field(repr=False, default_factory=dict)
levels_traj: List = field(repr=False, default_factory=list)
conv_time: int = field(repr=False, default=0)
cutoff: int = field(repr=False, init=False)
def __post_init__(self):
self.explored = defaultdict(set)
self.time = np.zeros(len(self.size_list), np.int_)
self.level_map = defaultdict(partial(np.ndarray, len(self.degree_list), int))
for ind, _ in enumerate(self.degree_list):
self.level_map[0][ind] = ind
for ind, _ in enumerate(self.size_list):
self.valid_trials[ind] = None
self.level_map[1].fill(-1)
[docs] def compute_level_map(self, level, mapping2shrinked):
for _ in range(len(self.degree_list)):
vtx_current_view = self.level_map[level - 1][_]
if vtx_current_view == -1 or vtx_current_view not in mapping2shrinked:
self.level_map[level][_] = -1
else:
self.level_map[level][_] = mapping2shrinked[vtx_current_view]
[docs] def add_to_time_counter(self, level):
self.time[int(level - 1)] += 1
self.conv_time += 1
self.levels_traj += [level - 1]
if self.conv_time >= self.cutoff:
return True
else:
return False
[docs]class SimplexRegistrar(object):
def __init__(self):
self.pointer = 0
self.name2id = dict()
self.id2name = dict()
self.facet_size_per_id = np.array([], dtype=np.int_)
[docs] def register(self, name) -> (tuple, int):
name = tuple(sorted(name, reverse=True))
if name not in self.name2id:
self.name2id[name] = self.pointer
self.id2name[self.pointer] = name
self.pointer += 1
self.facet_size_per_id = np.append(self.facet_size_per_id, [len(name)])
return name, self.name2id[name]
[docs]def flatten(nested_list):
f"""flattens out nested lists."""
return [item for sublist in nested_list for item in sublist]
[docs]def simplify_blocked_sets(bsets):
r"""No more inclusive blocked sets.
Parameters
----------
bsets : ``iterable`` of integer-valued ``tuple`` objects
Facets that must be respected for non-inclusion, or "blocked sets (of facets)".
Returns
-------
simplified_bsets : ``list`` of integer-valued ``tuple`` objects
Simplified blocked sets, of which redundancy is removed.
Examples
--------
>>> bsets = ((7, 44, 109, 273), (7, 273), (44, 273), (109, 273), (2,), (4,), (44,), (56,))
>>> simplified_bsets = simplify_blocked_sets(bsets)
>>> print(simplified_bsets)
[(7, 44, 109, 273), (2,), (4,), (56,)]
"""
simplified_bsets = []
for _bsets in sorted(bsets, key=lambda _: -len(_)):
if not tuple(set(_bsets)) in simplified_bsets:
if len(simplified_bsets) > 0:
to_add = True
for _ in simplified_bsets:
if set(_bsets).issubset(set(_)):
to_add = False
break
if to_add:
simplified_bsets += [tuple(set(_bsets))]
else:
simplified_bsets += [tuple(set(_bsets))]
return simplified_bsets
[docs]def sort_facets(facets):
sorted_facets = []
for facet in facets:
sorted_facets += [tuple(sorted(facet, reverse=False))]
sorted_facets = set(sorted_facets)
return tuple(sorted(sorted_facets, key=lambda _: [-len(_)] + list(_)))
[docs]def prune_included_facets(bsets):
r"""Alias of simplify_blocked_sets."""
return simplify_blocked_sets(bsets)
[docs]def get_indices_of_k_in_blocked_sets(blocked_sets, k):
indices = []
for idx, _ in enumerate(blocked_sets):
if k in _:
indices += [idx]
return indices
[docs]def remove_ones(sizes, wanting_degs, choose_from=None):
# todo, to figure out: you cannot do [-Counter(s)[1]:]
removed_vtx_sites = np.array(list(choose_from), dtype=np.int_)[:Counter(sizes)[1]]
wanting_degs[removed_vtx_sites] = 0
return wanting_degs, removed_vtx_sites.tolist()
[docs]def pair_one_by_one(size_list, degree_list) -> (list, list, int):
"""
This function is used to pair up the ones in size/deg lists, since this is the only plausible situation.
Note that it is only applied when level=0.
Parameters
----------
size_list
degree_list
Returns
-------
"""
size_list = sorted(size_list, reverse=True)
degree_list = sorted(degree_list, reverse=True)
_ = min(Counter(size_list)[1], Counter(degree_list)[1])
for __ in range(_):
size_list.remove(1)
degree_list.remove(1)
return size_list, degree_list, _
[docs]def sort_helper(st) -> list:
d = defaultdict()
for idx, _ in enumerate(st.DEGREE_LIST):
d[idx] = _
inv_map = defaultdict(list)
for _ in d:
inv_map[d[_]] += [_]
joint_seq = st.compute_joint_seq_from_identifier(sorted_deg=False)
if len(joint_seq[0]) == 0:
raise ValueError("Invalid input instance. Perhaps execute SimplicialTest.is_simplicial() first?")
old2new = dict()
for idx, _ in enumerate(st.compute_joint_seq_from_identifier(sorted_deg=False)[1]):
old2new[idx] = inv_map[_].pop(0)
translated = []
for facet in st.identifier2facets():
translated += [sorted([old2new[_] for _ in facet])]
return translated
[docs]def sort_callback(facets):
t = []
for facet in facets:
t += [tuple(sorted(facet, reverse=False))]
return t
# def get_seq2seq_mapping(degs):
# mapping2enlarged = dict()
# nonzero_degs = np.count_nonzero(degs)
# argsort = np.argsort(degs)[::-1]
# for idx in np.arange(0, nonzero_degs, 1):
# mapping2enlarged[idx] = argsort[idx]
# mapping2shrinked = {v: k for k, v in mapping2enlarged.items()}
# return mapping2shrinked, mapping2enlarged
[docs]def get_seq2seq_mapping(degs):
old2new = dict()
for idx, _deg in enumerate(degs):
old2new[idx] = _deg
inv_old2new = defaultdict(list)
for key in old2new.keys():
if old2new[key] != 0:
inv_old2new[old2new[key]] += [key]
_idx = 0
_inv_old2new = deepcopy(inv_old2new)
keys = sorted(inv_old2new.keys(), reverse=True)
for key in keys:
for idx, _ in enumerate(inv_old2new[key]):
inv_old2new[key][idx] = _idx
_idx += 1
mapping2enlarged = dict()
for key in inv_old2new.keys():
for idx, new_key in enumerate(inv_old2new[key]):
mapping2enlarged[new_key] = _inv_old2new[key][idx]
mapping2shrinked = {v: k for k, v in mapping2enlarged.items()}
return mapping2shrinked, mapping2enlarged
[docs]def filter_blocked_facets(blocked_facets, exempt_vids):
r"""Supposedly every existing facet will block potential facets in the next level; however, this only applies if
it contains `exempt_vids` because these vertices will be shared by next-level candidates.
Parameters
----------
blocked_facets
exempt_vids
Returns
-------
"""
filtered = []
for facet in blocked_facets:
if set(exempt_vids).issubset(facet):
filtered += [facet]
return filtered
[docs]def compute_joint_seq(facets) -> (list, list):
if len(facets) == 0:
raise ValueError("Empty input facets. Are you sure the input is correct?")
flattened_facets = [item for sublist in facets for item in sublist]
n = max(flattened_facets) - min(flattened_facets) + 1
degs = np.zeros(n, dtype=np.int_)
sizes = []
for facet in facets:
sizes += [len(facet)]
for vertex_id in facet:
degs[vertex_id] += 1
return sorted(degs, reverse=True), sorted(sizes, reverse=True)
[docs]def if_facets_simplicial(facets) -> bool:
for idx1, facet1 in enumerate(sorted(facets, key=len)):
for idx2, facet2 in enumerate(sorted(facets, key=len)):
if idx2 > idx1:
if set(list(facet1)).issubset(set(list(facet2))):
return False
return True
[docs]def accel_asc(n):
"""source: http://jeromekelleher.net/generating-integer-partitions.html"""
a = [0 for i in range(n + 1)]
k = 1
y = n - 1
while k != 0:
x = a[k - 1] + 1
k -= 1
while 2 * x <= y:
a[k] = x
y -= x
k += 1
l = k + 1
while x <= y:
a[k] = x
a[l] = y
yield a[:k + 2]
x += 1
y -= 1
a[k] = x + y
y = x + y - 1
yield a[:k + 1]
[docs]def get_partition(n=1, sortby="asc"):
gen = accel_asc(n)
partitions = []
while True:
try:
partitions += [sorted(next(gen), reverse=True)]
except StopIteration:
break
if sortby == "asc":
return partitions
else:
return sorted(partitions, key=lambda x: max(x) - len(x), reverse=True)
[docs]def read_hyperedge_list(path, delimiter=","):
with open(path, 'r') as f:
edge_list = set()
for line in f:
if not line.lstrip().startswith("%"):
e = line.strip().split(delimiter)
_edge_list = list()
for _e in e:
_edge_list += [int(_e) - 1]
edge_list.add(tuple(_edge_list))
return list(edge_list)
[docs]def write_simplicial_list(el, path):
with open(path, 'w') as f:
for _el in el:
for __el in _el:
f.write(str(__el) + " ")
f.write("\n")