import torch
from tensordict import TensorDict
from transfer_queue.dataloader.streaming_dataset import chunk_batch_fn
from transfer_queue.metadata import BatchMeta
def test_chunk_batch_fn_dense():
td = TensorDict(
{"a": torch.arange(6).reshape(3, 2), "b": torch.arange(3).reshape(3, 1)},
batch_size=(3,),
)
meta = BatchMeta(global_indexes=[0, 1, 2], partition_ids=["p"] * 3)
chunks = chunk_batch_fn(td, meta, micro_batch_size=2)
assert len(chunks) == 2
assert chunks[0][0].batch_size == (2,)
assert chunks[1][0].batch_size == (1,)
assert torch.equal(chunks[0][0]["a"], torch.tensor([[0, 1], [2, 3]]))
assert torch.equal(chunks[0][0]["b"], torch.tensor([[0], [1]]))
assert torch.equal(chunks[1][0]["a"], torch.tensor([[4, 5]]))
assert torch.equal(chunks[1][0]["b"], torch.tensor([[2]]))
def test_chunk_batch_fn_mixed_dense_and_jagged():
values = [torch.tensor([1, 2]), torch.tensor([3, 4, 5]), torch.tensor([6])]
nested = torch.nested.as_nested_tensor(values, layout=torch.jagged)
td = TensorDict({"a": nested, "b": torch.arange(3)}, batch_size=(3,))
meta = BatchMeta(global_indexes=[0, 1, 2], partition_ids=["p"] * 3)
chunks = chunk_batch_fn(td, meta, micro_batch_size=2)
assert len(chunks) == 2
assert chunks[0][0].batch_size == (2,)
assert chunks[1][0].batch_size == (1,)
assert chunks[0][0]["a"].is_nested
assert torch.equal(chunks[0][0]["a"][0], torch.tensor([1, 2]))
assert torch.equal(chunks[0][0]["a"][1], torch.tensor([3, 4, 5]))
assert torch.equal(chunks[0][0]["b"], torch.tensor([0, 1]))
assert chunks[1][0]["a"].is_nested
assert torch.equal(chunks[1][0]["a"][0], torch.tensor([6]))
assert torch.equal(chunks[1][0]["b"], torch.tensor([2]))
def test_chunk_batch_fn_strided():
values = [torch.tensor([[1, 2], [3, 4]]), torch.tensor([[5, 6], [7, 8]])]
nested = torch.nested.as_nested_tensor(values, layout=torch.strided)
td = TensorDict({"a": nested}, batch_size=(2,))
meta = BatchMeta(global_indexes=[0, 1], partition_ids=["p"] * 2)
chunks = chunk_batch_fn(td, meta, micro_batch_size=1)
assert len(chunks) == 2
assert chunks[0][0].batch_size == (1,)
assert chunks[0][0]["a"].is_nested
assert torch.equal(chunks[0][0]["a"][0], torch.tensor([[1, 2], [3, 4]]))
assert chunks[1][0].batch_size == (1,)
assert chunks[1][0]["a"].is_nested
assert torch.equal(chunks[1][0]["a"][0], torch.tensor([[5, 6], [7, 8]]))