Source code for ash_model.measures.hyper_conformity

from ash_model.paths.walks import all_shortest_s_walk_lengths, s_components
from ash_model.utils import hyperedge_most_frequent_node_attribute_value
from ash_model import ASH

import networkx as nx
from itertools import combinations
from collections import defaultdict
import tqdm


def __label_frequency(
    g: nx.Graph, u: object, nodes: list, labels: list, hierarchies: dict = None
) -> float:
    """
    Compute the similarity of node profiles

    :param g: a networkx Graph object
    :param u: node id
    :param labels: list of node categorical labels
    :param hierarchies: dict of labels hierarchies

    :return: node profiles similarity score in [-1, 1]
    """
    s = 1
    for label in labels:
        if label not in g.nodes[u]:
            continue
        a_u = g.nodes[u][label]

        # set of nodes at given distance
        sgn = {}
        for v in nodes:
            # indicator function that exploits label hierarchical structure

            if not g.has_node(v) or label not in g.nodes[v]:
                continue

            sgn[v] = (
                1
                if a_u == g.nodes[v][label]
                else __distance(label, a_u, g.nodes[v][label], hierarchies)
            )
            v_neigh = list(g.neighbors(v))
            if len(v_neigh) == 0:
                continue
            # compute the frequency for the given node at distance n over neighbors label
            f_label = len(
                [x for x in v_neigh if g.nodes[x][label] == g.nodes[v][label]]
            ) / len(v_neigh)
            f_label = f_label if f_label > 0 else 1
            sgn[v] *= f_label
        s *= sum(sgn.values()) / len(nodes)

    return s


def __distance(label: str, v1: str, v2: str, hierarchies: dict = None) -> float:
    """
    Compute the distance of two labels in a plain hierarchy

    :param label: label name
    :param v1: first label value
    :param v2: second label value
    :param hierarchies: labels hierarchies
    """
    if hierarchies is None or label not in hierarchies:
        return -1

    return -abs(hierarchies[label][v1] - hierarchies[label][v2]) / (
        len(hierarchies[label]) - 1
    )


def __normalize(u: object, scores: list, max_dist: int, alphas: list) -> list:
    """
    Normalize the computed scores in [-1, 1]

    :param u: node
    :param scores: datastructure containing the computed scores for u
    :param alphas: list of damping factor

    :return: scores updated
    """
    for alpha in alphas:
        norm = sum([(d**-alpha) for d in range(1, max_dist + 1)])

        for profile in scores[str(alpha)]:
            if u in scores[str(alpha)][profile]:
                scores[str(alpha)][profile][u] /= norm

    return scores


[docs]def hyper_conformity( h: ASH, alphas: list, labels: list, s: int = 1, profile_size: int = 1, hierarchies: dict = None, tid: int = None, ) -> dict: """ Compute the Attribute-Profile Conformity for the considered graph :param h: :param alphas: list of damping factors :param labels: list of node categorical labels :param s: :param profile_size: :param hierarchies: label hierarchies :param tid: :return: conformity value for each node in [-1, 1] Examples -------- Build the dataset described in the docs and compute hyper-conformity for label 'color' at tid=0: >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> res = hyper_conformity(h, alphas=[1], labels=['color'], s=1, profile_size=1, tid=0) >>> len(res) 1 >>> ap = list(res[0].keys())[0] >>> feat = list(res[0][ap].keys())[0] >>> sorted(list(res[0][ap][feat].items()))[:3] [('e1', 0.6287411100578744), ('e10', 0.5843765336901121), ('e100', 0.6358755716986335)] """ full_res = [] for comp in s_components(h, s): b, he_map = h.induced_hypergraph(comp) g = b.s_line_graph(s=s, start=tid, end=tid) for cmp in nx.connected_components(g): g1 = nx.subgraph(g, cmp).copy() if profile_size > len(labels): raise ValueError("profile_size must be <= len(labels)") if len(alphas) < 1 or len(labels) < 1: raise ValueError( "At list one value must be specified for both alphas and labels" ) profiles = [] for i in range(1, profile_size + 1): profiles.extend(combinations(labels, i)) # Attribute value frequency labels_value_frequency = defaultdict(lambda: defaultdict(int)) # hyperedge most frequent label for he in b.hyperedges(): for label in labels: v = list( hyperedge_most_frequent_node_attribute_value( b, he, label, tid ).keys() ) if len(v) == 0: continue v = v[0] labels_value_frequency[label][v] += 1 # annotate the line graph g1.add_node(he, **{label: v}) # Normalization df = defaultdict(lambda: defaultdict(int)) for k, v in labels_value_frequency.items(): tot = 0 for p, c in v.items(): tot += c for p, c in v.items(): df[k][p] = c / tot res = { str(a): { "_".join(profile): {n: 0 for n in g1.nodes()} for profile in profiles } for a in alphas } for u in tqdm.tqdm(g1.nodes()): sp = dict(all_shortest_s_walk_lengths(b, s, u)) dist_to_nodes = defaultdict(list) for _, nodelist in sp.items(): for node, dist in nodelist.items(): dist_to_nodes[dist].append(node) sp = dist_to_nodes for dist, nodes in sp.items(): if dist != 0: for profile in profiles: sim = __label_frequency( g1, u, nodes, list(profile), hierarchies ) for alpha in alphas: partial = sim / (dist**alpha) p_name = "_".join(profile) if u in res[str(alpha)][p_name]: res[str(alpha)][p_name][u] += partial res = __normalize(u, res, max(sp.keys()), alphas) # remap for ap, conf_dict in res.items(): for feat, node_dict in conf_dict.items(): node_dict = { he_map[n]: v for n, v in node_dict.items() if n in he_map } res[ap][feat] = node_dict full_res.append(res) return full_res