发送和接收媒体流
本文介绍如何集成服务端 Python SDK,通过少量代码从 0 开始实现发送和接收媒体流。
前提条件
在实现功能以前,请按照以下要求准备开发环境:
-
已下载 PyCharm 或 Visual Studio Code
-
Python 3.10 或以上版本
-
服务器接入公网,有公网 IP。服务器允许访问
.agora.io
以及.agoralab.co
域名。 -
操作系统与硬件环境需满足下列要求:
开发平台 操作系统版本 CPU 架构 设备性能 Linux - Ubuntu 18.04 LTS 或以上
- CentOS 7.0 或以上
x86-64 - CPU:8 核,1.8 GHz 主频
- 内存:2 GB 或以上,推荐 4 GB 或以上
-
一个有效的声网账号以及声网项目。请参考开通服务从声网控制台获得以下信息:
- App ID:声网随机生成的字符串,用于识别你的项目。
- 临时 Token:Token 也称为动态密钥,在客户端加入频道时对用户鉴权。临时 Token 的有效期为 24 小时。
集成 SDK
在终端中,进入你的项目根目录,然后运行以下命令集成 SDK:
pip install agora_python_server_sdk
实现步骤
本小节介绍如何实现服务端发送和接收媒体流,你可以按照实现步骤了解核心 API 调用。下图展示了基本的实现流程:
初始化 SDK
在调用其他声网 API 前,需要创建并初始化 AgoraService
实例:
- 调用
AgoraService()
来创建AgoraService
实例。 - 调用
initialize
进行初始化。
agora_service = AgoraService()
config = AgoraServiceConfig()
config.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS
config.appid = sample_options.app_id
agora_service.initialize(config)
与频道建立连接
初始化后,按照以下步骤与声网 RTC 频道建立连接:
-
调用
create_rtc_connection
方法创建RTCConnection
对象,用于与声网服务器建立连接。注意如需创建多个 Connection,声网推荐使用多进程的方式,以避免内存泄露等问题。
Pythoncon_config = RTCConnConfig(
client_role_type=ClientRoleType.CLIENT_ROLE_BROADCASTER,
channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
)
connection = agora_service.create_rtc_connection(con_config)
if not connection:
logger.error("create connection failed")
return -
调用
register_observer
方法注册连接事件观测器。示例代码中的DYSConnectionObserver
继承了IRTCConnectionObserver
类。Python# DYSConnectionObserver 继承自 IRTCConnectionObserver 类
conn_observer = DYSConnectionObserver()
connection.register_observer(conn_observer) -
调用
connect
与声网 RTC 频道建立连接。Pythonret = connection.connect(sample_options.token, channel_id, uid)
if ret < 0:
logger.error(f"connect failed: {ret}")
return
发送媒体流
成功建立连接后,参考以下步骤向客户端发送媒体流:
-
创建
MediaNodeFactory
媒体节点工厂对象。Pythonmedia_node_factory = agora_service.create_media_node_factory()
if not media_node_factory:
logger.error("create media node factory failed")
return -
在
MediaNodeFactory
对象中,根据实际需求创建音视频数据发送器。AudioPcmDataSender
:发送 PCM 格式的音频数据。VideoFrameSender
:发送 YUV 格式的视频数据。AudioEncodedFrameSender
:发送已编码的音频数据。VideoEncodedImageSender
:发送已编码的视频数据。
Python# 创建可发送 PCM 格式的音频数据的发送器
pcm_data_sender = media_node_factory.create_audio_pcm_data_sender()
if not pcm_data_sender:
logger.error("create pcm data sender failed")
return
# 创建可发送已编码的音频数据的发送器
audio_sender = media_node_factory.create_audio_encoded_frame_sender()
if not audio_sender:
logger.error("create audio sender failed")
return
# 创建可发送 YUV 格式的视频数据的发送器
video_sender = media_node_factory.create_video_frame_sender()
if not video_sender:
logger.error("create video frame sender failed")
return
# 创建可发送已编码的视频数据的发送器
video_sender = media_node_factory.create_video_encoded_image_sender()
if not video_sender:
logger.error("create video sender failed")
return -
使用步骤 2 中创建好的音视频数据发送器,创建
LocalAudioTrack
对象和LocalVideoTrack
对象,分别对应本地音频轨道和本地视频轨道。Python# 创建自定义音频轨道(使用可发送 PCM 格式的音频数据的发送器)
audio_track = agora_service.create_custom_audio_track_pcm(pcm_data_sender)
if not audio_track:
logger.error("create audio track failed")
return
# 创建自定义音频轨道(使用可发送已编码的音频数据的发送器)
audio_track = agora_service.create_custom_audio_track_encoded(audio_sender, 1)
if not audio_track:
logger.error("create audio track failed")
return
# 创建自定义视频轨道(可发送 YUV 格式的视频数据的发送器)
video_track = agora_service.create_custom_video_track_frame(video_sender)
if not video_track:
logger.error("create video track failed")
return
# 创建自定义视频轨道(可发送已编码的视频数据的发送器)
video_track = agora_service.create_custom_video_track_encoded(video_sender, sender_options)
if not video_track:
logger.error("create video track failed")
return -
调用
set_enabled
方法分别开启本地音频轨道和视频轨道。Python# 开启本地音频轨道
audio_track.set_enabled(1)
# 开启本地视频轨道
video_track.set_enabled(1) -
调用
publish_audio
和publish_video
方法在频道中分别发布本地音频轨道和视频轨道。Python# 发布本地音频轨道
local_user.publish_audio(audio_track)
# 发布本地视频轨道
local_user.publish_video(video_track) -
开启音视频发送线程。
Pythonasync def send(self,sample_options:ExampleOptions, pcm_data_sender, yuv_data_sender):
# 创建并启动音频发送任务
pcm_task = asyncio.create_task(push_pcm_data_from_file(sample_options.sample_rate, sample_options.num_of_channels, pcm_data_sender, sample_options.audio_file, self._exit))
# 创建并启动视频发送任务
yuv_task = asyncio.create_task(push_yuv_data_from_file(sample_options.width, sample_options.height, sample_options.fps, yuv_data_sender, sample_options.video_file, self._exit))
# 等待音频发送任务完成
await pcm_task
# 等待视频发送任务完成
await yuv_task
logger.info("send finish") -
发送音视频数据。本节以发送 PCM 音频数据和 YUV 视频数据为例,示例代码如下:
Python# 发送 PCM 音频数据
async def push_pcm_data_from_file(sample_rate, num_of_channels , pcm_data_sender:AudioPcmDataSender, audio_file_path, _exit:Event):
with open(audio_file_path, "rb") as audio_file:
pcm_sendinterval = 0.1
pacer_pcm = Pacer(pcm_sendinterval)
pcm_count = 0
send_size = int(sample_rate*num_of_channels*pcm_sendinterval*2)
frame_buf = bytearray(send_size)
while not _exit.is_set():
success = audio_file.readinto(frame_buf)
if not success:
audio_file.seek(0)
continue
frame = PcmAudioFrame()
frame.data = frame_buf
frame.timestamp = 0
frame.samples_per_channel = int(sample_rate * pcm_sendinterval)
frame.bytes_per_sample = 2
frame.number_of_channels = num_of_channels
frame.sample_rate = sample_rate
ret = pcm_data_sender.send_audio_pcm_data(frame)
pcm_count += 1
logger.info(f"send pcm: count,ret={pcm_count}, {ret}, {send_size}, {pcm_sendinterval}")
await pacer_pcm.apace_interval(0.1)
frame_buf = NonePython# 发送 YUV 视频数据
async def push_yuv_data_from_file(width, height, fps, video_sender:VideoFrameSender, video_file_path, _exit:Event):
with open(video_file_path, "rb") as video_file:
yuv_sendinterval = 1.0/fps
pacer_yuv = Pacer(yuv_sendinterval)
yuv_count = 0
yuv_len = int(width*height*3/2)
frame_buf = bytearray(yuv_len)
while not _exit.is_set():
success = video_file.readinto(frame_buf)
if not success:
video_file.seek(0)
continue
frame = ExternalVideoFrame()
frame.buffer = frame_buf
frame.type = 1
frame.format = 1
frame.stride = width
frame.height = height
frame.timestamp = 0
frame.metadata = "hello metadata"
ret = video_sender.send_video_frame(frame)
yuv_count += 1
logger.info("send yuv: count,ret=%d, %s", yuv_count, ret)
await pacer_yuv.apace_interval(yuv_sendinterval)
frame_buf = None
接收媒体流
成功建立连接后,参考以下步骤从客户端接收媒体流:
-
根据实际需求,调用
register_audio_frame_observer
和register_video_frame_observer
方法注册音视频数据观测器,并监听相关回调。支持以下几种数据观测器,示例代码以注册IAudioFrameObserver
和IVideoFrameObserver
观测器为例。IAudioFrameObserver
:音频数据观测器。IVideoFrameObserver
:原始视频数据观测器。IVideoEncodedImageReceiver
:已编码视频数据观测器。
Python# 在 register_audio_frame_observer 方法之前调用 set_playback_audio_frame_before_mixing_parameters 方法,设置音频数据的采样率
local_user.set_playback_audio_frame_before_mixing_parameters(1, 16000)
# SampleAudioFrameObserver 类继承了 IAudioFrameObserver
audio_frame_observer = SampleAudioFrameObserver()
local_user.register_audio_frame_observer(audio_frame_observer)
# SampleVideoFrameObserver 类继承了 IVideoFrameObserver
video_frame_observer = SampleVideoFrameObserver()
local_user.register_video_frame_observer(video_frame_observer) -
当客户端开始发送媒体流时,SDK 会通过相关回调接收媒体流。以接收已编码的视频、YUV 格式的视频和 PCM 格式的音频为例,示例代码如下:
Python# 通过 on_encoded_video_frame 回调接收已编码的视频并以文件形式输出
class SampleVideoEncodedFrameObserver(IVideoEncodedFrameObserver):
def on_encoded_video_frame(self, uid, image_buffer, length, video_encoded_frame_info):
file_path = os.path.join(log_folder, str(uid) + '.h264')
with open(file_path, 'ab') as f:
f.write(image_buffer[:length])
return 1
# 通过 on_frame 回调接收 YUV 格式的视频并以文件形式输出
class SampleVideoFrameObserver(IVideoFrameObserver):
def on_frame(self, channel_id, remote_uid, frame:VideoFrame):
file_path = os.path.join(log_folder, channel_id + "_" + remote_uid + '.yuv')
y_size = frame.y_stride * frame.height
uv_size = (frame.u_stride * frame.height # 2)
with open(file_path, 'ab') as f:
f.write(frame.y_buffer[:y_size])
f.write(frame.u_buffer[:uv_size])
f.write(frame.v_buffer[:uv_size])
return 1
# 通过 on_playback_audio_frame_before_mixing 回调接收 PCM 格式的音频并以文件形式输出
class SampleAudioFrameObserver(IAudioFrameObserver):
def on_playback_audio_frame_before_mixing(self, agora_local_user, channelId, uid, audio_frame:AudioFrame):
file_path = os.path.join(log_folder, channelId + "_" + uid + '.pcm')
with open(file_path, "ab") as f:
f.write(audio_frame.buffer)
return 1
停止发送和接收媒体流
如果你不再需要发送和接收媒体流,你可以调用以下方法停止发送和接收媒体流。
停止发送媒体流
-
调用
unpublish_audio
和unpublish_video
方法停止发布音视频轨道。Pythonlocal_user.unpublish_audio(audio_track)
local_user.unpublish_video(video_track) -
调用
set_enabled
方法分别关闭本地音频轨道和视频轨道。Pythonlocal_user.unpublish_audio(audio_track)
local_user.unpublish_video(video_track)
audio_track.set_enabled(0)
video_track.set_enabled(0) -
调用
release
方法依次销毁本地音视频轨道、音视频数据发送器、媒体节点工厂对象。Python# 销毁 PCM 音频数据发送器
pcm_data_sender.release()
# 销毁本地音频轨道
audio_track.release()
# 销毁 YUV 视频数据发送器
yuv_data_sender.release()
# 销毁本地视频轨道
video_track.release()
# 销毁媒体节点工厂对象
media_node_factory.release()
pcm_data_sender = None
audio_track = None
yuv_data_sender = None
video_track = None
media_node_factory = None
video_frame_observer = None
audio_frame_observer = None
停止接收媒体流
调用 unregister_audio_frame_observer
和 unregister_video_frame_observer
方法注销音视频数据观测器。
local_user.unregister_audio_frame_observer(audio_frame_observer)
local_user.unregister_video_frame_observer(video_frame_observer)
与频道断开连接
如果你不再需要对频道进行任何操作,或者不再需要接收任何频道的事件,按照以下步骤与频道断开连接:
你必须严格按照步骤释放相关资源。
-
调用
disconnect
方法断开与声网服务器的连接。Pythonret = connection.disconnect()
if ret < 0:
logger.error(f"disconnect failed: {ret}")
return -
调用
unregister_observer
方法注销连接事件观测器。Pythonlocal_user.unregister_local_user_observer()
connection.unregister_observer() -
调用
release
方法释放RTCConnection
对象。Pythonconnection.release()
释放 SDK 资源
当你退出服务端程序时,为避免资源浪费,调用 release
方法注销 AgoraService
对象,以释放 SDK 的资源。
agora_service.release()
调试项目
按照以下步骤来测试你的项目:
-
在终端运行你的 Python 文件。你需要填写以下参数。
appId
: 你声网项目的 App ID。channelId
: 你的频道名,本节以test_example
为例。userId
: 你的用户 ID,本节以6
为例。audioFile
: 音频数据文件的存放路径,本节以./test_data/demo.pcm
为例。为方便测试,声网提供了测试数据,你可以下载并解压至Agora-Python-Server-SDK
路径下使用。sampleRate
: 采样率,本节以16000
为例。numOfChannels
: 声道数,本节以1
为例。
Bash# 假设你的 Python 文件路径为 Desktop/videocall.py
python Desktop/videocall.py --appId=0a****************************99 --channelId=test_dys --userId=6 --audioFile=./test_data/demo.pcm --sampleRate=16000 --numOfChannels=1 -
打开声网实时互动 Web Demo,完成初始化设置,在浏览器中模拟一个客户端。App ID 需和你在步骤 2 中填入的 App ID 一致。
-
进入基础视频通话页面,在 Channel 填入与步骤 2 相同的频道名,依次点击 Create Client、Join Channel。
-
成功加入频道后,你可以开始发流。当频道中有发流用户时,Web Demo 会在 Step4 Subscribe & Play 下方的输入框中自动填充发流用户的用户 ID,点击 Subscribe & Play 按钮订阅发流用户。
由于 Web Demo 的限制,体验服务端发流时,你需要先在 Web 端完成加入频道,然后再在服务端运行项目发流;体验服务端收流时,你需要先在服务端运行项目连接频道,然后再在 Web 端加入频道、发流。
示例项目
声网提供了开源的服务端示例项目供你参考,你可以前往下载或查看其中的源代码。