使用AssemblyAI、Qdrant和DeepSeek-R1構建音訊RAG

使用AssemblyAI、Qdrant和DeepSeek-R1構建音訊RAG

厭倦了手動篩選數小時的音訊來尋找關鍵見解?本指南教你建立一個 AI 驅動的聊天機器人,將會議、播客、訪談等錄音轉化為互動對話。使用 AssemblyAI 進行帶有發言人標籤的精確轉錄,使用 Qdrant 進行快速資料儲存,透過 SambaNova Cloud 使用 DeepSeek-R1 進行智慧回覆,您將建立一個 RAG 工具,回答“[發言人]說了什麼?”或 “總結這段話”等問題。使用 AssemblyAI、Qdrant 和 DeepSeek-R1 構建 RAG 系統,將音訊轉化為可搜尋的人工智慧驅動對話。

學習目標

  • 利用 AssemblyAI API 轉錄帶有說話人日記的音訊檔案,將對話轉換為結構化文字資料以供分析。
  • 部署 Qdrant 向量資料庫,使用 HuggingFace 模型儲存和高效檢索轉錄音訊內容的嵌入。
  • 透過 SambaNova Cloud 使用 DeepSeek R1 模型實施 RAG,生成上下文感知聊天機器人回覆。
  • 為使用者上傳音訊檔案、視覺化轉錄內容並與聊天機器人即時互動建立一個 Streamlit Web 介面。
  • 整合端到端工作流程,將音訊處理、向量儲存和人工智慧驅動的回覆生成結合起來,建立一個可擴充套件的基於音訊的聊天應用程式。

什麼是AssemblyAI?

AssemblyAI 是您將音訊轉化為可操作見解的首選工具。無論您是在轉錄播客、分析客戶來電還是為影片新增字幕,其人工智慧驅動的語音到文字引擎都能提供精確的準確性,即使在有口音或背景噪音的情況下也是如此。

什麼是SambaNova雲?

試想一下,執行 DeepSeek-R1 (671B) 等大型開源模型的速度可提高 10 倍,而且無需通常的基礎設施。

SambaNova

SambaNova 不依賴 GPU,而是使用可重構資料流單元(RDUs),透過以下方式實現更快的效能:

  • 海量記憶體儲存–無需不斷重新載入模型
  • 高效的資料流設計–針對高吞吐量任務進行了最佳化
  • 即時模型切換–在微秒級時間內完成模型切換
  • 立即執行 DeepSeek-R1–無需複雜設定
  • 在同一平臺上進行訓練和微調–一切盡在其中

什麼是Qdrant?

Qdrant 是一個快如閃電的向量資料庫,旨在為人工智慧應用增添動力。無論您是在構建推薦系統、影像搜尋工具還是聊天機器人,Qdrant 都能進行相似性搜尋,快速為文字嵌入或視覺特徵等複雜資料找出最接近的匹配項。

Qdrant

什麼是DeepSeek-R1?

Deepseek-R1是一種改變遊戲規則的語言模型,它將人類的適應性與尖端的人工智慧相結合,使其成為自然語言處理領域的佼佼者。無論您是製作內容、翻譯語言、除錯程式碼,還是總結複雜的報告,R1 都能出色地理解上下文、語氣和意圖,提供直觀而非機械的響應。Deepseek-R1 將同理心和精確性放在首位,它不僅僅是一款工具,更是人工智慧與我們一樣自然交流的未來。

DeepSeek-R1

使用AssemblyAI和DeepSeek-R1構建RAG模型

現在您已經瞭解了所有元件,讓我們開始構建 RAG。但在此之前,讓我們先快速瞭解一下入門所需的內容。

1. 必要的先決條件

以下是所需的先決條件:

克隆版本庫:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
git clone https://github.com/karthikponna/chat_with_audios.git
cd chat_with_audios
git clone https://github.com/karthikponna/chat_with_audios.git cd chat_with_audios
git clone https://github.com/karthikponna/chat_with_audios.git 
cd chat_with_audios

建立並啟用虛擬環境:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# For macOS and Linux:
python3 -m venv venv
source venv/bin/activate
# For Windows:
python -m venv venv
.\venv\Scripts\activate
# For macOS and Linux: python3 -m venv venv source venv/bin/activate # For Windows: python -m venv venv .\venv\Scripts\activate
# For macOS and Linux:
python3 -m venv venv
source venv/bin/activate
# For Windows:
python -m venv venv
.\venv\Scripts\activate

安裝所需依賴項:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
pip install -r requirements.txt
pip install -r requirements.txt
pip install -r requirements.txt

設定環境變數:

建立一個 `.env` 檔案,並新增 AssemblyAISambaNova API 金鑰。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
ASSEMBLYAI_API_KEY="your_assemblyai_api_key_string"
SAMBANOVA_API_KEY="your_sambanova_api_key_string"
ASSEMBLYAI_API_KEY="your_assemblyai_api_key_string" SAMBANOVA_API_KEY="your_sambanova_api_key_string"
ASSEMBLYAI_API_KEY="your_assemblyai_api_key_string"
SAMBANOVA_API_KEY="your_sambanova_api_key_string"

現在讓我們開始編碼部分。

2. 檢索增強生成

RAG 將大型語言模型與外部資料合併,以生成更準確、語境更豐富的答案。它能在查詢時獲取相關資訊,確保回答依賴於真實資料,而不僅僅是模型訓練。

2.1 匯入必要的庫

讓我們建立一個名為 rag_code.py 的檔案。我們將從匯入必要的模組和使用 Llama Index 協調程式碼架構開始,逐步完成程式碼。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from qdrant_client import models
from qdrant_client import QdrantClient
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.sambanovasystems import SambaNovaCloud
from llama_index.llms.ollama import Ollama
import assemblyai as aai
from typing import List, Dict
from llama_index.core.base.llms.types import (
ChatMessage,
MessageRole,
)
from qdrant_client import models from qdrant_client import QdrantClient from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.llms.sambanovasystems import SambaNovaCloud from llama_index.llms.ollama import Ollama import assemblyai as aai from typing import List, Dict from llama_index.core.base.llms.types import ( ChatMessage, MessageRole, )
from qdrant_client import models
from qdrant_client import QdrantClient
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.sambanovasystems import SambaNovaCloud
from llama_index.llms.ollama import Ollama
import assemblyai as aai
from typing import List, Dict
from llama_index.core.base.llms.types import (
ChatMessage,
MessageRole,
)

2.2 使用Hugging Face進行批次處理和文字嵌入

在這裡,batch_iterate 函式將文字列表分割成小塊,從而更容易處理大型資料集。然後,EmbedData 類載入 Hugging Face 嵌入模型,為每批文字生成嵌入,並收集這些嵌入供以後使用。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def batch_iterate(lst, batch_size):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), batch_size):
yield lst[i : i + batch_size]
class EmbedData:
def __init__(self, embed_model_name="BAAI/bge-large-en-v1.5", batch_size = 32):
self.embed_model_name = embed_model_name
self.embed_model = self._load_embed_model()
self.batch_size = batch_size
self.embeddings = []
def _load_embed_model(self):
embed_model = HuggingFaceEmbedding(model_name=self.embed_model_name, trust_remote_code=True, cache_folder='./hf_cache')
return embed_model
def generate_embedding(self, context):
return self.embed_model.get_text_embedding_batch(context)
def embed(self, contexts):
self.contexts = contexts
for batch_context in batch_iterate(contexts, self.batch_size):
batch_embeddings = self.generate_embedding(batch_context)
self.embeddings.extend(batch_embeddings)
def batch_iterate(lst, batch_size): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), batch_size): yield lst[i : i + batch_size] class EmbedData: def __init__(self, embed_model_name="BAAI/bge-large-en-v1.5", batch_size = 32): self.embed_model_name = embed_model_name self.embed_model = self._load_embed_model() self.batch_size = batch_size self.embeddings = [] def _load_embed_model(self): embed_model = HuggingFaceEmbedding(model_name=self.embed_model_name, trust_remote_code=True, cache_folder='./hf_cache') return embed_model def generate_embedding(self, context): return self.embed_model.get_text_embedding_batch(context) def embed(self, contexts): self.contexts = contexts for batch_context in batch_iterate(contexts, self.batch_size): batch_embeddings = self.generate_embedding(batch_context) self.embeddings.extend(batch_embeddings)
def batch_iterate(lst, batch_size):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), batch_size):
yield lst[i : i + batch_size]
class EmbedData:
def __init__(self, embed_model_name="BAAI/bge-large-en-v1.5", batch_size = 32):
self.embed_model_name = embed_model_name
self.embed_model = self._load_embed_model()
self.batch_size = batch_size
self.embeddings = []
def _load_embed_model(self):
embed_model = HuggingFaceEmbedding(model_name=self.embed_model_name, trust_remote_code=True, cache_folder='./hf_cache')
return embed_model
def generate_embedding(self, context):
return self.embed_model.get_text_embedding_batch(context)
def embed(self, contexts):
self.contexts = contexts
for batch_context in batch_iterate(contexts, self.batch_size):
batch_embeddings = self.generate_embedding(batch_context)
self.embeddings.extend(batch_embeddings)

2.3 Qdrant向量資料庫的設定與輸入

  • QdrantVDB_QB 類透過設定關鍵引數(如集合名稱、向量維度和批次大小)來初始化 Qdrant 向量資料庫。
  • 它透過批處理文字上下文及其相應的嵌入,然後相應地更新集合的配置,從而高效地上傳資料。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class QdrantVDB_QB:
def __init__(self, collection_name, vector_dim = 768, batch_size=512):
self.collection_name = collection_name
self.batch_size = batch_size
self.vector_dim = vector_dim
def define_client(self):
self.client = QdrantClient(url="http://localhost:6333", prefer_grpc=True)
def create_collection(self):
if not self.client.collection_exists(collection_name=self.collection_name):
self.client.create_collection(collection_name=f"{self.collection_name}",
vectors_config=models.VectorParams(size=self.vector_dim,
distance=models.Distance.DOT,
on_disk=True),
optimizers_config=models.OptimizersConfigDiff(default_segment_number=5,
indexing_threshold=0),
quantization_config=models.BinaryQuantization(
binary=models.BinaryQuantizationConfig(always_ram=True)),
)
def ingest_data(self, embeddata):
for batch_context, batch_embeddings in zip(batch_iterate(embeddata.contexts, self.batch_size),
batch_iterate(embeddata.embeddings, self.batch_size)):
self.client.upload_collection(collection_name=self.collection_name,
vectors=batch_embeddings,
payload=[{"context": context} for context in batch_context])
self.client.update_collection(collection_name=self.collection_name,
optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000)
)
class QdrantVDB_QB: def __init__(self, collection_name, vector_dim = 768, batch_size=512): self.collection_name = collection_name self.batch_size = batch_size self.vector_dim = vector_dim def define_client(self): self.client = QdrantClient(url="http://localhost:6333", prefer_grpc=True) def create_collection(self): if not self.client.collection_exists(collection_name=self.collection_name): self.client.create_collection(collection_name=f"{self.collection_name}", vectors_config=models.VectorParams(size=self.vector_dim, distance=models.Distance.DOT, on_disk=True), optimizers_config=models.OptimizersConfigDiff(default_segment_number=5, indexing_threshold=0), quantization_config=models.BinaryQuantization( binary=models.BinaryQuantizationConfig(always_ram=True)), ) def ingest_data(self, embeddata): for batch_context, batch_embeddings in zip(batch_iterate(embeddata.contexts, self.batch_size), batch_iterate(embeddata.embeddings, self.batch_size)): self.client.upload_collection(collection_name=self.collection_name, vectors=batch_embeddings, payload=[{"context": context} for context in batch_context]) self.client.update_collection(collection_name=self.collection_name, optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000) )
class QdrantVDB_QB:
def __init__(self, collection_name, vector_dim = 768, batch_size=512):
self.collection_name = collection_name
self.batch_size = batch_size
self.vector_dim = vector_dim
def define_client(self):
self.client = QdrantClient(url="http://localhost:6333", prefer_grpc=True)
def create_collection(self):
if not self.client.collection_exists(collection_name=self.collection_name):
self.client.create_collection(collection_name=f"{self.collection_name}",
vectors_config=models.VectorParams(size=self.vector_dim,
distance=models.Distance.DOT,
on_disk=True),
optimizers_config=models.OptimizersConfigDiff(default_segment_number=5,
indexing_threshold=0),
quantization_config=models.BinaryQuantization(
binary=models.BinaryQuantizationConfig(always_ram=True)),
)
def ingest_data(self, embeddata):
for batch_context, batch_embeddings in zip(batch_iterate(embeddata.contexts, self.batch_size), 
batch_iterate(embeddata.embeddings, self.batch_size)):
self.client.upload_collection(collection_name=self.collection_name,
vectors=batch_embeddings,
payload=[{"context": context} for context in batch_context])
self.client.update_collection(collection_name=self.collection_name,
optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000)
)

2.4 查詢嵌入Retriever

  • Retriever 類旨在透過初始化向量資料庫客戶端和嵌入模型,在使用者查詢和向量資料庫之間架起一座橋樑。
  • 它的搜尋方法使用模型將查詢轉換為嵌入,然後使用微調的量化引數在資料庫中執行向量搜尋,以快速檢索相關結果。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class Retriever:
def __init__(self, vector_db, embeddata):
self.vector_db = vector_db
self.embeddata = embeddata
def search(self, query):
query_embedding = self.embeddata.embed_model.get_query_embedding(query)
result = self.vector_db.client.search(
collection_name=self.vector_db.collection_name,
query_vector=query_embedding,
search_params=models.SearchParams(
quantization=models.QuantizationSearchParams(
ignore=False,
rescore=True,
oversampling=2.0,
)
),
timeout=1000,
)
return result
class Retriever: def __init__(self, vector_db, embeddata): self.vector_db = vector_db self.embeddata = embeddata def search(self, query): query_embedding = self.embeddata.embed_model.get_query_embedding(query) result = self.vector_db.client.search( collection_name=self.vector_db.collection_name, query_vector=query_embedding, search_params=models.SearchParams( quantization=models.QuantizationSearchParams( ignore=False, rescore=True, oversampling=2.0, ) ), timeout=1000, ) return result
class Retriever:
def __init__(self, vector_db, embeddata):
self.vector_db = vector_db
self.embeddata = embeddata
def search(self, query):
query_embedding = self.embeddata.embed_model.get_query_embedding(query)
result = self.vector_db.client.search(
collection_name=self.vector_db.collection_name,
query_vector=query_embedding,
search_params=models.SearchParams(
quantization=models.QuantizationSearchParams(
ignore=False,
rescore=True,
oversampling=2.0,
)
),
timeout=1000,
)
return result

2.5 RAG智慧查詢助手

RAG 類整合了一個檢索器和一個 LLM,用於生成上下文感知響應。它從向量資料庫中檢索相關資訊,將其格式化為結構化提示,並將其傳送給 LLM 以獲得響應。我正在使用 SambaNovaCloud 透過其 API 訪問 LLM 模型,以高效生成文字。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class RAG:
def __init__(self,
retriever,
llm_name = "Meta-Llama-3.1-405B-Instruct"
):
system_msg = ChatMessage(
role=MessageRole.SYSTEM,
content="You are a helpful assistant that answers questions about the user's document.",
)
self.messages = [system_msg, ]
self.llm_name = llm_name
self.llm = self._setup_llm()
self.retriever = retriever
self.qa_prompt_tmpl_str = ("Context information is below.\n"
"---------------------\n"
"{context}\n"
"---------------------\n"
"Given the context information above I want you to think step by step to answer the query in a crisp manner, incase case you don't know the answer say 'I don't know!'.\n"
"Query: {query}\n"
"Answer: "
)
def _setup_llm(self):
return SambaNovaCloud(
model=self.llm_name,
temperature=0.7,
context_window=100000,
)
# return Ollama(model=self.llm_name,
# temperature=0.7,
# context_window=100000,
# )
def generate_context(self, query):
result = self.retriever.search(query)
context = [dict(data) for data in result]
combined_prompt = []
for entry in context[:2]:
context = entry["payload"]["context"]
combined_prompt.append(context)
return "\n\n---\n\n".join(combined_prompt)
def query(self, query):
context = self.generate_context(query=query)
prompt = self.qa_prompt_tmpl_str.format(context=context, query=query)
user_msg = ChatMessage(role=MessageRole.USER, content=prompt)
# self.messages.append(ChatMessage(role=MessageRole.USER, content=prompt))
streaming_response = self.llm.stream_complete(user_msg.content)
return streaming_response
class RAG: def __init__(self, retriever, llm_name = "Meta-Llama-3.1-405B-Instruct" ): system_msg = ChatMessage( role=MessageRole.SYSTEM, content="You are a helpful assistant that answers questions about the user's document.", ) self.messages = [system_msg, ] self.llm_name = llm_name self.llm = self._setup_llm() self.retriever = retriever self.qa_prompt_tmpl_str = ("Context information is below.\n" "---------------------\n" "{context}\n" "---------------------\n" "Given the context information above I want you to think step by step to answer the query in a crisp manner, incase case you don't know the answer say 'I don't know!'.\n" "Query: {query}\n" "Answer: " ) def _setup_llm(self): return SambaNovaCloud( model=self.llm_name, temperature=0.7, context_window=100000, ) # return Ollama(model=self.llm_name, # temperature=0.7, # context_window=100000, # ) def generate_context(self, query): result = self.retriever.search(query) context = [dict(data) for data in result] combined_prompt = [] for entry in context[:2]: context = entry["payload"]["context"] combined_prompt.append(context) return "\n\n---\n\n".join(combined_prompt) def query(self, query): context = self.generate_context(query=query) prompt = self.qa_prompt_tmpl_str.format(context=context, query=query) user_msg = ChatMessage(role=MessageRole.USER, content=prompt) # self.messages.append(ChatMessage(role=MessageRole.USER, content=prompt)) streaming_response = self.llm.stream_complete(user_msg.content) return streaming_response
class RAG:
def __init__(self,
retriever,
llm_name = "Meta-Llama-3.1-405B-Instruct"
):
system_msg = ChatMessage(
role=MessageRole.SYSTEM,
content="You are a helpful assistant that answers questions about the user's document.",
)
self.messages = [system_msg, ]
self.llm_name = llm_name
self.llm = self._setup_llm()
self.retriever = retriever
self.qa_prompt_tmpl_str = ("Context information is below.\n"
"---------------------\n"
"{context}\n"
"---------------------\n"
"Given the context information above I want you to think step by step to answer the query in a crisp manner, incase case you don't know the answer say 'I don't know!'.\n"
"Query: {query}\n"
"Answer: "
)
def _setup_llm(self):
return SambaNovaCloud(
model=self.llm_name,
temperature=0.7,
context_window=100000,
)
# return Ollama(model=self.llm_name,
#               temperature=0.7,
#               context_window=100000,
#             )
def generate_context(self, query):
result = self.retriever.search(query)
context = [dict(data) for data in result]
combined_prompt = []
for entry in context[:2]:
context = entry["payload"]["context"]
combined_prompt.append(context)
return "\n\n---\n\n".join(combined_prompt)
def query(self, query):
context = self.generate_context(query=query)
prompt = self.qa_prompt_tmpl_str.format(context=context, query=query)
user_msg = ChatMessage(role=MessageRole.USER, content=prompt)
# self.messages.append(ChatMessage(role=MessageRole.USER, content=prompt))
streaming_response = self.llm.stream_complete(user_msg.content)
return streaming_response

2.6 音訊轉錄

Transcribe 類透過設定 AssemblyAI API 金鑰和建立轉錄器進行初始化。然後,它使用啟用說話人標籤的配置來處理音訊檔案,最終返回一個字典列表,其中每個條目都將一個說話人對映到其轉錄文字。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
class Transcribe:
def __init__(self, api_key: str):
"""Initialize the Transcribe class with AssemblyAI API key."""
aai.settings.api_key = api_key
self.transcriber = aai.Transcriber()
def transcribe_audio(self, audio_path: str) -> List[Dict[str, str]]:
"""
Transcribe an audio file and return speaker-labeled transcripts.
Args:
audio_path: Path to the audio file
Returns:
List of dictionaries containing speaker and text information
"""
# Configure transcription with speaker labels
config = aai.TranscriptionConfig(
speaker_labels=True,
speakers_expected=2 # Adjust this based on your needs
)
# Transcribe the audio
transcript = self.transcriber.transcribe(audio_path, config=config)
# Extract speaker utterances
speaker_transcripts = []
for utterance in transcript.utterances:
speaker_transcripts.append({
"speaker": f"Speaker {utterance.speaker}",
"text": utterance.text
})
return speaker_transcripts
class Transcribe: def __init__(self, api_key: str): """Initialize the Transcribe class with AssemblyAI API key.""" aai.settings.api_key = api_key self.transcriber = aai.Transcriber() def transcribe_audio(self, audio_path: str) -> List[Dict[str, str]]: """ Transcribe an audio file and return speaker-labeled transcripts. Args: audio_path: Path to the audio file Returns: List of dictionaries containing speaker and text information """ # Configure transcription with speaker labels config = aai.TranscriptionConfig( speaker_labels=True, speakers_expected=2 # Adjust this based on your needs ) # Transcribe the audio transcript = self.transcriber.transcribe(audio_path, config=config) # Extract speaker utterances speaker_transcripts = [] for utterance in transcript.utterances: speaker_transcripts.append({ "speaker": f"Speaker {utterance.speaker}", "text": utterance.text }) return speaker_transcripts
class Transcribe:
def __init__(self, api_key: str):
"""Initialize the Transcribe class with AssemblyAI API key."""
aai.settings.api_key = api_key
self.transcriber = aai.Transcriber()
def transcribe_audio(self, audio_path: str) -> List[Dict[str, str]]:
"""
Transcribe an audio file and return speaker-labeled transcripts.
Args:
audio_path: Path to the audio file
Returns:
List of dictionaries containing speaker and text information
"""
# Configure transcription with speaker labels
config = aai.TranscriptionConfig(
speaker_labels=True,
speakers_expected=2  # Adjust this based on your needs
)
# Transcribe the audio
transcript = self.transcriber.transcribe(audio_path, config=config)
# Extract speaker utterances
speaker_transcripts = []
for utterance in transcript.utterances:
speaker_transcripts.append({
"speaker": f"Speaker {utterance.speaker}",
"text": utterance.text
})
return speaker_transcripts

3. Streamlit應用程式

Streamlit 是一個 Python 庫,可將資料指令碼轉換為互動式網路應用程式,因此非常適合基於 LLM 的解決方案。

  • 下面的程式碼構建了一個使用者友好型應用程式,讓使用者可以上傳音訊檔案、檢視其轉錄內容並進行相應的聊天。
  • AssemblyAI 會將上傳的音訊轉錄為標有說話人的文字。
  • 轉錄內容被嵌入並儲存在 Qdrant 向量資料庫中,以便高效檢索。
  • 與 RAG 引擎配對的檢索器會利用這些嵌入生成上下文感知的聊天回覆。
  • 會話狀態管理聊天曆史和檔案快取,以確保流暢的體驗。
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import os
import gc
import uuid
import tempfile
import base64
from dotenv import load_dotenv
from rag_code import Transcribe, EmbedData, QdrantVDB_QB, Retriever, RAG
import streamlit as st
if "id" not in st.session_state:
st.session_state.id = uuid.uuid4()
st.session_state.file_cache = {}
session_id = st.session_state.id
collection_name = "chat with audios"
batch_size = 32
load_dotenv()
def reset_chat():
st.session_state.messages = []
st.session_state.context = None
gc.collect()
with st.sidebar:
st.header("Add your audio file!")
uploaded_file = st.file_uploader("Choose your audio file", type=["mp3", "wav", "m4a"])
if uploaded_file:
try:
with tempfile.TemporaryDirectory() as temp_dir:
file_path = os.path.join(temp_dir, uploaded_file.name)
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
file_key = f"{session_id}-{uploaded_file.name}"
st.write("Transcribing with AssemblyAI and storing in vector database...")
if file_key not in st.session_state.get('file_cache', {}):
# Initialize transcriber
transcriber = Transcribe(api_key=os.getenv("ASSEMBLYAI_API_KEY"))
# Get speaker-labeled transcripts
transcripts = transcriber.transcribe_audio(file_path)
st.session_state.transcripts = transcripts
# Each speaker segment becomes a separate document for embedding
documents = [f"Speaker {t['speaker']}: {t['text']}" for t in transcripts]
# embed data
embeddata = EmbedData(embed_model_name="BAAI/bge-large-en-v1.5", batch_size=batch_size)
embeddata.embed(documents)
# set up vector database
qdrant_vdb = QdrantVDB_QB(collection_name=collection_name,
batch_size=batch_size,
vector_dim=1024)
qdrant_vdb.define_client()
qdrant_vdb.create_collection()
qdrant_vdb.ingest_data(embeddata=embeddata)
# set up retriever
retriever = Retriever(vector_db=qdrant_vdb, embeddata=embeddata)
# set up rag
query_engine = RAG(retriever=retriever, llm_name="DeepSeek-R1-Distill-Llama-70B")
st.session_state.file_cache[file_key] = query_engine
else:
query_engine = st.session_state.file_cache[file_key]
# Inform the user that the file is processed
st.success("Ready to Chat!")
# Display audio player
st.audio(uploaded_file)
# Display speaker-labeled transcript
st.subheader("Transcript")
with st.expander("Show full transcript", expanded=True):
for t in st.session_state.transcripts:
st.text(f"**{t['speaker']}**: {t['text']}")
except Exception as e:
st.error(f"An error occurred: {e}")
st.stop()
col1, col2 = st.columns([6, 1])
with col1:
st.markdown("""
# RAG over Audio powered by <img src="data:image/png;base64,{}" width="200" style="vertical-align: -15px; padding-right: 10px;"> and <img src="data:image/png;base64,{}" width="200" style="vertical-align: -5px; padding-left: 10px;">
""".format(base64.b64encode(open("assets/AssemblyAI.png", "rb").read()).decode(),
base64.b64encode(open("assets/deep-seek.png", "rb").read()).decode()), unsafe_allow_html=True)
with col2:
st.button("Clear ↺", on_click=reset_chat)
# Initialize chat history
if "messages" not in st.session_state:
reset_chat()
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
if prompt := st.chat_input("Ask about the audio conversation..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Display assistant response in chat message container
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
# Get streaming response
streaming_response = query_engine.query(prompt)
for chunk in streaming_response:
try:
new_text = chunk.raw["choices"][0]["delta"]["content"]
full_response += new_text
message_placeholder.markdown(full_response + "▌")
except:
pass
message_placeholder.markdown(full_response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})
import os import gc import uuid import tempfile import base64 from dotenv import load_dotenv from rag_code import Transcribe, EmbedData, QdrantVDB_QB, Retriever, RAG import streamlit as st if "id" not in st.session_state: st.session_state.id = uuid.uuid4() st.session_state.file_cache = {} session_id = st.session_state.id collection_name = "chat with audios" batch_size = 32 load_dotenv() def reset_chat(): st.session_state.messages = [] st.session_state.context = None gc.collect() with st.sidebar: st.header("Add your audio file!") uploaded_file = st.file_uploader("Choose your audio file", type=["mp3", "wav", "m4a"]) if uploaded_file: try: with tempfile.TemporaryDirectory() as temp_dir: file_path = os.path.join(temp_dir, uploaded_file.name) with open(file_path, "wb") as f: f.write(uploaded_file.getvalue()) file_key = f"{session_id}-{uploaded_file.name}" st.write("Transcribing with AssemblyAI and storing in vector database...") if file_key not in st.session_state.get('file_cache', {}): # Initialize transcriber transcriber = Transcribe(api_key=os.getenv("ASSEMBLYAI_API_KEY")) # Get speaker-labeled transcripts transcripts = transcriber.transcribe_audio(file_path) st.session_state.transcripts = transcripts # Each speaker segment becomes a separate document for embedding documents = [f"Speaker {t['speaker']}: {t['text']}" for t in transcripts] # embed data embeddata = EmbedData(embed_model_name="BAAI/bge-large-en-v1.5", batch_size=batch_size) embeddata.embed(documents) # set up vector database qdrant_vdb = QdrantVDB_QB(collection_name=collection_name, batch_size=batch_size, vector_dim=1024) qdrant_vdb.define_client() qdrant_vdb.create_collection() qdrant_vdb.ingest_data(embeddata=embeddata) # set up retriever retriever = Retriever(vector_db=qdrant_vdb, embeddata=embeddata) # set up rag query_engine = RAG(retriever=retriever, llm_name="DeepSeek-R1-Distill-Llama-70B") st.session_state.file_cache[file_key] = query_engine else: query_engine = st.session_state.file_cache[file_key] # Inform the user that the file is processed st.success("Ready to Chat!") # Display audio player st.audio(uploaded_file) # Display speaker-labeled transcript st.subheader("Transcript") with st.expander("Show full transcript", expanded=True): for t in st.session_state.transcripts: st.text(f"**{t['speaker']}**: {t['text']}") except Exception as e: st.error(f"An error occurred: {e}") st.stop() col1, col2 = st.columns([6, 1]) with col1: st.markdown(""" # RAG over Audio powered by <img src="data:image/png;base64,{}" width="200" style="vertical-align: -15px; padding-right: 10px;"> and <img src="data:image/png;base64,{}" width="200" style="vertical-align: -5px; padding-left: 10px;"> """.format(base64.b64encode(open("assets/AssemblyAI.png", "rb").read()).decode(), base64.b64encode(open("assets/deep-seek.png", "rb").read()).decode()), unsafe_allow_html=True) with col2: st.button("Clear ↺", on_click=reset_chat) # Initialize chat history if "messages" not in st.session_state: reset_chat() # Display chat messages from history on app rerun for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Accept user input if prompt := st.chat_input("Ask about the audio conversation..."): # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) # Display user message in chat message container with st.chat_message("user"): st.markdown(prompt) # Display assistant response in chat message container with st.chat_message("assistant"): message_placeholder = st.empty() full_response = "" # Get streaming response streaming_response = query_engine.query(prompt) for chunk in streaming_response: try: new_text = chunk.raw["choices"][0]["delta"]["content"] full_response += new_text message_placeholder.markdown(full_response + "▌") except: pass message_placeholder.markdown(full_response) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": full_response})
import os
import gc
import uuid
import tempfile
import base64
from dotenv import load_dotenv
from rag_code import Transcribe, EmbedData, QdrantVDB_QB, Retriever, RAG
import streamlit as st
if "id" not in st.session_state:
st.session_state.id = uuid.uuid4()
st.session_state.file_cache = {}
session_id = st.session_state.id
collection_name = "chat with audios"
batch_size = 32
load_dotenv()
def reset_chat():
st.session_state.messages = []
st.session_state.context = None
gc.collect()
with st.sidebar:
st.header("Add your audio file!")
uploaded_file = st.file_uploader("Choose your audio file", type=["mp3", "wav", "m4a"])
if uploaded_file:
try:
with tempfile.TemporaryDirectory() as temp_dir:
file_path = os.path.join(temp_dir, uploaded_file.name)
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
file_key = f"{session_id}-{uploaded_file.name}"
st.write("Transcribing with AssemblyAI and storing in vector database...")
if file_key not in st.session_state.get('file_cache', {}):
# Initialize transcriber
transcriber = Transcribe(api_key=os.getenv("ASSEMBLYAI_API_KEY"))
# Get speaker-labeled transcripts
transcripts = transcriber.transcribe_audio(file_path)
st.session_state.transcripts = transcripts
# Each speaker segment becomes a separate document for embedding
documents = [f"Speaker {t['speaker']}: {t['text']}" for t in transcripts]
# embed data    
embeddata = EmbedData(embed_model_name="BAAI/bge-large-en-v1.5", batch_size=batch_size)
embeddata.embed(documents)
# set up vector database
qdrant_vdb = QdrantVDB_QB(collection_name=collection_name,
batch_size=batch_size,
vector_dim=1024)
qdrant_vdb.define_client()
qdrant_vdb.create_collection()
qdrant_vdb.ingest_data(embeddata=embeddata)
# set up retriever
retriever = Retriever(vector_db=qdrant_vdb, embeddata=embeddata)
# set up rag
query_engine = RAG(retriever=retriever, llm_name="DeepSeek-R1-Distill-Llama-70B")
st.session_state.file_cache[file_key] = query_engine
else:
query_engine = st.session_state.file_cache[file_key]
# Inform the user that the file is processed
st.success("Ready to Chat!")
# Display audio player
st.audio(uploaded_file)
# Display speaker-labeled transcript
st.subheader("Transcript")
with st.expander("Show full transcript", expanded=True):
for t in st.session_state.transcripts:
st.text(f"**{t['speaker']}**: {t['text']}")
except Exception as e:
st.error(f"An error occurred: {e}")
st.stop()     
col1, col2 = st.columns([6, 1])
with col1:
st.markdown("""
# RAG over Audio powered by <img src="data:image/png;base64,{}" width="200" style="vertical-align: -15px; padding-right: 10px;">  and <img src="data:image/png;base64,{}" width="200" style="vertical-align: -5px; padding-left: 10px;">
""".format(base64.b64encode(open("assets/AssemblyAI.png", "rb").read()).decode(),
base64.b64encode(open("assets/deep-seek.png", "rb").read()).decode()), unsafe_allow_html=True)
with col2:
st.button("Clear ↺", on_click=reset_chat)
# Initialize chat history
if "messages" not in st.session_state:
reset_chat()
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
if prompt := st.chat_input("Ask about the audio conversation..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Display assistant response in chat message container
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
# Get streaming response
streaming_response = query_engine.query(prompt)
for chunk in streaming_response:
try:
new_text = chunk.raw["choices"][0]["delta"]["content"]
full_response += new_text
message_placeholder.markdown(full_response + "▌")
except:
pass
message_placeholder.markdown(full_response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})

在終端執行 app.py 檔案,輸入以下程式碼,上傳音訊檔案並與聊天機器人互動。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
streamlit run app.py
streamlit run app.py
streamlit run app.py

您可以在這裡觀看使用該應用程式的演示。您還可以從這裡下載音訊樣本檔案。

小結

我們成功地將 AssemblyAI、SambaNova Cloud、Qdrant 和 DeepSeek 結合起來,構建了一個透過音訊使用檢索增強生成技術的聊天機器人。rag_code.py 檔案管理 RAG 工作流程,而 app.py 檔案則提供了一個簡單的 Streamlit 介面。我希望你能使用不同的音訊檔案與這個聊天機器人互動,調整程式碼,新增新功能,探索基於音訊的聊天解決方案的無限可能性。

GitHub Repo:https://github.com/karthikponna/chat_with_audios/tree/main

  • 利用 AssemblyAI 進行音訊轉錄可實現準確的說話者標籤文字,為高階對話體驗奠定堅實基礎。
  • 整合 Qdrant 可確保快速進行基於向量的檢索,從而快速訪問相關上下文,以便做出更明智的回答。
  • 採用 RAG 方法,將檢索和生成結合起來,確保答案以實際資料為基礎。
  • 在 LLM 中採用 SambaNova Cloud,可提供強大的語言理解能力,從而支援引人入勝的上下文感知互動。
  • 使用者介面使用 Streamlit,提供了一個直接的互動環境,簡化了基於音訊的聊天機器人部署。

評論留言