1. 开篇:我为什么需要这个方案

生图是"一发一收",几秒就回。但生视频是长任务 —— Sora、Veo3、可灵跑一条 720p 视频动辄一两分钟。如果你照着生图那套"同步等结果"的写法去接视频接口,会立刻撞上三个坑:

本文用一套真实的 AI 生视频 SaaS 后端(FastAPI + Celery),讲清楚**"提交任务 → 拿 task_id → 回调/轮询拿结果"**这条异步通路怎么落地,以及通过中转站接 4SAPI 这类聚合平台的 Sora/Veo 时有哪些专属坑。


2. 原理速览

视频生成是两段式异步

① 提交任务  你的应用 → 中转站 → 官方   返回 {task_id, status: queued}
② 拿结果    两条路二选一:
   (A) 回调:平台生成完 POST 推到你的 callback_url   ← 火山/可灵
   (B) 轮询:你定时 GET /videos/{task_id} 查状态     ← Sora/Veo
③ 转存     status=completed → 把视频 URL 转存到自己的 OSS

关键决策:平台支不支持回调,决定了你走 (A) 还是 (B)。这套代码就是按平台分流的。


3. 接入实战(参考 4SAPI 异步接口)

① 提交任务,统一拿 task_id

不同平台返回字段不一样(Sora/Veo 是 id,火山/可灵是 task_id),代码做了归一:

# ai_generation_tasks.py —— 提交后兼容两种字段名
result = service.create_video_task(prompt=prompt, duration=duration, ratio=ratio, ...)
task_id = result.get("task_id") or result.get("id")
history.platform_task_id = task_id
history.status = "processing"   # 等回调或轮询

② 按平台分流:回调 or 轮询

if platform in ["doubao", "kling"]:
    # 支持回调,什么都不用做,等平台 POST 推过来
    logger.info("视频任务已提交,等待回调")
else:
    # sora/veo 不支持回调 → 启动轮询任务,30 秒后开始查
    poll_video_status.apply_async(args=[history_id, platform, task_id], countdown=30)

③ 轮询:用 Celery 的 retry 当定时器

轮询不是写个 while True + sleep,而是用 Celery 任务的 max_retries + default_retry_delay 当节拍器 —— 每 30 秒一次,最多 60 次(即最长等 30 分钟)

@celery_app.task(bind=True, max_retries=60, default_retry_delay=30)
def poll_video_status(self, history_id, platform, platform_task_id):
    result = service.query_video_task(platform_task_id)
    status = result.get("status", "")

    if status in ["succeeded", "completed"]:
        ...                       # 拿到 video_url,进入转存
    elif status in ["queued", "running", "processing", "in_progress"]:
        raise self.retry()        # 还没好,30 秒后再查
    elif status == "failed":
        ...                       # 失败处理(见下)

④ 避坑一:临时失败别误杀

视频跑挂了,不一定是真挂 —— 可能只是上游临时过载/限流。代码维护了一张"可重试关键词表",命中就继续轮询,不退款:

RETRYABLE_POLL_ERROR_KEYWORDS = [
    "reCAPTCHA", "PERMISSION_DENIED", "temporary",
    "overloaded", "rate limit", "服务繁忙", "请稍后再试",
]
if is_retryable_poll_failure(error_message):
    raise self.retry(countdown=60)   # 临时错误,1 分钟后再试
# 否则才判真失败 + 退积分

⑤ 避坑二:查询请求自己抖动,别判任务失败

"查状态"这个请求本身可能 SSL EOF / 503,但上游视频还在好好排队。这时绝不能把生成任务判失败,而是让这次轮询重试:

except Exception as e:
    logger.warning(f"轮询任务异常,30s 后重试: {e}")
    db.rollback()
    raise self.retry(exc=e)   # 网络抖动不背锅

⑥ 避坑三:4sapi 的 Sora-2 不直接给 video_url

这是接中转站 Sora 时最隐蔽的坑 —— completed 了但响应里没有 video_url,要自己拼下载地址,而且下载还得带 Bearer Token:

# Sora2/4sapi: 完成后构造下载 URL,不是直接给链接
video_id = response.get("id", task_id)
result["video_url"] = f"{base_url}/v1/videos/{video_id}/content"

# 下载这种网关 URL 需要带 token,不能走第三方异步转存(它没你的 token)
headers = {"Authorization": f"Bearer {api_key}"}
resp = httpx.Client(timeout=300).get(video_url, headers=headers)

代码对这类"OpenAI 兼容视频网关"专门走本地下载 + 直传 OSS,绕开拿不到 token 的异步转存路径。


4. 成本与风险提示


5. 总结与系列导航

一句话总结:接长任务视频接口,记住三件事 —— 提交后只认 task_id;支持回调就等回调、不支持就用 Celery retry 当轮询节拍器;临时错误重试、查询抖动重试、真失败才退款。接 4sapi 的 Sora 还要额外处理"自己拼下载 URL + 带 token 下载"。

适用人群:要在应用里接 Sora / Veo3 / 可灵等异步视频模型的开发者。

中转站选型与异步接口文档可参考 4SAPI。你有更稳的轮询/回调方案,欢迎评论区交流。