Source code for ash_model.measures.attribute_analysis

from collections import defaultdict, Counter
from math import log, e

import numpy as np

from ash_model import ASH, NProfile
from ash_model.utils import hyperedge_aggregate_node_profile


def __entropy(labels, base=None):
    """

    :param labels:
    :param base:

    :return:

    """

    n_labels = len(labels)

    if n_labels <= 1:
        return 0

    value, counts = np.unique(labels, return_counts=True)
    probs = counts / n_labels
    n_classes = np.count_nonzero(probs)

    if n_classes <= 1:
        return 0

    ent = 0.0

    # Compute entropy
    base = e if base is None else base
    for i in probs:
        ent -= i * log(i, base)

    return ent


def __purity(labels):
    """
    purity is the relative frequency of the most common attribute value
    this returns a dictionary mapping the most common value to its purity

    :param labels:

    :return:

    """

    res = {}  # attr -> purity

    counter = Counter(labels).most_common(1)[0]
    if len(counter) == 0:
        return res
    return {counter[0]: counter[1] / len(labels)}


[docs]def hyperedge_profile_purity(h: ASH, hyperedge_id: str, tid: int) -> dict: """ Computes the purity of the hyperedge profile, i.e., the relative frequency of the most common attribute value for each attribute in the hyperedge nodes' profiles. :param h: The ASH object :param hyperedge_id: The hyperedge id :param tid: The temporal id :return: A dictionary with attribute names as keys and their purity as values Examples -------- Build a small temporal ASH from 10 Barabási–Albert graphs, annotate nodes with a categorical color, then compute the purity on one hyperedge at tid=0. >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> tid = 0 >>> he0 = next(iter(h.hyperedges(start=tid, end=tid))) >>> hyperedge_profile_purity(h, he0, tid) {'color': {'blue': 1.0}} """ nodes = h.get_hyperedge_nodes(hyperedge_id) attributes = set() attr_values = defaultdict(list) for node in nodes: profile = h.get_node_profile(node, tid) prof = profile.get_attributes() names = prof.keys() for name in names: if isinstance(profile.get_attribute(name), str): attributes.add(name) attr_values[name].append(profile.get_attribute(name)) res = {} for attribute in attributes: res[attribute] = __purity(attr_values[attribute]) # {most_common_class: purity} return res
[docs]def hyperedge_profile_entropy(h: ASH, hyperedge_id: str, tid: int) -> dict: """ Computes the entropy of the hyperedge profile, i.e., the entropy of the attribute values for each attribute in the hyperedge nodes' profiles. :param h: The ASH object :param hyperedge_id: The hyperedge id :param tid: The temporal id :return: A dictionary with attribute names as keys and their entropy as values Examples -------- Using the same dataset as above, the entropy for the selected hyperedge at tid=0: >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> tid = 0 >>> he0 = next(iter(h.hyperedges(start=tid, end=tid))) >>> hyperedge_profile_entropy(h, he0, tid) {'color': 0} """ nodes = h.get_hyperedge_nodes(hyperedge_id) attributes = defaultdict(list) for node in nodes: profile = h.get_node_profile(node, tid) for name, value in profile.get_attributes().items(): if isinstance(value, str): attributes[name].append(value) res = {} for attribute in attributes: res[attribute] = __entropy( attributes[attribute], len(set(attributes[attribute])) ) return res
[docs]def star_profile_entropy( h: ASH, node_id: int, tid: int, method: str = "aggregate" ) -> dict: """ Returns the entropy of the star profile of a node, i.e., the entropy of the attribute values for each attribute in the profiles of the nodes in the star of the given node. If method is 'aggregate', it is computed by first aggregating each hyperedge into a single profile (see hyperedge_aggregate_node_profile). Else if method is 'collapse', all the node's neighbors are considered. :param h: ASH instance :param node_id: Specify the node for which we want to calculate the star profile entropy :param tid: Specify the temporal id :param method: Specify the method to be used in calculating the star profile entropy. Options are 'aggregate' or 'collapse'. :return: A dictionary with the entropy of each attribute Examples -------- Compute the star-profile entropy for a node (aggregate method) on tid=0 with the dataset above: >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> tid = 0 >>> node0 = next(iter(h.nodes(start=tid, end=tid))) >>> star_profile_entropy(h, node0, tid, method='aggregate') {'color': np.float64(0.7706290693639405)} """ star = h.star(node_id, start=tid) if method == "aggregate": profiles = [] for hyperedge_id in star: # build aggregated profile aggr = hyperedge_aggregate_node_profile(h, hyperedge_id, tid) profiles.append(aggr) elif method == "collapse": nodes = h.neighbors(node_id, start=tid) profiles = [h.get_node_profile(n, tid) for n in set(nodes)] else: raise ValueError("method must either be 'aggregate' or 'collapse'") attributes = defaultdict(list) for profile in profiles: for name, value in profile.get_attributes().items(): if isinstance(value, str): attributes[name].append(value) res = {} for attribute in attributes: res[attribute] = __entropy( attributes[attribute], len(set(attributes[attribute])) ) return res
[docs]def star_profile_homogeneity( h: ASH, node_id: int, tid: int, method: str = "aggregate" ) -> dict: """ Returns the homogeneity of the star profile of a node, i.e., the relative frequency of the node's attribute value for each attribute in the profiles of the nodes in the star of the given node. If method is 'aggregate', it is computed by first aggregating each hyperedge into a single profile (see hyperedge_aggregate_node_profile). Else if method is 'collapse', all the node's neighbors are considered. :param h: ASH instance :param node_id: node id :param tid: temporal id :param method: Specify the method to be used in calculating the star profile homogeneity. Options are 'aggregate' or 'collapse'. :return: A dictionary with the homogeneity of each attribute Examples -------- Compute the star-profile homogeneity for a node (aggregate method) on tid=0 with the dataset above: >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> tid = 0 >>> node0 = next(iter(h.nodes(start=tid, end=tid))) >>> star_profile_homogeneity(h, node0, tid, method='aggregate') {'color': 0.7741935483870968} """ star = h.star(node_id, start=tid) if method == "aggregate": profiles = [] for hyperedge_id in star: # build aggregated profile aggr = hyperedge_aggregate_node_profile(h, hyperedge_id, tid) profiles.append(aggr) elif method == "collapse": profiles = [ h.get_node_profile(n, tid) for n in set(h.neighbors(node_id, start=tid)) ] else: raise ValueError("method must either be 'aggregate' or 'collapse'") attributes = defaultdict(list) for profile in profiles: for attr_name, value in profile.get_attributes().items(): if isinstance(value, str): attributes[attr_name].append(value) res = {} # count frequency of node_id's attribute and divide it by star size for attr_name in attributes: node_attr = h.get_node_profile(node_id, tid=tid).get_attribute(attr_name) res[attr_name] = attributes[attr_name].count(node_attr) / len(star) return res
[docs]def average_group_degree(h: ASH, tid: int, hyperedge_size: int = None) -> object: """ Computes the average degree of each group (nodes having the same label in the attribute) :param h: ASH instance :param tid: the temporal id :param hyperedge_size: Specify the size of the hyperedges :return: A dictionary with attribute names as keys and a dictionary of average degrees for each attribute value Examples -------- Average degree by color at tid=0 with the dataset above: >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> average_group_degree(h, tid=0) {'color': {'red': 4.854166666666667, 'blue': 5.0}} """ attributes = h.list_node_attributes(tid=tid, categorical=True) group_degrees = { attribute: {attribute_value: [] for attribute_value in attributes[attribute]} for attribute in attributes } for n in h.nodes(start=tid): for attr_name in attributes: deg = h.degree(n, hyperedge_size=hyperedge_size, start=tid) attr = h.get_node_attribute(n, attr_name=attr_name, tid=tid) group_degrees[attr_name][attr].append(deg) res = { attribute: { attribute_value: np.mean(group_degrees[attribute][attribute_value]) for attribute_value in attributes[attribute] } for attribute in attributes } return res
[docs]def attribute_consistency(h: ASH, node: int = None) -> dict: """ The consistency measures the extent to which a nodes' attribute value remains constant/change across time. The higher the value, the more consistent in time are the node's values for that attribute. :param h: ASH instance :param node: Specify the node for which we want to calculate the consistency :return: A dict containing, for each node, for each attribute, the consistency value Examples -------- Consistency for a specific node over time (avoid node=0 which is falsy in current implementation): >>> import numpy as np, networkx as nx >>> from ash_model.utils.networkx import from_networkx_maximal_cliques_list >>> Gs = [nx.barabasi_albert_graph(100, 3, seed=i) for i in range(10)] >>> rng = np.random.default_rng(42) >>> for G in Gs: ... for n in G.nodes(): ... G.nodes[n]['color'] = 'red' if rng.integers(0, 2) == 0 else 'blue' >>> h = from_networkx_maximal_cliques_list(Gs) >>> attribute_consistency(h, node=1) {'color': np.float64(0.1187091007693073)} Note: passing node=0 currently behaves as if no node was specified because 0 evaluates to False. """ res = defaultdict(dict) attributes = h.list_node_attributes(categorical=True) if node is not None: nodes = [node] else: nodes = h.nodes() for n in nodes: all_profiles = h.get_node_profiles_by_time(n) for attr_name in attributes: labels = [] for time, profile in all_profiles.items(): if profile.has_attribute(attr_name): label = profile.get_attribute(attr_name) labels.append(label) consist = 1 - __entropy(labels, base=len(attributes[attr_name])) res[n][attr_name] = consist if node is not None: return res[node] return res