-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtoolbox.py
476 lines (428 loc) · 17.3 KB
/
toolbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
import os.path
from os import listdir
import re
from collections import OrderedDict, deque, Sequence
import logging
import warnings
try:
from itertools import zip_longest
except ImportError:
from itertools import izip_longest as zip_longest
default_tokenizer = re.compile(r'\S+\s*')
class ToolboxError(Exception): pass
class ToolboxInitError(ToolboxError): pass
class ToolboxAlignmentError(ToolboxError): pass
class ToolboxWarning(UserWarning): pass
def custom_formatwarning(msg, category, *args):
return '{}: {}\n'.format(category.__name__, msg)
warnings.formatwarning = custom_formatwarning
def find_project_file(path):
proj_path = None
if os.path.isfile(path):
if path.lower().endswith('.prj'):
proj_path = path
elif os.path.isdir(path):
fns = [fn for fn in listdir(path) if fn.lower().endswith('.prj')]
if len(fns) == 1:
proj_path = fns[0]
if proj_path is None:
raise ToolboxInitError(
'Toolbox project file not found at {}.'.format(path)
)
return proj_path
toolbox_line_re = re.compile(r'(?P<mkr>\\[^\s]+)( (?P<val>.*\n?))?$')
# inspired by the NLTK's implementation:
# http://www.nltk.org/_modules/nltk/toolbox.html
def read_toolbox_file(f, strip=True):
"""
Parse a Toolbox file and yield pairs of (marker, value). By default,
no interpretation or normalization of the data is done besides
stripping the trailing whitespace of each value (set the `strip`
parameter to False to leave whitespace on).
Args:
f: An open file-like object.
strip: If True, strip trailing whitespace from each value.
Yields:
Pairs of (marker, value)
"""
def make_val(val_lines, strip):
if val_lines == [None]:
return None
val = ''.join([s or '' for s in val_lines]) # first s might be None
if strip:
val = val.rstrip()
return val
mkr = None
val_lines = []
for line in f:
mkr_match = toolbox_line_re.match(line)
if mkr_match is not None:
# first yield the current pair
if mkr is not None:
yield (mkr, make_val(val_lines, strip))
mkr = mkr_match.group('mkr')
val_lines = [mkr_match.group('val')]
else:
val_lines.append(line)
# when we reach the end of the file, yield the final pair
if mkr is not None:
yield (mkr, make_val(val_lines, strip))
def iterparse(pairs, keys):
"""
Yield pairs of (event, result) based on `keys` for the given
`pairs`. Events and associated results are given below:
===== =============================================
event result
===== =============================================
key the (key, value) pair for when \key was seen
start the (key, value) pair for when \+key was seen
end the (key, value) pair for when \-key was seen
data list of (marker, value) pairs between keys
===== =============================================
Args:
pairs: An iterable of (marker, value) pairs.
keys: A container of markers that delimit blocks of associated
data.
Yields:
Pairs of (event, result).
"""
start_keys = set(re.sub(r'\\(.*)', r'\+\1', k) for k in keys)
end_keys = set(re.sub(r'\\(.*)', r'\-\1', k) for k in keys)
all_keys = set(keys).union(start_keys, end_keys)
data = []
for mkr, val in pairs:
if mkr in all_keys:
if len(data) > 0:
yield ('data', data)
data = []
if mkr in keys:
yield ('key', (mkr, val))
elif mkr in start_keys:
yield ('start', ('\\{}'.format(mkr[2:]), val))
elif mkr in end_keys:
yield ('end', ('\\{}'.format(mkr[2:]), val))
else:
data.append((mkr, val))
# don't forget to yield the last one
if len(data) > 0:
yield ('data', data)
def records(pairs, record_marker, context_keys=None):
"""
An alternative parsing function to iterparse(), which yields pairs
of (context, data), where context is a dictionary mapping each key
to the previously seen value, and data is the list of
(marker, value) pairs delimited by the keys. The basic usage is:
records(pairs, '\\ref')
Where `'\\ref'` is the delimiter of records in `pairs`.
Args:
pairs: An iterable of (marker, value) pairs.
record_marker: The marker(s) that delimits records. If the
value is a string, it is considered the only record
marker. Any other Sequence (list, tuple, etc.) become an
ordered hierarchy of record delimiters. When a higher
marker is encountered, it resets the value of the lower
markers to None. For instance, if ['\\id', '\\ref'] is
used, '\\ref' is reset to None whenever '\\id' is
encountered.
context_keys: A container of additional delimiters to include
in the context, but unlike `record_marker`, these do not
reset other markers. E.g. one might use \\page to group
elements in a section that spans several pages.
Yields:
Pairs of (context, data)
Raises:
ToolboxError when block-start markers (\\+key) or block-end
markers (\\-key) are seen, as they are considered invalid
within records.
"""
if isinstance(record_marker, str):
record_marker = [record_marker]
if not isinstance(record_marker, Sequence):
raise ToolboxError('Record marker must be a string or a sequence.')
keys = set(record_marker).union(context_keys or [])
context = dict((key, None) for key in keys)
for event, result in iterparse(pairs, keys):
if event == 'key':
mkr, val = result
try:
idx = record_marker.index(mkr)
for m in record_marker[idx:]:
context[m] = None
except ValueError:
pass
context[mkr] = val
elif event == 'data':
yield (context, result)
else:
raise ToolboxError('Illegal event in record: {}'
.format(event, result))
def field_groups(pairs, aligned_fields):
"""
Yield lists of (marker, value) pairs where all pairs in the list
are aligned. Unaligned fields will be returned as the only pair in
the list, and repeating groups (e.g. where they are wrapped) will
be returned separately.
"""
group = []
seen = set()
for mkr, val in pairs:
# unaligned or repeated fields start over grouping
if mkr not in aligned_fields or mkr in seen:
if group:
yield group
group = []
seen = set()
if mkr not in aligned_fields:
yield [(mkr, val)]
continue
group.append((mkr, val))
seen.add(mkr)
# yield the last group if non-empty
if group:
yield group
def normalize_record(pairs, aligned_fields, strip=True):
"""
Return a list of pairs of (marker, value) from `pairs`, where values
with the same marker are recombined (i.e. unwrapped). If the marker
is in `aligned_fields`, spacing will also be normalized (taking the
length of the longest token) so that the tokens still align visually
in columns.
Args:
pairs: An iterable of (marker, value) pairs.
aligned_fields: A container of markers that are aligned.
Return:
The list of pairs with aligned fields normalized.
Example:
>>> data = [
... ('\\t', 'inu=ga ippiki'),
... ('\\m', 'inu =ga ichi -hiki'),
... ('\\g', 'dog =NOM one -CLF.ANIMAL'),
... ('\\t', 'hoeru'),
... ('\\m', 'hoe -ru'),
... ('\\g', 'bark -IPFV'),
... ('\\f', 'One dog barks.')
... ]
>>> for (mkr, val) in normalize_record(data, set(['\\t', \\g', '\\m'])):
... print(mkr, val)
\t inu=ga ippiki hoeru
\m inu =ga ichi -hiki hoe -ru
\g dog =NOM one -CLF.ANIMAL bark -IPFV
\f One dog barks.
"""
field_data = OrderedDict()
# gather lines with the same marker, and keep track of the longest
# aligned fields at each position
maxlens = {}
for mkr, val in pairs:
if mkr not in field_data:
field_data[mkr] = []
if val is None:
continue
field_data[mkr].append(val)
i = len(field_data[mkr]) - 1
# this string length counts unicode combining characters, so
# the lengths may appear off when printed
if mkr in aligned_fields and len(val) > maxlens.get(i, -1):
maxlens[i] = len(val)
# join and normalize spacing (use longest length for each position)
mkrs = list(field_data.keys())
for mkr in mkrs:
data = field_data[mkr]
if data == []:
joined = None
elif mkr in aligned_fields:
joined = ' '.join(s.ljust(maxlens[i]) for i, s in enumerate(data))
else:
joined = ' '.join(data)
if strip and joined is not None:
joined = joined.rstrip()
field_data[mkr] = joined
return list(field_data.items())
def align_fields(pairs, alignments=None, tokenizers=None, errors='strict'):
"""
Align source to target tokens for each line in `pairs` using
alignment mappings given in `alignments`. Line values are tokenized
by whitespace by default, but can be handled specially by giving a
custom tokenizer in `tokenizers`.
Args:
pairs: An iterable of (marker, value) pairs
alignments: A dictionary of {marker1: marker2} alignments, where
marker1 is aligned to marker2. If `alignments` is None, each
value will still be put in a list as the only item with an
alignment target of None.
tokenizers: A dictionary of {marker: regex}, where the compiled
regular expression `regex` is used to find sub-parts of the
original value of the field. If `tokenizers` is None or a
tokenizer regex is not given for a marker, and the marker is
the source or target of an alignment, the values will be
split by whitespace.
errors: If 'strict', errors during alignment will be raised.
Otherwise they will be ignored.
Returns:
A list of alignment pairs. Each alignment pair is a structure
(marker, [(target_token, [source_tokens])]). That is, for each
target token, a list of source tokens is aligned to it. For
unaligned lines, the target token is None and the source tokens
has the original line as the only list item. Lines that are a
target but not source of any alignment have their own
untokenized line as the target token. If the value of a line is
None (e.g. there was a marker but no content), then both the
target_token and the source_tokens will be None, even if the
line should have been aligned to something.
Example:
>>> data = [
... ('\\t', 'inu=ga ippiki hoeru '),
... ('\\m', 'inu =ga ichi -hiki hoe -ru '),
... ('\\g', 'dog =NOM one -CLF.ANIMAL bark -IPFV'),
... ('\\f', 'One dog barks.'),
... ('\\x', None)
... ]
>>> align_fields(data, alignments={'\\m': '\\t', '\\g': '\\m'})
[('\\t', [('inu=ga ippiki hoeru ',
['inu=ga', 'ippiki', 'hoeru'])]),
('\\m', [('inu=ga', ['inu', '=ga']),
('ippiki', ['ichi', '-hiki']),
('hoeru', ['hoe', '-ru'])]),
('\\g', [('inu', ['dog']),
('=ga', ['=NOM']),
('ichi', ['one']),
('-hiki' ['-CLF.ANIMAL']),
('hoe', ['bark']),
('-ru', ['-IPFV'])]),
('\\f', [(None, ['One dog barks.'])]),
('\\x', [(None, None)])
]
"""
aligned_fields = set(alignments.keys()).union(alignments.values())
alignments = dict(alignments or [])
tokenizers = dict(tokenizers or [])
prev = {} # previous tokenization matches used for alignment
aligned_pairs = []
for mkr, val in pairs:
tokenizer = tokenizers.get(mkr, default_tokenizer)
# empty content
if val is None:
aligned_pairs.append((mkr, [(None, None)]))
# unaligned fields; don't do any tokenization
elif mkr not in aligned_fields:
aligned_pairs.append((mkr, [(None, [val])]))
else:
toks = list(tokenizer.finditer(val))
prev[mkr] = toks
# target, but not source, of alignments; just tokenize
if mkr not in alignments:
aligned_pairs.append(
(mkr, [(val, [t.group(0).rstrip() for t in toks])])
)
# source of an alignment; tokenize and align
else:
tgt_toks = prev.get(alignments[mkr])
if tgt_toks is None:
logging.warning(
'Alignment target {} must precede source {}.'
.format(alignments[mkr], mkr)
)
continue
aligned = _align_tokens(
toks, tgt_toks, errors=errors
)
aligned_pairs.append((mkr, aligned))
return aligned_pairs
def _align_tokens(src, tgt, errors='strict'):
# make a deque so we can efficiently pop from the front; also this
# makes a copy of src so we don't affect the original
_src = deque(src)
_tgt = deque(tgt)
#last_tgt_idx = len(_tgt) - 1
#last_end = -1 # the end pos of the last source token
aligned = []
while _tgt:
t = _tgt.popleft()
next_t = _tgt[0] if _tgt else None # None indicates end of deque
try:
grp = _collect_aligned_tokens(
_src, t, next_t, errors=errors
)
except ToolboxAlignmentError as e:
if errors == 'reanalyze':
_src, _tgt = _reanalyze_tokens(src, tgt)
aligned = []
errors = 'reanalyze_1' # change this to avoid a cycle
continue
else:
raise
aligned.append((t.group(0).rstrip(), grp))
return aligned
def _collect_aligned_tokens(src, t, next_t, errors='strict'):
grp = []
while src:
s = src[0] # don't pop just yet
s_tok = s.group(0).rstrip()
if next_t is not None:
# basic case; reached next column
if s.start() >= next_t.start():
break
# exceptional case; overlapping columns
s_end = s.start() + len(s_tok)
if s_end >= next_t.start():
msg = 'Possible misalignment at position {} ({}).'.format(
next_t.start(), s_tok
)
# recover in some way, but still warn the user
warnings.warn(msg, ToolboxWarning)
if errors == 'ratio':
ratio = float(s_end - next_t.start()) / len(s_tok)
if ratio >= 0.5:
break
else: # errors == 'strict' or otherwise
raise ToolboxAlignmentError(msg)
# we're good to go. pop the token and continue
src.popleft() # no need to store it again
grp.append(s_tok)
return grp
def _reanalyze_tokens(src, tgt):
# get joined strings
src = ' '.join(x.group(0) for x in src)
tgt = ' '.join(x.group(0) for x in tgt)
# normalize spacing
src = re.sub(r'\s+', ' ', src.strip())
tgt = re.sub(r'\s+', ' ', tgt.strip())
# put them in lists of tokens (initially just the strings)
src = [src]
tgt = [tgt]
# iteratively find deeper alignments if possible
level_delims = [ # the order is reversed, so pop() can be used
r'([\.])', # dot-delimiter for glosses
r'\s+|((?<!\s)[-=~](?!\s))', # space or (delim not next to a space)
r'(?<=[^-=~\s])\s+(?=[^-=~\s])' # space not adjacent to a delimiter
]
while level_delims:
delim = level_delims.pop()
assert len(src) == len(tgt)
# tokenize each token in src and tgt (and filter '' and None)
_src = list(map(lambda x: list(filter(bool, re.split(delim, x))), src))
_tgt = list(map(lambda x: list(filter(bool, re.split(delim, x))), tgt))
pairs = zip_longest(_src, _tgt, fillvalue=[])
if all(len(s) == len(t) for s, t in pairs):
# same # tokens on each; reassign and continue
src = [tok for toks in _src for tok in toks]
tgt = [tok for toks in _tgt for tok in toks]
else:
# reshape src and break
_src = list(map(lambda xs: ' '.join(xs), _src))
src, tgt = zip(*map(lambda st: _ljust_pair(*st), zip(_src, tgt)))
break
# finally return re matches with the default tokenizer
src = deque(default_tokenizer.finditer(' '.join(src)))
tgt = deque(default_tokenizer.finditer(' '.join(tgt)))
return src, tgt
def _ljust_pair(s, t):
maxlen = max(len(s or ''), len(t or ''))
return ((s or '').ljust(maxlen), (t or '').ljust(maxlen))
class ToolboxProject(object):
def __init__(self, path):
self.path = find_project_file(path)
self.alignments = {}
self.initialize()
def initialize(self):
pass