forked from cclauss/Ten-lines-or-less
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlite_get_list_from_table.py
62 lines (45 loc) · 2.21 KB
/
sqlite_get_list_from_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# coding: utf-8
"""
Useful tools for reading sqlite database tables into Python data structures.
Each row of your database table is converted into a namedtuple with fieldnames
taken from the sqlite table definition. Using the namedtuple._asdict() method
allows you to use treat each table as a list of dicts or if your table has a
primary key which is unique for each row in the table, as a dict of dicts.
get_list_from_table() + get_dict_from_table() is 12 lines because of the
useful uniqueness test in the dict routine and because doing camelize()
inline would make the code too difficult to understand.
"""
import collections, sqlite3 # noqa
db_filename = "my.db"
def get_list_from_table(sqlite_connection, table_name):
"""convert an sqlite database table into a list of namedtuples"""
def camelize(s): # 'aa_bb_cc_dd' --> 'AaBbCcDd'
return "".join(word.title() for word in s.split("_"))
cursor = sqlite_connection.execute("SELECT * FROM {}".format(table_name))
col_names = " ".join(col_desc[0] for col_desc in cursor.description)
nt = collections.namedtuple(camelize(table_name), col_names)
return [nt(*row) for row in cursor.fetchall()]
def get_dict_from_table(sqlite_connection, table_name, key_col_name="id"):
"""convert an sqlite database table into a dict of namedtuples
useful for tables where each row has a unique primary key"""
the_list = get_list_from_table(sqlite_connection, table_name)
the_dict = {row._asdict()[key_col_name]: row for row in the_list}
fmt = "In {}, {} is not unique: {} {}"
assert len(the_dict) == len(the_list), fmt.format(
table_name, key_col_name, len(the_dict), len(the_list)
)
return the_dict
def get_services():
with sqlite3.connect(db_filename) as conn:
return get_dict_from_table(conn, "service")
def get_employee_dict():
with sqlite3.connect(db_filename) as conn:
return get_dict_from_table(conn, "employee", "employee_id")
services = get_services()
employee_dict = get_employee_dict()
for dataset in [services, employee_dict]:
if isinstance(dataset, dict):
print("\n".join(str(row) for row in dataset.values()))
else:
print("\n".join(str(row) for row in dataset))
print("")