-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcStream.py
80 lines (63 loc) · 3.03 KB
/
cStream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from kafka import SimpleProducer, KafkaClient
import requests
from dateutil.parser import parse
from datetime import datetime, timedelta
import time
import csv
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
def main():
time.sleep(20)
makeStream()
scheduler = BlockingScheduler()
scheduler.add_job(makeStream, 'interval', hours=1)
scheduler.start()
def makeStream():
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)
# Dow Jones Industrial Average 30 stocks
dow30 = ['MMM', 'AXP', 'AAPL', 'BA', 'CAT', 'CVX', 'CSCO', 'KO', 'DIS', 'DOW', 'XOM', 'GS', 'HD', 'IBM', 'INTC', 'JNJ',
'JPM', 'MCD', 'MRK', 'MSFT', 'NKE', 'PFE', 'PG', 'TRV', 'UTX', 'UNH', 'VZ', 'V', 'WMT', 'WBA']
CSV_URL1 = 'http://www.nasdaq.com/screening/companies-by-industry.aspx?exchange=NASDAQ&render=download'
CSV_URL2 = 'http://www.nasdaq.com/screening/companies-by-industry.aspx?exchange=NYSE&render=download'
stock_quotes = {}
with requests.Session() as s:
download = s.get(CSV_URL1)
decoded_content = download.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
my_list1 = list(cr)
download = s.get(CSV_URL2)
decoded_content = download.content.decode('utf-8')
cr = csv.reader(decoded_content.splitlines(), delimiter=',')
my_list2 = list(cr)
my_list = my_list1+my_list2[1:]
for row in my_list[1:]:
if row[0] in dow30:
stock_quotes[row[0]] = [row[1].replace("'s", "\'"), row[2]]
delim = '$$$$'
for symbol in stock_quotes.keys():
#f_dt = str((datetime.now() - timedelta(hours=1)).replace(microsecond=0).isoformat())
f_dt = str((datetime.now() - timedelta(days=5)).replace(microsecond=0).isoformat())
stock = stock_quotes[symbol][0]
quote = stock_quotes[symbol][1]
print('Stock being analyzed : ' + stock)
# stock = stock.replace(" ", "%20")
# stock = stock.replace("\'", "%2527")
t_dt = str(datetime.now().replace(microsecond=0).isoformat())
url = 'https://newsapi.org/v2/everything?q=' + stock + '&from='+f_dt+'&to='+t_dt+'&sortBy=publishedAt&apiKey=9714e1d74fb64495aaafdb54d4cdd0bc'
response = requests.get(url)
json_res = response.json()
for post in json_res["articles"]:
date_time = post["publishedAt"]
#if parse(date_time).date() == last_hour.date() and parse(date_time).time() > last_hour.time():
data = symbol + delim + stock + delim + quote + delim + date_time
if post["description"] is not None:
data += delim + post["description"]
else:
continue
if post["content"] is not None:
data += " " + post["content"]
msg = data.encode('utf-8')
producer.send_messages(b'newsstream', msg)
if __name__ == '__main__':
main()