Source code for littleballoffur.exploration_sampling.commonneighborawarerandomwalksampler

import random
import numpy as np
import networkx as nx
import networkit as nk
from typing import Union
from littleballoffur.sampler import Sampler


NKGraph = type(nk.graph.Graph())
NXGraph = nx.classes.graph.Graph


[docs]class CommonNeighborAwareRandomWalkSampler(Sampler): r"""An implementation of node sampling by common neighbor aware random walks. The random walker is biased to visit neighbors that have a lower number of common neighbors. This way the sampling procedure is able to escape tightly knit communities and visit new ones. `"For details about the algorithm see this paper." <https://ieeexplore.ieee.org/document/8731555>`_ Args: number_of_nodes (int): Number of nodes. Default is 100. seed (int): Random seed. Default is 42. """ def __init__(self, number_of_nodes: int = 100, seed: int = 42): self.number_of_nodes = number_of_nodes self.seed = seed self._set_seed() def _create_initial_node_set(self, graph, start_node): """ Choosing an initial node. """ if start_node is not None: if start_node >= 0 and start_node < self.backend.get_number_of_nodes(graph): self._current_node = start_node self._sampled_nodes = set([self._current_node]) else: raise ValueError("Starting node index is out of range.") else: self._current_node = random.choice( range(self.backend.get_number_of_nodes(graph)) ) self._sampled_nodes = set([self._current_node]) def _create_sampler(self, graph): self._sampler = {} def _get_node_scores(self, graph, node): if node in self._sampler: # no need to recompute return neighbors = set(self.backend.get_neighbors(graph, node)) scores = [] for neighbor in neighbors: fringe = set(self.backend.get_neighbors(graph, neighbor)) overlap = len(neighbors.intersection(fringe)) scores.append( 1.0 - (overlap) / min( self.backend.get_degree(graph, node), self.backend.get_degree(graph, neighbor), ) ) self._sampler[node] = {} self._sampler[node]["neighbors"] = list(neighbors) self._sampler[node]["scores"] = scores / np.sum(scores) def _do_a_step(self, graph): """ Doing a single random walk step. """ self._get_node_scores(graph, self._current_node) self._current_node = sample = np.random.choice( self._sampler[self._current_node]["neighbors"], 1, replace=False, p=self._sampler[self._current_node]["scores"], )[0] self._sampled_nodes.add(self._current_node)
[docs] def sample( self, graph: Union[NXGraph, NKGraph], start_node: int = None ) -> Union[NXGraph, NKGraph]: """ Sampling nodes with a single common neighbor aware random walk. Arg types: * **graph** *(NetworkX or NetworKit graph)* - The graph to be sampled from. * **start_node** *(int, optional)* - The start node. Return types: * **new_graph** *(NetworkX or NetworKit graph)* - The graph of sampled nodes. """ self._deploy_backend(graph) self._check_number_of_nodes(graph) self._create_initial_node_set(graph, start_node) self._create_sampler(graph) while len(self._sampled_nodes) < self.number_of_nodes: self._do_a_step(graph) new_graph = self.backend.get_subgraph(graph, self._sampled_nodes) return new_graph