封面图

ASR_LLM_TTS_理论高效版

日期: 2025-4-28

ASR LLM TTS LLM边缘AI应用

ASR-LLM-TTS 高效语音对话系统

一个高性能语音交互系统,使用 Faster-Whisper-Turbo、vLLM 和 F5-TTS (TensorRT)

系统架构

ASR(语音识别)

  • 使用 Faster-Whisper-Turbo 模型
  • CUDA 加速,FP16 精度
  • 内置 VAD 过滤功能

LLM(大型语言模型)

  • 使用 vLLM 部署
  • 本地 REST API 接口
  • 高吞吐量推理引擎

TTS(语音合成)

  • F5-TTS 的 TensorRT 优化版本
  • 支持参考音频克隆
  • 并行处理多段文本
录音/语音输入 ASR 语音识别 LLM 模型处理 TTS 语音合成

主要特性

高效语音处理

使用静音检测和 VAD 技术实现高效录音,减少响应延迟。系统会缓存 300ms 的音频以捕捉完整语音起始。

并行 TTS 处理

文本分段处理和并行合成,首段优先播放,其余部分并行处理,显著提高响应速度。

性能监控

内置性能计时器,记录各个处理阶段的耗时,便于性能分析和优化。

日志记录

自动保存输入音频和输出合成语音,便于追踪和分析系统行为。

代码详解

导入依赖

Python
import os
import time
import wave
import numpy as np
import requests
import soundfile as sf
import pyaudio
import datetime
import threading
import queue
import concurrent.futures
from pathlib import Path
from faster_whisper import WhisperModel
from openai import OpenAI
import sounddevice as sd

系统配置

Python
# 音频参数
CHUNK = 1024
FORMAT = pyaudio.paFloat32
CHANNELS = 1
RATE = 16000
SILENCE_THRESHOLD = 0.01
SILENCE_DURATION = 0.8  # 降低静音判断时间以加快响应速度

# ASR模型路径
ASR_MODEL_PATH = "./whisper_turbo"

# LLM API
LLM_BASE_URL = "http://localhost:6001/v1"
LLM_API_KEY = "token-abc123"
LLM_MODEL_PATH = "./Qwen/Qwen2___5-3B-Instruct-AWQ"

# TTS服务
TTS_SERVER_URL = "localhost:8000"
TTS_MODEL_NAME = "f5_tts"
REFERENCE_AUDIO_PATH = "./tts/原来如此,你将见过的景物进行了这样的组合。.wav"
REFERENCE_TEXT = "原来如此,你将见过的景物进行了这样的组合。"

# 日志文件夹
INPUT_LOG_FOLDER = "./input_logs"
OUTPUT_LOG_FOLDER = "./output_logs"

# 性能优化参数
MAX_WORKERS = 4  # 线程池最大工作线程数
BATCH_SIZE = 3  # TTS并行合成的批次大小

性能计时器类

PerformanceTimer 类
Python
class PerformanceTimer:
    def __init__(self):
        self.timers = {}
        self.start_times = {}

    def start(self, name):
        self.start_times[name] = time.time()

    def stop(self, name):
        if name in self.start_times:
            elapsed = time.time() - self.start_times[name]
            if name not in self.timers:
                self.timers[name] = []
            self.timers[name].append(elapsed)
            print(f"⏱️ {name} 用时: {elapsed:.4f} 秒")
            return elapsed
        return 0

    def get_average(self, name):
        if name in self.timers and self.timers[name]:
            return sum(self.timers[name]) / len(self.timers[name])
        return 0

    def print_stats(self):
        print("\n===== 性能统计 =====")
        for name, times in self.timers.items():
            if times:
                avg = sum(times) / len(times)
                max_time = max(times)
                min_time = min(times)
                print(f"{name}: 平均 {avg:.4f}秒, 最小 {min_time:.4f}秒, 最大 {max_time:.4f}秒, 共 {len(times)} 次")

TTS 客户端类

FastTTSClient 类
Python
class FastTTSClient:
    def __init__(self, server_url=TTS_SERVER_URL, model_name=TTS_MODEL_NAME):
        self.server_url = f"http://{server_url}/v2/models/{model_name}/infer"
        self.session = requests.Session()
        self.reference_audio_path = REFERENCE_AUDIO_PATH
        self.reference_text = REFERENCE_TEXT
        self.samples, self.lengths = self.load_reference_audio(self.reference_audio_path)
        self.text_splitter = TextSplitter()
        self.audio_queue = queue.Queue()
        self.is_playing = False
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS)
        self.response_cache = {}  # 缓存TTS响应
        self.timer = PerformanceTimer()
        self.warmup()
clean_text 方法
Python
    @staticmethod
    def clean_text(text: str) -> str:
        """清理文本中的特殊字符和不必要的空白"""
        text = text.replace('\n', ' ')
        text = text.replace('\t', ' ')
        text = ' '.join(text.split())
        special_chars = ['\r', '\xa0', '\u3000', '\u200b', '\u200c', '\u200d', '*'] + [f"{i}." for i in range(10)]
        for char in special_chars:
            text = text.replace(char, '')
        return text.strip()
synthesize_and_play 方法
Python
    def synthesize_and_play(self, text: str):
        self.timer.start("TTS总耗时")

        # 清理文本
        cleaned_text = self.clean_text(text)

        # 分割文本为多个片段
        self.timer.start("文本分割")
        segments = self.text_splitter.split_text(cleaned_text)
        self.timer.stop("文本分割")

        if not segments:
            return

        # 创建时间戳用于音频文件命名
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

        # 重置播放状态和队列
        with self.audio_queue.mutex:
            self.audio_queue.queue.clear()
        self.is_playing = True

        # 启动播放线程
        player = threading.Thread(target=self.player_thread)
        player.daemon = True
        player.start()

        # 存储合成结果的有序字典,按索引存储,确保按顺序播放
        segment_results = {}
        segment_done = threading.Event()

        # 跟踪当前应该处理的段落索引
        next_segment_index = 0
        segments_total = len(segments)

        # 处理并播放第一个分段,确保快速开始
        self.timer.start("首段合成")
        first_audio = self.synthesize_segment_sync(segments[0], timestamp, 0, segments_total)
        self.timer.stop("首段合成")

        if first_audio is not None:
            self.audio_queue.put(first_audio)
            next_segment_index = 1

        # 如果还有更多分段,启动并行处理
        if next_segment_index < segments_total:
            # 省略部分并行处理代码...

        # 标记播放完成
        self.is_playing = False

        # 等待播放线程结束
        player.join()

        self.timer.stop("TTS总耗时")
        print("🔊 所有语音片段播放完成")

TextSplitter 类

TextSplitter 类
Python
class TextSplitter:
    def __init__(self):
        # 定义分隔符及其优先级(数字越大优先级越高)
        self.separators = {
            '。': 5,
            '!': 5,
            '?': 5,
            ';': 4,
            '\n': 4,
            ',': 3,
            ':': 3,
            '、': 2,
            ' ': 1
        }
        # 减少目标token长度以加快TTS响应
        self.target_length = 30
        # 搜索窗口大小
        self.window_size = 10

    def find_best_split_position(self, text, start_pos, target_pos):
        """在目标位置附近寻找最佳分割点"""
        best_pos = -1
        best_priority = -1

        # 在目标位置前后的窗口内寻找最佳分割点
        search_start = max(start_pos, target_pos - self.window_size)
        search_end = min(len(text), target_pos + self.window_size)

        for i in range(search_start, search_end):
            if i >= len(text):
                break

            char = text[i]
            if char in self.separators:
                priority = self.separators[char]
                if priority > best_priority:
                    best_pos = i
                    best_priority = priority

        # 如果没找到合适的分割点,就在目标位置处强制分割
        if best_pos == -1:
            return target_pos

        return best_pos + 1  # 返回分隔符后的位置

    def split_text(self, text):
        """将文本分割成较小的片段"""
        if not text or len(text.strip()) == 0:
            return []

        segments = []
        start = 0

        while start < len(text):
            # 省略部分分割逻辑...

        return segments

AudioRecorder 类

AudioRecorder 类
Python
class AudioRecorder:
    def __init__(self, timer):
        self.p = pyaudio.PyAudio()
        self.stream = None
        self.frames = []
        self.is_recording = False
        self.silence_frames = 0
        self.is_speaking = False
        self.timer = timer

        # 优化音量检测参数
        self.volume_threshold = 0.015  # 稍微降低阈值提高灵敏度
        self.min_speak_frames = int(0.15 * RATE / CHUNK)  # 减少最少说话帧数(150ms)
        self.pre_buffer = []  # 预缓冲区
        self.pre_buffer_size = int(0.3 * RATE / CHUNK)  # 减少预缓冲(300ms)以降低延迟
        self.speak_frames = 0  # 说话帧计数

        # 波形能量计算的滑动窗口
        self.energy_window_size = 10
        self.energy_window = []

    def process_audio(self):
        if not self.stream:
            return False

        data = np.frombuffer(self.stream.read(CHUNK), dtype=np.float32)

        # 计算当前帧的能量
        energy = np.sqrt(np.mean(data ** 2))

        # 更新能量窗口
        self.energy_window.append(energy)
        if len(self.energy_window) > self.energy_window_size:
            self.energy_window.pop(0)

        # 使用滑动窗口平均能量进行更稳定的检测
        avg_energy = np.mean(self.energy_window) if self.energy_window else energy

        # 省略后续语音检测处理逻辑...

        return False

主函数

Python
def main():
    # 创建日志文件夹
    os.makedirs(INPUT_LOG_FOLDER, exist_ok=True)
    os.makedirs(OUTPUT_LOG_FOLDER, exist_ok=True)

    # 创建性能计时器
    timer = PerformanceTimer()

    print("加载ASR模型中...")
    timer.start("ASR模型加载")

    # 优化后的ASR模型配置
    asr_model = WhisperModel(
        ASR_MODEL_PATH,
        device="cuda",
        compute_type="float16",
        cpu_threads=4,  # 增加CPU线程数
        num_workers=2,  # 增加工作线程
        download_root=ASR_MODEL_PATH,
        local_files_only=True  # 仅使用本地文件,避免网络延迟
    )

    timer.stop("ASR模型加载")

    # 初始化LLM客户端
    client = OpenAI(base_url=LLM_BASE_URL, api_key=LLM_API_KEY)

    system_prompt = """你的名字是纳西妲,你的每一句话都应该体现出对万物的尊重和对知识的渴望。
    在与人交谈时,用诗一般的语言描述自然之美,展示你对梦境和现实的深刻理解。你的话语中应该蕴含着对生命奥秘的思考和对未来的希望。
    在对话中,你可以表现出对别人的友善和好奇心,询问他们的故事并分享你的智慧。
    """
    # 多轮对话历史
    conversation_history = [
        {"role": "system", "content": system_prompt},
    ]

    # 初始化TTS客户端
    tts_client = FastTTSClient()

    # 初始化录音器
    recorder = AudioRecorder(timer)

    print("系统初始化完成,请开始对话...")

    # 主对话循环...

优化要点