ZZhiyu Chengadd files
abf93d00创建于 2025年10月23日历史提交
from typing import Optional, Union, List

import numpy as np

from transformers.feature_extraction_utils import BatchFeature
from transformers.image_utils import ImageInput
from transformers.processing_utils import ImagesKwargs, MultiModalData, ProcessingKwargs, ProcessorMixin, Unpack, VideosKwargs
from transformers.tokenization_utils_base import PreTokenizedInput, TextInput
from transformers.video_utils import VideoInput


class NemotronNanoVLV2ImagesKwargs(ImagesKwargs):
    min_pixels: Optional[int]
    max_pixels: Optional[int]
    patch_size: Optional[int]
    temporal_patch_size: Optional[int]
    merge_size: Optional[int]


class NemotronNanoVLV2ProcessorKwargs(ProcessingKwargs, total=False):
    images_kwargs: NemotronNanoVLV2ImagesKwargs
    videos_kwargs: VideosKwargs
    _defaults = {
        "text_kwargs": {
            "padding": False,
        },
    }


class NemotronNanoVLV2Processor(ProcessorMixin):
    r"""
    Constructs a Nemotron Nano VL V2 processor which wraps an image processor and a tokenizer into a single processor.
    [`NemotronNanoVLV2Processor`] offers all the functionalities of the image processor and tokenizer. See the
    [`~NemotronNanoVLV2Processor.__call__`] and [`~NemotronNanoVLV2Processor.decode`] for more information.
    Args:
        image_processor ([`AutoImageProcessor`], *optional*):
            The image processor is a required input.
        tokenizer ([`AutoTokenizer`], *optional*):
            The tokenizer is a required input.
        chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages
            in a chat into a tokenizable string.
    """

    attributes = ["image_processor", "tokenizer"]

    image_processor_class = "AutoImageProcessor"
    video_processor_class = "AutoVideoProcessor"
    tokenizer_class = ("AutoTokenizer")

    def __init__(self, image_processor=None, tokenizer=None, chat_template=None, **kwargs):
        self.image_token = "<image>" if not hasattr(tokenizer, "image_token") else tokenizer.image_token
        self.video_token = "<video>" if not hasattr(tokenizer, "video_token") else tokenizer.video_token
        self.image_start_token = "<img>" if not hasattr(tokenizer, "image_start_token") else tokenizer.image_start_token
        self.image_end_token = "</img>" if not hasattr(tokenizer, "image_end_token") else tokenizer.image_end_token
        self.image_token_id = (
            tokenizer.image_token_id
            if getattr(tokenizer, "image_token_id", None)
            else tokenizer.convert_tokens_to_ids(self.image_token)
        )
        self.video_token_id = (
            tokenizer.video_token_id
            if getattr(tokenizer, "video_token_id", None)
            else tokenizer.convert_tokens_to_ids(self.video_token)
        )
        super().__init__(image_processor, tokenizer, chat_template=chat_template)

    def __call__(
        self,
        images: ImageInput = None,
        text: Union[TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput]] = None,
        videos: VideoInput = None,
        **kwargs: Unpack[NemotronNanoVLV2ProcessorKwargs],
    ) -> BatchFeature:
        """
        Main method to prepare multimodal inputs (text, images, videos) for the model. This method processes text by 
        replacing image/video tokens with appropriate placeholder sequences, processes images and videos through the 
        image processor, and tokenizes the final text.

        The method performs the following key operations:
        1. Processes images using the image processor to get pixel values and patch counts
        2. Processes videos using the image processor with max_num_tiles=1 to get video pixel values  
        3. Replaces `<image>` tokens in text with `<img>` + image tokens + `</img>` sequences
        4. Replaces `<video>` tokens in text with frame-by-frame descriptions including timestamps (if metadata provided)
        5. Tokenizes the processed text and combines all outputs

        Args:
            images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `List[PIL.Image.Image]`, `List[np.ndarray]`, `List[torch.Tensor]`, *optional*):
                The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
                tensor. Both channels-first and channels-last formats are supported.
            text (`str`, `List[str]`, *optional*):
                The sequence or batch of sequences to be encoded. Each sequence should be a string. The text can contain
                special tokens `<image>` and `<video>` that will be replaced with appropriate token sequences.
            videos (`np.ndarray`, `torch.Tensor`, `List[np.ndarray]`, `List[torch.Tensor]`, *optional*):
                The video or batch of videos to be prepared. Each video should be a 4D NumPy array or PyTorch
                tensor with shape (num_frames, channels, height, width). Both channels-first and channels-last formats 
                are supported. Note: Currently only supports batch size of 1 for videos.
            images_kwargs (`Dict`, *optional*):
                Additional keyword arguments for image processing, including:
                - `min_pixels` (`int`, *optional*): Minimum number of pixels for image processing
                - `max_pixels` (`int`, *optional*): Maximum number of pixels for image processing  
                - `patch_size` (`int`, *optional*): Size of patches for image processing
                - `temporal_patch_size` (`int`, *optional*): Size of temporal patches
                - `merge_size` (`int`, *optional*): Size for merging patches
            videos_kwargs (`Dict`, *optional*):
                Additional keyword arguments for video processing, including:
                - `video_metadata` (`VideoMetadata`, *optional*): Metadata containing fps information for timestamp calculation
            text_kwargs (`Dict`, *optional*):
                Additional keyword arguments for text tokenization, including:
                - `return_tensors` (`str` or [`~utils.TensorType`], *optional*): Framework for returned tensors ('tf', 'pt', 'np', 'jax')
                - `padding` (`bool`, *optional*): Whether to pad sequences (defaults to False)

        Returns:
            [`BatchFeature`]: A [`BatchFeature`] with the following fields:

            - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
            - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
              `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
              `None`).
            - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
            - **num_patches** -- Number of patches per image. Returned when `images` is not `None`.
            - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`.

        Raises:
            AssertionError: If videos are provided with batch size > 1 (not currently supported).

        Note:
            - Image tokens `<image>` in text are replaced with `<img>` + repeated image tokens + `</img>`
            - Video tokens `<video>` in text are replaced with frame-by-frame descriptions
            - When video metadata with fps is provided, frame descriptions include timestamps
            - Videos are processed with max_num_tiles=1 regardless of the images setting
        """
        output_kwargs = self._merge_kwargs(
            NemotronNanoVLV2ProcessorKwargs,
            tokenizer_init_kwargs=self.tokenizer.init_kwargs,
            **kwargs,
        )
        image_inputs = videos_inputs = {}
        if images is not None:
            image_inputs = self.image_processor(images=images, **output_kwargs["images_kwargs"])
            image_num_patches = image_inputs["num_patches"]

        if videos is not None:
            orig_tiles = self.image_processor.max_num_tiles
            self.image_processor.max_num_tiles = 1
            videos_inputs = self.image_processor(images=videos, **output_kwargs["images_kwargs"])
            self.image_processor.max_num_tiles = orig_tiles
            video_num_patches = [sum(videos_inputs["num_patches"])]
            videos_inputs["pixel_values_videos"] = videos_inputs["pixel_values"]
            del videos_inputs["pixel_values"]

        if not isinstance(text, list):
            text = [text]

        text = text.copy()  # below lines change text in-place
        if images is not None:
            index = 0
            for i in range(len(text)):
                while self.image_token in text[i]:
                    text[i] = text[i].replace(self.image_token, self.image_start_token + "<|placeholder|>" * image_num_patches[index] * self.image_processor.num_image_token + self.image_end_token, 1)
                    index += 1
                text[i] = text[i].replace("<|placeholder|>", self.image_token)
        if videos is not None:
            assert len(text) == 1, "Video is not supported for batch size > 1"
            video_metadata = output_kwargs.get("videos_kwargs", {}).get("video_metadata", None)
            i = 0
            index = 0
            if self.video_token in text[i]:
                each_frame = self.image_start_token + "<|placeholder|>" * self.image_processor.num_image_token + self.image_end_token
                video_prompt = "This is a video:\n"
                for j in range(video_num_patches[index]):
                    if video_metadata is not None and video_metadata.fps is not None:
                        timestamp = j / video_metadata.fps
                        video_prompt += f"Frame {j+1} sampled at {timestamp:.2f} seconds: {each_frame}\n"
                    else:
                        # Fallback to original format without timestamps
                        video_prompt += f"Frame {j+1}: {each_frame}\n"
                
                text[i] = text[i].replace(self.video_token, video_prompt, 1)
            text[i] = text[i].replace("<|placeholder|>", self.video_token)

        return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)
        text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"])
        return BatchFeature(data={**text_inputs, **image_inputs, **videos_inputs}, tensor_type=return_tensors)

    def _get_num_multimodal_tokens(self, image_sizes=None, video_sizes=None, **kwargs):
        """
        Computes the number of placeholder tokens needed for multimodal inputs with the given sizes.
        Args:
            image_sizes (`list[list[int]]`, *optional*):
                The input sizes formatted as (height, width) per each image.
            video_sizes (`list[list[int]]`, *optional*):
                The input sizes formatted as (num_frames, height, width) per each video.
        Returns:
            `MultiModalData`: A `MultiModalData` object holding number of tokens per each of the provided
            input modalities, along with other useful data.
        """

        vision_data = {}
        if image_sizes is not None:
            images_kwargs = NemotronNanoVLV2ProcessorKwargs._defaults.get("images_kwargs", {})
            images_kwargs.update(kwargs)
            merge_size = images_kwargs.get("merge_size", None) or self.image_processor.merge_size

            num_image_patches = [
                self.image_processor.get_number_of_image_patches(*image_size, images_kwargs)
                for image_size in image_sizes
            ]
            num_image_tokens = [(num_patches // merge_size**2) for num_patches in num_image_patches]
            vision_data.update({"num_image_tokens": num_image_tokens, "num_image_patches": num_image_patches})
        return MultiModalData(**vision_data)

    def batch_decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to the tokenizer's [`~PreTrainedTokenizer.batch_decode`]. Please
        refer to the docstring of this method for more information.
        """
        return self.tokenizer.batch_decode(*args, **kwargs)

    def decode(self, *args, **kwargs):
        """
        This method forwards all its arguments to the tokenizer's [`~PreTrainedTokenizer.decode`]. Please refer to
        the docstring of this method for more information.
        """
        return self.tokenizer.decode(*args, **kwargs)

    def post_process_image_text_to_text(
        self, generated_outputs, skip_special_tokens=True, clean_up_tokenization_spaces=False, **kwargs
    ):
        """
        Post-process the output of the model to decode the text.

        Args:
            generated_outputs (`torch.Tensor` or `np.ndarray`):
                The output of the model `generate` function. The output is expected to be a tensor of shape `(batch_size, sequence_length)`
                or `(sequence_length,)`.
            skip_special_tokens (`bool`, *optional*, defaults to `True`):
                Whether or not to remove special tokens in the output. Argument passed to the tokenizer's `batch_decode` method.
            clean_up_tokenization_spaces (`bool`, *optional*, defaults to `False`):
                Whether or not to clean up the tokenization spaces. Argument passed to the tokenizer's `batch_decode` method.
            **kwargs:
                Additional arguments to be passed to the tokenizer's `batch_decode method`.

        Returns:
            `list[str]`: The decoded text.
        """
        return self.tokenizer.batch_decode(
            generated_outputs,
            skip_special_tokens=skip_special_tokens,
            clean_up_tokenization_spaces=clean_up_tokenization_spaces,
            **kwargs,
        )

    @property
    def model_input_names(self):
        tokenizer_input_names = self.tokenizer.model_input_names
        image_processor_input_names = self.image_processor.model_input_names
        names_from_processor = list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
        return names_from_processor + ["second_per_grid_ts"]


__all__ = ["NemotronNanoVLV2Processor"]