-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathclient.py
364 lines (294 loc) · 12 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# -*- coding: utf-8 -*-
class ExpiredTokenException(Exception):
pass
class FailedRequestException(Exception):
pass
class ExpectedFailedRequestException(Exception):
pass
import feedparser
import re
import streaming_httplib2
from httplib2 import Http as StandardHttp
from OpenSSL.crypto import Error as CryptoError
from oauth2client.client import \
SignedJwtAssertionCredentials, AccessTokenRefreshError
from apiclient.discovery import build
from model import User, Document, Folder
from utils import *
from configuration import Configuration
# just reverts to default httplib2 behavior unless
# explicitely required (comes in handy to still use
# Google's code with streaming_httplib2.Http objects)
class StreamingHttp(streaming_httplib2.Http):
# incredibly ugly hack, but only way to do that with google's
# wrapping sitting on top of our class...
_STREAMING_MARKER = '-STREAMING'
_STREAMING_MARKER_LENGTH = len(_STREAMING_MARKER)
def __init__(self, *args, **kwargs):
super(StreamingHttp, self).__init__(*args, **kwargs)
# we need to keep track of redirections here because otherwise
# the 'streaming' flag will be lost when following a redirection
self._redirection_level = 0
@classmethod
def encode_streaming_method(cls, method):
return '%s%s' % (method, cls._STREAMING_MARKER)
@classmethod
def _decode_streaming_method(cls, method):
if method[-cls._STREAMING_MARKER_LENGTH:] == cls._STREAMING_MARKER:
return method[:-cls._STREAMING_MARKER_LENGTH]
return None
def request(self, uri, method='GET', *args, **kwargs):
decoded_method = self._decode_streaming_method(method)
if decoded_method:
method = decoded_method
try:
self._redirection_level += 1
headers, content = super(StreamingHttp, self).request(
uri, method, *args, **kwargs
)
finally:
self._redirection_level -= 1
if decoded_method is None and self._redirection_level == 0:
content = content.read()
return (headers, content)
class Credentials(object):
def __init__(self, http):
self._email, p12_file, self._p12_secret, self._scopes = \
Configuration.get('google_app_email', 'google_app_p12_file',
'google_app_p12_secret', 'google_api_scopes',
not_null=True)
stream = open(p12_file, 'r')
self._p12 = stream.read()
stream.close()
# check our credentials are those of a valid app
self._valid(http, True)
# returns true iff the credentials are from a valid Google API app
# that's quite independent of the domain
# throw_excptns set to True will throw Exceptions instead of
# just returning false
def _valid(self, http, throw_excptns=False):
try:
signed_assertion = self.get_signed_assertion()
signed_assertion.refresh(http)
Log.debug('App\'s credentials valid')
return True
except CryptoError as crypto_error:
if throw_excptns:
crypto_error.brive_explanation = \
'Incorrect p12 file and/or password'
raise
except AccessTokenRefreshError as oauth_error:
if throw_excptns:
oauth_error.brive_explanation = 'Invalid app credentials'
raise
return False
def get_signed_assertion(self, **kwargs):
return SignedJwtAssertionCredentials(self._email,
self._p12,
self._scopes,
self._p12_secret,
**kwargs)
class Client(object):
# FIXME: check extended scopes, and see that we fail,
# otherwise issue a warning
def __init__(self, keep_dirs, streaming):
self._keep_dirs = keep_dirs
self._streaming = streaming
self._reset()
self._creds = Credentials(self._http)
self._domain, admin_login, \
self._drive_service_name, self._drive_service_version, \
self._users_service_name, self._users_service_version = \
Configuration.get('google_domain_name',
'google_domain_admin_login',
'google_api_drive_name',
'google_api_drive_version',
'google_api_users_name',
'google_api_users_version',
not_null=True)
self._admin = User(admin_login, self, False)
Log.debug('Client loaded')
# authorizes the given user
def authorize(self, user):
Log.debug(u'Authorizing client for {}'.format(user.login))
self._reset()
signed_assertion = self._creds.get_signed_assertion(
prn=self._get_email_address(user)
)
signed_assertion.authorize(self._http)
def authorize_admin(self):
return self.authorize(self._admin)
@property
def streaming(self):
return self._streaming
@property
def drive_service(self):
return self._build_service(
self._drive_service_name,
self._drive_service_version
)
@property
def users_service(self):
return self._build_service(
self._users_service_name,
self._users_service_version
)
def users(self, logins=None, login_regex_pattern=None):
try:
if not logins:
logins = self._get_all_user_logins()
if login_regex_pattern:
regex = re.compile(login_regex_pattern)
logins = filter(regex.search, logins)
result = [User(login, self, self._keep_dirs)
for login in logins]
Log.verbose(u'Found users: {}'.format(result))
return result
except AccessTokenRefreshError as oauth_error:
explanation = \
u'App not authorized on {} '.format(self._domain) \
+ u'(or your admin user {} doesn\'t exist)'.format(self._admin)
oauth_error.brive_explanation = explanation
raise
# gets the complete list of users
def _get_all_user_logins(self):
return [login for login in UserGenerator(self, self._domain)]
def _build_service(self, service_name, api_version):
return build(service_name, api_version, self._http)
@Utils.multiple_tries_decorator(ExpectedFailedRequestException)
def request(self, uri, method='GET', *args, **kwargs):
# pop a few internal kwargs
expected_error_status = kwargs.pop('brive_expected_error_status', [])
if not isinstance(expected_error_status, list):
expected_error_status = [expected_error_status]
if kwargs.pop('brive_streaming', False) and self._streaming:
method = self._http.encode_streaming_method(method)
result = self._http.request(uri, method, *args, **kwargs)
headers = result[0]
status = int(headers.get('status', 0))
if status != 200:
if status in expected_error_status:
raise ExpectedFailedRequestException(status)
else:
content = result[1]
if hasattr(content, 'read'):
# a streamed content!
content = content.read()
raise FailedRequestException(
u'Http request failed (return code: {}, headers: {} '
.format(status, headers) +
u'and content: {})'.format(content.decode('utf8'))
)
return result
def _get_email_address(self, user):
return u'{}@{}'.format(user.login, self._domain)
def _reset(self):
if self._streaming:
self._http = StreamingHttp()
else:
self._http = StandardHttp()
class ServiceListEnumerator(object):
# sub classes must override these 2
_service_object_name = None
_items_field = None
def __iter__(self):
self._current_page_nb = 0
self._current_page = []
self._current_page_token = None
self._next_page_token = None
self._service = None
self._already_done_ids = []
return self
def _regenerate_service(self):
raise NotImplementedError()
def _list_kwargs(self):
return NotImplementedError()
def _process_item(self, item):
raise NotImplementedError()
@Utils.multiple_tries_decorator([ExpiredTokenException, StopIteration])
def next(self):
return self._get_next()
def add_processed_id(self, id):
self._already_done_ids.append(id)
def reset_to_current_page(self):
self._next_page_token = self._current_page_token
self._current_page = []
self._current_page_nb -= 1
def _get_next(self, first_try=True):
try:
if not first_try or not self._service:
# we need to re-auth
self._service = self._regenerate_service()
return self._do_get_next()
except ExpiredTokenException:
if first_try:
# let's try again
return self._get_next(False)
raise Exception(
'Two oauth errors in a row while processing' +
u'{}\'s documents '.format(self._user.login) +
're-authentication failed'
)
def _do_get_next(self):
if not self._current_page:
self._fetch_next_page()
try:
return self._current_page.pop(0)
except IndexError:
# no more docs to be fetched
raise StopIteration
def _fetch_next_page(self):
cls = self.__class__
if not self._current_page_nb or self._next_page_token:
kwargs = self._list_kwargs()
if self._next_page_token:
kwargs['pageToken'] = self._next_page_token
service_object = getattr(self._service, cls._service_object_name)()
response = service_object.list(**kwargs).execute()
items = response[getattr(cls, '_items_field')]
Log.debug('Retrieving page # {} : found {} items'
.format(self._current_page_nb, len(items)))
self._current_page = [
self._process_item(item) for item in items
if item['id'] not in self._already_done_ids
]
self._current_page_token = self._next_page_token
self._next_page_token = response.get('nextPageToken')
self._current_page_nb += 1
else:
# we're done
self._current_page = []
# no need to keep the processed ids of the current page in memory
self._already_done_ids = []
class UserGenerator(ServiceListEnumerator):
_service_object_name = 'users'
_items_field = 'users'
def __init__(self, client, domain):
self._client = client
self._domain = domain
def _regenerate_service(self):
self._client.authorize_admin()
return self._client.users_service
def _list_kwargs(self):
return {'domain': self._domain}
def _process_item(self, item):
email = item['primaryEmail']
return email.rsplit(u'@{}'.format(self._domain), 1)[0]
class UserDocumentsGenerator(ServiceListEnumerator):
_service_object_name = 'files'
_items_field = 'items'
def __init__(self, user, query=None, cls=Document):
self._user = user
# the query is used for the requests to the API
# (see doc @ https://developers.google.com/drive/search-parameters)
self._query = query
self._class = cls
def _regenerate_service(self):
return self._user.drive_service
def _list_kwargs(self):
kwargs = {}
if self._query:
kwargs['q'] = self._query
return kwargs
def _process_item(self, item):
return self._class(item, self._user.folders)