-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
181 lines (152 loc) · 7.06 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import json
import base64
from quart import Quart, websocket
from pyngrok import ngrok
from dotenv import load_dotenv
from deepgram import Deepgram
from signalwire.voice_response import VoiceResponse, Start, Stream, Dial
from signalwire.rest import Client as SignalwireClient
from loguru import logger
from pydub import AudioSegment
# Load environment variables
load_dotenv()
# Initiate Quart app
app = Quart(__name__)
# Update app configurations from environment variables
app.config.update(
TO_NUMBER=os.getenv("TO_NUMBER"),
WEBHOOK_NUMBER=os.getenv("WEBHOOK_NUM"),
PORT=os.getenv('PORT'),
PROJECT=os.getenv('PROJECT'),
SW_TOKEN=os.getenv('SW_TOKEN'),
SPACE=os.getenv('SPACE'),
dg_client=Deepgram(os.getenv('DEEPGRAM_TOKEN')),
PUBLIC_URL=None,
)
# Initiate Signalwire client
client = SignalwireClient(app.config['PROJECT'], app.config['SW_TOKEN'], signalwire_space_url=f"{app.config['SPACE']}")
class ActiveCall:
# Class to handle active calls
def __init__(self, sid: str, dg_client: Deepgram):
self.sid = sid
self.dg_client = dg_client
async def get_transcript(self, data: dict):
# Get transcript from the provided data dictionary
transcript = data.get('channel', {}).get('alternatives', [{}])[0].get('transcript')
if transcript:
# Logs Call sid and transcription
logger.info(f"{self.sid} - {transcript}")
async def connect_to_deepgram(self):
# Establish a connection with Deepgram
socket = await self.dg_client.transcription.live({
'punctuate': True,
'encoding': "mulaw", # Match encoding of phone calls
'sample_rate': 8000, # Match audio sample rate
'channels': 2, # Match Audio Channel amount
'model': 'phonecall', # Improves accuracy of transcribing phone call audio
'language': 'en-US', # Sets language of transcription to english
'tier': 'nova', # Sets transcription tier to Nova - tiers: standard/enhanced/nova
'interim_results': False, # Only gives transcription results on final speech (when silence is heard)
})
socket.registerHandler(socket.event.CLOSE, lambda _: logger.info("Connection Closed..."))
socket.registerHandler(socket.event.TRANSCRIPT_RECEIVED, self.get_transcript)
return socket
# Handles inbound calls
@app.route('/inbound', methods=['POST', 'GET'])
async def inbound_call():
# Swap public url from a http url, to a wss url
public_url = app.config.get('PUBLIC_URL')
public_url = public_url.replace("https", "wss").replace("http", "wss") + '/media'
# Start Stream
response = VoiceResponse()
start = Start()
stream = Stream(name='stream', url=public_url, track="both_tracks")
start.append(stream)
response.append(start)
# Forward call
dial = Dial()
dial.number(f"{app.config['TO_NUMBER']}")
response.append(dial)
return response.to_xml()
@app.websocket('/media')
async def websocket_endpoint():
# Handle media websocket endpoint, SignalWire will stream call audio to this endpoint.
deepgram_socket = None
in_buffer = bytearray()
out_buffer = bytearray()
buffer_size = 20 * 160
try:
while True:
ws = await websocket.receive()
# Makes sure valid json is being sent
data = json.loads(ws)
event = data.get('event')
if event == "start":
sid = data.get('start', {}).get('callSid')
if sid:
# initializes a new ActiveCall, and then makes a connection to Deepgram
call_class = ActiveCall(sid, app.config["dg_client"])
deepgram_socket = await call_class.connect_to_deepgram()
logger.info(f"{sid} - Session is starting...")
elif event == "media":
# Decode base64 payload into bytes and then extend the audio bytes to the correlating bytearray
payload = base64.b64decode(data.get('media', {}).get('payload', ''))
track = data.get('media', {}).get('track')
if track == 'inbound':
in_buffer.extend(payload)
if track == 'outbound':
out_buffer.extend(payload)
elif event == "stop":
if deepgram_socket is not None:
# Sending an empty byte will close the connection with Deepgram
deepgram_socket.send(b'')
break
"""
Checks the length of the bytearrays, once both are the length of the buffer size, we will mix the two
audio channels together to make a stereo audio segment. Once this segment is created, we will then send the
raw data of this stereo segment to DeepGram for transcription
"""
while len(in_buffer) >= buffer_size and len(out_buffer) >= buffer_size:
as_inbound = AudioSegment(bytes(in_buffer[:buffer_size]), sample_width=1, frame_rate=8000, channels=1)
as_outbound = AudioSegment(bytes(out_buffer[:buffer_size]), sample_width=1, frame_rate=8000, channels=1)
mixed = AudioSegment.from_mono_audiosegments(as_inbound, as_outbound)
deepgram_socket.send(mixed.raw_data)
in_buffer = in_buffer[buffer_size:]
out_buffer = out_buffer[buffer_size:]
except Exception as error:
logger.error(error)
finally:
await websocket.close(app.config['PORT'])
def start_ngrok():
"""Start ngrok for tunneling"""
logger.info("Starting ngrok tunnel...")
tunnel_url = ngrok.connect(app.config['PORT'], bind_tls=True).public_url
app.config['PUBLIC_URL'] = tunnel_url
# Getting sid of webhook number
incoming_phone_numbers = client.incoming_phone_numbers.list(phone_number=app.config['WEBHOOK_NUMBER'])
sid = incoming_phone_numbers[0].sid if incoming_phone_numbers else logger.error("Invalid Webhook number")
# Update the voice URL
try:
# noinspection PyTypeChecker
client.incoming_phone_numbers(sid).update(voice_url=f"{tunnel_url}/inbound", voice_receive_mode="voice")
logger.info(f"Signalwire Number updated...\n Public Url: {tunnel_url}")
logger.info(f"Call {app.config['WEBHOOK_NUMBER']} to start transcribing a call...")
except Exception as e:
logger.error(e)
if __name__ == "__main__":
try:
start_ngrok()
except Exception as e:
logger.error(f"{e}")
logger.info("Killing ngrok process...")
# Checks the kind of platform, and then sets the according command to kill ngrok
command = "taskkill /IM ngrok.exe /F" if os.name == 'nt' else "pkill ngrok"
os.system(command)
# Try starting ngrok again after killing the process
try:
start_ngrok()
except Exception as e:
logger.error(f"Error restarting ngrok after killing process: {e}")
# If it fails again, you may want to handle this appropriately in your code
app.run('localhost', port=app.config['PORT'], debug=False)