Here you go.
Requirements.txt:
fastapi
uvicorn[standard]
python-dotenv
openai
google-api-python-client
google-auth
google-auth-oauthlib
google-auth-httplib2
import os
import re
import time
import json
import uuid
from pathlib import Path
from typing import Any, Dict, Optional, List
from dotenv import load_dotenv
from fastapi import FastAPI, Request, HTTPException
from openai import OpenAI
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request as GoogleRequest
load_dotenv()
———- Config ———-
OTTER_WEBHOOK_TOKEN = os.getenv(“OTTER_WEBHOOK_TOKEN”, “”)
OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”, “”)
OPENAI_TEXT_MODEL = os.getenv(“OPENAI_TEXT_MODEL”, “gpt-4.1”)
OPENAI_VIDEO_MODEL = os.getenv(“OPENAI_VIDEO_MODEL”, “sora-2”) # or sora-2-pro
VIDEO_SECONDS = os.getenv(“VIDEO_SECONDS”, “12”) # string is OK per API examples
VIDEO_SIZE = os.getenv(“VIDEO_SIZE”, “1280×720”)
OUT_DIR = Path(os.getenv(“OUT_DIR”, “out”)).resolve()
OUT_DIR.mkdir(parents=True, exist_ok=True)
YouTube OAuth
YOUTUBE_CLIENT_SECRETS = os.getenv(“YOUTUBE_CLIENT_SECRETS”, “client_secrets.json”)
YOUTUBE_TOKEN_FILE = os.getenv(“YOUTUBE_TOKEN_FILE”, “youtube_token.json”)
YOUTUBE_PRIVACY = os.getenv(“YOUTUBE_PRIVACY”, “private”) # private|unlisted|public
YOUTUBE_CATEGORY_ID = os.getenv(“YOUTUBE_CATEGORY_ID”, “22”) # 22 = People & Blogs (example)
YOUTUBE_TAGS = os.getenv(“YOUTUBE_TAGS”, “ai,summary,autovideo”)
SCOPES = [“https://www.googleapis.com/auth/youtube.upload”]
if not OPENAI_API_KEY:
raise RuntimeError(“Missing OPENAI_API_KEY”)
app = FastAPI()
openai_client = OpenAI()
———- Helpers ———-
def extract_otter_transcript(payload: Dict[str, Any]) -> str:
“””
Otter Workspace Webhooks payload example places transcript at:
payload[“relationships”][“transcript”][“content”]
(See Otter ‘Workspace Webhooks’ doc example payload.)
“””
rel = payload.get(“relationships”, {}) or {}
t = rel.get(“transcript”, {}) or {}
content = t.get(“content”, “”) or “”
if not content:
# fallback if payload structure differs
content = payload.get(“transcript”, “”) or “”
return content.strip()
def extract_otter_abstract_summary(payload: Dict[str, Any]) -> str:
# Otter includes an “abstract_summary” field in the webhook payload (when available).
return (payload.get(“abstract_summary”) or “”).strip()
def sanitize_for_video_prompt(text: str) -> str:
“””
Remove obvious personal/contact info from transcript/summary before generating video.
(Lightweight heuristic; extend as needed.)
“””
text = re.sub(r”\b[\w.-]+@[\w.-]+.\w+\b”, “[email]”, text)
text = re.sub(r”\b(?:+?\d[\d-() ]{7,}\d)\b”, “[phone]”, text)
text = re.sub(r”https?://\S+”, “[url]”, text)
return text
def chunk_text(s: str, max_chars: int = 12000) -> List[str]:
s = s.strip()
if len(s) <= max_chars: return [s] chunks = [] start = 0 while start < len(s): end = min(len(s), start + max_chars) # try to split on a newline boundary nl = s.rfind(“\n”, start, end) if nl != -1 and nl > start + 2000:
end = nl
chunks.append(s[start:end].strip())
start = end
return [c for c in chunks if c]
def summarize_transcript(transcript: str, otter_summary: Optional[str] = None) -> str:
“””
If Otter already provides abstract_summary, we refine/standardize it.
Otherwise we summarize the transcript ourselves.
“””
transcript = sanitize_for_video_prompt(transcript)
if otter_summary:
base = sanitize_for_video_prompt(otter_summary)
prompt = (
"Refine this meeting summary into:\n"
"1) a 5-bullet key points list\n"
"2) a 1-paragraph plain-English summary\n"
"Avoid names of real people; use roles like 'Speaker A'.\n\n"
f"Otter summary:\n{base}\n\n"
"If helpful, you may incorporate details from the transcript below:\n"
f"{transcript[:6000]}"
)
resp = openai_client.responses.create(
model=OPENAI_TEXT_MODEL,
input=prompt,
)
return resp.output_text.strip()
# No Otter summary; do chunked summarization then combine
parts = chunk_text(transcript)
partials = []
for i, part in enumerate(parts, start=1):
resp = openai_client.responses.create(
model=OPENAI_TEXT_MODEL,
input=(
"Summarize this transcript chunk into 5-10 bullet points. "
"No real names; use roles like 'Speaker A'.\n\n"
f"Chunk {i}/{len(parts)}:\n{part}"
),
)
partials.append(resp.output_text.strip())
resp2 = openai_client.responses.create(
model=OPENAI_TEXT_MODEL,
input=(
"Combine these chunk summaries into:\n"
"1) a 5-bullet key points list\n"
"2) a 1-paragraph plain-English summary\n"
"No real names; use roles.\n\n"
+ "\n\n".join(partials)
),
)
return resp2.output_text.strip()
def build_sora_prompt(summary_text: str) -> str:
“””
Build a Sora-safe prompt: abstract/animated, no real people, no copyrighted characters/music.
“””
summary_text = sanitize_for_video_prompt(summary_text)
# Use an LLM to turn summary into a tight shot list prompt for Sora.
resp = openai_client.responses.create(
model=OPENAI_TEXT_MODEL,
input=(
"Create a SINGLE prompt for a short video generator.\n"
"Rules:\n"
"- Do NOT depict real people or public figures.\n"
"- Use abstract/animated visuals, icons, silhouettes, text overlays.\n"
"- No copyrighted characters, logos, or recognizable music.\n"
"- Make it look like a modern motion-graphics explainer.\n"
"- Include camera/scene directions and on-screen text suggestions.\n"
"- Keep it under 1200 characters.\n\n"
"Source summary:\n"
f"{summary_text}"
),
)
return resp.output_text.strip()
def generate_video_with_sora(prompt: str, out_path: Path) -> None:
“””
Create a Sora job, poll until completed, then download MP4.
“””
video = openai_client.videos.create(
model=OPENAI_VIDEO_MODEL,
prompt=prompt,
seconds=VIDEO_SECONDS,
size=VIDEO_SIZE,
)
while video.status in ("queued", "in_progress"):
time.sleep(5)
video = openai_client.videos.retrieve(video.id)
if video.status != "completed":
msg = getattr(getattr(video, "error", None), "message", "Video generation failed")
raise RuntimeError(msg)
content = openai_client.videos.download_content(video.id, variant="video")
content.write_to_file(str(out_path))
def youtube_credentials() -> Credentials:
creds = None
if os.path.exists(YOUTUBE_TOKEN_FILE):
creds = Credentials.from_authorized_user_file(YOUTUBE_TOKEN_FILE, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(GoogleRequest())
else:
flow = InstalledAppFlow.from_client_secrets_file(YOUTUBE_CLIENT_SECRETS, SCOPES)
creds = flow.run_local_server(port=0)
with open(YOUTUBE_TOKEN_FILE, "w", encoding="utf-8") as f:
f.write(creds.to_json())
return creds
def upload_to_youtube(video_file: Path, title: str, description: str) -> str:
creds = youtube_credentials()
youtube = build(“youtube”, “v3”, credentials=creds)
body = {
"snippet": {
"title": title[:95],
"description": description,
"tags": [t.strip() for t in YOUTUBE_TAGS.split(",") if t.strip()],
"categoryId": YOUTUBE_CATEGORY_ID,
},
"status": {"privacyStatus": YOUTUBE_PRIVACY},
}
media = MediaFileUpload(str(video_file), chunksize=-1, resumable=True, mimetype="video/mp4")
request = youtube.videos().insert(part="snippet,status", body=body, media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
# Optionally print upload progress
if status:
print(f"YouTube upload progress: {int(status.progress() * 100)}%")
return response.get("id", "")
def make_youtube_metadata(payload: Dict[str, Any], summary_text: str) -> Dict[str, str]:
meeting_title = (payload.get(“title”) or “Otter Summary Video”).strip()
# Simple formatting; you can customize more.
title = f”{meeting_title} (Auto Summary)”
description = (
“Auto-generated summary video from an Otter transcript.\n\n”
“Summary:\n”
f”{summary_text}\n”
)
return {“title”: title, “description”: description}
def pipeline(payload: Dict[str, Any]) -> Dict[str, Any]:
transcript = extract_otter_transcript(payload)
if not transcript:
raise ValueError(“No transcript found in webhook payload”)
otter_abs = extract_otter_abstract_summary(payload)
summary = summarize_transcript(transcript, otter_summary=otter_abs)
sora_prompt = build_sora_prompt(summary)
job_id = payload.get("id") or str(uuid.uuid4())
out_mp4 = OUT_DIR / f"{job_id}.mp4"
generate_video_with_sora(sora_prompt, out_mp4)
meta = make_youtube_metadata(payload, summary)
yt_id = upload_to_youtube(out_mp4, meta["title"], meta["description"])
return {
"job_id": job_id,
"video_file": str(out_mp4),
"youtube_video_id": yt_id,
"summary": summary,
"sora_prompt": sora_prompt,
}
———- Webhook Endpoint ———-
@app.post(“/otter/webhook/{token}”)
async def otter_webhook(token: str, request: Request):
# Simple shared-secret path guard (since Otter docs don’t describe a signature header).
if not OTTER_WEBHOOK_TOKEN or token != OTTER_WEBHOOK_TOKEN:
raise HTTPException(status_code=401, detail=”Unauthorized”)
payload = await request.json()
try:
result = pipeline(payload)
return {"ok": True, "result": result}
except Exception as e:
return {"ok": False, "error": str(e)}
pip install -r requirements.txt
export OPENAI_API_KEY="..."
export OTTER_WEBHOOK_TOKEN="choose-a-long-random-string"
export YOUTUBE_CLIENT_SECRETS="client_secrets.json"
uvicorn app:app --host 0.0.0.0 --port 8000
https://YOUR_PUBLIC_HTTPS_DOMAIN/otter/webhook/<OTTER_WEBHOOK_TOKEN>