-
Notifications
You must be signed in to change notification settings - Fork 40
/
pyutil.py
114 lines (87 loc) · 3.53 KB
/
pyutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""Miscellaneous utility functions."""
from __future__ import absolute_import, division, with_statement
import zlib
import os
import fcntl
class ObjectDict(dict):
"""Makes a dictionary behave like an object."""
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
self[name] = value
class GzipDecompressor(object):
"""Streaming gzip decompressor.
The interface is like that of `zlib.decompressobj` (without the
optional arguments, but it understands gzip headers and checksums.
"""
def __init__(self):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
# This works on cpython and pypy, but not jython.
self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
def decompress(self, value):
"""Decompress a chunk, returning newly-available data.
Some data may be buffered for later processing; `flush` must
be called when there is no more input data to ensure that
all data was processed.
"""
return self.decompressobj.decompress(value)
def flush(self):
"""Return any remaining buffered data not yet returned by decompress.
Also checks for errors such as truncated input.
No other methods may be called on this object after `flush`.
"""
return self.decompressobj.flush()
def import_object(name):
"""Imports an object by name.
import_object('x.y.z') is equivalent to 'from x.y import z'.
>>> import tornado.escape
>>> import_object('tornado.escape') is tornado.escape
True
>>> import_object('tornado.escape.utf8') is tornado.escape.utf8
True
"""
parts = name.split('.')
obj = __import__('.'.join(parts[:-1]), None, None, [parts[-1]], 0)
return getattr(obj, parts[-1])
# Fake byte literal support: In python 2.6+, you can say b"foo" to get
# a byte literal (str in 2.x, bytes in 3.x). There's no way to do this
# in a way that supports 2.5, though, so we need a function wrapper
# to convert our string literals. b() should only be applied to literal
# latin1 strings. Once we drop support for 2.5, we can remove this function
# and just use byte literals.
if str is unicode:
def b(s):
return s.encode('latin1')
bytes_type = bytes
else:
def b(s):
return s
bytes_type = str
def raise_exc_info(exc_info):
"""Re-raise an exception (with original traceback) from an exc_info tuple.
The argument is a ``(type, value, traceback)`` tuple as returned by
`sys.exc_info`.
"""
# 2to3 isn't smart enough to convert three-argument raise
# statements correctly in some cases.
if isinstance(exc_info[1], exc_info[0]):
raise exc_info[1], None, exc_info[2]
# After 2to3: raise exc_info[1].with_traceback(exc_info[2])
else:
# I think this branch is only taken for string exceptions,
# which were removed in Python 2.6.
raise exc_info[0], exc_info[1], exc_info[2]
# After 2to3: raise exc_info[0](exc_info[1]).with_traceback(exc_info[2])
def doctests():
import doctest
return doctest.DocTestSuite()
def set_close_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)