from __future__ import annotations
import collections
import functools
import math
import operator
import statistics
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from cytoolz import itertoolz
from spacy.tokens import Doc, Token
from ... import errors, utils
from .. import utils as ext_utils
[docs]def yake(
doc: Doc,
*,
normalize: Optional[str] = "lemma",
ngrams: int | Collection[int] = (1, 2, 3),
include_pos: Optional[str | Collection[str]] = ("NOUN", "PROPN", "ADJ"),
window_size: int = 2,
topn: int | float = 10,
) -> List[Tuple[str, float]]:
"""
Extract key terms from a document using the YAKE algorithm.
Args:
doc: spaCy ``Doc`` from which to extract keyterms.
Must be sentence-segmented; optionally POS-tagged.
normalize: If "lemma", lemmatize terms; if "lower", lowercase terms;
if None, use the form of terms as they appeared in ``doc``.
.. note:: Unlike the other keyterm extraction functions, this one
doesn't accept a callable for ``normalize``.
ngrams: n of which n-grams to consider as keyterm candidates.
For example, `(1, 2, 3)`` includes all unigrams, bigrams, and trigrams,
while ``2`` includes bigrams only.
include_pos: One or more POS tags with which to filter for good candidate keyterms.
If None, include tokens of all POS tags
(which also allows keyterm extraction from docs without POS-tagging.)
window_size: Number of words to the right and left of a given word
to use as context when computing the "relatedness to context"
component of its score. Note that the resulting sliding window's
full width is ``1 + (2 * window_size)``.
topn: Number of top-ranked terms to return as key terms.
If an integer, represents the absolute number; if a float, value
must be in the interval (0.0, 1.0], which is converted to an int by
``int(round(len(candidates) * topn))``
Returns:
Sorted list of top ``topn`` key terms and their corresponding YAKE scores.
References:
Campos, Mangaravite, Pasquali, Jorge, Nunes, and Jatowt. (2018).
A Text Feature Based Automatic Keyword Extraction Method for Single Documents.
Advances in Information Retrieval. ECIR 2018.
Lecture Notes in Computer Science, vol 10772, pp. 684-691.
"""
# validate / transform args
ngrams = utils.to_collection(ngrams, int, tuple)
include_pos = utils.to_collection(include_pos, str, set)
if isinstance(topn, float):
if not 0.0 < topn <= 1.0:
raise ValueError(
f"topn = {topn} is invalid; "
"must be an int, or a float between 0.0 and 1.0"
)
# bail out on empty docs
if not doc:
return []
stop_words: Set[str] = set()
seen_candidates: Set[str] = set()
# compute key values on a per-word basis
word_occ_vals = _get_per_word_occurrence_values(
doc, normalize, stop_words, window_size
)
# doc doesn't have any words...
if not word_occ_vals:
return []
word_freqs = {w_id: len(vals["is_uc"]) for w_id, vals in word_occ_vals.items()}
word_scores = _compute_word_scores(doc, word_occ_vals, word_freqs, stop_words)
# compute scores for candidate terms based on scores of constituent words
term_scores: Dict[str, float] = {}
# do single-word candidates separately; it's faster and simpler
if 1 in ngrams:
candidates = _get_unigram_candidates(doc, include_pos)
_score_unigram_candidates(
candidates,
word_freqs,
word_scores,
term_scores,
stop_words,
seen_candidates,
normalize,
)
# now compute combined scores for higher-n ngram and candidates
candidates = list(
ext_utils.get_ngram_candidates(
doc, [n for n in ngrams if n > 1], include_pos=include_pos,
)
)
attr_name = _get_attr_name(normalize, True)
ngram_freqs = itertoolz.frequencies(
" ".join(getattr(word, attr_name) for word in ngram) for ngram in candidates
)
_score_ngram_candidates(
candidates, ngram_freqs, word_scores, term_scores, seen_candidates, normalize,
)
# build up a list of key terms in order of increasing score
if isinstance(topn, float):
topn = int(round(len(seen_candidates) * topn))
sorted_term_scores = sorted(
term_scores.items(), key=operator.itemgetter(1), reverse=False,
)
return ext_utils.get_filtered_topn_terms(
sorted_term_scores, topn, match_threshold=0.8
)
def _get_attr_name(normalize: Optional[str], as_strings: bool) -> str:
if normalize is None:
attr_name = "norm"
elif normalize in ("lemma", "lower"):
attr_name = normalize
else:
raise ValueError(
errors.value_invalid_msg("normalize", normalize, {"lemma", "lower", None})
)
if as_strings is True:
attr_name = attr_name + "_"
return attr_name
def _get_per_word_occurrence_values(
doc: Doc, normalize: Optional[str], stop_words: Set[str], window_size: int,
) -> Dict[int, Dict[str, list]]:
"""
Get base values for each individual occurrence of a word, to be aggregated
and combined into a per-word score.
"""
word_occ_vals = collections.defaultdict(lambda: collections.defaultdict(list))
def _is_upper_cased(tok):
return tok.is_upper or (tok.is_title and not tok.is_sent_start)
attr_name = _get_attr_name(normalize, False)
padding = [None] * window_size
for sent_idx, sent in enumerate(doc.sents):
sent_padded = itertoolz.concatv(padding, sent, padding)
for window in itertoolz.sliding_window(1 + (2 * window_size), sent_padded):
lwords, word, rwords = (
window[:window_size],
window[window_size],
window[window_size + 1 :],
)
w_id = getattr(word, attr_name)
if word.is_stop:
stop_words.add(w_id)
word_occ_vals[w_id]["is_uc"].append(_is_upper_cased(word))
word_occ_vals[w_id]["sent_idx"].append(sent_idx)
word_occ_vals[w_id]["l_context"].extend(
getattr(w, attr_name)
for w in lwords
if not (w is None or w.is_punct or w.is_space)
)
word_occ_vals[w_id]["r_context"].extend(
getattr(w, attr_name)
for w in rwords
if not (w is None or w.is_punct or w.is_space)
)
return word_occ_vals
def _compute_word_scores(
doc: Doc,
word_occ_vals: Dict[int, Dict[str, list]],
word_freqs: Dict[int, int],
stop_words: Set[str],
) -> Dict[int, float]:
"""
Aggregate values from per-word occurrence values, compute per-word weights
of several components, then combine components into per-word scores.
"""
word_weights = collections.defaultdict(dict)
# compute summary stats for word frequencies
freqs_nsw = [freq for w_id, freq in word_freqs.items() if w_id not in stop_words]
freq_max = max(word_freqs.values())
freq_baseline = statistics.mean(freqs_nsw) + statistics.stdev(freqs_nsw)
n_sents = itertoolz.count(doc.sents)
for w_id, vals in word_occ_vals.items():
freq = word_freqs[w_id]
word_weights[w_id]["case"] = sum(vals["is_uc"]) / math.log2(1 + freq)
word_weights[w_id]["pos"] = math.log2(
math.log2(3 + statistics.mean(vals["sent_idx"]))
)
word_weights[w_id]["freq"] = freq / freq_baseline
word_weights[w_id]["disp"] = len(set(vals["sent_idx"])) / n_sents
n_unique_lc = len(set(vals["l_context"]))
n_unique_rc = len(set(vals["r_context"]))
try:
wl = n_unique_lc / len(vals["l_context"])
except ZeroDivisionError:
wl = 0.0
try:
wr = n_unique_rc / len(vals["r_context"])
except ZeroDivisionError:
wr = 0.0
pl = n_unique_lc / freq_max
pr = n_unique_rc / freq_max
word_weights[w_id]["rel"] = 1.0 + (wl + wr) * (freq / freq_max) + pl + pr
# combine individual weights into per-word scores
word_scores = {
w_id: (wts["rel"] * wts["pos"])
/ (wts["case"] + (wts["freq"] / wts["rel"]) + (wts["disp"] / wts["rel"]))
for w_id, wts in word_weights.items()
}
return word_scores
def _get_unigram_candidates(doc: Doc, include_pos: Set[str]) -> Iterable[Token]:
candidates = (
word for word in doc if not (word.is_stop or word.is_punct or word.is_space)
)
if include_pos:
candidates = (word for word in candidates if word.pos_ in include_pos)
return candidates
def _score_unigram_candidates(
candidates: Iterable[Token],
word_freqs: Dict[int, int],
word_scores: Dict[int, float],
term_scores: Dict[str, float],
stop_words: Set[str],
seen_candidates: Set[str],
normalize: Optional[str],
):
attr_name = _get_attr_name(normalize, False)
attr_name_str = _get_attr_name(normalize, True)
for word in candidates:
w_id = getattr(word, attr_name)
if w_id in stop_words or w_id in seen_candidates:
continue
else:
seen_candidates.add(w_id)
# NOTE: here i've modified the YAKE algorithm to put less emphasis on term freq
# term_scores[word.lower_] = word_scores[w_id] / (word_freqs[w_id] * (1 + word_scores[w_id]))
term_scores[getattr(word, attr_name_str)] = word_scores[w_id] / (
math.log2(1 + word_freqs[w_id]) * (1 + word_scores[w_id])
)
def _score_ngram_candidates(
candidates: List[Tuple[Token, ...]],
ngram_freqs: Dict[str, int],
word_scores: Dict[int, float],
term_scores: Dict[str, float],
seen_candidates: Set[str],
normalize: Optional[str],
):
attr_name = _get_attr_name(normalize, False)
attr_name_str = _get_attr_name(normalize, True)
for ngram in candidates:
ngtxt = " ".join(getattr(word, attr_name_str) for word in ngram)
if ngtxt in seen_candidates:
continue
else:
seen_candidates.add(ngtxt)
ngram_word_scores = [word_scores[getattr(word, attr_name)] for word in ngram]
# multiply individual word scores together in the numerator
numerator = functools.reduce(operator.mul, ngram_word_scores, 1.0)
# NOTE: here i've modified the YAKE algorithm to put less emphasis on term freq
# denominator = ngram_freqs[ngtxt] * (1.0 + sum(ngram_word_scores))
denominator = math.log2(1 + ngram_freqs[ngtxt]) * (1.0 + sum(ngram_word_scores))
term_scores[ngtxt] = numerator / denominator