littlebot
Published on 2025-04-14 / 0 Visits
0

【源码】基于FFmpeg和FunctionGraph的视频处理服务

项目简介

本项目是一个基于FFmpeg和华为云FunctionGraph的视频处理服务,能帮助用户快速将FFmpeg部署到函数工作流上,提供部分音视频处理能力。

项目的主要特性和功能

  • 音频转码:支持将音频文件转换为不同的格式。
  • 视频截图:从视频中提取指定时间点的截图。
  • 截帧:从视频中提取指定时间段的帧。
  • 时长获取:获取视频的总时长。
  • 元信息获取:获取音视频文件的元信息。
  • 视频转GIF:将视频片段转换为GIF动画。
  • 视频加水印:为视频添加文字或图片水印。

安装使用步骤

前期准备

  • 开通华为云函数工作流(FunctionGraph)服务,用于提供FFmpeg的算力。
  • 开通华为云对象存储服务(OBS),用于存储输入和输出媒体文件。
  • 为函数配置具备访问OBS和SWR服务的委托。

应用部署

  1. 进入华为云函数工作流应用中心,选择“FFmpeg”模板进行创建。
  2. 配置参数,包括委托和函数参数。
  3. 等待创建成功。

应用使用

  1. 获取函数URN:在应用详情页面下方的“资源”中获取函数的URN,用于调用函数。
  2. 创建OBS桶并上传输入文件
  3. 获取AK/SK。
  4. 在华为云OBS服务控制台创建一个OBS桶。
  5. 将输入媒体文件上传到OBS桶中。
  6. 调用函数
  7. 使用Python SDK或REST API方式调用函数。
  8. 提供必要的参数,如OBS bucket名称、对象键等。
  9. 同步或异步获取处理结果。

调用函数示例

同步执行示例

以获取视频时长为例,Python语言的调用示例如下: ```python from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkfunctiongraph.v2.region.functiongraph_region import FunctionGraphRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkfunctiongraph.v2 import *

ak = "xxxxxx" # 替换为你的AccessKeyID sk = "xxxxxxxxxx" # 替换为你的SecretAccessKey credentials = BasicCredentials(ak, sk)

client = FunctionGraphClient.new_builder() \ .with_credentials(credentials) \ .with_region(FunctionGraphRegion.value_of("cn-east-3")) \ .build()

try: request = InvokeFunctionRequest() request.function_urn = "urn:fss:cn-east-3:xxxxxxxx:function:default:fg-ffmpeg-get-duration:latest" # 替换为你的函数URN request.body = { "bucket_name": "test-bucket", "object_key": "a.mp4" } response = client.invoke_function(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) ```

异步调用示例

以音频转码为例,Python语言的异步调用示例如下: ```python from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkfunctiongraph.v2.region.functiongraph_region import FunctionGraphRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkfunctiongraph.v2 import *

ak = "xxxxxx" # 替换为你的AccessKeyID sk = "xxxxxxxxxx" # 替换为你的SecretAccessKey credentials = BasicCredentials(ak, sk)

client = FunctionGraphClient.new_builder() \ .with_credentials(credentials) \ .with_region(FunctionGraphRegion.value_of("cn-east-3")) \ .build()

try: request = AsyncInvokeFunctionRequest() request.function_urn = "urn:fss:cn-east-3:xxxxxxxx:function:default:ffmpeg-audio-convert:latest" # 替换为你的函数URN request.body = { "bucket_name": "test-bucket", "object_key": "a.mp3", "output_dir": "output/", "dst_type": ".wav", "ac": 1, "ar": 4000 } response = client.async_invoke_function(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) ```

注意事项

  • 本应用为简单示例,仅供学习和参考,实际应用需进一步优化和完善。
  • 部署和使用前请仔细阅读文档,确保满足系统要求。
  • 调用函数时请确保参数正确,避免不必要的错误。
  • 监控函数执行状态,确保服务稳定运行。

下载地址

点击下载 【提取码: 4003】【解压密码: www.makuang.net】