Files
PaddleOCR-VL/image_processing.py
2025-11-03 08:09:57 +00:00

564 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Image processor class for PaddleOCR-VL."""
import math
from typing import Dict, List, Optional, Union
import numpy as np
import torch
from transformers.image_processing_utils import BaseImageProcessor, BatchFeature
from torchvision.transforms import functional as TF
from transformers.image_transforms import (
convert_to_rgb,
resize,
to_channel_dimension_format,
)
from transformers.image_utils import (
OPENAI_CLIP_MEAN,
OPENAI_CLIP_STD,
ChannelDimension,
PILImageResampling,
get_image_size,
infer_channel_dimension_format,
is_scaled_image,
is_valid_image,
make_list_of_images,
to_numpy_array,
valid_images,
validate_preprocess_arguments,
)
from transformers.utils import TensorType, is_vision_available, logging
logger = logging.get_logger(__name__)
if is_vision_available():
from PIL import Image
ImageInput = Union[
"PIL.Image.Image",
np.ndarray,
"torch.Tensor",
List["PIL.Image.Image"],
List[np.ndarray],
List["torch.Tensor"],
] # noqa
VideoInput = Union[
List["PIL.Image.Image"],
"np.ndarray",
"torch.Tensor",
List["np.ndarray"],
List["torch.Tensor"],
List[List["PIL.Image.Image"]],
List[List["np.ndarrray"]],
List[List["torch.Tensor"]],
] # noqa
def make_batched_images(images) -> List[List[ImageInput]]:
"""
Accepts images in list or nested list format, and makes a list of images for preprocessing.
Args:
images (`Union[List[List[ImageInput]], List[ImageInput], ImageInput]`):
The input image.
Returns:
list: A list of images.
"""
if (
isinstance(images, (list, tuple))
and isinstance(images[0], (list, tuple))
and is_valid_image(images[0][0])
):
return [img for img_list in images for img in img_list]
elif isinstance(images, (list, tuple)) and is_valid_image(images[0]):
return images
elif is_valid_image(images):
return [images]
raise ValueError(f"Could not make batched images from {images}")
def adjust_size(size, patch_size):
num_patches = size // patch_size
if num_patches % 2 != 0: # 如果是奇数减1
num_patches -= 1
return num_patches * patch_size
def make_batched_videos(videos) -> List[VideoInput]:
if (
isinstance(videos, (list, tuple))
and isinstance(videos[0], (list, tuple))
and is_valid_image(videos[0][0])
):
return videos
elif isinstance(videos, (list, tuple)) and is_valid_image(videos[0]):
if isinstance(videos[0], Image.Image):
return [videos]
elif len(videos[0].shape) == 4:
return [list(video) for video in videos]
elif is_valid_image(videos) and len(videos.shape) == 4:
return [list(videos)]
raise ValueError(f"Could not make batched video from {videos}")
def smart_resize(
height: int,
width: int,
factor: int = 28,
min_pixels: int = 28 * 28 * 130,
max_pixels: int = 28 * 28 * 1280,
):
"""Rescales the image so that the following conditions are met:
1. Both dimensions (height and width) are divisible by 'factor'.
2. The total number of pixels is within the range ['min_pixels', 'max_pixels'].
3. The aspect ratio of the image is maintained as closely as possible.
"""
if height < factor:
width = round((width * factor) / height)
height = factor
if width < factor:
height = round((height * factor) / width)
width = factor
if max(height, width) / min(height, width) > 200:
raise ValueError(
f"absolute aspect ratio must be smaller than 200, got {max(height, width) / min(height, width)}"
)
h_bar = round(height / factor) * factor
w_bar = round(width / factor) * factor
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = math.floor(height / beta / factor) * factor
w_bar = math.floor(width / beta / factor) * factor
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = math.ceil(height * beta / factor) * factor
w_bar = math.ceil(width * beta / factor) * factor
return h_bar, w_bar
class SiglipImageProcessor(BaseImageProcessor):
r"""
Constructs a Siglip image processor that dynamically resizes images based on the original images.
Args:
do_resize (`bool`, *optional*, defaults to `True`):
Whether to resize the image's (height, width) dimensions.
resample (`PILImageResampling`, *optional*, defaults to `Resampling.BICUBIC`):
Resampling filter to use when resizing the image.
do_rescale (`bool`, *optional*, defaults to `True`):
Whether to rescale the image by the specified scale `rescale_factor`.
rescale_factor (`int` or `float`, *optional*, defaults to `1/255`):
Scale factor to use if rescaling the image.
do_normalize (`bool`, *optional*, defaults to `True`):
Whether to normalize the image.
image_mean (`float` or `List[float]`, *optional*, defaults to `[0.48145466, 0.4578275, 0.40821073]`):
Mean to use if normalizing the image. This is a float or list of floats for each channel in the image.
image_std (`float` or `List[float]`, *optional*, defaults to `[0.26862954, 0.26130258, 0.27577711]`):
Standard deviation to use if normalizing the image. This is a float or list of floats for each channel in the image.
do_convert_rgb (`bool`, *optional*, defaults to `True`):
Whether to convert the image to RGB.
min_pixels (`int`, *optional*, defaults to `28 * 28 * 130`):
The min pixels of the image to resize the image.
max_pixels (`int`, *optional*, defaults to `28 * 28 * 1670`):
The max pixels of the image to resize the image.
patch_size (`int`, *optional*, defaults to 14):
The spacial patch size of the vision encoder.
temporal_patch_size (`int`, *optional*, defaults to 2):
The temporal patch size of the vision encoder.
merge_size (`int`, *optional*, defaults to 2):
The merge size of the vision encoder to llm encoder.
"""
model_input_names = [
"pixel_values",
"image_grid_thw",
"pixel_values_videos",
"video_grid_thw",
]
def __init__(
self,
do_resize: bool = True,
resample: PILImageResampling = PILImageResampling.BICUBIC,
do_rescale: bool = True,
rescale_factor: Union[int, float] = 1 / 255,
do_normalize: bool = True,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
do_convert_rgb: bool = True,
min_pixels: int = 28 * 28 * 130,
max_pixels: int = 28 * 28 * 1280,
patch_size: int = 14,
temporal_patch_size: int = 1,
merge_size: int = 2,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.do_resize = do_resize
self.resample = resample
self.do_rescale = do_rescale
self.rescale_factor = rescale_factor
self.do_normalize = do_normalize
self.image_mean = image_mean if image_mean is not None else OPENAI_CLIP_MEAN
self.image_std = image_std if image_std is not None else OPENAI_CLIP_STD
self.min_pixels = min_pixels
self.max_pixels = max_pixels
self.patch_size = patch_size
self.temporal_patch_size = temporal_patch_size
self.merge_size = merge_size
self.size = {"min_pixels": min_pixels, "max_pixels": max_pixels} # not used
self.do_convert_rgb = do_convert_rgb
def mvit_rescale(self, image: Image.Image, merge_size: int = 2) -> Image.Image:
try:
w, h = image.size
except:
raise ValueError(str((type(image), image)))
patch_size = self.patch_size
if (w // patch_size) * (h // patch_size) > self.in_token_limit:
scale = math.sqrt(
self.in_token_limit / ((w // patch_size) * (h // patch_size))
)
new_w, new_h = int(w * scale), int(h * scale)
image = image.resize((new_w, new_h), Image.Resampling.BICUBIC)
if self.pad_input:
new_w, new_h = image.size
pad_size_h = merge_size * patch_size
pad_size_w = merge_size * patch_size
pad_h = (pad_size_h - new_h % pad_size_h) % pad_size_h
pad_w = (pad_size_w - new_w % pad_size_w) % pad_size_w
image = TF.pad(image, (0, 0, pad_w, pad_h))
else:
new_w, new_h = image.size
new_w = new_w - new_w % patch_size
new_h = new_h - new_h % patch_size
new_w = adjust_size(new_w, patch_size)
new_h = adjust_size(new_h, patch_size)
image = TF.center_crop(image, (new_h, new_w))
w, h = image.size
if w // patch_size >= 512 or h // patch_size >= 512:
new_h = min(patch_size * 510, h)
new_w = min(patch_size * 510, w)
image = TF.center_crop(image, (new_h, new_w))
# raise ValueError("Exceed pos emb")
return image
def _preprocess(
self,
images: Union[ImageInput, VideoInput],
do_resize: bool = None,
resample: PILImageResampling = None,
do_rescale: bool = None,
rescale_factor: float = None,
do_normalize: bool = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
do_convert_rgb: bool = None,
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
):
"""
Preprocess an image or batch of images. Copy of the `preprocess` method from `CLIPImageProcessor`.
Args:
images (`ImageInput`):
Image or batch of images to preprocess. Expects pixel values ranging from 0 to 255. If pixel values range from 0 to 1, set `do_rescale=False`.
vision_info (`List[Dict]`, *optional*):
Optional list of dictionaries containing additional information about vision inputs.
do_resize (`bool`, *optional*, defaults to `self.do_resize`):
Whether to resize the image.
resample (`PILImageResampling`, *optional*, defaults to `self.resample`):
Resampling filter to use if resizing the image. This can be one of the `PILImageResampling` enums.
do_rescale (`bool`, *optional*, defaults to `self.do_rescale`):
Whether to rescale the image.
rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`):
Scale factor to use if rescaling the image.
do_normalize (`bool`, *optional*, defaults to `self.do_normalize`):
Whether to normalize the image.
image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`):
Mean to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image.
image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`):
Standard deviation to use if normalizing the image. Can be a float or a list of floats corresponding to the number of channels in the image.
do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`):
Whether to convert the image to RGB.
data_format (`ChannelDimension`, *optional*, defaults to `ChannelDimension.FIRST`):
The channel dimension format for the output image. Can be one of:
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- Unset: Use the channel dimension format of the input image.
input_data_format (`ChannelDimension` or `str`, *optional*):
The channel dimension format for the input image. Can be one of:
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- `"none"` or `ChannelDimension.NONE`: image in (height, width) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format.
"""
images = make_list_of_images(images)
if do_convert_rgb:
images = [convert_to_rgb(image) for image in images]
# All transformations expect numpy arrays.
images = [to_numpy_array(image) for image in images]
if is_scaled_image(images[0]) and do_rescale:
logger.warning_once(
"It looks like you are trying to rescale already rescaled images. If the input"
" images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again."
)
if input_data_format is None:
# We assume that all images have the same channel dimension format.
input_data_format = infer_channel_dimension_format(images[0])
height, width = get_image_size(images[0], channel_dim=input_data_format)
resized_height, resized_width = height, width
processed_images = []
for image in images:
if do_resize:
resized_height, resized_width = smart_resize(
height,
width,
factor=self.patch_size * self.merge_size,
min_pixels=self.min_pixels,
max_pixels=self.max_pixels,
)
image = resize(
image,
size=(resized_height, resized_width),
resample=resample,
input_data_format=input_data_format,
)
if do_rescale:
image = self.rescale(
image, scale=rescale_factor, input_data_format=input_data_format
)
if do_normalize:
image = self.normalize(
image=image,
mean=image_mean,
std=image_std,
input_data_format=input_data_format,
)
image = to_channel_dimension_format(
image, data_format, input_channel_dim=input_data_format
)
processed_images.append(image)
patches = np.array(processed_images)
if data_format == ChannelDimension.LAST:
patches = patches.transpose(0, 3, 1, 2)
if patches.shape[0] == 1:
patches = np.tile(patches, (self.temporal_patch_size, 1, 1, 1))
init_patches = patches
channel = patches.shape[1]
grid_t = patches.shape[0] // self.temporal_patch_size
grid_h, grid_w = (
resized_height // self.patch_size,
resized_width // self.patch_size,
)
patches = patches.reshape(
grid_t,
self.temporal_patch_size,
channel,
grid_h,
self.patch_size,
grid_w,
self.patch_size,
)
patches = patches.transpose(0, 3, 5, 2, 1, 4, 6)
assert self.temporal_patch_size == 1
flatten_patches = patches.reshape(
grid_t * grid_h * grid_w, channel, self.patch_size, self.patch_size
)
return flatten_patches, (grid_t, grid_h, grid_w)
def preprocess(
self,
images: ImageInput,
videos: VideoInput = None,
do_resize: bool = None,
size: Dict[str, int] = None,
resample: PILImageResampling = None,
do_rescale: bool = None,
rescale_factor: float = None,
do_normalize: bool = None,
image_mean: Optional[Union[float, List[float]]] = None,
image_std: Optional[Union[float, List[float]]] = None,
do_convert_rgb: bool = None,
return_tensors: Optional[Union[str, TensorType]] = None,
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST,
input_data_format: Optional[Union[str, ChannelDimension]] = None,
):
"""
Args:
images (`ImageInput`):
Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If
passing in images with pixel values between 0 and 1, set `do_rescale=False`.
videos (`VideoInput`):
Video to preprocess. Expects a single or batch of videos with pixel values ranging from 0 to 255. If
passing in videos with pixel values between 0 and 1, set `do_rescale=False`.
do_resize (`bool`, *optional*, defaults to `self.do_resize`):
Whether to resize the image.
size (`Dict[str, int]`, *optional*, defaults to `self.size`):
Size of the image after resizing. Shortest edge of the image is resized to size["shortest_edge"], with
the longest edge resized to keep the input aspect ratio.
resample (`int`, *optional*, defaults to `self.resample`):
Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`. Only
has an effect if `do_resize` is set to `True`.
do_rescale (`bool`, *optional*, defaults to `self.do_rescale`):
Whether to rescale the image.
rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`):
Rescale factor to rescale the image by if `do_rescale` is set to `True`.
do_normalize (`bool`, *optional*, defaults to `self.do_normalize`):
Whether to normalize the image.
image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`):
Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`.
image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`):
Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to
`True`.
do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`):
Whether to convert the image to RGB.
return_tensors (`str` or `TensorType`, *optional*):
The type of tensors to return. Can be one of:
- Unset: Return a list of `np.ndarray`.
- `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`.
- `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`.
- `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`.
- `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`.
data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`):
The channel dimension format for the output image. Can be one of:
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- Unset: Use the channel dimension format of the input image.
input_data_format (`ChannelDimension` or `str`, *optional*):
The channel dimension format for the input image. If unset, the channel dimension format is inferred
from the input image. Can be one of:
- `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format.
- `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format.
- `"none"` or `ChannelDimension.NONE`: image in (height, width) format.
"""
do_resize = do_resize if do_resize is not None else self.do_resize
size = size if size is not None else self.size
resample = resample if resample is not None else self.resample
do_rescale = do_rescale if do_rescale is not None else self.do_rescale
rescale_factor = (
rescale_factor if rescale_factor is not None else self.rescale_factor
)
do_normalize = do_normalize if do_normalize is not None else self.do_normalize
image_mean = image_mean if image_mean is not None else self.image_mean
image_std = image_std if image_std is not None else self.image_std
do_convert_rgb = (
do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb
)
if images is not None:
images = make_batched_images(images)
if videos is not None:
videos = make_batched_videos(videos)
if images is not None and not valid_images(images):
raise ValueError(
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, "
"torch.Tensor, tf.Tensor or jax.ndarray."
)
validate_preprocess_arguments(
rescale_factor=rescale_factor,
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
do_resize=do_resize,
size=size,
resample=resample,
)
if images is not None:
pixel_values, vision_grid_thws = [], []
for image in images:
patches, image_grid_thw = self._preprocess(
image,
do_resize=do_resize,
resample=resample,
do_rescale=do_rescale,
rescale_factor=rescale_factor,
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
data_format=data_format,
do_convert_rgb=do_convert_rgb,
input_data_format=input_data_format,
)
pixel_values.extend(patches)
vision_grid_thws.append(image_grid_thw)
pixel_values = np.array(pixel_values)
vision_grid_thws = np.array(vision_grid_thws)
data = {"pixel_values": pixel_values, "image_grid_thw": vision_grid_thws}
if videos is not None:
pixel_values, vision_grid_thws = [], []
for images in videos:
patches, video_grid_thw = self._preprocess(
images,
do_resize=do_resize,
resample=resample,
do_rescale=do_rescale,
rescale_factor=rescale_factor,
do_normalize=do_normalize,
image_mean=image_mean,
image_std=image_std,
data_format=data_format,
do_convert_rgb=do_convert_rgb,
input_data_format=input_data_format,
)
pixel_values.extend(patches)
vision_grid_thws.append(video_grid_thw)
pixel_values = np.array(pixel_values)
vision_grid_thws = np.array(vision_grid_thws)
data = {
"pixel_values_videos": pixel_values,
"video_grid_thw": vision_grid_thws,
}
return BatchFeature(data=data, tensor_type=return_tensors)