-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcollect_slackusers.py
156 lines (128 loc) · 5.16 KB
/
collect_slackusers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
__author__ = 'https://github.com/password123456/'
__date__ = '2024.11.26'
__version__ = '1.3'
__status__ = 'Production'
import os
import sys
import requests
import json
from datetime import datetime, timezone
from settings import Config
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
def lookup_conversations_members(headers, channel_id, set_proxy):
api_method = 'conversations.members'
limit = 1000
result = ''
try:
# https://api.slack.com/methods/conversations.members
response = requests.get(
f'https://slack.com/api/{api_method}?channel={channel_id}&limit={limit}&pretty=1',
headers=headers,
proxies=set_proxy
)
result = response.text
except KeyboardInterrupt:
sys.exit(0)
except Exception as error:
print(f'- Exception::{error}')
return result
def lookup_users_info(headers, user_id, set_proxy):
api_method = 'users.info'
result = ''
try:
# https://api.slack.com/methods/users.info
response = requests.get(
f'https://slack.com/api/{api_method}?user={user_id}',
headers=headers,
proxies=set_proxy
)
result = response.text
except KeyboardInterrupt:
sys.exit(0)
except Exception as error:
print(f'- Exception::{error}')
return result
def get_value_or_null(value):
return 'null' if not value else value
def export_data(filename, data, mode):
with open(filename, mode, encoding='utf-8', newline='') as file:
for line in data:
file.write(f'{line}\n')
def count_lines_in_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
return len(lines)
except FileNotFoundError:
return 0
except Exception as error:
print(f'- Exception::{error}')
return -1
def main():
headers = {
'Authorization': f'Bearer {Config.SLACK_BOT}',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/49.0.2623.112 Safari/537.36'
}
# set_proxy = None # if you don't want to use proxy server
set_proxy = Config.PROXY # if you don't want to use proxy server
result = lookup_conversations_members(headers, channel_id, set_proxy)
member_data = json.loads(result)
i = 0
content_result = []
mode = 'w'
if member_data['ok']:
members = member_data['members']
for user_id in members:
user_data = lookup_users_info(headers, user_id, set_proxy)
user_data = json.loads(user_data)
user_info = user_data['user']
# Extract user excluding bots account
if not user_info['is_bot']:
i = i + 1
member_id = get_value_or_null(user_info.get('id'))
display_name = get_value_or_null(user_info.get('name'))
real_name = get_value_or_null(user_info.get('real_name'))
mobile = get_value_or_null(user_info['profile'].get('phone'))
email = get_value_or_null(user_info['profile'].get('email'))
title = get_value_or_null(user_info['profile'].get('title').replace(' ', '').replace(',', '/'))
content_result.append(f'{i},{datetime.now(timezone.utc).astimezone().replace(microsecond=0).isoformat()},'
f'{real_name},{display_name},{member_id},{email},{title},{mobile}')
# save users by 10 count information stored in content_result when it reaches 10
# i.e. when it reaches 10, break into 10 units and save to users_db
# after saving, content_result is initialised to an empty list (to avoid duplicate storage)
if len(content_result) >= 10:
export_data(Config.USERS_DB, content_result, mode)
content_result = []
mode = 'a'
# If there is data that is less than 10 and could not be saved to users_db, save to file
# i.e. save the last remaining user information
mode = 'a'
if content_result:
export_data(Config.USERS_DB, content_result, mode)
message = (f'>> collect slack_users <<\n\n'
f'- {os.uname()[1]}\n'
f'- *{datetime.now(timezone.utc).replace(microsecond=0).isoformat()}*')
if os.path.exists(Config.USERS_DB):
line_count = count_lines_in_file(Config.USERS_DB)
if line_count >= 0:
message = f'*{message}\n\n`{line_count} user record created.`'
else:
message = f'*{message}\n\n`{Config.USERS_DB} not found. something wrong?`'
else:
message = f'*{message}\n\n`{Config.USERS_DB} not found. something wrong?`'
print(message)
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(f'- ::Exception:: Func:[{__name__.__name__}] '
f'Line:[{sys.exc_info()[-1].tb_lineno}] [{type(e).__name__}] {e}')
"""
*>> collect slack_users <<
- mymacmacmac.local
- *2024-11-22T01:10:51+00:00*
`257 user record created.`
"""