Just MVT demo for the concept of AI audio assistant.
Signed-off-by: Alex Lau (AvengerMoJo) <alau@suse.com>
This commit is contained in:
parent
35055a80b4
commit
7ba9d8d3db
51
audio_utils.py
Normal file
51
audio_utils.py
Normal file
@ -0,0 +1,51 @@
|
||||
# audio_utils.py
|
||||
|
||||
import csv
|
||||
import math
|
||||
import os
|
||||
from pydub import AudioSegment
|
||||
from tqdm import tqdm
|
||||
import tempfile
|
||||
|
||||
|
||||
class AudioSplit():
|
||||
"""Export audio file into smaller size and format that AI can understand."""
|
||||
|
||||
def __init__(self, filename, title="Output"):
|
||||
"""Init the Audio utils."""
|
||||
self.folder = tempfile.TemporaryDirectory()
|
||||
self.title = os.path.splitext(title)[0]
|
||||
os.mkdir(self.folder.name + '/audio/')
|
||||
if filename.endswith('.mp3'):
|
||||
another = tempfile.TemporaryDirectory()
|
||||
sound = AudioSegment.from_mp3(filename)
|
||||
sound = sound.set_frame_rate(16000)
|
||||
sound.export(another.name + '/orginal.wav', format="wav")
|
||||
self.audio = AudioSegment.from_wav(another.name + '/orginal.wav')
|
||||
elif filename.endswith('.wav'):
|
||||
self.audio = AudioSegment.from_wav(filename)
|
||||
self.metadata = 'metadata.csv'
|
||||
|
||||
def get_folder(self):
|
||||
return self.folder.name
|
||||
|
||||
def get_duration(self):
|
||||
return self.audio.duration_seconds
|
||||
|
||||
def single_split(self, from_sec, to_sec, split_filename):
|
||||
time1 = from_sec * 1000
|
||||
time2 = to_sec * 1000
|
||||
split_audio = self.audio[time1:time2]
|
||||
split_audio.export(self.folder.name + '/audio/' + split_filename, format="wav")
|
||||
|
||||
def multiple_split(self, sec_per_split):
|
||||
with open(self.folder.name + '/audio/' + self.metadata, 'w', encoding='utf8') as csv_file:
|
||||
writer = csv.writer(csv_file)
|
||||
header = ['id', 'file_name', 'transcription']
|
||||
writer.writerow(header)
|
||||
total_sec = math.ceil(self.get_duration() / sec_per_split)
|
||||
for i in tqdm(range(0, total_sec)):
|
||||
split_fn = self.title + '_' + str(i) + '.wav'
|
||||
self.single_split(i*sec_per_split, i*sec_per_split + sec_per_split, split_fn)
|
||||
data = [i, split_fn, "Export wave file " + str(i)]
|
||||
writer.writerow(data)
|
BIN
data/audio/i_dont_get.wav
Normal file
BIN
data/audio/i_dont_get.wav
Normal file
Binary file not shown.
BIN
data/audio/suse_intro.wav
Normal file
BIN
data/audio/suse_intro.wav
Normal file
Binary file not shown.
163
demo_assistant.py
Normal file
163
demo_assistant.py
Normal file
@ -0,0 +1,163 @@
|
||||
import io
|
||||
import tempfile
|
||||
import pyaudio
|
||||
from pydub import AudioSegment
|
||||
import wave
|
||||
import re
|
||||
import time
|
||||
|
||||
import queue
|
||||
import threading
|
||||
from transformers import pipeline
|
||||
from datasets import load_dataset
|
||||
from faster_whisper import WhisperModel
|
||||
import torch
|
||||
from TTS.api import TTS
|
||||
from gpt4all import GPT4All
|
||||
|
||||
from audio_utils import AudioSplit
|
||||
|
||||
|
||||
CHUNK = 1024
|
||||
FORMAT = pyaudio.paInt16 # 16-bit resolution
|
||||
CHANNELS = 1
|
||||
RATE = 16000 # sample rate
|
||||
DURATION = 3
|
||||
Q_DURATION = 7
|
||||
SUSE = r"s*u*s*e"
|
||||
THANK= r"Thank\s*(?:you|u)\b"
|
||||
|
||||
|
||||
g_active = False
|
||||
g_wait = False
|
||||
g_lock = threading.Lock()
|
||||
counter = 0
|
||||
|
||||
p_audio = pyaudio.PyAudio()
|
||||
playback_stream = p_audio.open(format=FORMAT, channels=CHANNELS, rate=24000, output=True)
|
||||
record_stream = p_audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
|
||||
def record_audio():
|
||||
global p_audio
|
||||
global g_active
|
||||
global g_wait
|
||||
print("Recording started...")
|
||||
while True:
|
||||
if g_active == True:
|
||||
print("c", end="")
|
||||
if g_wait == True:
|
||||
print("w", end="")
|
||||
while not audio_queue.empty():
|
||||
audio_queue.get()
|
||||
time.sleep(1)
|
||||
continue
|
||||
audio_data = record_stream.read(CHUNK, exception_on_overflow=False)
|
||||
audio_queue.put(audio_data)
|
||||
print(".", end="")
|
||||
|
||||
record_stream.stop_stream()
|
||||
record_stream.close()
|
||||
p_audio.terminate()
|
||||
|
||||
def speech_to_text():
|
||||
global p_audio
|
||||
global g_active
|
||||
global g_wait
|
||||
while True:
|
||||
if g_active:
|
||||
time_duration = Q_DURATION
|
||||
else:
|
||||
time_duration = DURATION
|
||||
tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=True, mode='wb')
|
||||
mp3_tf = tempfile.NamedTemporaryFile(suffix=".mp3", delete=True, mode='wb')
|
||||
with wave.open(tf.name, 'wb') as wav_file:
|
||||
wav_file.setnchannels(CHANNELS)
|
||||
wav_file.setsampwidth(p_audio.get_sample_size(FORMAT))
|
||||
wav_file.setframerate(RATE)
|
||||
# Read audio data from the stream for the specified duration
|
||||
for i in range(0, RATE // CHUNK * time_duration):
|
||||
print("r", end="")
|
||||
audio_data = audio_queue.get()
|
||||
wav_file.writeframes(audio_data)
|
||||
audio_queue.task_done()
|
||||
print(f"{time_duration} sec recording done.")
|
||||
# Perform speech recognition
|
||||
audio = AudioSegment.from_wav(tf.name)
|
||||
audio.export(mp3_tf.name, format="mp3")
|
||||
# segments, info = model.transcribe(mp3_tf_filename, beam_size=5)
|
||||
segments, _ = model.transcribe(mp3_tf.name)
|
||||
questions = []
|
||||
if g_active:
|
||||
counter += 1
|
||||
for segment in segments:
|
||||
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
|
||||
if segment.text:
|
||||
questions.append(segment.text)
|
||||
q = re.sub(THANK, "", " ".join(questions))
|
||||
print(f"Question:{q} counter{counter}")
|
||||
if len(q) > 40 and counter >= 2:
|
||||
counter = 0
|
||||
output = gpt_model.generate(" ".join(questions), max_tokens=50)
|
||||
print(f"Answer:{output}")
|
||||
reply_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=True, mode='wb')
|
||||
tts.tts_to_file(text=output, file_path=reply_wav.name)
|
||||
play(playback_stream, reply_wav.name)
|
||||
with g_lock:
|
||||
g_active = False
|
||||
time.sleep(5)
|
||||
continue
|
||||
for segment in segments:
|
||||
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
|
||||
text_input = segment.text.lower()
|
||||
if text_input.find("hey") != -1:
|
||||
if re.search(SUSE, text_input):
|
||||
counter = 1
|
||||
with g_lock:
|
||||
g_active = True
|
||||
g_wait = True
|
||||
play(playback_stream, "data/audio/suse_intro.wav")
|
||||
print("Finish suse")
|
||||
with g_lock:
|
||||
g_wait = False
|
||||
time.sleep(5)
|
||||
|
||||
def play(play_stream, filename):
|
||||
wave_file = wave.open(filename, 'rb')
|
||||
print(f"Wave: rate={wave_file.getframerate()} channels={wave_file.getnchannels()} width={wave_file.getsampwidth()}")
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
while out_data:
|
||||
play_stream.write(out_data)
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
|
||||
# Get device
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
# Init TTS
|
||||
# tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
|
||||
tts = TTS("tts_models/en/blizzard2013/capacitron-t2-c150_v2").to(device)
|
||||
gpt_model = GPT4All("orca-mini-3b-gguf2-q4_0.gguf")
|
||||
# Create a queue to share audio data between threads
|
||||
audio_queue = queue.Queue()
|
||||
# model_size = "large-v2"
|
||||
model_size = "small.en"
|
||||
# model_size = "tiny.en"
|
||||
|
||||
# Run on GPU with FP16
|
||||
model = WhisperModel(model_size, device="cpu", compute_type="int8")
|
||||
|
||||
|
||||
print(f"Stream: playback->{playback_stream.get_write_available()}")
|
||||
# Create and start the recording thread
|
||||
recording_thread = threading.Thread(target=record_audio)
|
||||
recording_thread.start()
|
||||
|
||||
# Create and start the speech-to-text thread
|
||||
speech_to_text_thread = threading.Thread(target=speech_to_text)
|
||||
speech_to_text_thread.start()
|
||||
|
||||
# Wait for the recording thread to finish (you can define conditions to stop the recording)
|
||||
recording_thread.join()
|
||||
|
||||
# Stop the speech-to-text thread
|
||||
speech_to_text_thread.join()
|
||||
|
||||
p.terminate()
|
8
requirements.txt
Normal file
8
requirements.txt
Normal file
@ -0,0 +1,8 @@
|
||||
pyaudio
|
||||
pydub
|
||||
transformers
|
||||
datasets
|
||||
faster_whisper
|
||||
torch
|
||||
TTS
|
||||
gpt4all
|
1
setup.sh
Normal file
1
setup.sh
Normal file
@ -0,0 +1 @@
|
||||
sudo zypper install python311-devel portaudio-devel espeak-ng
|
25
test/coqui_tts_out.py
Normal file
25
test/coqui_tts_out.py
Normal file
@ -0,0 +1,25 @@
|
||||
import io
|
||||
import tempfile
|
||||
import pyaudio
|
||||
from pydub import AudioSegment
|
||||
import wave
|
||||
|
||||
from transformers import pipeline
|
||||
from datasets import load_dataset
|
||||
|
||||
from audio_utils import AudioSplit
|
||||
|
||||
|
||||
from faster_whisper import WhisperModel
|
||||
import torch
|
||||
from TTS.api import TTS
|
||||
|
||||
# Get device
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
# Init TTS
|
||||
# tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
|
||||
tts = TTS("tts_models/en/blizzard2013/capacitron-t2-c150_v2").to(device)
|
||||
# tts.tts_to_file(text="Hello, I am your SUSE ... assistant. What can I do for you today?", file_path="/tmp/out.wav")
|
||||
tts.tts_to_file(text="Sorry, I don't get your question, please say that again please.", file_path="/tmp/out.wav")
|
||||
|
111
test/fast_whisper.py
Normal file
111
test/fast_whisper.py
Normal file
@ -0,0 +1,111 @@
|
||||
import io
|
||||
import tempfile
|
||||
import pyaudio
|
||||
from pydub import AudioSegment
|
||||
import wave
|
||||
import re
|
||||
import queue
|
||||
from transformers import pipeline
|
||||
from datasets import load_dataset
|
||||
from faster_whisper import WhisperModel
|
||||
import torch
|
||||
from TTS.api import TTS
|
||||
|
||||
from audio_utils import AudioSplit
|
||||
|
||||
CHUNK = 1024
|
||||
FORMAT = pyaudio.paInt16 # 16-bit resolution
|
||||
CHANNELS = 1
|
||||
RATE = 16000 # sample rate
|
||||
DURATION = 2
|
||||
SUSE = r"s*u*s*e"
|
||||
|
||||
p = pyaudio.PyAudio()
|
||||
def record(stream):
|
||||
print("Recording started...")
|
||||
while True:
|
||||
audio_data = stream.read(CHUNK)
|
||||
audio_queue.put(audio_data)
|
||||
|
||||
stream.stop_stream()
|
||||
stream.close()
|
||||
audio.terminate()
|
||||
|
||||
def play(play_stream, filename):
|
||||
wave_file = wave.open(filename, 'rb')
|
||||
print(f"Wave: rate={wave_file.getframerate()} channels={wave_file.getnchannels()} width={wave_file.getsampwidth()}")
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
while out_data:
|
||||
play_stream.write(out_data)
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
|
||||
# Get device
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
# Init TTS
|
||||
# tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
|
||||
tts = TTS("tts_models/en/blizzard2013/capacitron-t2-c150_v2").to(device)
|
||||
# Create a queue to share audio data between threads
|
||||
audio_queue = queue.Queue()
|
||||
# model_size = "large-v2"
|
||||
model_size = "small.en"
|
||||
# model_size = "tiny.en"
|
||||
|
||||
# Run on GPU with FP16
|
||||
model = WhisperModel(model_size, device="cpu", compute_type="int8")
|
||||
|
||||
device = ""
|
||||
for i in range(p.get_device_count()):
|
||||
device = p.get_device_info_by_index(i)
|
||||
if device['name']=="default":
|
||||
print(device)
|
||||
break
|
||||
|
||||
|
||||
playback_stream = p.open(format=p.get_format_from_width(2),
|
||||
channels=1,
|
||||
rate=24000,
|
||||
# output_device_index = device['index'],
|
||||
output=True)
|
||||
|
||||
print(f"Stream: playback->{playback_stream.get_write_available()}")
|
||||
# generator = pipeline(task="automatic-speech-recognition", model="microsoft/speecht5_asr")
|
||||
|
||||
while True:
|
||||
tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=True, mode='wb')
|
||||
mp3_tf = tempfile.NamedTemporaryFile(suffix=".mp3", delete=True, mode='wb')
|
||||
temp_filename = tf.name
|
||||
mp3_tf_filename = mp3_tf.name
|
||||
with wave.open(temp_filename, 'wb') as wav_file:
|
||||
wav_file.setnchannels(CHANNELS)
|
||||
wav_file.setsampwidth(p.get_sample_size(FORMAT))
|
||||
wav_file.setframerate(RATE)
|
||||
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
|
||||
print("Listening...")
|
||||
frames = []
|
||||
for i in range(0, RATE // CHUNK * DURATION):
|
||||
# Read audio data from the stream for the specified duration
|
||||
audio_data = stream.read(CHUNK)
|
||||
frames.append(audio_data)
|
||||
wav_file.writeframes(audio_data)
|
||||
#print(f"{DURATION} sec recording done.")
|
||||
stream.close()
|
||||
|
||||
audio = AudioSegment.from_wav(temp_filename)
|
||||
audio.export(mp3_tf_filename, format="mp3")
|
||||
# segments, info = model.transcribe(mp3_tf_filename, beam_size=5)
|
||||
# print("Detected language '%s' with probability %f" % (info.language, info.language_probability))
|
||||
segments, _ = model.transcribe(mp3_tf_filename)
|
||||
for segment in segments:
|
||||
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
|
||||
# out_wav = tempfile.NamedTemporaryFile(suffix=".mp3", delete=True, mode='wb')
|
||||
text_input = segment.text.lower()
|
||||
if text_input.find("hey") != -1:
|
||||
if re.search(SUSE, text_input):
|
||||
# wav = tts.tts(text=segment.text, speaker_wav=speak_wav.name, language="en")
|
||||
# tts.tts_to_file(text="This is SUSE assistant what can I do for you today?", language="en", file_path=out_wav.name)
|
||||
# tts.tts_to_file(text="This is SUSE assistant what can I do for you today?", file_path=out_wav.name)
|
||||
# wave_file = wave.open(out_wav.name, 'rb')
|
||||
play(playback_stream, "data/audio/suse_intro.wav")
|
||||
|
||||
p.terminate()
|
158
test/fast_whisper2.py
Normal file
158
test/fast_whisper2.py
Normal file
@ -0,0 +1,158 @@
|
||||
import io
|
||||
import tempfile
|
||||
import pyaudio
|
||||
from pydub import AudioSegment
|
||||
import wave
|
||||
import re
|
||||
import time
|
||||
|
||||
import queue
|
||||
import threading
|
||||
from transformers import pipeline
|
||||
from datasets import load_dataset
|
||||
from faster_whisper import WhisperModel
|
||||
import torch
|
||||
from TTS.api import TTS
|
||||
from gpt4all import GPT4All
|
||||
|
||||
from audio_utils import AudioSplit
|
||||
|
||||
|
||||
CHUNK = 1024
|
||||
FORMAT = pyaudio.paInt16 # 16-bit resolution
|
||||
CHANNELS = 1
|
||||
RATE = 16000 # sample rate
|
||||
DURATION = 5
|
||||
SUSE = r"s*u*s*e"
|
||||
THANK= r"Thank\s*(?:you|u)\b"
|
||||
|
||||
|
||||
g_active = False
|
||||
g_wait = False
|
||||
g_lock = threading.Lock()
|
||||
counter = 0
|
||||
|
||||
p_audio = pyaudio.PyAudio()
|
||||
playback_stream = p_audio.open(format=FORMAT, channels=CHANNELS, rate=24000, output=True)
|
||||
record_stream = p_audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
|
||||
def record_audio():
|
||||
global p_audio
|
||||
global g_active
|
||||
global g_wait
|
||||
print("Recording started...")
|
||||
while True:
|
||||
if g_active == True:
|
||||
print("c", end="")
|
||||
if g_wait == True:
|
||||
print("w", end="")
|
||||
while not audio_queue.empty():
|
||||
audio_queue.get()
|
||||
time.sleep(1)
|
||||
continue
|
||||
audio_data = record_stream.read(CHUNK)
|
||||
audio_queue.put(audio_data)
|
||||
print(".", end="")
|
||||
|
||||
record_stream.stop_stream()
|
||||
record_stream.close()
|
||||
p_audio.terminate()
|
||||
|
||||
def speech_to_text():
|
||||
global p_audio
|
||||
global g_active
|
||||
global g_wait
|
||||
while True:
|
||||
tf = tempfile.NamedTemporaryFile(suffix=".wav", delete=True, mode='wb')
|
||||
mp3_tf = tempfile.NamedTemporaryFile(suffix=".mp3", delete=True, mode='wb')
|
||||
with wave.open(tf.name, 'wb') as wav_file:
|
||||
wav_file.setnchannels(CHANNELS)
|
||||
wav_file.setsampwidth(p_audio.get_sample_size(FORMAT))
|
||||
wav_file.setframerate(RATE)
|
||||
# Read audio data from the stream for the specified duration
|
||||
for i in range(0, RATE // CHUNK * DURATION):
|
||||
print("r", end="")
|
||||
audio_data = audio_queue.get()
|
||||
wav_file.writeframes(audio_data)
|
||||
audio_queue.task_done()
|
||||
print(f"{DURATION} sec recording done.")
|
||||
# Perform speech recognition
|
||||
audio = AudioSegment.from_wav(tf.name)
|
||||
audio.export(mp3_tf.name, format="mp3")
|
||||
# segments, info = model.transcribe(mp3_tf_filename, beam_size=5)
|
||||
segments, _ = model.transcribe(mp3_tf.name)
|
||||
questions = []
|
||||
if g_active:
|
||||
counter += 1
|
||||
for segment in segments:
|
||||
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
|
||||
if segment.text:
|
||||
questions.append(segment.text)
|
||||
q = re.sub(THANK, "", " ".join(questions))
|
||||
print(f"Question:{q} counter{counter}")
|
||||
if len(q) > 40 and counter > 3:
|
||||
counter = 0
|
||||
output = gpt_model.generate(" ".join(questions), max_tokens=50)
|
||||
print(f"Answer:{output}")
|
||||
reply_wav = tempfile.NamedTemporaryFile(suffix=".wav", delete=True, mode='wb')
|
||||
tts.tts_to_file(text=output, file_path=reply_wav.name)
|
||||
play(playback_stream, reply_wav.name)
|
||||
with g_lock:
|
||||
g_active = False
|
||||
time.sleep(5)
|
||||
continue
|
||||
for segment in segments:
|
||||
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
|
||||
text_input = segment.text.lower()
|
||||
if text_input.find("hey") != -1:
|
||||
if re.search(SUSE, text_input):
|
||||
counter = 1
|
||||
with g_lock:
|
||||
g_active = True
|
||||
g_wait = True
|
||||
play(playback_stream, "data/audio/suse_intro.wav")
|
||||
print("Finish suse")
|
||||
with g_lock:
|
||||
g_wait = False
|
||||
time.sleep(5)
|
||||
|
||||
def play(play_stream, filename):
|
||||
wave_file = wave.open(filename, 'rb')
|
||||
print(f"Wave: rate={wave_file.getframerate()} channels={wave_file.getnchannels()} width={wave_file.getsampwidth()}")
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
while out_data:
|
||||
play_stream.write(out_data)
|
||||
out_data = wave_file.readframes(CHUNK)
|
||||
|
||||
# Get device
|
||||
device = "cuda" if torch.cuda.is_available() else "cpu"
|
||||
|
||||
# Init TTS
|
||||
# tts = TTS("tts_models/multilingual/multi-dataset/xtts_v2").to(device)
|
||||
tts = TTS("tts_models/en/blizzard2013/capacitron-t2-c150_v2").to(device)
|
||||
gpt_model = GPT4All("orca-mini-3b-gguf2-q4_0.gguf")
|
||||
# Create a queue to share audio data between threads
|
||||
audio_queue = queue.Queue()
|
||||
# model_size = "large-v2"
|
||||
model_size = "small.en"
|
||||
# model_size = "tiny.en"
|
||||
|
||||
# Run on GPU with FP16
|
||||
model = WhisperModel(model_size, device="cpu", compute_type="int8")
|
||||
|
||||
|
||||
print(f"Stream: playback->{playback_stream.get_write_available()}")
|
||||
# Create and start the recording thread
|
||||
recording_thread = threading.Thread(target=record_audio)
|
||||
recording_thread.start()
|
||||
|
||||
# Create and start the speech-to-text thread
|
||||
speech_to_text_thread = threading.Thread(target=speech_to_text)
|
||||
speech_to_text_thread.start()
|
||||
|
||||
# Wait for the recording thread to finish (you can define conditions to stop the recording)
|
||||
recording_thread.join()
|
||||
|
||||
# Stop the speech-to-text thread
|
||||
speech_to_text_thread.join()
|
||||
|
||||
p.terminate()
|
19
test/pyttsx_test.py
Normal file
19
test/pyttsx_test.py
Normal file
@ -0,0 +1,19 @@
|
||||
import pyttsx3
|
||||
|
||||
def text_to_speech(text):
|
||||
# Initialize the TTS engine
|
||||
engine = pyttsx3.init()
|
||||
|
||||
# Set properties (optional)
|
||||
engine.setProperty('rate', 130) # Speed of speech
|
||||
engine.setProperty('volume', 0.6) # Volume level (0.0 to 1.0)
|
||||
|
||||
# Convert text to speech
|
||||
engine.say(text)
|
||||
|
||||
# Wait for the speech to finish
|
||||
engine.runAndWait()
|
||||
|
||||
# Example usage
|
||||
text = "Hello, this is a simple text-to-speech example in Python."
|
||||
text_to_speech(text)
|
33
test/test_audio.py
Normal file
33
test/test_audio.py
Normal file
@ -0,0 +1,33 @@
|
||||
import pyaudio
|
||||
import wave
|
||||
|
||||
filename = '/tmp/out.wav'
|
||||
|
||||
# Set chunk size of 1024 samples per data frame
|
||||
chunk = 1024
|
||||
|
||||
# Open the sound file
|
||||
wf = wave.open(filename, 'rb')
|
||||
|
||||
# Create an interface to PortAudio
|
||||
p = pyaudio.PyAudio()
|
||||
|
||||
# Open a .Stream object to write the WAV file to
|
||||
# 'output = True' indicates that the sound will be played rather than recorded
|
||||
stream = p.open(format = p.get_format_from_width(wf.getsampwidth()),
|
||||
channels = wf.getnchannels(),
|
||||
rate = wf.getframerate(),
|
||||
output = True)
|
||||
|
||||
# Read data in chunks
|
||||
data = wf.readframes(chunk)
|
||||
|
||||
# Play the sound by writing the audio data to the stream
|
||||
while data != '':
|
||||
stream.write(data)
|
||||
data = wf.readframes(chunk)
|
||||
|
||||
# Close and terminate the stream
|
||||
stream.close()
|
||||
p.terminate()
|
||||
|
Loading…
Reference in New Issue
Block a user