-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
129 lines (99 loc) · 4.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from datetime import datetime, timedelta
from dotenv import load_dotenv
import isodate
import requests
import re
import json
import os
load_dotenv() # Same folder so no need to specify path
DEVELOPER_KEY = os.getenv('DEVELOPER_KEY')
YT_API_SERVICE_NAME = 'youtube'
YT_API_VERSION = 'v3'
youtube = build(YT_API_SERVICE_NAME, YT_API_VERSION, developerKey=DEVELOPER_KEY)
def get_channel_id_by_url(channel_url):
response = requests.get(channel_url)
match = re.search(r'\/channel\/(.*?)\"', response.text)
if match:
return match.group(1)
else:
return None
def get_channel_id_by_username(username):
search_response = youtube.search().list(
type='channel',
q=username,
part='id'
).execute()
return search_response['items'][0]['id']['channelId']
def get_recent_vids(channel_id, months = 3, greaterLength = timedelta(minutes = 0)):
# Get the channel's uploads playlist ID
channels_response = youtube.channels().list(
id = channel_id,
part='contentDetails'
).execute()
uploads_playlist_id = channels_response['items'][0]['contentDetails']['relatedPlaylists']['uploads']
# Get videos in the uploads playlist
videos = []
next_page_token = ''
flag_stop = False
few_months_ago = datetime.now() - timedelta(days=months*30)
while (next_page_token is not None):
playlistitems_response = youtube.playlistItems().list(
playlistId=uploads_playlist_id,
part='snippet',
maxResults=50,
pageToken=next_page_token
).execute()
# print(json.dumps(playlistitems_response, indent=4))
for item in playlistitems_response['items']:
published_at = datetime.strptime(item['snippet']['publishedAt'], '%Y-%m-%dT%H:%M:%SZ')
if published_at < few_months_ago:
flag_stop = True
break
video_id = item['snippet']['resourceId']['videoId']
video_response = youtube.videos().list(
id=video_id,
part='statistics,contentDetails'
).execute()
duration = isodate.parse_duration(video_response['items'][0]['contentDetails']['duration'])
if duration <= greaterLength:
continue
video_id = item['snippet']['resourceId']['videoId']
view_count = int(video_response['items'][0]['statistics']['viewCount'])
video_url = f'https:///www.youtube.com/watch?v={video_id}'
videos.append((item['snippet']['title'], view_count, video_url, duration))
if flag_stop: break
next_page_token = playlistitems_response.get('nextPageToken')
return videos
def filterVideos(videos, display = 5):
#print(json.dumps(videos, indent=4))
sorted_videos = sorted(videos, key = lambda x: x[1], reverse=True)
return sorted_videos[:display]
def output(videos):
print(f'>> Top video uploaded in the last {months} months (by view):\n')
for vid in videos:
for feature in vid:
if isinstance(feature, int):
print(" View count: ", format(feature, ','))
continue
print(' ', feature)
print("\n")
def beautify(msg):
if isinstance(msg, bytes):
msg = json.loads(msg.decode('utf-8'))
return json.dumps(msg, indent=4)
if __name__ == '__main__':
try:
channel_username = "ComedyCentral"
channel_url = "https://www.youtube.com/@" + channel_username
months = 1
display = 5 # How many vid
greaterLength = timedelta(minutes=1)
# channel_id = get_channel_id_by_url(channel_url)
channel_id = get_channel_id_by_username(channel_username) # use this for some channel that create custom channel
vids = get_recent_vids(channel_id, months, greaterLength)
vids = filterVideos(vids, display)
output(vids)
except HttpError as err:
print(f'An HTTP error {err.resp.status} occurred:\n{beautify(err.content)}')