forked from ee-2/SurrogateGeneration
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsurrogateGeneration.py
executable file
·125 lines (114 loc) · 6.5 KB
/
surrogateGeneration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import re
import random
import os
import traceback
import importlib
from sgFile import SgFile
from string import punctuation, ascii_lowercase, ascii_uppercase
from entity import Entity
'''
Surrogate Generation
'''
class SurrogateGeneration:
def __init__(self, parameters):
self.parameters = parameters
module = importlib.import_module('lang.'+parameters['settings']['lang'])
self.lang = getattr(module, module.__all__[0])()
self.nrFiles = 0
# generate random characters
def genRandomChars(self, tokenTxt):
surrogate = ''
for char in tokenTxt:
if char.isdigit():
char = str(random.randint(0,9))
elif char.isalpha():
if char.islower():
char = random.choice(ascii_lowercase)
else:
char = random.choice(ascii_uppercase)
surrogate += char
return surrogate
# substitute entity with random letters and numbers
def subChar(self, sgFile, token):
token.setNormCase(token.text.lower())
if token.normCase in sgFile.sub[token.label]:
return sgFile.sub[token.label].get(token.text, sgFile.sub[token.label][token.normCase])
else:
surrogate = self.genRandomChars(token.text)
sgFile.sub[token.label][token.text] = surrogate
sgFile.sub[token.label][token.normCase] = surrogate
return surrogate
# substitute EMAIL and URL
def subUri (self, sgFile, token):
token.setNormCase(token.text.lower())
if token.normCase in sgFile.sub[token.label]:
return sgFile.sub[token.label].get(token.text, sgFile.sub[token.label][token.normCase])
else:
diff = len(token.text)-len(re.sub('^(<?ftp:|<?file:|<?mailto:|((<?https?:)?(<?www)?))','',token.text))
surrogate = token.text[:diff] + self.genRandomChars(token.text[diff:])
sgFile.sub[token.label][token.text] = surrogate
sgFile.sub[token.label][token.normCase] = surrogate
return surrogate
# get substitute
def getSubstitute(self, sgFile, token):
if token.text in punctuation: # punctuation is returned unchanged only special char if not UFID etc...
return token.text
elif token.label in ['UFID', 'PHONE', 'ZIP', 'STREETNO', 'PASS', 'USER']:
return self.subChar(sgFile, token)
elif token.label in ['URL','EMAIL']:
return self.subUri(sgFile, token)
elif token.label == 'DATE':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.subDate(sgFile, token)
elif token.label == 'STREET':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.subStreet(sgFile, token)
elif token.label == 'CITY':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.getSurrogateAbbreviation(sgFile, token.text, token.label, self.lang.city) or self.lang.subCity(sgFile, token)
elif token.label == 'FEMALE':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.getSurrogateAbbreviation(sgFile, token.text, token.label, self.lang.female) or self.lang.subFemale(sgFile, token)
elif token.label == 'MALE':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.getSurrogateAbbreviation(sgFile, token.text, token.label, self.lang.male) or self.lang.subMale(sgFile, token)
elif token.label == 'FAMILY':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.getSurrogateAbbreviation(sgFile, token.text, token.label, self.lang.family) or self.lang.subFamily(sgFile, token)
elif token.label == 'ORG':
return self.lang.getCoSurrogate(sgFile, token) or self.lang.subOrg(sgFile, token)
# substitute privacy-sensitive annotations in file
def subFile(self, sgFile, annotations):
newText = ''
begin = 0
outputAnn = ''
for i, anno in enumerate(annotations):
token = Entity(anno['text'], anno['label'], anno['start'], anno['end'])
sub = self.getSubstitute(sgFile, token)
newText+= sgFile.txt[begin:anno['start']] + sub
begin = anno['end']
outputAnn += 'T' + str(i+1) + '\t' + anno['label'] + ' ' + str(len(newText)-len(sub)) + ' ' + str(len(newText)) + '\t' + sub + '\n'
newText += sgFile.txt[begin:]
fileOutputAnn = os.path.join(self.parameters['settings']['path_output'], os.path.relpath(sgFile.file, self.parameters['settings']['path_input']))
fileOutputTxt = re.sub('.ann', '.txt', fileOutputAnn)
os.makedirs(os.path.dirname(fileOutputAnn), exist_ok=True)
with open(fileOutputTxt, 'w', encoding='utf-8', newline='') as fileOutputTxt:
fileOutputTxt.write(newText)
with open(fileOutputAnn, 'w', encoding='utf-8') as fileOutputAnn:
fileOutputAnn.write(outputAnn.rstrip())
# process files
def collectFiles(self, subset, threadName):
for file in subset:
print(file)
try:
with open(re.sub('.ann$', '.txt', file), 'r', encoding='utf-8', newline='') as fileInputTxt:
inputTxt = fileInputTxt.read()
with open (file, 'r', encoding='utf-8') as fileInputAnn:
annos = {}
for line in fileInputAnn.readlines():
# TODO: handle discontinuous annotations
lineSplitted = line.rstrip().split(None, 4)
annos[(int(lineSplitted[2]), int(lineSplitted[3]))] = {'start':int(lineSplitted[2]),
'end':int(lineSplitted[3]),
'label':lineSplitted[1],
'text': lineSplitted[4]}
sgFile = SgFile(file, threadName, inputTxt, self.lang.freqMapFemale, self.lang.freqMapMale, self.lang.freqMapFamily, self.lang.freqMapOrg, self.lang.freqMapStreet, self.lang.freqMapCity)
self.subFile(sgFile, [annos[anno] for anno in sorted(annos)])
self.nrFiles += 1
except Exception:
print(file + ' not processed:')
traceback.print_exc()