forked from scottkleinman/rollingwindows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
333 lines (296 loc) · 12.3 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""__init__.py.
Last Update: May 29 2024
"""
import re
from collections.abc import Iterator
from dataclasses import dataclass
from typing import Callable, Iterable, List, Union
import spacy
from spacy.tokens.doc import Doc
from timer import timer
from rollingwindows import helpers
from rollingwindows.registry import rw_components
def get_rw_component(id: str):
"""Get a component from the registry by id.
Args:
id (str): The registry id of the component
Returns:
The component class.
"""
return rw_components.get(id)
def sliding_str_windows(
input: Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc, str],
n: int = 1000,
alignment_mode: str = "contract",
) -> Iterator:
"""Return a generator of string windows.
Args:
input (Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc, str]): A spaCy doc or a list of spaCy spans.
n (int): The size of the window.
window_units (str): The type of units to use ("characters", "tokens", "lines", "sentences", "spans").
alignment_mode (str): How character indices snap to token boundaries.
- "strict" (no snapping)
- "contract" (span of all tokens completely within the character span)
- "expand" (span of all tokens at least partially covered by the character span)
Returns:
A generator of window strings.
Note:
Window boundaries are snapped to token boundaries in the original doc.
"Contract" means that the window will contain all tokens completely
within the boundaries of `i:i + n`. "Expand" means that window will
contain all tokens partially withn those boundaries. Setting
`alignment_mode="strict"` in `doc.char_span()` is not advised
because it returns `None` for any string that cannot be aligned to
token boundaries. As a result, a slice method is used if you want
to simply cut the text strictly on `n` characters.
"""
# TODO: We have to iterate through the input twice to get the boundaries.
if isinstance(input, list):
input_spans = [span.as_doc() for span in input]
boundaries = [(i, i + n) for i in range(len(input_spans))]
for start, end in boundaries:
yield Doc.from_docs(input_spans[start:end]).text.strip()
else:
if isinstance(input, str):
alignment_mode = "strict"
boundaries = [(i, i + n) for i in range(len(input))]
else:
boundaries = [(i, i + n) for i in range(len(input.text))]
if alignment_mode == "strict":
for start_char, end_char in boundaries:
span = input[start_char:end_char]
if span is not None:
yield span.text
else:
for start_char, end_char in boundaries:
span = input.char_span(
start_char, end_char, alignment_mode=alignment_mode
)
if span is not None:
yield span.text
def sliding_windows(
input: Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc],
n: int = 1000,
window_units: str = "characters",
alignment_mode: str = "strict",
) -> Iterator:
"""Create the windows generator.
Args:
input (Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc]): A spaCy doc or a list of spaCy spans.
n (int): The size of the window.
window_units (str): The type of units to use ("characters", "tokens", "lines", "sentences", "spans").
alignment_mode (str): How character indices snap to token boundaries.
- "strict" (no snapping)
- "contract" (span of all tokens completely within the character span)
- "expand" (span of all tokens at least partially covered by the character span)
Yields:
Iterator: A generator of sliding windows.
"""
# Process character windows
if window_units == "characters":
boundaries = [(i, i + n) for i in range(len(input.text))]
if alignment_mode == "strict":
for start_char, end_char in boundaries:
yield input.text[start_char:end_char]
else:
for start_char, end_char in boundaries:
window = input.char_span(
start_char, end_char, alignment_mode=alignment_mode
)
if window is not None:
yield window.text
# Process span and token windows
elif window_units in ["lines", "sentences", "spans", "tokens"]:
boundaries = [(i, i + n) for i in range(len(input))]
for start, end in boundaries:
yield input[start:end]
else:
raise Exception("Invalid window unit.")
# Windows class
@dataclass
class Windows:
"""A dataclass for storing rolling windows."""
windows: Iterable
window_units: str
n: int
alignment_mode: str = "strict"
def __iter__(self):
"""Iterate over the windows."""
return iter(self.windows)
# RollingWindows class
class RollingWindows:
"""A class for managing a rolling windows workflow."""
def __init__(
self,
doc: spacy.tokens.doc.Doc,
model: str,
*,
patterns: Union[list, str] = None,
):
"""Initialise a RollingWindows object.
Args:
doc (spacy.tokens.doc.Doc): A spaCy Doc object.
model (str): The name of a spaCy model.
patterns (Union[list, str]): The patterns to match.
"""
self.doc = doc
self.nlp = spacy.load(model)
if patterns:
self.patterns = helpers.ensure_list(patterns)
else:
self.patterns = []
self.metadata = {"model": model}
def _get_search_method(self, window_units: str = None) -> str:
"""Get the search method based on the window type.
Args:
window_units (str): The type of window unit.
Returns:
str: The preliminary search method
"""
methods = {
"characters": "count",
"tokens": "spacy_matcher",
"lines": "spacy_matcher",
"sentences": "spacy_matcher",
}
return methods.get(window_units, "re_finditer")
def _get_units(
self, doc: spacy.tokens.doc.Doc, window_units: str = "characters"
) -> Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc]:
"""Get a list of characters, sentences, lines, or tokens.
Args:
doc (spacy.tokens.doc.Doc): A list of spaCy spans or docs.
window_units (str): "characters", "lines", "sentences", or "tokens".
Returns:
Union[List[spacy.tokens.span.Span], spacy.tokens.doc.Doc]: A list of spaCy spans or the original doc
"""
if window_units == "sentences":
if doc.has_annotation("SENT_START"):
return list(doc.sents)
elif window_units == "lines":
regex = r"^(.+)\n+|(.+)\n+|(.+)$"
lines = []
for match in re.finditer(regex, doc.text):
start, end = match.span()
span = doc.char_span(start, end)
if span is not None:
lines.append(span)
return lines
else:
return doc
@timer
def calculate(
self,
patterns: Union[list, str] = None,
calculator: Union[Callable, str] = "rw_calculator",
query: str = "counts",
show_spacy_rules: bool = False,
) -> None:
"""Set up a calculator.
Args:
patterns: (Union[list, str]): The patterns to search for.
calculator (Union[Callable, str]): The calculator to use.
query (str): String designating whether to return "counts", "averages", or "ratios".
show_spacy_rules (bool): Whether to use spaCy rules or strings in column labels
"""
if not hasattr(self, "windows"):
raise Exception("You must call set_windows() before running calculations.")
else:
if calculator:
# Use the "averages" calculator with the default config
if isinstance(calculator, str):
if patterns is not None:
self.patterns = patterns
calculator = get_rw_component(calculator)
calculator = calculator(
patterns=self.patterns, windows=self.windows, query=query
)
calculator.run(query=calculator.query)
self.metadata["calculator"] = calculator.metadata
self.result = calculator.to_df(show_spacy_rules=show_spacy_rules)
def plot(
self,
plotter: Union[Callable, str] = "rw_simple_plotter",
show: bool = False,
file: str = None,
**kwargs,
) -> None:
"""Set up the plotter.
Args:
plotter (Union[Callable, str]): The plotter to use.
show (bool): Whether to show the generated figure.
file (str): The filepath to save the file, if desired.
"""
if not hasattr(self, "result") or self.result is None:
raise Exception(
"You must run a calculator on your data before generating a plot."
)
# Use the "rw_simple_plotter" plotter with the default config
if isinstance(plotter, str):
plotter = get_rw_component(plotter)
plotter = plotter()
plotter.run(self.result)
self.metadata["plotter"] = plotter.metadata
self.fig = plotter.fig
if show:
plotter.show(**kwargs)
if file:
plotter.save(file, **kwargs)
# @timer
def set_windows(
self,
n: int = 1000,
window_units: str = "characters",
*,
alignment_mode: str = "strict",
filter: Union[Callable, str] = None,
) -> None:
"""Set the object's windows.
Args:
n (int): The number of windows to calculate
window_units (str): "characters", "lines", "sentences", or "tokens".
alignment_mode (str): How character indices snap to token boundaries.
- "strict" (no snapping)
- "contract" (span of all tokens completely within the character span)
- "expand" (span of all tokens at least partially covered by the character span)
filter (Union[Callable, str]): The name of a filter or a filter object to apply to the document.
"""
if filter:
# Use the filter with the default config
if isinstance(filter, str):
filter = get_rw_component(filter)
filter = filter(self.doc)
doc = filter.apply()
else:
doc = self.doc
# _get_units() returns either a doc or a list of spans. The doc is used to slide over
# characters or tokens, and the list is used to slide over sentences or lines.
input = self._get_units(doc, window_units)
# sliding_windows() returns a generator containing with string or span windows.
if window_units == "characters":
if isinstance(input, list):
input = " ".join([span.text for span in input])
windows = sliding_str_windows(input, n, alignment_mode)
else:
windows = sliding_windows(input, n, window_units, alignment_mode)
# Since spans windows are lists of multiple spans, we need to get the first and last
# token from the original doc to get a window that combines them into a single span.
if window_units in ["lines", "sentences", "spans"]:
span_windows = (doc[window[0].start : window[-1].end] for window in windows)
self.windows = Windows(span_windows, window_units, n, alignment_mode)
else:
self.windows = Windows(windows, window_units, n, alignment_mode)
# For convenience's sake, we detect the search method here, but the calculator
# will override it based on the pattern.
search_method = self._get_search_method(window_units)
metadata = {
"n": n,
"window_units": window_units,
"alignment_mode": alignment_mode,
"search_method": search_method,
}
if filter:
metadata["filter"] = filter.metadata
else:
self.metadata.pop("filter", None)
self.metadata = self.metadata | metadata