Source code for textacy.representations.matrix_utils

"""
:mod:`textacy.vsm.matrix_utils`: Functions for computing corpus-wide term- or
document-based values, like term frequency, document frequency, and document length,
and filtering terms from a matrix by their document frequency.
"""
import numpy as np
import scipy.sparse as sp

from .. import errors


[docs]def get_term_freqs(doc_term_matrix, *, type_="linear"): """ Compute frequencies for all terms in a document-term matrix, with optional sub-linear scaling. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs and N is the # of unique terms. Values must be the linear, un-scaled counts of term n per doc m. type_ ({'linear', 'sqrt', 'log'}): Scaling applied to absolute term counts. If 'linear', term counts are left as-is, since the sums are already linear; if 'sqrt', tf => sqrt(tf); if 'log', tf => log(tf) + 1. Returns: :class:`numpy.ndarray`: Array of term frequencies, with length equal to the # of unique terms (# of columns) in ``doc_term_matrix``. Raises: ValueError: if ``doc_term_matrix`` doesn't have any non-zero entries, or if ``type_`` isn't one of {"linear", "sqrt", "log"}. """ if doc_term_matrix.nnz == 0: raise ValueError("`doc_term_matrix` must have at least 1 non-zero entry") tfs = np.asarray(doc_term_matrix.sum(axis=0)).ravel() if type_ == "linear": return tfs # tfs is already linear elif type_ == "sqrt": return np.sqrt(tfs) elif type_ == "log": return np.log(tfs) + 1.0 else: raise ValueError( errors.value_invalid_msg("type_", type_, {"linear", "sqrt", "log"}) )
[docs]def get_doc_freqs(doc_term_matrix): """ Compute document frequencies for all terms in a document-term matrix. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs and N is the # of unique terms. .. note:: Weighting on the terms doesn't matter! Could be binary or tf or tfidf, a term's doc freq will be the same. Returns: :class:`numpy.ndarray`: Array of document frequencies, with length equal to the # of unique terms (# of columns) in ``doc_term_matrix``. Raises: ValueError: if ``doc_term_matrix`` doesn't have any non-zero entries. """ if doc_term_matrix.nnz == 0: raise ValueError("`doc_term_matrix` must have at least 1 non-zero entry") _, n_terms = doc_term_matrix.shape return np.bincount(doc_term_matrix.indices, minlength=n_terms)
[docs]def get_inverse_doc_freqs(doc_term_matrix, *, type_="smooth"): """ Compute inverse document frequencies for all terms in a document-term matrix, using one of several IDF formulations. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs and N is the # of unique terms. The particular weighting of matrix values doesn't matter. type_ ({'standard', 'smooth', 'bm25'}): Type of IDF formulation to use. If 'standard', idfs => log(n_docs / dfs) + 1.0; if 'smooth', idfs => log(n_docs + 1 / dfs + 1) + 1.0, i.e. 1 is added to all document frequencies, equivalent to adding a single document to the corpus containing every unique term; if 'bm25', idfs => log((n_docs - dfs + 0.5) / (dfs + 0.5)), which is a form commonly used in BM25 ranking that allows for extremely common terms to have negative idf weights. Returns: :class:`numpy.ndarray`: Array of inverse document frequencies, with length equal to the # of unique terms (# of columns) in ``doc_term_matrix``. Raises: ValueError: if ``type_`` isn't one of {"standard", "smooth", "bm25"}. """ dfs = get_doc_freqs(doc_term_matrix) n_docs, _ = doc_term_matrix.shape if type_ == "standard": return np.log(n_docs / dfs) + 1.0 elif type_ == "smooth": n_docs += 1 dfs += 1 return np.log(n_docs / dfs) + 1.0 elif type_ == "bm25": return np.log((n_docs - dfs + 0.5) / (dfs + 0.5)) else: raise ValueError( errors.value_invalid_msg("type_", type_, {"standard", "smooth", "bm25"}) )
[docs]def get_doc_lengths(doc_term_matrix, *, type_="linear"): """ Compute the lengths (i.e. number of terms) for all documents in a document-term matrix. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs, N is the # of unique terms, and values are the absolute counts of term n per doc m. type_ ({'linear', 'sqrt', 'log'}): Scaling applied to absolute doc lengths. If 'linear', lengths are left as-is, since the sums are already linear; if 'sqrt', dl => sqrt(dl); if 'log', dl => log(dl) + 1. Returns: :class:`numpy.ndarray`: Array of document lengths, with length equal to the # of documents (# of rows) in ``doc_term_matrix``. Raises: ValueError: if ``type_`` isn't one of {"linear", "sqrt", "log"}. """ dls = np.asarray(doc_term_matrix.sum(axis=1)).ravel() if type_ == "linear": return dls # dls is already linear elif type_ == "sqrt": return np.sqrt(dls) elif type_ == "log": return np.log(dls) + 1.0 else: raise ValueError( errors.value_invalid_msg("type_", type_, {"linear", "sqrt", "log"}) )
[docs]def get_information_content(doc_term_matrix): """ Compute information content for all terms in a document-term matrix. IC is a float in [0.0, 1.0], defined as ``-df * log2(df) - (1 - df) * log2(1 - df)``, where df is a term's normalized document frequency. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs and N is the # of unique terms. .. note:: Weighting on the terms doesn't matter! Could be binary or tf or tfidf, a term's information content will be the same. Returns: :class:`numpy.ndarray`: Array of term information content values, with length equal to the # of unique terms (# of columns) in ``doc_term_matrix``. Raises: ValueError: if ``doc_term_matrix`` doesn't have any non-zero entries. """ dfs = get_doc_freqs(doc_term_matrix) # normalize doc freqs by total number of docs # TODO: is this *really* what we want to do? dfs = dfs / doc_term_matrix.shape[0] ics = -dfs * np.log2(dfs) - (1 - dfs) * np.log2(1 - dfs) ics[np.isnan(ics)] = 0.0 # NaN values not permitted! return ics
[docs]def apply_idf_weighting(doc_term_matrix, *, type_="smooth"): """ Apply inverse document frequency (idf) weighting to a term-frequency (tf) weighted document-term matrix, using one of several IDF formulations. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M x N sparse matrix, where M is the # of docs and N is the # of unique terms. type_ ({'standard', 'smooth', 'bm25'}): Type of IDF formulation to use. Returns: :class:`scipy.sparse.csr_matrix`: Sparse matrix of shape M x N, where value (i, j) is the tfidf weight of term j in doc i. See Also: :func:`get_inverse_doc_freqs()` """ idfs = get_inverse_doc_freqs(doc_term_matrix, type_=type_) return doc_term_matrix.dot(sp.diags(idfs, 0))
[docs]def filter_terms_by_df( doc_term_matrix, term_to_id, *, max_df=1.0, min_df=1, max_n_terms=None, ): """ Filter out terms that are too common and/or too rare (by document frequency), and compactify the top ``max_n_terms`` in the ``id_to_term`` mapping accordingly. Borrows heavily from the ``sklearn.feature_extraction.text`` module. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M X N matrix, where M is the # of docs and N is the # of unique terms. term_to_id (Dict[str, int]): Mapping of term string to unique term id, e.g. :attr:`Vectorizer.vocabulary_terms`. min_df (float or int): if float, value is the fractional proportion of the total number of documents and must be in [0.0, 1.0]; if int, value is the absolute number; filter terms whose document frequency is less than ``min_df`` max_df (float or int): if float, value is the fractional proportion of the total number of documents and must be in [0.0, 1.0]; if int, value is the absolute number; filter terms whose document frequency is greater than ``max_df`` max_n_terms (int): only include terms whose *term* frequency is within the top `max_n_terms` Returns: :class:`scipy.sparse.csr_matrix`: Sparse matrix of shape (# docs, # unique filtered terms), where value (i, j) is the weight of term j in doc i. Dict[str, int]: Term to id mapping, where keys are unique *filtered* terms as strings and values are their corresponding integer ids. Raises: ValueError: if ``max_df`` or ``min_df`` or ``max_n_terms`` < 0. """ if max_df == 1.0 and min_df == 1 and max_n_terms is None: return doc_term_matrix, term_to_id if max_df < 0 or min_df < 0 or (max_n_terms is not None and max_n_terms < 0): raise ValueError("max_df, min_df, and max_n_terms may not be negative") n_docs, n_terms = doc_term_matrix.shape max_doc_count = max_df if isinstance(max_df, int) else int(max_df * n_docs) min_doc_count = min_df if isinstance(min_df, int) else int(min_df * n_docs) if max_doc_count < min_doc_count: raise ValueError("max_df corresponds to fewer documents than min_df") # calculate a mask based on document frequencies dfs = get_doc_freqs(doc_term_matrix) mask = np.ones(n_terms, dtype=bool) if max_doc_count < n_docs: mask &= dfs <= max_doc_count if min_doc_count > 1: mask &= dfs >= min_doc_count if max_n_terms is not None and mask.sum() > max_n_terms: tfs = get_term_freqs(doc_term_matrix, type_="linear") top_mask_inds = (-tfs[mask]).argsort()[:max_n_terms] new_mask = np.zeros(n_terms, dtype=bool) new_mask[np.where(mask)[0][top_mask_inds]] = True mask = new_mask # map old term indices to new ones new_indices = np.cumsum(mask) - 1 term_to_id = { term: new_indices[old_index] for term, old_index in term_to_id.items() if mask[old_index] } kept_indices = np.where(mask)[0] if len(kept_indices) == 0: raise ValueError( "After filtering, no terms remain; " "try a lower `min_df` or higher `max_df`" ) return (doc_term_matrix[:, kept_indices], term_to_id)
[docs]def filter_terms_by_ic(doc_term_matrix, term_to_id, *, min_ic=0.0, max_n_terms=None): """ Filter out terms that are too common and/or too rare (by information content), and compactify the top ``max_n_terms`` in the ``id_to_term`` mapping accordingly. Borrows heavily from the ``sklearn.feature_extraction.text`` module. Args: doc_term_matrix (:class:`scipy.sparse.csr_matrix`): M X N sparse matrix, where M is the # of docs and N is the # of unique terms. term_to_id (Dict[str, int]): Mapping of term string to unique term id, e.g. :attr:`Vectorizer.vocabulary_terms`. min_ic (float): filter terms whose information content is less than this value; must be in [0.0, 1.0] max_n_terms (int): only include terms whose information content is within the top ``max_n_terms`` Returns: :class:`scipy.sparse.csr_matrix`: Sparse matrix of shape (# docs, # unique filtered terms), where value (i, j) is the weight of term j in doc i. Dict[str, int]: Term to id mapping, where keys are unique *filtered* terms as strings and values are their corresponding integer ids. Raises: ValueError: if ``min_ic`` not in [0.0, 1.0] or ``max_n_terms`` < 0. """ if min_ic == 0.0 and max_n_terms is None: return doc_term_matrix, term_to_id if min_ic < 0.0 or min_ic > 1.0: raise ValueError("min_ic must be a float in [0.0, 1.0]") if max_n_terms is not None and max_n_terms < 0: raise ValueError("max_n_terms may not be negative") _, n_terms = doc_term_matrix.shape # calculate a mask based on document frequencies ics = get_information_content(doc_term_matrix) mask = np.ones(n_terms, dtype=bool) if min_ic > 0.0: mask &= ics >= min_ic if max_n_terms is not None and mask.sum() > max_n_terms: top_mask_inds = (-ics[mask]).argsort()[:max_n_terms] new_mask = np.zeros(n_terms, dtype=bool) new_mask[np.where(mask)[0][top_mask_inds]] = True mask = new_mask # map old term indices to new ones new_indices = np.cumsum(mask) - 1 term_to_id = { term: new_indices[old_index] for term, old_index in term_to_id.items() if mask[old_index] } kept_indices = np.where(mask)[0] if len(kept_indices) == 0: raise ValueError("After filtering, no terms remain; try a lower `min_ic`") return (doc_term_matrix[:, kept_indices], term_to_id)