Source code for littleballoffur.exploration_sampling.spikyballsampler

import copy
import itertools
import numpy as np
import networkx as nx
import networkit as nk
from typing import Union
from collections import deque
from littleballoffur.sampler import Sampler
from littleballoffur.helpers import Edge


NKGraph = type(nk.graph.Graph())
NXGraph = nx.classes.graph.Graph


[docs]class SpikyBallSampler(Sampler): r"""An implementation of spiky ball sampling. The procedure is a filtered breadth-first search sampling method where the expansion is is performed over a random subset of neighbors. `"For details about the algorithm see this paper." <https://www.mdpi.com/1999-4893/13/11/275>`_ Args: number_of_nodes (int): Number of sampled nodes. Default is 100. sampling_probability (float): Edge sampling probability. Default is 0.1. initial_nodes_ratio (float): Initial ratio of sampled nodes. Default is 0.1. seed (int): Random seed. Default is 42. max_hops (int): Number of hops. Default is 100000. mode (str): Sampling procedure, one of: (:obj:`"edgeball"`, :obj:`"hubball"`, :obj:`"coreball"`, :obj:`"fireball"`, :obj:`"firecoreball"`). Default is 'fireball'. max_visited_nodes_backlog (int): Maximal number of nodes in restart queue. Default is 100. restart_hop_size (int): Mimimal number of nodes to pop from restart queue. Default is 10. distrib_coeff (float): Proposal distribution power coefficient. Default is 1.0. """ def __init__( self, number_of_nodes: int = 100, sampling_probability: float = 0.2, initial_nodes_ratio: float = 0.1, seed: int = 42, max_hops: int = 100000, mode: str = "fireball", max_visited_nodes_backlog: int = 100, restart_hop_size: int = 10, distrib_coeff: float = 1.0, ): self.number_of_nodes = number_of_nodes self.sampling_probability = sampling_probability self.initial_nodes_ratio = initial_nodes_ratio self.max_hops = max_hops self.seed = seed self.mode = mode self.max_visited_nodes_backlog = max_visited_nodes_backlog self.restart_hop_size = restart_hop_size self.distrib_coeff = distrib_coeff self._set_modes() self._set_seed() def _set_modes(self): self._mode_computations = { "edgeball": {"source": False, "target": False}, "hubball": {"source": True, "target": False}, "coreball": {"source": False, "target": True}, "fireball": {"source": True, "target": False}, "firecoreball": {"source": True, "target": True}, } def _get_edge_weight(self, u, v): if self._is_weighted_graph: weight = self.backend.get_edge_weight(self._graph, u, v) else: weight = 1.0 return weight def _create_node_sets(self): """ Create a starting set of nodes. """ self._sampled_nodes = set() num_nodes = self.backend.get_number_of_nodes(self._graph) self._set_of_nodes = set(range(num_nodes)) num_initial_nodes = max(int(self.number_of_nodes * self.initial_nodes_ratio), 1) self._seed_nodes = set( np.random.choice(list(self._set_of_nodes), num_initial_nodes, replace=False) ) self._visited_nodes = deque(maxlen=self.max_visited_nodes_backlog) self._sampled_nodes.update(self._seed_nodes) def _get_degree(self, edge_list, selector): return { k: sum(map(lambda x: x.weight, g)) for k, g in itertools.groupby(sorted(edge_list, key=selector), selector) } def _get_new_edges(self, nodes): edge_list = [] for node in nodes: new_neighbors = set( self.backend.get_neighbors(self._graph, node) ).difference(self._sampled_nodes) for e in new_neighbors: edge_list.append(Edge(node, e, self._get_edge_weight(node, e))) if self._mode_computations[self.mode]["source"]: source_degree = self._get_degree(edge_list, lambda x: x.source) else: source_degree = {} if self._mode_computations[self.mode]["target"]: target_degree = self._get_degree(edge_list, lambda x: x.target) else: target_degree = {} for e in edge_list: e.source_degree = source_degree.get(e.source, 1.0) e.target_degree = target_degree.get(e.target, 1.0) edges_data = { "raw": edge_list, "weight": list(map(lambda x: x.weight, edge_list)), "source_degree": list(map(lambda x: x.source_degree, edge_list)), "target_degree": list(map(lambda x: x.target_degree, edge_list)), } return edges_data def _get_probability_density_generic( self, edges_data, source_coef, weight_coef, target_coef ): if self._mode_computations[self.mode]["source"]: p = np.array(edges_data["source_degree"]) ** source_coef else: p = np.ones(len(edges_data["source_degree"])) p *= np.array(edges_data["weight"]) ** weight_coef if self._mode_computations[self.mode]["target"]: p *= np.array(edges_data["target_degree"]) ** target_coef p_norm = p / np.sum(p) return p_norm def _get_probability_density(self, edges_data, coeff): if self.mode == "edgeball": source_coeff, edge_coeff, target_coeff = 0, 1, 0 elif self.mode == "hubball": source_coeff, edge_coeff, target_coeff = coeff, 1, 0 elif self.mode == "coreball": source_coeff, edge_coeff, target_coeff = 0, 1, coeff elif self.mode == "fireball": source_coeff, edge_coeff, target_coeff = -1, 1, 0 elif self.mode == "firecoreball": source_coeff, edge_coeff, target_coeff = -1, 1, coeff else: raise ValueError("Unknown ball type.") return self._get_probability_density_generic( edges_data, source_coeff, edge_coeff, target_coeff ) def _process_hops(self): hop_count = 0 layer_nodes = self._seed_nodes.copy() while ( hop_count < self.max_hops and len(self._sampled_nodes) < self.number_of_nodes ): edges_data = self._get_new_edges(layer_nodes) p_norm = self._get_probability_density(edges_data, self.distrib_coeff) new_nodes = [edge.target for edge in edges_data["raw"]] if len(new_nodes) == 0: layer_nodes = [ self._visited_nodes.popleft() for k in range(min(self.restart_hop_size, len(self._visited_nodes))) ] continue sampled_edges = np.random.choice( new_nodes, max(round(self.sampling_probability * len(new_nodes)), 1), p=p_norm, replace=False, ) layer_nodes = set(sampled_edges) self._visited_nodes.extendleft(set(new_nodes).difference(layer_nodes)) remaining = min( self.number_of_nodes - len(self._sampled_nodes), len(layer_nodes) ) layer_nodes = list(layer_nodes)[:remaining] self._sampled_nodes.update(layer_nodes) hop_count = hop_count + 1 def _assign_graph(self, graph: Union[NXGraph, NKGraph]): """ Assigning the source graph and making a decision about weighting. """ self._graph = graph self._is_weighted_graph = self.backend.is_weighted(graph)
[docs] def sample(self, graph: Union[NXGraph, NKGraph]) -> Union[NXGraph, NKGraph]: """ Sampling nodes iteratively with a spiky ball sampler. Arg types: * **graph** *(NetworkX or NetworKit graph)* - The graph to be sampled from. Return types: * **new_graph** *(NetworkX or NetworKit graph)* - The graph of sampled nodes. """ self._deploy_backend(graph) self._check_number_of_nodes(graph) self._assign_graph(graph) self._create_node_sets() self._process_hops() new_graph = self.backend.get_subgraph(graph, self._sampled_nodes) return new_graph