Pipecat 的 Asterisk PBX websocket 传输是一个使用 FastAPI websockets 的新实现,现在已经作为 PyPI 包可用。
它支持所有slin编解码器的格式(8-192 kHz),并且能自动检测采样率,自动检测用于信令的websocket子协议,并提供流量控制。
用于 Asterisk 的 Pipecat 社区集成。 此仓库提供传输方式和帧序列化器,用于将用户的 Asterisk 与 Pipecat 管道连接。
特性:
AsteriskWebsocketTransport: 使用 Asterisk 本地处理原始音频流和生命周期事件。
AsteriskFrameSerializer:序列化器,用于将 Asterisk websocket JSON 或纯文本负载以及原始(音频)负载转换为 Pipecat 帧。
流量控制: 内置逻辑来管理 Pipecat 应用程序和 Asterisk 之间的缓冲区利用率。
项目地址:
https://github.com/NikolayShakin/pipecat-asterisk
安装:
uv add pipecat-asterisk
(或者pip install pipecat-asterisk如果您将其pip用于依赖管理,则可以使用此功能)
用法
以下是一个将 Asterisk WebSocket 传输集成到 Pipecat 管道中的基本示例:
from fastapi import FastAPI, WebSocket
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat_asterisk import AsteriskWebsocketTransport
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Initialize the Asterisk Transport
ws_transport = AsteriskWebsocketTransport(websocket=websocket)
# Build your Pipecat pipeline
pipeline = Pipeline([
ws_transport.input(),
# ... other pipeline components (VAD, LLM, TTS, etc.)
ws_transport.output(),
])
task = PipelineTask(pipeline)
# Run the pipeline
# ...
本文来自作者投稿,版权归原作者所有。如需转载,请注明出处:https://www.nxrte.com/zixun/66607.html