This repository has been archived by the owner on Jul 28, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathau_notebook.py
339 lines (294 loc) · 11.4 KB
/
au_notebook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""
This module is intended for use with Archives Unleashed Cloud
derivative data and the Archives Unleashed Cloud notebooks.
For more information, please visit https://archivesunleashed.org/.
"""
from collections import Counter
from nltk.corpus import stopwords
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.sentiment.vader import SentimentIntensityAnalyzer
class au_notebook:
"""
Archives Unleashed Notebook helper functions.
"""
# Maximum number of words to show in output.
# Jupyter will create an output error if the number is too high.
top_count = 30
# Domain suffixes to check non-U.S. domains so that (e.g.)
# www.google.co.uk will become "google".
stop_domains = ["co", "org", "net", "edu"] # Domain suffixes to remove.
# Minimum number of characters for a word to be included in a corpus.
minimum_word_length = 3 # Eliminates "it", "I", "be" etc.
# List of substrings to filter a text line, if desired.
line_filter = ["404 Not Found"]
# How many lines of text to use.
results_limit = 1000
# If you want to start at a different line, you can increase this.
# If results_start is great than results_limit you will get no results.
results_start = 0
# If you have a large file but want to sample the file more broadly.
# You can increase this value skip to every Nth line.
results_step = 1
# Change if you want a different filename.
output_filename = "./filtered_text.txt" # filename to output.
# Characters to show per text file in output.
# Larger numbers will result in more text showing in output.
max_characters = 75
# The years to include in the analysis.
# If empty, you will get all available years.
filtered_years = [] # E.g. ['2015', '2016', '2019'].
# The domains to include in the analysis.
# If empty, you will get all available domains.
filtered_domains = [] # E.g ["google", "apple", "facebook"].
# Use nltk stopwords?
use_nltk = True
# List of words not to include in a corpus for text analysis. Added to
# nltk stop words if use_nltk is True.
stop_words_user = ("north", "south")
# Will include nltk stop words if use_nltk is True, otherwise just user
# selected stop words.
stop_words = ""
# Collection ID.
collection = "4867" # Default collection for auk-notebooks.
auk_fp = "./data/"
auk_full_text = ""
auk_gephi = ""
auk_graphml = ""
auk_domains = ""
auk_filtered_text = ""
def __init__(self, collection, folder, **kwargs):
self.collection = collection
if folder is not None:
self.auk_fp = folder
for key, value in kwargs.items():
setattr(self, key.lower(), value)
self.auk_full_text = self.auk_fp + self.collection + "-fulltext.txt"
self.auk_gephi = self.auk_fp + self.collection + "-gephi.gexf"
self.auk_graphml = self.auk_fp + self.collection + "-gephi.graphml"
self.auk_domains = self.auk_fp + self.collection + "-fullurls.txt"
self.auk_filtered_text = (self.auk_fp +
self.collection +
"-filtered_text.zip")
self.stop_words = (
set(stopwords.words("english")).union(self.stop_words_user)
if self.use_nltk
else self.stop_words_user
)
def clean_domain(self, s):
"""Extracts the name from the domain (e.g. 'www.google.com' becomes
'google').
:param: s: The domain name to clean.
:return: The relevant name.
"""
ret = ""
dom = s.split(".")
if len(dom) < 3: # x.com is always x.
ret = dom[0]
elif dom[-2] in self.stop_words: # www.x.co.uk should be x.
ret = dom[-3]
else:
ret = dom[1]
return ret
def get_domains(self, split_method="full"):
"""Extracts the domains from a file by method.
:param split_method: Either "full" "name" or "sub". "name" provides
just the domain name, "sub" produces the name with subdomains.
"full" provides the entire name.
:return: A list of tuples containing (urlname, count).
"""
ret = []
clean = self.clean_domain
scores = Counter()
with open(self.auk_domains) as fin:
for line in fin:
ret.append(line.strip("()\n").split(","))
if split_method == "name":
for url, count in ret:
scores.update({clean(url): int(count)})
ret = scores
elif split_method == "sub":
splits = [(x[0].split("."), int(x[1])) for x in ret]
for url, count in splits:
if len(url) < 3:
scores.update({".".join(["www", url[0]]): count})
else:
scores.update({".".join([url[0], url[1]]): count})
ret = scores
else:
for url, count in ret:
scores.update({url: int(count)})
ret = scores
return ret
def get_text(self, by="all"):
"""Get the text from the files (by domain or year if desired).
:param by: "all", "domain" or "year" the output to return.
:param minline: The minimum size of a line to be included.
:return: [({year or domain}, textString)] if by is 'domain' or 'year',
otherwise [textString].
"""
text = []
form = range(self.results_start, self.results_limit, self.results_step)
with open(self.auk_full_text) as fin:
for num in range(self.results_limit):
if num in form:
line = next(fin)
split_line = str(line).split(",", 3)
line_filter = set([split_line[3].find(x)
for x in self.line_filter])
if len(
split_line[3]
) >= self.minimum_word_length and line_filter == {-1}:
# Too short and filtered strings removed.
if by == "domain":
text.append(
(self.clean_domain(split_line[1]),
split_line[3])
)
elif by == "year":
text.append((split_line[0][1:5], split_line[3]))
else:
text.append(split_line[3])
else:
next(fin)
return text
def get_text_tokens(self):
"""Get the data and tokenize the text.
:param minlen: The minimum word size to be included in the
list of words.
:return: A list of words included in the text file.
"""
return [
x.lower()
for x in word_tokenize(" ".join(self.get_text()))
if len(x) > self.minimum_word_length
]
def get_tokens_domains(self):
"""Get tokens by domain.
:param minlen: The minimum word size to be included in the
list of words.
:return: A list of tuples with (domain, Counter).
"""
return [
(
x[0],
Counter(
[
y
for y in word_tokenize(x[1])
if len(y) > self.minimum_word_length
]
),
)
for x in self.get_text("domain")
]
def get_tokens_years(self):
"""Get tokens by year.
:para minlen: The minimum word size to be included in the
list of words.
:return: A list of tuples with (year, Counter).
"""
return [
(
x[0],
Counter(
[
y
for y in word_tokenize(x[1])
if len(y) > self.minimum_word_length
]
),
)
for x in self.get_text("year")
]
def year(self):
"""Used by get_top_tokens_by to get the tokens by year."""
return self.get_tokens_years()
def domain(self):
"""Used by get_top_tokens_by to get tokens by domain."""
return self.get_tokens_domains()
def get_top_tokens(self):
"""Return the top tokens for the text."""
return [
(key, value)
for key, value in Counter(self.get_text_tokens()).most_common(
self.top_count
)
]
def get_top_tokens_by(self, fun):
""" Get the top tokens by a function.
:para fun: A function that returns a list of (key,
Counter([tokenized_list])).
:para total: The number of top tokens to return for each key.
:para minlen: The minimum word length.
:return: List of minlen tokens by fun.
"""
sep = dict()
tokens = fun()
sep = {k[0]: Counter() for k in tokens}
for key, value in tokens:
sep[key] += value
ret = [(key,
val.most_common(self.top_count)) for key,
val in sep.items()]
return ret
def international(self, text):
"""Applies UTF-16 if possible.
:param text: The text to decode (assumes UTF-8).
:return: UTF-32 or UTF-16 decoded string or else original string.
"""
unicode = text.encode("utf-8")
try:
ret = unicode.decode("UTF-32-LE")
except UnicodeDecodeError:
try:
ret = unicode.decode("UTF-32-BE")
except UnicodeDecodeError:
try:
ret = unicode.decode("UTF-16-LE")
except UnicodeDecodeError:
try:
ret = unicode.decode("UTF-16-BE")
except UnicodeDecodeError:
ret = unicode.decode("UTF-8")
return ret
def write_output(self, stdout, results):
""" Writes results to file.
:param stdout: Filepath for file.
:param results: A list of results.
:return: None.
"""
try:
with open(stdout, "w") as output:
for value in results:
output.write(str(value))
except Exception as exp:
print("Error writing the file.")
def sentiment_scores(self, by="domain"):
""" Calculates sentiment scores for a body of text.
:param by: Either "year" or "domain".
:return: A list of tuples with (year/domain, ("neg", score),
("neu", score) etc.).
"""
sep = dict()
corpus = self.get_text(by)
sep = {k[0]: [] for k in corpus}
for key, value in corpus:
sep[key] += sent_tokenize(value)
sid = SentimentIntensityAnalyzer()
result = []
for a, b in sep.items():
scores = Counter({"neg": 0, "pos": 0, "neu": 0, "compound": 0})
for c in b:
scores.update(sid.polarity_scores(c))
result += [
(
a,
("neg", scores["neg"] / len(b)),
("pos", scores["neg"] / len(b)),
("neu", scores["neu"] / len(b)),
("compound", scores["compound"] / len(b)),
)
]
return result
if __name__ == "__main__":
pass