"""Prepare MLS/LibriSpeech/FLEURS manifests for Canary-1B evaluation.
This script only downloads/converts data and writes JSONL manifests. It does not
load the model or run inference.
"""
from __future__ import annotations
import argparse
import io
import json
import math
import os
import random
import re
import tarfile
from concurrent.futures import ThreadPoolExecutor
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Iterable
import soundfile as sf
from tqdm import tqdm
FLEURS_CONFIG = {
"en": "en_us",
"de": "de_de",
"es": "es_419",
"fr": "fr_fr",
}
DEFAULT_AST_DIRECTIONS = ["en-de", "en-es", "en-fr", "de-en", "es-en", "fr-en"]
@dataclass
class ManifestItem:
audio_filepath: str
duration: float
answer: str
taskname: str
source_lang: str
target_lang: str
pnc: str
sample_id: str
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Prepare Canary-1B MLS/LibriSpeech/FLEURS eval data")
parser.add_argument("--task", default="all", choices=["librispeech", "asr", "ast", "all"])
parser.add_argument("--data_dir", default="Canary-1B/eval_data")
parser.add_argument("--seed", type=int, default=0)
parser.add_argument("--shuffle", action="store_true", help="Shuffle before taking subsets")
parser.add_argument("--asr_dataset", default="facebook/multilingual_librispeech")
parser.add_argument(
"--asr_configs",
default="german,spanish,french",
help="Comma-separated facebook/multilingual_librispeech configs for ASR manifests.",
)
parser.add_argument("--asr_split", default="test")
parser.add_argument("--asr_parquet_dir", default="", help=(
"Optional local/download directory for facebook/multilingual_librispeech split parquet files. "
"Files are stored as <dir>/<config>/<split>-00000-of-00001.parquet and reused if present."
))
parser.add_argument("--asr_minutes", type=float, default=30.0, help="0 means full split/no minute cap")
parser.add_argument("--asr_limit", type=int, default=0, help="0 means no item-count cap")
parser.add_argument("--asr_pnc", default="no", choices=["yes", "no"])
parser.add_argument(
"--include_librispeech_test_clean",
action=argparse.BooleanOptionalAction,
default=True,
help="Also prepare OpenSLR LibriSpeech test-clean for Open ASR Leaderboard-style performance tests.",
)
parser.add_argument("--librispeech_dataset", default="openslr/librispeech_asr")
parser.add_argument("--librispeech_config", default="clean")
parser.add_argument("--librispeech_split", default="test")
parser.add_argument("--librispeech_dir", default="", help=(
"Optional local/download directory for OpenSLR LibriSpeech archives. "
"For test-clean, the script reuses <dir>/LibriSpeech/test-clean if present, "
"otherwise downloads <dir>/test-clean.tar.gz and extracts it."
))
parser.add_argument(
"--librispeech_minutes",
type=float,
default=None,
help="LibriSpeech minute cap. Defaults to --asr_minutes; 0 means full split/no cap.",
)
parser.add_argument(
"--librispeech_limit",
type=int,
default=None,
help="LibriSpeech item cap. Defaults to --asr_limit; 0 means no item-count cap.",
)
parser.add_argument("--fleurs_dataset", default="google/fleurs")
parser.add_argument("--fleurs_split", default="test")
parser.add_argument("--fleurs_parquet_dir", default="", help=(
"Optional local/download directory for google/fleurs split parquet files. "
"Files are stored as <dir>/<config>/<split>-00000-of-00001.parquet and reused if present."
))
parser.add_argument("--offline", action="store_true", help="Do not download missing local dataset files")
parser.add_argument("--fleurs_limit", type=int, default=50, help="Samples per AST direction; 0 means full split")
parser.add_argument("--ast_directions", default=",".join(DEFAULT_AST_DIRECTIONS))
parser.add_argument("--ast_pnc", default="yes", choices=["yes", "no"])
parser.add_argument(
"--audio_workers",
type=int,
default=min(8, max(1, os.cpu_count() or 1)),
help=(
"Parallel workers for decoding/writing wav files. Set to 1 for serial behavior. "
"Defaults to min(8, CPU count)."
),
)
parser.add_argument(
"--force_rewrite_audio",
action="store_true",
help="Rewrite wav files even when they already exist. By default existing wav files are reused.",
)
return parser.parse_args()
def safe_name(text: str) -> str:
return re.sub(r"[^A-Za-z0-9_.-]+", "_", str(text)).strip("_") or "sample"
def wav_duration(path: Path) -> float:
info = sf.info(path)
return float(info.frames) / float(info.samplerate)
def write_wav(audio: dict[str, Any], path: Path, force: bool = False) -> float:
"""Write a datasets Audio value to 16 kHz wav and return duration.
HF dataset builders usually return {array, sampling_rate}; direct parquet
loading may return {bytes, path}. Support both so FLEURS can be loaded from
its test parquet only, without downloading train parquet files. Existing wav
files are reused by default, which makes repeated manifest preparation much
faster.
"""
path.parent.mkdir(parents=True, exist_ok=True)
if path.exists() and not force:
return wav_duration(path)
if "array" in audio and audio["array"] is not None:
array = audio["array"]
sr = int(audio["sampling_rate"])
elif "bytes" in audio and audio["bytes"] is not None:
array, sr = sf.read(io.BytesIO(audio["bytes"]))
elif "path" in audio and audio["path"]:
array, sr = sf.read(audio["path"])
else:
raise ValueError(f"Unsupported audio field: {audio.keys()}")
if sr != 16000:
import librosa
array = librosa.resample(array, orig_sr=sr, target_sr=16000)
sr = 16000
sf.write(path, array, sr)
return float(len(array)) / float(sr)
def map_audio_jobs(jobs: list[tuple[dict[str, Any], Path]], workers: int, force: bool) -> list[float]:
if workers <= 1 or len(jobs) <= 1:
return [write_wav(audio, path, force=force) for audio, path in jobs]
with ThreadPoolExecutor(max_workers=workers) as executor:
return list(executor.map(lambda job: write_wav(job[0], job[1], force=force), jobs))
def write_manifest(path: Path, items: Iterable[ManifestItem]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
count = 0
with path.open("w", encoding="utf-8") as f:
for item in items:
f.write(json.dumps(asdict(item), ensure_ascii=False) + "\n")
count += 1
print(f"wrote {count} items: {path}")
def write_metadata(path: Path, metadata: dict[str, Any]) -> None:
meta_path = path.with_suffix(path.suffix + ".meta.json")
with meta_path.open("w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
print(f"wrote metadata: {meta_path}")
def maybe_shuffle(ds: Any, shuffle: bool, seed: int) -> Any:
return ds.shuffle(seed=seed) if shuffle else ds
def dataset_has_column(ds: Any, column: str) -> bool:
"""Return whether a map-style or iterable HF dataset exposes a column."""
column_names = getattr(ds, "column_names", None)
if column_names is not None:
return column in column_names
features = getattr(ds, "features", None)
return bool(features is not None and column in features)
def disable_audio_decode(ds: Any, column: str = "audio") -> Any:
"""Keep HF datasets Audio values encoded so torchcodec is not required.
Recent versions of ``datasets`` decode Audio columns through torchcodec when
rows are iterated. On Ascend/NPU environments torchcodec is often not
available or not usable. This script writes audio with soundfile anyway, so
keep the original bytes/path and decode locally in ``write_wav`` instead.
"""
if not dataset_has_column(ds, column):
return ds
from datasets import Audio
return ds.cast_column(column, Audio(decode=False))
def remove_existing_columns(ds: Any, columns: Iterable[str]) -> Any:
existing = [column for column in columns if dataset_has_column(ds, column)]
return ds.remove_columns(existing) if existing else ds
MLS_LANG = {
"german": "de",
"spanish": "es",
"french": "fr",
"dutch": "nl",
"italian": "it",
"portuguese": "pt",
"polish": "pl",
}
def split_csv(value: str) -> list[str]:
return [x.strip() for x in value.split(",") if x.strip()]
def source_lang_for_mls_config(config: str) -> str:
key = config.lower().replace("_", "-")
if key not in MLS_LANG:
raise ValueError(
f"Unsupported multilingual_librispeech config for Canary ASR manifest: {config!r}. "
f"Known configs: {', '.join(sorted(MLS_LANG))}."
)
return MLS_LANG[key]
def librispeech_openslr_subset(config: str, split: str) -> str:
config = config.lower().replace("_", "-")
split = split.lower().replace("_", "-")
if split in {"test-clean", "test-other", "dev-clean", "dev-other", "train-clean-100", "train-clean-360", "train-other-500"}:
return split
if split == "validation":
split = "dev"
if split in {"test", "dev"} and config in {"clean", "other"}:
return f"{split}-{config}"
if split == "train" and config == "clean":
return "train-clean-100"
if split == "train" and config == "other":
return "train-other-500"
raise ValueError(
f"Unsupported local LibriSpeech config/split: config={config!r}, split={split!r}. "
"Use --librispeech_split test-clean/dev-clean/test-other/... or a supported config+split pair."
)
def librispeech_subset_dir(librispeech_dir: str | Path, subset: str) -> Path:
return Path(librispeech_dir) / "LibriSpeech" / subset
def safe_extract_tar(archive: Path, dest_dir: Path) -> None:
dest_dir = dest_dir.resolve()
with tarfile.open(archive, "r:gz") as tar:
for member in tar.getmembers():
target = (dest_dir / member.name).resolve()
if not str(target).startswith(str(dest_dir)):
raise ValueError(f"Refusing to extract path outside destination: {member.name}")
tar.extractall(dest_dir)
def download_librispeech_subset(config: str, split: str, librispeech_dir: str | Path, offline: bool = False) -> Path:
subset = librispeech_openslr_subset(config, split)
root = Path(librispeech_dir)
subset_dir = librispeech_subset_dir(root, subset)
if subset_dir.exists():
print(f"using existing LibriSpeech directory: {subset_dir}")
return subset_dir
archive = root / f"{subset}.tar.gz"
if not archive.exists():
if offline:
raise FileNotFoundError(
f"Offline mode enabled and LibriSpeech data is missing: {subset_dir} or {archive}"
)
from urllib.request import urlretrieve
root.mkdir(parents=True, exist_ok=True)
tmp_archive = archive.with_suffix(archive.suffix + ".tmp")
url = f"https://www.openslr.org/resources/12/{subset}.tar.gz"
print(f"downloading LibriSpeech {subset} to {archive}: {url}")
urlretrieve(url, tmp_archive)
tmp_archive.replace(archive)
else:
print(f"using existing LibriSpeech archive: {archive}")
print(f"extracting LibriSpeech archive: {archive}")
safe_extract_tar(archive, root)
if not subset_dir.exists():
raise FileNotFoundError(f"Expected extracted LibriSpeech directory not found: {subset_dir}")
return subset_dir
def iter_librispeech_local_rows(subset_dir: Path) -> Iterable[dict[str, Any]]:
for transcript in sorted(subset_dir.glob("*/*/*.trans.txt")):
with transcript.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
sample_id, text = line.split(" ", 1)
audio_path = transcript.parent / f"{sample_id}.flac"
if not audio_path.exists():
raise FileNotFoundError(f"Missing LibriSpeech audio file: {audio_path}")
yield {"id": sample_id, "audio": {"path": str(audio_path)}, "text": text}
def librispeech_minutes(args: argparse.Namespace) -> float:
return args.asr_minutes if args.librispeech_minutes is None else args.librispeech_minutes
def librispeech_limit(args: argparse.Namespace) -> int:
return args.asr_limit if args.librispeech_limit is None else args.librispeech_limit
def write_librispeech_manifest(args: argparse.Namespace, rows: Iterable[dict[str, Any]], dataset: str, config: str) -> Path:
out_dir = Path(args.data_dir) / "librispeech_test_clean"
minutes = librispeech_minutes(args)
limit = librispeech_limit(args)
max_seconds = minutes * 60.0 if minutes > 0 else math.inf
total_seconds = 0.0
items: list[ManifestItem] = []
row_iter = iter(rows)
chunk_size = max(1, args.audio_workers * 4)
with tqdm(desc="prepare LibriSpeech") as pbar:
while (not limit or len(items) < limit) and total_seconds < max_seconds:
jobs: list[tuple[dict[str, Any], Path]] = []
metas: list[tuple[dict[str, Any], str, Path]] = []
for row in row_iter:
if limit and len(items) + len(jobs) >= limit:
break
sid = str(row.get("id") or row.get("file") or len(items) + len(jobs))
wav_path = out_dir / "wav" / f"{safe_name(sid)}.wav"
jobs.append((row["audio"], wav_path))
metas.append((row, sid, wav_path))
if len(jobs) >= chunk_size:
break
if not jobs:
break
durations = map_audio_jobs(jobs, args.audio_workers, args.force_rewrite_audio)
for (row, sid, wav_path), duration in zip(metas, durations):
if total_seconds >= max_seconds:
break
total_seconds += duration
items.append(
ManifestItem(
audio_filepath=str(wav_path),
duration=duration,
answer=str(row.get("text") or row.get("transcript") or row.get("transcription") or ""),
taskname="asr",
source_lang="en",
target_lang="en",
pnc=args.asr_pnc,
sample_id=sid,
)
)
pbar.update(1)
manifest = out_dir / "manifest_asr_en.jsonl"
write_manifest(manifest, items)
write_metadata(
manifest,
{
"task": "asr",
"dataset": dataset,
"config": config,
"split": args.librispeech_split,
"minutes_limit": minutes,
"item_limit": limit,
"librispeech_dir": args.librispeech_dir,
"offline": args.offline,
"purpose": "performance / Open ASR Leaderboard-style LibriSpeech test-clean",
"num_items": len(items),
"total_audio_seconds": sum(item.duration for item in items),
},
)
return manifest
def prepare_librispeech_local(args: argparse.Namespace) -> Path:
subset_dir = download_librispeech_subset(
args.librispeech_config, args.librispeech_split, args.librispeech_dir, args.offline
)
rows = list(iter_librispeech_local_rows(subset_dir))
if args.shuffle:
random.Random(args.seed).shuffle(rows)
return write_librispeech_manifest(args, rows, dataset=str(subset_dir), config=args.librispeech_config)
def prepare_librispeech(args: argparse.Namespace) -> Path:
if args.librispeech_dir:
return prepare_librispeech_local(args)
from datasets import load_dataset
print(
f"loading LibriSpeech dataset={args.librispeech_dataset} "
f"config={args.librispeech_config} split={args.librispeech_split}"
)
ds = load_dataset(args.librispeech_dataset, args.librispeech_config, split=args.librispeech_split)
ds = disable_audio_decode(ds)
ds = maybe_shuffle(ds, args.shuffle, args.seed)
return write_librispeech_manifest(args, ds, dataset=args.librispeech_dataset, config=args.librispeech_config)
def mls_parquet_url(config: str, split: str) -> str:
return (
"https://huggingface.co/datasets/facebook/multilingual_librispeech/resolve/main/"
f"{config}/{split}-00000-of-00001.parquet"
)
def mls_parquet_path(parquet_dir: str | Path, config: str, split: str) -> Path:
return Path(parquet_dir) / config / f"{split}-00000-of-00001.parquet"
def download_mls_parquet(config: str, split: str, parquet_dir: str | Path = "", offline: bool = False) -> Path:
if parquet_dir:
local_file = mls_parquet_path(parquet_dir, config, split)
if local_file.exists():
print(f"using existing MLS parquet: {local_file}")
return local_file
if offline:
raise FileNotFoundError(f"Offline mode enabled and MLS parquet is missing: {local_file}")
from urllib.request import urlretrieve
local_file.parent.mkdir(parents=True, exist_ok=True)
tmp_file = local_file.with_suffix(local_file.suffix + ".tmp")
url = mls_parquet_url(config, split)
print(f"downloading MLS parquet to {local_file}: {url}")
urlretrieve(url, tmp_file)
tmp_file.replace(local_file)
return local_file
from huggingface_hub import hf_hub_download
return Path(
hf_hub_download(
repo_id="facebook/multilingual_librispeech",
repo_type="dataset",
filename=f"{config}/{split}-00000-of-00001.parquet",
local_files_only=offline,
)
)
def write_asr_manifest(
args: argparse.Namespace, rows: Iterable[dict[str, Any]], dataset: str, config: str, split: str
) -> Path:
lang = source_lang_for_mls_config(config) if dataset == "facebook/multilingual_librispeech" else "en"
safe_config = safe_name(config.lower().replace("_", "-"))
out_dir = Path(args.data_dir) / f"mls_{split}_{safe_config}"
max_seconds = args.asr_minutes * 60.0 if args.asr_minutes > 0 else math.inf
total_seconds = 0.0
items: list[ManifestItem] = []
row_iter = iter(rows)
chunk_size = max(1, args.audio_workers * 4)
with tqdm(desc=f"prepare ASR {config}/{split}") as pbar:
while (not args.asr_limit or len(items) < args.asr_limit) and total_seconds < max_seconds:
jobs: list[tuple[dict[str, Any], Path]] = []
metas: list[tuple[dict[str, Any], str, Path]] = []
for row in row_iter:
if args.asr_limit and len(items) + len(jobs) >= args.asr_limit:
break
sid = str(row.get("id") or row.get("file") or len(items) + len(jobs))
wav_path = out_dir / "wav" / f"{safe_name(sid)}.wav"
jobs.append((row["audio"], wav_path))
metas.append((row, sid, wav_path))
if len(jobs) >= chunk_size:
break
if not jobs:
break
durations = map_audio_jobs(jobs, args.audio_workers, args.force_rewrite_audio)
for (row, sid, wav_path), duration in zip(metas, durations):
if total_seconds >= max_seconds:
break
total_seconds += duration
items.append(
ManifestItem(
audio_filepath=str(wav_path),
duration=duration,
answer=str(row.get("text") or row.get("transcript") or row.get("transcription") or ""),
taskname="asr",
source_lang=lang,
target_lang=lang,
pnc=args.asr_pnc,
sample_id=sid,
)
)
pbar.update(1)
manifest = out_dir / f"manifest_asr_{lang}.jsonl"
write_manifest(manifest, items)
write_metadata(
manifest,
{
"task": "asr",
"dataset": dataset,
"config": config,
"split": split,
"minutes_limit": args.asr_minutes,
"item_limit": args.asr_limit,
"asr_parquet_dir": args.asr_parquet_dir,
"offline": args.offline,
"num_items": len(items),
"total_audio_seconds": sum(item.duration for item in items),
},
)
return manifest
def prepare_asr_config(args: argparse.Namespace, config: str) -> Path:
from datasets import load_dataset
print(f"loading ASR dataset={args.asr_dataset} config={config} split={args.asr_split}")
if args.asr_dataset == "facebook/multilingual_librispeech":
if args.asr_parquet_dir:
local_file = download_mls_parquet(config, args.asr_split, args.asr_parquet_dir, args.offline)
print(f"loading local MLS parquet: {local_file}")
ds = load_dataset("parquet", data_files={args.asr_split: str(local_file)}, split=args.asr_split, streaming=True)
else:
data_file = mls_parquet_url(config, args.asr_split)
print(f"loading MLS split-only parquet: {data_file}")
try:
ds = load_dataset("parquet", data_files={args.asr_split: data_file}, split=args.asr_split, streaming=True)
except FileNotFoundError as exc:
print(f"direct HTTPS parquet load failed: {exc}")
local_file = download_mls_parquet(config, args.asr_split, offline=args.offline)
print(f"loading local MLS parquet: {local_file}")
ds = load_dataset("parquet", data_files={args.asr_split: str(local_file)}, split=args.asr_split, streaming=True)
else:
ds = load_dataset(args.asr_dataset, config, split=args.asr_split, streaming=True)
ds = disable_audio_decode(ds)
ds = maybe_shuffle(ds, args.shuffle, args.seed)
return write_asr_manifest(args, ds, dataset=args.asr_dataset, config=config, split=args.asr_split)
def prepare_asr(args: argparse.Namespace) -> list[Path]:
manifests = [prepare_asr_config(args, config) for config in split_csv(args.asr_configs)]
if args.include_librispeech_test_clean:
manifests.append(prepare_librispeech(args))
return manifests
def fleurs_text(row: dict[str, Any], pnc: str) -> str:
if pnc == "yes":
return str(row.get("raw_transcription") or row.get("transcription") or "")
return str(row.get("transcription") or row.get("raw_transcription") or "")
def fleurs_parquet_url(config: str, split: str) -> str:
return (
"https://huggingface.co/datasets/google/fleurs/resolve/main/"
f"parquet-data/{config}/{split}-00000-of-00001.parquet"
)
def fleurs_parquet_path(parquet_dir: str | Path, config: str, split: str) -> Path:
return Path(parquet_dir) / config / f"{split}-00000-of-00001.parquet"
def download_fleurs_parquet(config: str, split: str, parquet_dir: str | Path = "", offline: bool = False) -> Path:
if parquet_dir:
local_file = fleurs_parquet_path(parquet_dir, config, split)
if local_file.exists():
print(f"using existing FLEURS parquet: {local_file}")
return local_file
if offline:
raise FileNotFoundError(f"Offline mode enabled and FLEURS parquet is missing: {local_file}")
from urllib.request import urlretrieve
local_file.parent.mkdir(parents=True, exist_ok=True)
tmp_file = local_file.with_suffix(local_file.suffix + ".tmp")
url = fleurs_parquet_url(config, split)
print(f"downloading FLEURS parquet to {local_file}: {url}")
urlretrieve(url, tmp_file)
tmp_file.replace(local_file)
return local_file
from huggingface_hub import hf_hub_download
return Path(
hf_hub_download(
repo_id="google/fleurs",
repo_type="dataset",
filename=f"parquet-data/{config}/{split}-00000-of-00001.parquet",
local_files_only=offline,
)
)
def load_fleurs(dataset_name: str, split: str, lang: str, fleurs_parquet_dir: str = "", offline: bool = False) -> Any:
from datasets import load_dataset
config = FLEURS_CONFIG[lang]
print(f"loading FLEURS dataset={dataset_name} config={config} split={split}")
if dataset_name == "google/fleurs":
if fleurs_parquet_dir:
local_file = download_fleurs_parquet(config, split, fleurs_parquet_dir, offline)
print(f"loading local FLEURS parquet: {local_file}")
ds = load_dataset("parquet", data_files={split: str(local_file)}, split=split, streaming=True)
else:
data_file = fleurs_parquet_url(config, split)
print(f"loading FLEURS split-only parquet: {data_file}")
try:
ds = load_dataset("parquet", data_files={split: data_file}, split=split, streaming=True)
except FileNotFoundError as exc:
print(f"direct HTTPS parquet load failed: {exc}")
local_file = download_fleurs_parquet(config, split, offline=offline)
print(f"loading local FLEURS parquet: {local_file}")
ds = load_dataset("parquet", data_files={split: str(local_file)}, split=split, streaming=True)
else:
ds = load_dataset(dataset_name, config, split=split)
return disable_audio_decode(ds)
def prepare_fleurs_direction(args: argparse.Namespace, src: str, tgt: str) -> Path:
src_ds = maybe_shuffle(
load_fleurs(args.fleurs_dataset, args.fleurs_split, src, args.fleurs_parquet_dir, args.offline),
args.shuffle,
args.seed,
)
tgt_ds = load_fleurs(args.fleurs_dataset, args.fleurs_split, tgt, args.fleurs_parquet_dir, args.offline)
tgt_ds = remove_existing_columns(tgt_ds, ["audio"])
tgt_by_id = {str(row["id"]): row for row in tgt_ds}
out_dir = Path(args.data_dir) / "fleurs" / f"{src}-{tgt}"
items: list[ManifestItem] = []
row_iter = iter(src_ds)
chunk_size = max(1, args.audio_workers * 4)
with tqdm(desc=f"prepare FLEURS {src}->{tgt}") as pbar:
while not args.fleurs_limit or len(items) < args.fleurs_limit:
jobs: list[tuple[dict[str, Any], Path]] = []
metas: list[tuple[str, Path]] = []
for row in row_iter:
if args.fleurs_limit and len(items) + len(jobs) >= args.fleurs_limit:
break
sid = str(row["id"])
if sid not in tgt_by_id:
continue
wav_path = out_dir / "wav" / f"{safe_name(sid)}.wav"
jobs.append((row["audio"], wav_path))
metas.append((sid, wav_path))
if len(jobs) >= chunk_size:
break
if not jobs:
break
durations = map_audio_jobs(jobs, args.audio_workers, args.force_rewrite_audio)
for (sid, wav_path), duration in zip(metas, durations):
items.append(
ManifestItem(
audio_filepath=str(wav_path),
duration=duration,
answer=fleurs_text(tgt_by_id[sid], args.ast_pnc),
taskname="ast",
source_lang=src,
target_lang=tgt,
pnc=args.ast_pnc,
sample_id=sid,
)
)
pbar.update(1)
manifest = out_dir / f"manifest_ast_{src}_{tgt}.jsonl"
write_manifest(manifest, items)
write_metadata(
manifest,
{
"task": "ast",
"dataset": args.fleurs_dataset,
"source_config": FLEURS_CONFIG[src],
"target_config": FLEURS_CONFIG[tgt],
"split": args.fleurs_split,
"direction": f"{src}-{tgt}",
"item_limit": args.fleurs_limit,
"fleurs_parquet_dir": args.fleurs_parquet_dir,
"offline": args.offline,
"num_items": len(items),
"total_audio_seconds": sum(item.duration for item in items),
},
)
return manifest
def main() -> None:
args = parse_args()
manifests: list[Path] = []
if args.task == "librispeech":
manifests.append(prepare_librispeech(args))
if args.task in {"asr", "all"}:
manifests.extend(prepare_asr(args))
if args.task in {"ast", "all"}:
for direction in [x.strip() for x in args.ast_directions.split(",") if x.strip()]:
src, tgt = direction.split("-", 1)
if src not in FLEURS_CONFIG or tgt not in FLEURS_CONFIG:
raise ValueError(f"Unsupported FLEURS direction: {direction}")
manifests.append(prepare_fleurs_direction(args, src, tgt))
print("\nPrepared manifests:")
for manifest in manifests:
print(f" {manifest}")
if __name__ == "__main__":
main()