import os
import shutil
import json
import logging
import asyncio
import time
from typing import Optional, Dict, List, Tuple, Literal
from moviepy import VideoFileClip, concatenate_videoclips
from PIL import Image
from agents import *
import yaml
from interfaces import *
from langchain.chat_models import init_chat_model
from tools.render_backend import RenderBackend
from utils.provider_presets import resolve_chat_model_config
class Script2VideoPipeline:
character_portrait_events = {}
shot_desc_events = {}
frame_events = {}
def __init__(
self,
chat_model: str,
image_generator,
video_generator,
working_dir: str,
):
self.chat_model = chat_model
self.image_generator = image_generator
self.video_generator = video_generator
self.character_extractor = CharacterExtractor(chat_model=self.chat_model)
self.character_portraits_generator = CharacterPortraitsGenerator(image_generator=self.image_generator)
self.storyboard_artist = StoryboardArtist(chat_model=self.chat_model)
self.camera_image_generator = CameraImageGenerator(chat_model=self.chat_model, image_generator=self.image_generator, video_generator=self.video_generator)
self.reference_image_selector = ReferenceImageSelector(chat_model=self.chat_model)
self.working_dir = working_dir
os.makedirs(self.working_dir, exist_ok=True)
@classmethod
def init_from_config(cls, config_path: str):
with open(config_path, "r") as f:
config = yaml.safe_load(f)
chat_model_args = resolve_chat_model_config(config["chat_model"]["init_args"])
chat_model = init_chat_model(**chat_model_args)
backend = RenderBackend.from_config(config)
return cls(
chat_model=chat_model,
image_generator=backend.image_generator,
video_generator=backend.video_generator,
working_dir=config["working_dir"],
)
async def __call__(
self,
script: str,
user_requirement: str,
style: str,
characters: List[CharacterInScene] = None,
character_portraits_registry: Optional[Dict[str, Dict[str, Dict[str, str]]]] = None,
):
if characters is None:
characters = await self.extract_characters(script=script)
if character_portraits_registry is None:
character_portraits_registry_path = os.path.join(self.working_dir, "character_portraits_registry.json")
if os.path.exists(character_portraits_registry_path):
with open(character_portraits_registry_path, "r", encoding="utf-8") as f:
character_portraits_registry = json.load(f)
print(f"🚀 Loaded {len(character_portraits_registry)} character portraits from existing file.")
else:
print(f"🔍 Generating character portraits...")
character_portraits_registry = await self.generate_character_portraits(
characters=characters,
character_portraits_registry=None,
style=style,
)
with open(character_portraits_registry_path, "w", encoding="utf-8") as f:
json.dump(character_portraits_registry, f, ensure_ascii=False, indent=4)
print(f"☑️ Generated {len(character_portraits_registry)} character portraits and saved to {character_portraits_registry_path}.")
storyboard = await self.design_storyboard(
script=script,
characters=characters,
user_requirement=user_requirement,
)
shot_descriptions = await self.decompose_visual_descriptions(
shot_brief_descriptions=storyboard,
characters=characters,
)
camera_tree = await self.construct_camera_tree(
shot_descriptions=shot_descriptions,
)
priority_shot_idxs = [camera.parent_cam_idx for camera in camera_tree if camera.parent_cam_idx is not None]
tasks = [
self.generate_frames_for_single_camera(
camera=camera,
shot_descriptions=shot_descriptions,
characters=characters,
character_portraits_registry=character_portraits_registry,
priority_shot_idxs=priority_shot_idxs,
)
for camera in camera_tree
]
video_tasks = [
self.generate_video_for_single_shot(
shot_description=shot_description,
)
for shot_description in shot_descriptions
]
tasks.extend(video_tasks)
await asyncio.gather(*tasks)
final_video_path = os.path.join(self.working_dir, "final_video.mp4")
if os.path.exists(final_video_path):
print(f"🚀 Skipped concatenating videos, already exists.")
else:
print(f"🎬 Starting concatenating videos...")
video_clips = [
VideoFileClip(os.path.join(self.working_dir, "shots", f"{shot_description.idx}", "video.mp4"))
for shot_description in shot_descriptions
]
final_video = concatenate_videoclips(video_clips)
final_video.write_videofile(final_video_path, codec="libx264", preset="medium")
print(f"☑️ Concatenated videos, saved to {final_video_path}.")
return final_video_path
async def generate_frames_for_single_camera(
self,
camera: Camera,
shot_descriptions: List[ShotDescription],
characters: List[CharacterInScene],
character_portraits_registry: Dict[str, Dict[str, Dict[str, str]]],
priority_shot_idxs: List[int],
):
first_shot_idx = camera.active_shot_idxs[0]
first_shot_ff_path = os.path.join(self.working_dir, "shots", f"{first_shot_idx}", "first_frame.png")
if os.path.exists(first_shot_ff_path):
print(f"🚀 Skipped generating first_frame for shot {first_shot_idx}, already exists.")
self.frame_events[first_shot_idx]["first_frame"].set()
else:
print(f"🖼️ Starting first_frame generation for shot {first_shot_idx}...")
available_image_path_and_text_pairs = []
for character_idx in shot_descriptions[first_shot_idx].ff_vis_char_idxs:
identifier_in_scene = characters[character_idx].identifier_in_scene
registry_item = character_portraits_registry[identifier_in_scene]
for view, item in registry_item.items():
available_image_path_and_text_pairs.append((item["path"], item["description"]))
if camera.parent_shot_idx is not None:
parent_shot_idx = camera.parent_shot_idx
await self.frame_events[parent_shot_idx]["first_frame"].wait()
parent_shot_ff_path = os.path.join(self.working_dir, "shots", f"{parent_shot_idx}", "first_frame.png")
transition_video_path = os.path.join(self.working_dir, "shots", f"{first_shot_idx}", f"transition_video_from_shot_{parent_shot_idx}.mp4")
if os.path.exists(transition_video_path):
print(f"🚀 Skipped generating transition video for shot {first_shot_idx} from shot {parent_shot_idx}, already exists.")
else:
print(f"🖼️ Starting transition video generation for shot {first_shot_idx} from shot {parent_shot_idx}...")
transition_video_output = await self.camera_image_generator.generate_transition_video(
first_shot_visual_desc=shot_descriptions[parent_shot_idx].visual_desc,
second_shot_visual_desc=shot_descriptions[first_shot_idx].visual_desc,
first_shot_ff_path=parent_shot_ff_path,
)
transition_video_output.save(transition_video_path)
print(f"☑️ Generated transition video for shot {first_shot_idx} from shot {parent_shot_idx}, saved to {transition_video_path}.")
new_camera_image_path = os.path.join(self.working_dir, "shots", f"{first_shot_idx}", f"new_camera_{camera.idx}.png")
if os.path.exists(new_camera_image_path):
print(f"🚀 Skipped generating new camera image for shot {first_shot_idx}, already exists.")
else:
print(f"🖼️ Starting new camera image generation for shot {first_shot_idx}...")
new_camera_image = self.camera_image_generator.get_new_camera_image(transition_video_path)
new_camera_image.save(new_camera_image_path)
print(f"☑️ Generated new camera image for shot {first_shot_idx} (not completed), saved to {new_camera_image_path}.")
available_image_path_and_text_pairs.append(
(
new_camera_image_path,
f"The composition and background are correct but some elements may be wrong. The wrong elements should be replaced.\nWrong elements: {camera.missing_info}.\nYou must select this image as the main reference and replace the characters in the image with the provided character portraits. Don't change the background."
)
)
if camera.parent_shot_idx is None or camera.missing_info is not None:
ff_selector_output_path = os.path.join(self.working_dir, "shots", f"{first_shot_idx}", "first_frame_selector_output.json")
if os.path.exists(ff_selector_output_path):
with open(ff_selector_output_path, 'r', encoding='utf-8') as f:
ff_selector_output = json.load(f)
print(f"🚀 Loaded existing reference image selection and prompt for first_frame of shot {first_shot_idx} from {ff_selector_output_path}.")
else:
print(f"🔍 Selecting reference images and generating prompt for first_frame of shot {first_shot_idx}...")
ff_selector_output = await self.reference_image_selector.select_reference_images_and_generate_prompt(
available_image_path_and_text_pairs=available_image_path_and_text_pairs,
frame_description=shot_descriptions[first_shot_idx].ff_desc
)
with open(ff_selector_output_path, 'w', encoding='utf-8') as f:
json.dump(ff_selector_output, f, ensure_ascii=False, indent=4)
print(f"☑️ Selected reference images and generated prompt for first_frame of shot {first_shot_idx}, saved to {ff_selector_output_path}.")
reference_image_path_and_text_pairs, prompt = ff_selector_output["reference_image_path_and_text_pairs"], ff_selector_output["text_prompt"]
prefix_prompt = ""
for i, (image_path, text) in enumerate(reference_image_path_and_text_pairs):
prefix_prompt += f"Image {i}: {text}\n"
prompt = f"{prefix_prompt}\n{prompt}"
reference_image_paths = [item[0] for item in reference_image_path_and_text_pairs]
ff_image: ImageOutput = await self.image_generator.generate_single_image(
prompt=prompt,
reference_image_paths=reference_image_paths,
size="1600x900",
)
ff_image.save(first_shot_ff_path)
self.frame_events[first_shot_idx]["first_frame"].set()
print(f"☑️ Generated first_frame for shot {first_shot_idx}, saved to {first_shot_ff_path}.")
else:
shutil.copy(new_camera_image_path, first_shot_ff_path)
self.frame_events[first_shot_idx]["first_frame"].set()
print(f"☑️ Generated first_frame for shot {first_shot_idx}, saved to {first_shot_ff_path}.")
priority_tasks = []
normal_tasks = []
if shot_descriptions[first_shot_idx].variation_type in ["medium", "large"]:
task = self.generate_frame_for_single_shot(
shot_idx=first_shot_idx,
frame_type="last_frame",
first_shot_ff_path_and_text_pair=(first_shot_ff_path, shot_descriptions[first_shot_idx].ff_desc),
frame_desc=shot_descriptions[first_shot_idx].lf_desc,
visible_characters=[characters[idx] for idx in shot_descriptions[first_shot_idx].lf_vis_char_idxs],
character_portraits_registry=character_portraits_registry,
)
normal_tasks.append(task)
for shot_idx in camera.active_shot_idxs[1:]:
first_frame_task = self.generate_frame_for_single_shot(
shot_idx=shot_idx,
frame_type="first_frame",
first_shot_ff_path_and_text_pair=(first_shot_ff_path, shot_descriptions[first_shot_idx].ff_desc),
frame_desc=shot_descriptions[shot_idx].ff_desc,
visible_characters=[characters[idx] for idx in shot_descriptions[shot_idx].ff_vis_char_idxs],
character_portraits_registry=character_portraits_registry,
)
if shot_idx in priority_shot_idxs:
priority_tasks.append(first_frame_task)
else:
normal_tasks.append(first_frame_task)
if shot_descriptions[shot_idx].variation_type in ["medium", "large"]:
last_frame_task = self.generate_frame_for_single_shot(
shot_idx=shot_idx,
frame_type="last_frame",
first_shot_ff_path_and_text_pair=(first_shot_ff_path, shot_descriptions[first_shot_idx].ff_desc),
frame_desc=shot_descriptions[shot_idx].lf_desc,
visible_characters=[characters[idx] for idx in shot_descriptions[shot_idx].lf_vis_char_idxs],
character_portraits_registry=character_portraits_registry,
)
normal_tasks.append(last_frame_task)
await asyncio.gather(*priority_tasks)
await asyncio.gather(*normal_tasks)
async def generate_video_for_single_shot(
self,
shot_description: ShotDescription,
):
video_path = os.path.join(self.working_dir, "shots", f"{shot_description.idx}", "video.mp4")
if os.path.exists(video_path):
print(f"🚀 Skipped generating video for shot {shot_description.idx}, already exists.")
else:
await self.frame_events[shot_description.idx]["first_frame"].wait()
if shot_description.variation_type in ["medium", "large"]:
await self.frame_events[shot_description.idx]["last_frame"].wait()
frame_paths = []
frame_paths.append(os.path.join(self.working_dir, "shots", f"{shot_description.idx}", "first_frame.png"))
if shot_description.variation_type in ["medium", "large"]:
frame_paths.append(os.path.join(self.working_dir, "shots", f"{shot_description.idx}", "last_frame.png"))
print(f"🎬 Starting video generation for shot {shot_description.idx}...")
video_output = await self.video_generator.generate_single_video(
prompt=shot_description.motion_desc + "\n" + shot_description.audio_desc,
reference_image_paths=frame_paths,
)
video_output.save(video_path)
print(f"☑️ Generated video for shot {shot_description.idx}, saved to {video_path}.")
async def generate_frame_for_single_shot(
self,
shot_idx: int,
frame_type: Literal["first_frame", "last_frame"],
first_shot_ff_path_and_text_pair: Tuple[str, str],
frame_desc: str,
visible_characters: List[CharacterInScene],
character_portraits_registry: Dict[str, Dict[str, Dict[str, str]]],
) -> ImageOutput:
frame_image_path = os.path.join(self.working_dir, "shots", f"{shot_idx}", f"{frame_type}.png")
if os.path.exists(frame_image_path):
print(f"🚀 Skipped generating {frame_type} for shot {shot_idx}, already exists.")
else:
print(f"🖼️ Starting {frame_type} generation for shot {shot_idx}...")
available_image_path_and_text_pairs = []
for visible_character in visible_characters:
identifier_in_scene = visible_character.identifier_in_scene
registry_item = character_portraits_registry[identifier_in_scene]
for view, item in registry_item.items():
available_image_path_and_text_pairs.append((item["path"], item["description"]))
available_image_path_and_text_pairs.append(first_shot_ff_path_and_text_pair)
selector_output_path = os.path.join(self.working_dir, "shots", f"{shot_idx}", f"{frame_type}_selector_output.json")
if os.path.exists(selector_output_path):
with open(selector_output_path, 'r', encoding='utf-8') as f:
selector_output = json.load(f)
print(f"🚀 Loaded existing reference image selection and prompt for {frame_type} frame of shot {shot_idx} from {selector_output_path}.")
else:
print(f"🔍 Selecting reference images and generating prompt for {frame_type} frame of shot {shot_idx}...")
selector_output = await self.reference_image_selector.select_reference_images_and_generate_prompt(
available_image_path_and_text_pairs=available_image_path_and_text_pairs,
frame_description=frame_desc
)
with open(selector_output_path, 'w', encoding='utf-8') as f:
json.dump(selector_output, f, ensure_ascii=False, indent=4)
print(f"☑️ Selected reference images and generated prompt for {frame_type} frame of shot {shot_idx}, saved to {selector_output_path}.")
reference_image_path_and_text_pairs, prompt = selector_output["reference_image_path_and_text_pairs"], selector_output["text_prompt"]
prefix_prompt = ""
for i, (image_path, text) in enumerate(reference_image_path_and_text_pairs):
prefix_prompt += f"Image {i}: {text}\n"
prompt = f"{prefix_prompt}\n{prompt}"
reference_image_paths = [item[0] for item in reference_image_path_and_text_pairs]
frame_image: ImageOutput = await self.image_generator.generate_single_image(
prompt=prompt,
reference_image_paths=reference_image_paths,
size="1600x900",
)
frame_image.save(frame_image_path)
print(f"☑️ Generated {frame_type} frame for shot {shot_idx}, saved to {frame_image_path}.")
self.frame_events[shot_idx][frame_type].set()
return frame_image_path
async def construct_camera_tree(
self,
shot_descriptions: List[ShotDescription],
):
camera_tree_path = os.path.join(self.working_dir, "camera_tree.json")
if os.path.exists(camera_tree_path):
with open(camera_tree_path, "r", encoding="utf-8") as f:
camera_tree = json.load(f)
camera_tree = [Camera.model_validate(camera) for camera in camera_tree]
print(f"🚀 Loaded {len(camera_tree)} cameras from existing file.")
return camera_tree
cameras: List[Camera] = []
for shot_description in shot_descriptions:
if shot_description.cam_idx not in [camera.idx for camera in cameras]:
cameras.append(Camera(idx=shot_description.cam_idx, active_shot_idxs=[shot_description.idx]))
else:
cameras[shot_description.cam_idx].active_shot_idxs.append(shot_description.idx)
camera_tree = await self.camera_image_generator.construct_camera_tree(cameras=cameras, shot_descs=shot_descriptions)
with open(camera_tree_path, "w", encoding="utf-8") as f:
json.dump([camera.model_dump() for camera in camera_tree], f, ensure_ascii=False, indent=4)
print(f"✅ Constructed camera tree and saved to {camera_tree_path}.")
return camera_tree
async def extract_characters(
self,
script: str,
):
save_path = os.path.join(self.working_dir, "characters.json")
if os.path.exists(save_path):
with open(save_path, "r", encoding="utf-8") as f:
characters = json.load(f)
characters = [CharacterInScene.model_validate(character) for character in characters]
print(f"🚀 Loaded {len(characters)} characters from existing file.")
else:
characters = await self.character_extractor.extract_characters(script)
with open(save_path, "w", encoding="utf-8") as f:
json.dump([character.model_dump() for character in characters], f, ensure_ascii=False, indent=4)
print(f"✅ Extracted {len(characters)} characters from script and saved to {save_path}.")
for character in characters:
self.character_portrait_events[character.idx] = asyncio.Event()
return characters
async def generate_character_portraits(
self,
characters: List[CharacterInScene],
character_portraits_registry: Optional[Dict[str, Dict[str, Dict[str, str]]]],
style: str,
):
character_portraits_registry_path = os.path.join(self.working_dir, "character_portraits_registry.json")
if character_portraits_registry is None:
if os.path.exists(character_portraits_registry_path):
with open(character_portraits_registry_path, 'r', encoding='utf-8') as f:
character_portraits_registry = json.load(f)
else:
character_portraits_registry = {}
tasks = [
self.generate_portraits_for_single_character(character, style)
for character in characters
if character.identifier_in_scene not in character_portraits_registry
]
if tasks:
for future in asyncio.as_completed(tasks):
character_portraits_registry.update(await future)
with open(character_portraits_registry_path, 'w', encoding='utf-8') as f:
json.dump(character_portraits_registry, f, ensure_ascii=False, indent=4)
print(f"✅ Completed character portrait generation for {len(characters)} characters.")
else:
print("🚀 All characters already have portraits, skipping portrait generation.")
return character_portraits_registry
async def generate_portraits_for_single_character(
self,
character: CharacterInScene,
style: str,
):
character_dir = os.path.join(self.working_dir, "character_portraits", f"{character.idx}_{character.identifier_in_scene}")
os.makedirs(character_dir, exist_ok=True)
front_portrait_path = os.path.join(character_dir, "front.png")
if os.path.exists(front_portrait_path):
pass
else:
front_portrait_output = await self.character_portraits_generator.generate_front_portrait(character, style)
front_portrait_output.save(front_portrait_path)
side_portrait_path = os.path.join(character_dir, "side.png")
if os.path.exists(side_portrait_path):
pass
else:
side_portrait_output = await self.character_portraits_generator.generate_side_portrait(character, front_portrait_path)
side_portrait_output.save(side_portrait_path)
back_portrait_path = os.path.join(character_dir, "back.png")
if os.path.exists(back_portrait_path):
pass
else:
back_portrait_output = await self.character_portraits_generator.generate_back_portrait(character, front_portrait_path)
back_portrait_output.save(back_portrait_path)
self.character_portrait_events[character.idx].set()
print(f"☑️ Completed character portrait generation for {character.identifier_in_scene}.")
return {
character.identifier_in_scene: {
"front": {
"path": front_portrait_path,
"description": f"A front view portrait of {character.identifier_in_scene}.",
},
"side": {
"path": side_portrait_path,
"description": f"A side view portrait of {character.identifier_in_scene}.",
},
"back": {
"path": back_portrait_path,
"description": f"A back view portrait of {character.identifier_in_scene}.",
},
}
}
async def design_storyboard(
self,
script: str,
characters: List[CharacterInScene],
user_requirement: str,
):
storyboard_path = os.path.join(self.working_dir, "storyboard.json")
if os.path.exists(storyboard_path):
with open(storyboard_path, 'r', encoding='utf-8') as f:
storyboard = json.load(f)
storyboard = [ShotBriefDescription.model_validate(shot) for shot in storyboard]
print(f"🚀 Loaded {len(storyboard)} shot brief descriptions from existing file.")
else:
print(f"🔍 Designing storyboard...")
storyboard = await self.storyboard_artist.design_storyboard(
script=script,
characters=characters,
user_requirement=user_requirement,
retry_timeout=150,
)
with open(storyboard_path, 'w', encoding='utf-8') as f:
json.dump([shot.model_dump() for shot in storyboard], f, ensure_ascii=False, indent=4)
print(f"✅ Designed storyboard and saved to {storyboard_path}.")
for shot_brief_description in storyboard:
self.shot_desc_events[shot_brief_description.idx] = asyncio.Event()
return storyboard
async def decompose_visual_descriptions(
self,
shot_brief_descriptions: List[ShotBriefDescription],
characters: List[CharacterInScene],
):
tasks = [
self.decompose_visual_description_for_single_shot_brief_description(shot_brief_description, characters)
for shot_brief_description in shot_brief_descriptions
]
shot_descriptions = await asyncio.gather(*tasks)
return shot_descriptions
async def decompose_visual_description_for_single_shot_brief_description(
self,
shot_brief_description: ShotBriefDescription,
characters: List[CharacterInScene],
):
shot_description_path = os.path.join(self.working_dir, "shots", f"{shot_brief_description.idx}", "shot_description.json")
os.makedirs(os.path.dirname(shot_description_path), exist_ok=True)
if os.path.exists(shot_description_path):
with open(shot_description_path, 'r', encoding='utf-8') as f:
shot_description = ShotDescription.model_validate(json.load(f))
print(f"🚀 Loaded shot {shot_brief_description.idx} description from existing file.")
else:
shot_description = await self.storyboard_artist.decompose_visual_description(
shot_brief_desc=shot_brief_description,
characters=characters,
retry_timeout=120,
)
with open(shot_description_path, 'w', encoding='utf-8') as f:
json.dump(shot_description.model_dump(), f, ensure_ascii=False, indent=4)
print(f"✅ Decomposed visual description for shot {shot_brief_description.idx} and saved to {shot_description_path}.")
self.shot_desc_events[shot_brief_description.idx].set()
if shot_description.variation_type in ["medium", "large"]:
self.frame_events[shot_brief_description.idx] = {
"first_frame": asyncio.Event(),
"last_frame": asyncio.Event(),
}
else:
self.frame_events[shot_brief_description.idx] = {
"first_frame": asyncio.Event(),
}
return shot_description