-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathconversation.py
More file actions
159 lines (135 loc) · 5.03 KB
/
conversation.py
File metadata and controls
159 lines (135 loc) · 5.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import asyncio
import io
import ssl
import sys
import signal
import pyaudio
from speechmatics_flow.client import WebsocketClient
from speechmatics_flow.models import (
ConnectionSettings,
Interaction,
AudioSettings,
ConversationConfig,
ServerMessageType,
)
import os
from dotenv import load_dotenv
load_dotenv()
CHUNK_SIZE = 1024 # default 256
SEE_TRANSCRIPTS = True
AUTH_TOKEN = os.getenv("SPEECHMATICS_API_KEY")
# SSL (Secure Sockets Layer) Configuration
# Dummy SSL context to disable hostname and certificate verification
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
client = WebsocketClient(
ConnectionSettings(
url="wss://flow.api.speechmatics.com/v1/flow",
auth_token=AUTH_TOKEN,
ssl_context=ssl_context,
)
)
# Store audio messages in a queue (for playback)
audio_queue = asyncio.Queue()
# Handle server messages
def message_handler(msg: dict):
if isinstance(msg, dict):
message_type = msg.get("message", "")
# Only process final transcripts, not partials
if message_type == "AddTranscript":
if "metadata" in msg and "transcript" in msg["metadata"]:
transcript = msg["metadata"]["transcript"]
if transcript.strip() and SEE_TRANSCRIPTS:
print(f"User: {transcript}")
# Flow response completed
elif message_type == "ResponseCompleted":
content = msg.get("content", "")
if content and SEE_TRANSCRIPTS:
print(f"AI (completed): {content}")
# Flow response interrupted
elif message_type == "ResponseInterrupted":
content = msg.get("content", "")
if content and SEE_TRANSCRIPTS:
print(f"AI (interrupted): {content}")
# Handle binary audio messages from the server
def binary_msg_handler(msg: bytes):
if isinstance(msg, (bytes, bytearray)):
audio_queue.put_nowait(msg)
# Catch and log issues
def error_handler(msg: dict):
if isinstance(msg, dict):
message_type = msg.get("message", "")
details = msg.get("details", "")
if message_type == "Error":
print(f"Error: {details}")
elif message_type == "Warning":
print(f"Warning: {details}")
# Register handlers
client.add_event_handler(ServerMessageType.AddAudio, binary_msg_handler)
client.add_event_handler(ServerMessageType.AddTranscript, message_handler)
client.add_event_handler(ServerMessageType.ResponseCompleted, message_handler)
client.add_event_handler(ServerMessageType.ResponseInterrupted, message_handler)
client.add_event_handler(ServerMessageType.Error, error_handler)
client.add_event_handler(ServerMessageType.Warning, error_handler)
async def audio_playback():
'''Continuously read from the audio queue and play audio back to the user.'''
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=16000, output=True)
try:
while True:
# Create a new playback buffer for each iteration
playback_buffer = io.BytesIO()
# Fill the buffer up to the chunk size
while playback_buffer.tell() < CHUNK_SIZE:
data = await audio_queue.get()
playback_buffer.write(data)
# Write the full buffer contents to the player stream
stream.write(playback_buffer.getvalue())
finally:
stream.stop_stream()
stream.close()
p.terminate()
def end_conversation():
print(
f"""
OKAY, I'M DONE. BYE!
"""
)
sys.exit(0)
async def main():
'''
1. Sets up microphone input
2. Configures AI conversation
3. Manages concurrent audio playback
'''
# Handle Ctrl+C gracefully
signal.signal(signal.SIGINT, lambda s, f: end_conversation())
tasks = [
asyncio.create_task(
client.run(
interactions=[Interaction(
stream=sys.stdin.buffer,
callback=end_conversation,
)],
audio_settings=AudioSettings(
encoding="pcm_s16le", # default
sample_rate=16000, # default
chunk_size=CHUNK_SIZE,
),
conversation_config=ConversationConfig(
#template_id="flow-service-assistant-amelia",
template_id="flow-service-assistant-humphrey",
template_variables={
"persona": "Your name is Jojo. You are a witty young man who makes a lot of dad jokes and puns.",
"style": "Your tone makes people feel at ease and comfortable.",
"context": "You are having a conversation. You want to entertain and make the other person laugh.",
},
),
)
),
asyncio.create_task(audio_playback()),
]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())