-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreaming.py
197 lines (148 loc) · 6.06 KB
/
streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# pylint: disable=W1203
# pylint: disable=W0613
'''
ICE Gauntlet Token Server
'''
import sys
import json
import random
import signal
import string
import logging
import os.path
import glob, os
import random
import hashlib
import uuid
import iceflixrtsp
import ipaddress
import manage_services
import Ice
Ice.loadSlice('iceflix.ice')
# pylint: disable=E0401
# pylint: disable=C0413
import IceFlix
import IceStorm
authenticators = {}
streamings = {}
catalogs = {}
class StreamControllerI(IceFlix.StreamController):
def __init__(self, file, service_manager):
self.file = file
self.service_manager = service_manager
def getSDP(self, authentication, port, current=None):
try:
if (random.choice(list(authenticators)).isAuthorized(authentication) == False):
raise IceFlix.Unauthorized()
except:
raise IceFlix.TemporaryUnavailable()
connection = current.con.getInfo()
host_address = ipaddress.ip_address(connection.remoteAddress)
host = str(host_address.ipv4_mapped)
self.rtsp = iceflixrtsp.RTSPEmitter(self.file, host, port)
self.rtsp.start()
self.player = iceflixrtsp.RTSPPlayer()
self.player.play(self.rtsp.sdp)
#def getSyncTopic(self, current=None):
#def refreshAuthentication(self, authentication, current=None):
def stop(self, current=None):
self.player.stop()
self.rtsp.stop()
class StreamProvider(IceFlix.StreamProvider):
def __init__(self, sync_catalog, publisher_availability, directory, adapter):
self.media_list = {}
self.new_stream_controller = None
self.sync_catalog = sync_catalog
os.chdir(directory)
proxy = adapter.addWithUUID(self)
this_streaming = IceFlix.StreamProviderPrx.checkedCast(proxy)
streaming_id = str(uuid.uuid4())
for (dirpath, dirnames, filenames) in os.walk("."):
for file in filenames:
with open(file, "rb") as f:
bytes = f.read()
readable_hash = hashlib.sha256(bytes).hexdigest()
sync_catalog.newMedia(readable_hash, file, streaming_id)
self.media_list[readable_hash] = file
self.service_manager = manage_services.ManageServices()
sync_services = IceFlix.ServiceAvailabilityPrx.uncheckedCast(publisher_availability)
sync_services.mediaService(this_streaming, streaming_id)
def get_show_services(self, dict_of_catalogs, dict_of_auth, dict_of_streams):
authenticators = dict_of_auth
streamings = dict_of_streams
catalogs = dict_of_catalogs
def getStream(self, id, authentication, current):
try:
if (random.choice(list(authenticators)).isAuthorized(authentication) == False):
raise IceFlix.Unauthorized()
except:
raise IceFlix.TemporaryUnavailable()
try:
servant = StreamControllerI(self.media_list[id], self.service_manager)
proxy = current.adapter.addWithUUID(servant)
if self.new_stream_controller is None:
self.new_stream_controller = IceFlix.StreamControllerPrx.checkedCast(proxy)
return self.new_stream_controller
except:
raise IceFlix.WrongMediaId(id)
def isAvailable(self, id, current=None):
for file in self.media_list:
if id == file:
return True
def reannounceMedia(self, current=None):
for media in self.media_list:
self.sync_catalog.newMedia(media, self.media_list[media], self)
class Server(Ice.Application):
'''
Authentication Server
'''
def run(self, argv):
'''
Server loop
'''
logging.debug('Initializing server...')
directory = argv[1]
#MEDIA ANNOUNCEMENT SUBSCRIPTION
proxy = self.communicator().propertyToProxy('IceFlix.IceStorm/TopicManager')
topic_mgr = IceStorm.TopicManagerPrx.checkedCast(proxy) #pylint: disable=E1101
if not topic_mgr:
print ('Invalid proxy')
return 2
topic_name_announcement = "MediaAnnouncement"
qos={}
try:
topic_announcement = topic_mgr.retrieve(topic_name_announcement)# pylint: disable=W0702
except:# pylint: disable=W0702
topic_announcement = topic_mgr.create(topic_name_announcement)# pylint: disable=W0702
publisher_announcement = topic_announcement.getPublisher()
sync_catalog = IceFlix.StreamAnnouncesPrx.uncheckedCast(publisher_announcement)
adapter = self.communicator().createObjectAdapter('StreamingAdapter')
#SERVICE AVAILABILITY SUSCRIPTION
topic_name_availability = "ServiceAvailability"
try:
topic_availability = topic_mgr.retrieve(topic_name_availability)# pylint: disable=W0702
except:# pylint: disable=W0702
topic_availability = topic_mgr.create(topic_name_availability)# pylint: disable=W0702
publisher_availability = topic_availability.getPublisher()
properties = self.communicator().getProperties()
suscriber_servant = StreamProvider(sync_catalog, publisher_availability, directory, adapter)
factory_id = properties.getProperty('StreamingFactoryIdentity')
#PROXY OF THE ACTUAL SERVER
proxy = adapter.add(suscriber_servant, self.communicator().stringToIdentity(factory_id))
adapter.addDefaultServant(suscriber_servant, '')
subscriber = adapter.addWithUUID(suscriber_servant)
topic_announcement.subscribeAndGetPublisher(qos, subscriber)
adapter.activate()
logging.debug('Adapter ready, servant proxy: {}'.format(proxy))
print('"{}"'.format(proxy), flush=True)
logging.debug('Initializing server...')
self.shutdownOnInterrupt()
self.communicator().waitForShutdown()
topic_announcement.unsubscribe(subscriber)
return 0
if __name__ == '__main__':
app = Server()
sys.exit(app.main(sys.argv))