Source code for ash_model.paths.walks

from collections import defaultdict, Counter
from itertools import combinations
from typing import Iterator, List, Dict, Optional, Union, Tuple

import networkx as nx

from ash_model import ASH


[docs]def all_simple_paths( h: ASH, s: int, hyperedge_a: Optional[str] = None, hyperedge_b: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, cutoff: Optional[int] = None, ) -> Iterator[List[str]]: """ Generate all simple, s-overlap-valid paths in a hypergraph's line graph. A simple path is one with no repeated hyperedges, and additionally, the sequence must satisfy the s-overlap condition in the original hypergraph. Internally, this projects the hypergraph into its s-line-graph and then yields each NetworkX simple path that also passes the is_s_path filter. :param h: ASH hypergraph instance. :param s: Minimum overlap size between consecutive hyperedges. :param hyperedge_a: Identifier of the starting hyperedge (optional). If None, paths may start from any node. :param hyperedge_b: Identifier of the ending hyperedge (optional). If None, paths may end at any node. :param start: Lower time bound for hyperedges (inclusive). :param end: Upper time bound for hyperedges (inclusive). :param cutoff: Maximum path length (number of nodes) to explore. :return: Iterator over valid paths, each as a list of hyperedge IDs. """ # Build the s-overlap line graph lg = h.s_line_graph(s, start, end) # If specific endpoints are requested, ensure they exist if hyperedge_a and hyperedge_a not in lg: return iter([]) if hyperedge_b and hyperedge_b not in lg: return iter([]) # Yield only the simple paths that satisfy the s-path condition for path in nx.all_simple_paths( lg, source=hyperedge_a, target=hyperedge_b, cutoff=cutoff ): if is_s_path(h, path): yield path
[docs]def shortest_s_path( h: ASH, s: int, hyperedge_a: str, hyperedge_b: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, cutoff: Optional[int] = None, ) -> List[List[str]]: """ Return all shortest simple, s-overlap-valid paths between two hyperedges. If hyperedge_b is None, this returns the shortest paths from A to all others. :param h: ASH hypergraph instance. :param s: Minimum overlap size. :param hyperedge_a: Starting hyperedge ID. :param hyperedge_b: Ending hyperedge ID (optional). :param start: Start time bound (inclusive). :param end: End time bound (inclusive). :param cutoff: Maximum path length. :return: List of shortest paths, each as list of IDs. """ paths = list(all_simple_paths(h, s, hyperedge_a, hyperedge_b, start, end, cutoff)) if not paths: return [] min_len = min(len(p) for p in paths) return [p for p in paths if len(p) == min_len]
[docs]def all_shortest_s_paths( h: ASH, s: int, hyperedge_a: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, cutoff: Optional[int] = None, ) -> Dict[Tuple[str, str], List[List[str]]]: """ Compute shortest s-paths for all hyperedge pairs (or from a source). When hyperedge_a is given, returns shortest paths from that source to all other hyperedges. Otherwise, returns shortest paths for every unordered pair, keyed both ways for convenience. :param h: ASH hypergraph instance. :param s: Minimum overlap size. :param hyperedge_a: Source hyperedge ID (optional). :param start: Start time bound (inclusive). :param end: End time bound (inclusive). :param cutoff: Maximum path length. :return: Dict mapping (u,v) to list of shortest paths. """ result: Dict[Tuple[str, str], List[List[str]]] = defaultdict(list) hyperedges = list(h.hyperedges(start, end)) if hyperedge_a: # From one source to all others for he in hyperedges: if he == hyperedge_a: continue paths = list(all_simple_paths(h, s, hyperedge_a, he, start, end, cutoff)) if not paths: continue min_len = min(len(p) for p in paths) result[(hyperedge_a, he)] = [p for p in paths if len(p) == min_len] else: # Pairwise between all for i, u in enumerate(hyperedges): for v in hyperedges[i + 1 :]: paths = list(all_simple_paths(h, s, u, v, start, end, cutoff)) if not paths: continue min_len = min(len(p) for p in paths) best = [p for p in paths if len(p) == min_len] result[(u, v)] = best result[(v, u)] = [list(reversed(p)) for p in best] return result
[docs]def all_shortest_s_path_lengths( h: ASH, s: int, hyperedge_a: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, cutoff: Optional[int] = None, ) -> Dict[str, Dict[str, int]]: """ Compute lengths of shortest s-overlap paths between hyperedges. When hyperedge_a is provided, returns dict lengths from source to others; otherwise returns full matrix mapping each hyperedge to every other. :param h: ASH hypergraph instance. :param s: Minimum overlap size. :param hyperedge_a: Source hyperedge ID (optional). :param start: Start time bound (inclusive). :param end: End time bound (inclusive). :param cutoff: Maximum path length. :return: Nested dict of distances (number of edges). """ result: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) hyperedges = list(h.hyperedges(start, end)) if hyperedge_a: # Distances from source for he in hyperedges: if he == hyperedge_a: result[hyperedge_a][he] = 0 continue paths = list(all_simple_paths(h, s, hyperedge_a, he, start, end, cutoff)) if paths: length = min(len(p) for p in paths) - 1 result[hyperedge_a][he] = length result[he][hyperedge_a] = length else: # Full pairwise for i, u in enumerate(hyperedges): result[u][u] = 0 for v in hyperedges[i + 1 :]: paths = list(all_simple_paths(h, s, u, v, start, end, cutoff)) if paths: length = min(len(p) for p in paths) - 1 result[u][v] = length result[v][u] = length return result
[docs]def all_shortest_s_walks( h: ASH, s: int, hyperedge_a: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, weight: bool = False, edge: bool = True, ) -> Union[List[str], Dict[str, List[str]]]: """ Retrieve the shortest s-walk(s) in the hypergraph or its dual. Delegates to shortest_s_walk. :param h: ASH instance. :param s: Minimum overlap size. :param hyperedge_a: Source hyperedge ID (optional). :param start: Start time bound. :param end: End time bound. :return: Single path (list) if two endpoints specified, or dict of {target: path} if one endpoint, or nested dict if neither. """ return shortest_s_walk(h, s, hyperedge_a, None, start, end, weight, edge)
[docs]def all_shortest_s_walk_lengths( h: ASH, s: int, hyperedge_a: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, ) -> Dict[str, Dict[str, int]]: """ Compute lengths of shortest s-walks between hyperedges. :param h: ASH instance. :param s: Minimum overlap size. :param hyperedge_a: Source hyperedge ID (optional). :param start: Start time bound. :param end: End time bound. :return: Nested dict mapping hyperedge to target to length. """ walks = all_shortest_s_walks(h, s, hyperedge_a, start, end) lengths: Dict[str, Dict[str, int]] = {} # walks may be dict or list if isinstance(walks, dict): for tgt, path in walks.items(): lengths.setdefault(hyperedge_a, {})[tgt] = len(path) lengths.setdefault(tgt, {})[hyperedge_a] = len(path) return lengths
[docs]def shortest_s_walk( h: ASH, s: int, fr: Optional[str] = None, to: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, weight: bool = False, edge: bool = True, ) -> Union[List[str], Dict[str, List[str]]]: """ Compute shortest s-walk(s) considering weight or dual hypergraph. If edge=True, operates on s-line-graph; otherwise uses dual hypergraph node graph. :param h: ASH instance. :param s: Minimum overlap size. :param fr: Starting hyperedge ID (node or edge depending). :param to: Ending hyperedge ID. :param start: Start time bound. :param end: End time bound. :param weight: Use edge weight attribute 'w' if True. :param edge: If False, compute on dual hypergraph. :return: Single path list if both fr and to set, else dict mapping. """ # Build appropriate graph if edge: g = h.s_line_graph(s, start, end) # Guards for explicit endpoints if (fr is not None and fr not in g) or (to is not None and to not in g): return [] if (fr is not None and to is not None) else {} # Case 1: specific source and target -> single path as list if fr is not None and to is not None: return nx.shortest_path( g, source=fr, target=to, weight="w" if weight else None ) # Case 2: specific source only -> dict {target: path} if fr is not None: if weight: return nx.single_source_dijkstra_path(g, fr, weight="w") return nx.single_source_shortest_path(g, fr) # Case 3: no endpoints -> dict {source: {target: path}} if weight: pairs_iter = nx.all_pairs_dijkstra_path(g, weight="w") else: pairs_iter = nx.all_pairs_shortest_path(g) # Materialize iterator (NetworkX >=3.3 returns iterator for all-pairs) return {src: tgt_paths for src, tgt_paths in pairs_iter} else: # Dual graph handling h1, node_to_eid = h.dual_hypergraph(start=start, end=end) eid_to_node = {eid: node for node, eid in node_to_eid.items()} # map fr/to to dual nodes src = node_to_eid.get(fr, fr) dst = node_to_eid.get(to, to) raw = shortest_s_walk(h1, s, src, dst, start, end, weight, edge=True) # remap back if isinstance(raw, dict): return { eid_to_node[k]: [eid_to_node[n] for n in path] for k, path in raw.items() } return [eid_to_node[n] for n in raw]
[docs]def closed_s_walk( h: ASH, s: int, hyperedge_a: str, start: Optional[int] = None, end: Optional[int] = None, ) -> List[List[str]]: """ Find all simple cycles (basis) in the s-line-graph containing a given node. :param h: ASH instance. :param s: Minimum overlap size. :param hyperedge_a: Node ID in s-line-graph to find cycles through. :param start: Start time bound. :param end: End time bound. :return: List of cycles, each as list of hyperedge IDs. """ g = h.s_line_graph(s, start, end) if hyperedge_a not in g: return [] return nx.cycle_basis(g, hyperedge_a)
[docs]def s_distance( h: ASH, s: int, fr: Optional[str] = None, to: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, weight: bool = False, edge: bool = True, ) -> Union[int, Dict[str, Dict[str, int]], None]: """ Compute shortest-path distances in s-line-graph or dual hypergraph. If edge=True, uses s-line-graph; otherwise uses dual hypergraph. :param h: ASH instance. :param s: Minimum overlap size. :param fr: Source ID. :param to: Target ID. :param start: Start time bound. :param end: End time bound. :param weight: Use edge weight attribute 'w' if True. :param edge: If False, use dual hypergraph distances. :return: Single distance, nested dict of distances, or None if unreachable. """ if edge: G = h.s_line_graph(s, start, end) # guard: if either endpoint is missing, bail if (fr and fr not in G) or (to and to not in G): return None # pick the right algorithm if fr and to: return nx.shortest_path_length( G, source=fr, target=to, weight="w" if weight else None ) elif fr: # distances from fr to all reachable nodes return nx.single_source_dijkstra_path_length( G, fr, weight="w" if weight else None ) else: # distances from fr to all reachable nodes return { fr: nx.single_source_dijkstra_path_length( G, fr, weight="w" if weight else None ) for fr in G.nodes() } else: # build dual and invert its node‐to‐eid map H_dual, node_to_eid = h.dual_hypergraph( start=start, end=end ) # node represent hyperedges eid_to_node = { eid: node for node, eid in node_to_eid.items() } # maps projected hyperedges to real nodes # while node_to_eid maps real nodes to projected hyperedges # translate your fr/to into the dual graph’s hyperedges src = node_to_eid.get(fr, fr) dst = node_to_eid.get(to, to) # compute distances on the dual graph dual_res = s_distance( H_dual, s, fr=src, to=dst, start=start, end=end, edge=True, weight=weight ) # raise ValueError(f"{dual_res}, {src}, {dst}") if dual_res is None: return None # if it’s a single integer, map back and return if isinstance(dual_res, int): return dual_res # if it's a nested dict, map it back if isinstance(dual_res, dict) and any( isinstance(v, dict) for v in dual_res.values() ): remapped: Dict[str, Dict[str, int]] = {} for dual_src, targets in dual_res.items(): src = eid_to_node.get(dual_src, dual_src) remapped[src] = {} for dual_trg, dist in targets.items(): trg = eid_to_node.get(dual_trg, dual_trg) remapped[src][trg] = dist return remapped # else it must be a normal dict remapped: Dict[str, int] = {} for dual_trg, dist in dual_res.items(): # remap the target hyperedge ID trg = eid_to_node.get(dual_trg, dual_trg) # remap the source hyperedge IDs remapped[trg] = dist print(f"remapped: {remapped}") return remapped
[docs]def average_s_distance( h: ASH, s: int, start: Optional[int] = None, end: Optional[int] = None, weight: bool = False, edge: bool = True, ) -> float: """ Compute the average shortest-path length in the s-line-graph. :param h: ASH instance. :param s: Minimum overlap size. :param start: Start time bound. :param end: End time bound. :param weight: Use edge weight attribute 'w' if True (unused here). :param edge: (Unused) Included for API consistency. :return: Average distance across all connected pairs. """ if not edge: h, _ = h.dual_hypergraph(start=start, end=end) g = h.s_line_graph(s, start, end) if weight: return nx.average_shortest_path_length(g, weight="w") return nx.average_shortest_path_length(g)
[docs]def has_s_walk( h: ASH, s: int, fr: Optional[str] = None, to: Optional[str] = None, start: Optional[int] = None, end: Optional[int] = None, edge: bool = True, ) -> bool: """ Determine if an s-overlap walk exists between two hyperedges. :param h: ASH instance. :param s: Minimum overlap size. :param fr: Source hyperedge ID. :param to: Target hyperedge ID. :param start: Start time bound. :param end: End time bound. :param edge: If False, evaluate on dual hypergraph. :return: True if a path exists, False otherwise. """ if edge: g = h.s_line_graph(s, start, end) if fr and fr not in g or to and to not in g: return False return nx.has_path(g, fr, to) else: h1, node_to_eid = h.dual_hypergraph(start=start, end=end) src = node_to_eid.get(fr, fr) dst = node_to_eid.get(to, to) return has_s_walk(h1, s, src, dst, start, end, edge=True)
[docs]def s_diameter( h: ASH, s: int, start: Optional[int] = None, end: Optional[int] = None, weight: bool = False, edge: bool = True, ) -> int: """ Compute the diameter (longest shortest-path) of the s-line-graph. :param h: ASH instance. :param s: Minimum overlap size. :param start: Start time bound. :param end: End time bound. :param weight: Use weighted distances if True. :param edge: If False, compute on dual hypergraph. :return: Integer diameter of the graph (0 if empty). """ if not edge: h, _ = h.dual_hypergraph(start=start, end=end) g = h.s_line_graph(s, start, end) # lcc comps = list(nx.connected_components(g)) lcc = max(comps, key=len) if comps else set() lcc = g.subgraph(lcc) diam = nx.diameter(lcc, weight="w" if weight else None) return diam if diam is not None else 0
[docs]def s_components( h: ASH, s: int, start: Optional[int] = None, end: Optional[int] = None, edge: bool = True, ) -> Iterator[set]: """ Yield connected components of the s-line-graph or its dual. :param h: ASH instance. :param s: Minimum overlap size. :param start: Start time bound. :param end: End time bound. :param edge: If False, yields on dual hypergraph components. :return: Iterator of sets of hyperedge IDs per component. """ if edge: g = h.s_line_graph(s, start, end) yield from nx.connected_components(g) else: h1, node_to_eid = h.dual_hypergraph(start=start, end=end) translate = {eid: node for node, eid in node_to_eid.items()} # iterate over components in the dual graph for comp in s_components(h1, s, start, end, edge=True): yield {translate[eid] for eid in comp}
[docs]def is_s_path(h: ASH, walk: List[str]) -> bool: """Validate that a hyperedge sequence is a simple s-path. A simple s-path here means: * No hyperedge appears more than once in the sequence. * No node participates in overlaps between more than two hyperedges (prevents reuse of the same node in non-consecutive steps). :param h: ASH instance :param walk: List of hyperedge IDs forming the candidate path :return: True if valid s-path, False otherwise """ # Check for repeated hyperedges if any(count > 1 for count in Counter(walk).values()): return False # Collect all node overlaps between any two hyperedges in the walk node_counts: Counter = Counter() for u, v in combinations(walk, 2): nodes_u = set(h.get_hyperedge_nodes(u)) nodes_v = set(h.get_hyperedge_nodes(v)) node_counts.update(nodes_u & nodes_v) # If any node appears in more than one overlap, path is not simple if any(count > 1 for count in node_counts.values()): return False return True