Source code for textacy.extract.keyterms.sgrank

from __future__ import annotations

import collections
import itertools
import math
from operator import itemgetter
from typing import Callable, Collection, Counter, Dict, List, Optional, Set, Tuple

import networkx as nx
from spacy.tokens import Doc, Span

from ... import utils
from .. import utils as ext_utils


Candidate = collections.namedtuple("Candidate", ["text", "idx", "length", "count"])


[docs]def sgrank( doc: Doc, *, normalize: Optional[str | Callable[[Span], str]] = "lemma", ngrams: int | Collection[int] = (1, 2, 3, 4, 5, 6), include_pos: Optional[str | Collection[str]] = ("NOUN", "PROPN", "ADJ"), window_size: int = 1500, topn: int | float = 10, idf: Dict[str, float] = None, ) -> List[Tuple[str, float]]: """ Extract key terms from a document using the SGRank algorithm. Args: doc: spaCy ``Doc`` from which to extract keyterms. normalize: If "lemma", lemmatize terms; if "lower", lowercase terms; if None, use the form of terms as they appeared in ``doc``; if a callable, must accept a ``Span`` and return a str, e.g. :func:`textacy.spacier.utils.get_normalized_text()` ngrams: n of which n-grams to include. For example, ``(1, 2, 3, 4, 5, 6)`` (default) includes all ngrams from 1 to 6; `2` if only bigrams are wanted include_pos: One or more POS tags with which to filter for good candidate keyterms. If None, include tokens of all POS tags (which also allows keyterm extraction from docs without POS-tagging.) window_size: Size of sliding window in which term co-occurrences are determined to occur. Note: Larger values may dramatically increase runtime, owing to the larger number of co-occurrence combinations that must be counted. topn: Number of top-ranked terms to return as keyterms. If int, represents the absolute number; if float, must be in the open interval (0.0, 1.0), and is converted to an integer by ``int(round(len(candidates) * topn))`` idf: Mapping of ``normalize(term)`` to inverse document frequency for re-weighting of unigrams (n-grams with n > 1 have df assumed = 1). Results are typically better with idf information. Returns: Sorted list of top ``topn`` key terms and their corresponding SGRank scores Raises: ValueError: if ``topn`` is a float but not in (0.0, 1.0] or ``window_size`` < 2 References: Danesh, Sumner, and Martin. "SGRank: Combining Statistical and Graphical Methods to Improve the State of the Art in Unsupervised Keyphrase Extraction." Lexical and Computational Semantics (* SEM 2015) (2015): 117. """ # validate / transform args ngrams = utils.to_collection(ngrams, int, tuple) include_pos = utils.to_collection(include_pos, str, set) if window_size < 2: raise ValueError("`window_size` must be >= 2") if isinstance(topn, float): if not 0.0 < topn <= 1.0: raise ValueError("`topn` must be an int, or a float between 0.0 and 1.0") n_toks = len(doc) window_size = min(n_toks, window_size) # bail out on (nearly) empty docs if n_toks < 2: return [] candidates, candidate_counts = _get_candidates(doc, normalize, ngrams, include_pos) # scale float topn based on total number of initial candidates if isinstance(topn, float): topn = int(round(len(candidate_counts) * topn)) candidates, unique_candidates = _prefilter_candidates( candidates, candidate_counts, topn, idf ) term_weights = _compute_term_weights( candidates, candidate_counts, unique_candidates, n_toks, idf ) # filter terms to only those with positive weights candidates = [cand for cand in candidates if term_weights[cand.text] > 0] edge_weights = _compute_edge_weights(candidates, term_weights, window_size, n_toks) # build the weighted directed graph from edges, rank nodes by pagerank graph = nx.DiGraph() graph.add_edges_from(edge_weights) term_ranks = nx.pagerank_scipy(graph, alpha=0.85, weight="weight") sorted_term_ranks = sorted(term_ranks.items(), key=itemgetter(1, 0), reverse=True) return ext_utils.get_filtered_topn_terms(sorted_term_ranks, topn, match_threshold=0.8)
def _get_candidates( doc: Doc, normalize: Optional[str | Callable[[Span], str]], ngrams: Tuple[int, ...], include_pos: Set[str], ) -> Tuple[List[Candidate], Counter[str]]: """ Get n-gram candidate keyterms from ``doc``, with key information for each: its normalized text string, position within the doc, number of constituent words, and frequency of occurrence. """ min_term_freq = min(max(len(doc) // 1000, 1), 4) cand_tuples = list( ext_utils.get_ngram_candidates(doc, ngrams, include_pos=include_pos) ) cand_texts = [ " ".join(ext_utils.terms_to_strings(ctup, normalize or "orth")) for ctup in cand_tuples ] cand_counts = collections.Counter(cand_texts) candidates = [ Candidate(text=ctext, idx=ctup[0].i, length=len(ctup), count=cand_counts[ctext]) for ctup, ctext in zip(cand_tuples, cand_texts) ] if min_term_freq > 1: candidates = [ candidate for candidate in candidates if candidate.count >= min_term_freq ] return candidates, cand_counts def _prefilter_candidates( candidates: List[Candidate], candidate_counts: Counter[str], topn: int, idf: Optional[Dict[str, float]], ) -> Tuple[List[Candidate], Set[str]]: """ Filter initial set of candidates to only those with sufficiently high TF or (if available) modified TF*IDF. """ topn_prefilter = max(3 * topn, 100) if idf: mod_tfidfs = { c.text: c.count * idf.get(c.text, 1) if " " not in c.text else c.count for c in candidates } unique_candidates = { ctext for ctext, _ in sorted( mod_tfidfs.items(), key=itemgetter(1), reverse=True )[:topn_prefilter] } else: unique_candidates = { ctext for ctext, _ in candidate_counts.most_common(topn_prefilter) } candidates = [cand for cand in candidates if cand.text in unique_candidates] return candidates, unique_candidates def _compute_term_weights( candidates: List[Candidate], candidate_counts: Dict[str, int], unique_candidates: Set[str], n_toks: int, idf: Optional[Dict[str, float]], ) -> Dict[str, float]: """ Compute term weights from statistical attributes: position of first occurrence, not subsumed frequency, and number of constituent words. """ clen: float term_weights = {} seen_terms = set() n_toks_p1 = n_toks + 1 for cand in candidates: # we only want the *first* occurrence of a unique term (by its text) if cand.text in seen_terms: continue else: seen_terms.add(cand.text) pos_first_occ = math.log(n_toks_p1 / (cand.idx + 1)) # TODO: do we want to sub-linear scale term len or not? clen = math.sqrt(cand.length) # subtract from subsum_count for the case when ctext == ctext2 subsum_count = ( sum( candidate_counts[ctext2] for ctext2 in unique_candidates if cand.text in ctext2 ) - cand.count ) term_freq_factor = cand.count - subsum_count if idf and clen == 1: term_freq_factor *= idf.get(cand.text, 1) term_weights[cand.text] = term_freq_factor * pos_first_occ * clen return term_weights def _compute_edge_weights( candidates: List[Candidate], term_weights: Dict[str, float], window_size: int, n_toks: int, ) -> List[Tuple[str, str, Dict[str, float]]]: """ Compute weights between candidates that occur within a sliding window(s) of each other, then combine with statistical ``term_weights`` and normalize by the total number of outgoing edge weights. """ n_coocs = collections.defaultdict(lambda: collections.defaultdict(int)) sum_logdists = collections.defaultdict(lambda: collections.defaultdict(float)) # iterate over windows log_ = math.log # localize this, for performance for start_idx in range(n_toks): end_idx = start_idx + window_size window_cands = (cand for cand in candidates if start_idx <= cand.idx < end_idx) # get all token combinations within window for c1, c2 in itertools.combinations(window_cands, 2): n_coocs[c1.text][c2.text] += 1 try: sum_logdists[c1.text][c2.text] += log_( window_size / abs(c1.idx - c2.idx) ) except ZeroDivisionError: sum_logdists[c1.text][c2.text] += log_(window_size) if end_idx >= n_toks: break # compute edge weights between co-occurring terms (nodes) edge_weights = collections.defaultdict(lambda: collections.defaultdict(float)) for c1, c2_dict in sum_logdists.items(): for c2, sum_logdist in c2_dict.items(): edge_weights[c1][c2] = ( ((1.0 + sum_logdist) / n_coocs[c1][c2]) * term_weights[c1] * term_weights[c2] ) # normalize edge weights by sum of outgoing edge weights per term (node) norm_edge_weights: List[Tuple[str, str, Dict[str, float]]] = [] for c1, c2s in edge_weights.items(): sum_edge_weights = sum(c2s.values()) norm_edge_weights.extend( (c1, c2, {"weight": weight / sum_edge_weights}) for c2, weight in c2s.items() ) return norm_edge_weights