Skip to content

Commit

Permalink
Added CLI support
Browse files Browse the repository at this point in the history
  • Loading branch information
Krishanu Konar committed Oct 24, 2022
1 parent 72bc707 commit 3d771c4
Showing 1 changed file with 50 additions and 21 deletions.
71 changes: 50 additions & 21 deletions twitter_image_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import os
import wget
import sys
import argparse
import re

MEDIA_FORMATS = {
# https://help.twitter.com/en/using-twitter/tweeting-gifs-and-pictures
Expand All @@ -21,31 +23,57 @@
}

def main():
#Authentication
# Authentication
api = authenticate()

print ('\nTwitter Image Downloader:\n========================\n')

# CLI definition
parser = argparse.ArgumentParser(description="Download media from a given Twitter handle.")
parser.add_argument("-H", "--handle", type=str, help="Twitter Handle", required = False)
parser.add_argument("-n", "--max-tweets", type=int, nargs="?", help=" Max. number of tweets to search",
required=False, const=100)
parser.add_argument("-t", "--type", type=str, nargs="*", help="Type of media to download", required=False,
choices=[ "images", "videos", "gifs", "all" ])

args = parser.parse_args()

handle, max_tweets, media_formats = None, None, None

if args.handle is None:
handle, max_tweets, media_formats = interactiveUI()
else:
handle = args.handle
max_tweets = args.max_tweets or 100
media_formats = args.type or ["images"]

username = input("\nEnter the twitter handle of the Account to download media from: ")
max_tweets = int(input("Enter Max. number of tweets to search (default: 1000): ") or 1000)
media_formats = input("Enter type of media (images/gifs/videos/all) (default: images) ") or "images"

all_tweets = getTweetsFromUser(api, username, max_tweets)
print (handle, max_tweets, media_formats)
all_tweets = getTweetsFromUser(api, handle, max_tweets)
media_URLs = getTweetMediaURL(all_tweets, media_formats)

downloadFiles(media_URLs,username)
downloadFiles(media_URLs, handle)
print('\n\nFinished Downloading.\n')

def getTweetsFromUser(api, username, max_tweets=1000):
def interactiveUI():
handle = input("\nEnter the twitter handle of the Account to download media from: ").strip()
max_tweets = int(input("Enter Max. number of tweets to search (default: 1000): ").strip() or 1000)
media_formats = re.split('\s|,', input("Enter type of media (images/gifs/videos/all) (default: images) "))

# adding default
if media_formats == ['']: media_formats = ['images']

return handle, max_tweets, media_formats

def getTweetsFromUser(api, handle, max_tweets=1000):
'''
Fetches Tweets from user with the handle 'username'
Fetches Tweets from user with the handle 'handle'
upto max of 'max_tweets' tweets.
'''

last_tweet_id = 0

try:
raw_tweets = api.user_timeline(screen_name=username, include_rts=False, exclude_replies=True)
raw_tweets = api.user_timeline(screen_name = handle, include_rts = False, exclude_replies = True)
except Exception as e:
print (e)
sys.exit(-1)
Expand All @@ -58,7 +86,7 @@ def getTweetsFromUser(api, username, max_tweets=1000):
sys.stdout.write("\rTweets fetched: %d" % len(raw_tweets))
sys.stdout.flush()

tweets = api.user_timeline(screen_name=username, max_id=last_tweet_id, \
tweets = api.user_timeline(screen_name = handle, max_id=last_tweet_id, \
include_rts=False, exclude_replies=True)

if len(tweets) == 0:
Expand All @@ -68,10 +96,10 @@ def getTweetsFromUser(api, username, max_tweets=1000):
last_tweet_id = int(tweets[-1].id - 1)
raw_tweets = raw_tweets + tweets

print ('\nFinished fetching ' + str(min(len(raw_tweets),max_tweets)) + ' Tweets.')
print ('\nFinished fetching ' + str(min(len(raw_tweets), max_tweets)) + ' Tweets.')
return raw_tweets

def getTweetMediaURL(all_tweets, media_formats = "images"):
def getTweetMediaURL(all_tweets, media_formats = ["images"]):
'''
Fetches the media URLs from downloaded tweets.
'''
Expand All @@ -82,11 +110,13 @@ def getTweetMediaURL(all_tweets, media_formats = "images"):
for tweet in all_tweets:
media = tweet.entities.get('media',[])
if len(media) > 0:
# print(media)
if media_formats == "all":
if "all" in media_formats:
tweets_with_media.add(media[0]['media_url_https'])
else:
if media[0]['media_url_https'].split(".")[-1] in MEDIA_FORMATS[media_formats]:
allowed_extensions = []
for format in media_formats:
allowed_extensions.extend(MEDIA_FORMATS[format])
if media[0]['media_url_https'].split(".")[-1] in allowed_extensions:
tweets_with_media.add(media[0]['media_url_https'])

sys.stdout.write("\rMedia Links fetched: %d" % len(tweets_with_media))
Expand All @@ -95,7 +125,7 @@ def getTweetMediaURL(all_tweets, media_formats = "images"):
print ('\nFinished fetching ' + str(len(tweets_with_media)) + ' links.')
return tweets_with_media

def downloadFiles(media_url, username):
def downloadFiles(media_url, handle):
'''
Downloads the fetched media URLs.
'''
Expand All @@ -109,10 +139,10 @@ def downloadFiles(media_url, username):
os.chdir('twitter_images')

try:
os.mkdir(username)
os.chdir(username)
os.mkdir(handle)
os.chdir(handle)
except:
os.chdir(username)
os.chdir(handle)

for url in media_url:
wget.download(url)
Expand All @@ -124,6 +154,5 @@ def authenticate():
api = API(auth)
return api


if __name__ == '__main__':
main()

0 comments on commit 3d771c4

Please sign in to comment.