# 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 # This file was automatically generated from src/transformers/models/qwen3_vl/modular_qwen3_vl.py. # Do NOT edit this file manually as any edits will be overwritten by the generation of # the file from the modular. If any change should be done, please apply the change to the # modular_qwen3_vl.py file directly. One of our CI enforces this. # 🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨 # coding=utf-8 # Copyright 2025 The Qwen Team and The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from dataclasses import dataclass from typing import Any, Callable, Optional, Union import torch import torch.nn as nn import torch.nn.functional as F from transformers.activations import ACT2FN from transformers.cache_utils import Cache, DynamicCache from transformers.generation import GenerationMixin from transformers.integrations import use_kernel_forward_from_hub from transformers.masking_utils import create_causal_mask from transformers.modeling_flash_attention_utils import FlashAttentionKwargs from transformers.modeling_layers import GradientCheckpointingLayer from transformers.modeling_outputs import BaseModelOutputWithPast, ModelOutput from transformers.modeling_rope_utils import ROPE_INIT_FUNCTIONS, dynamic_rope_update from transformers.modeling_utils import ALL_ATTENTION_FUNCTIONS, PreTrainedModel from transformers.processing_utils import Unpack from transformers.utils import TransformersKwargs, auto_docstring, is_torchdynamo_compiling from transformers.utils.deprecation import deprecate_kwarg from transformers.utils.generic import check_model_inputs from .configuration_qwen3_vl import LimeQwen3VLConfig, LimeQwen3VLTextConfig, Qwen3VLVisionConfig from torch.nn.utils.rnn import pad_sequence class Qwen3VLVisionMLP(nn.Module): def __init__(self, config): super().__init__() self.hidden_size = config.hidden_size self.intermediate_size = config.intermediate_size self.linear_fc1 = nn.Linear(self.hidden_size, self.intermediate_size, bias=True) self.linear_fc2 = nn.Linear(self.intermediate_size, self.hidden_size, bias=True) self.act_fn = ACT2FN[config.hidden_act] def forward(self, hidden_state): return self.linear_fc2(self.act_fn(self.linear_fc1(hidden_state))) class Qwen3VLVisionPatchEmbed(nn.Module): def __init__(self, config) -> None: super().__init__() self.patch_size = config.patch_size self.temporal_patch_size = config.temporal_patch_size self.in_channels = config.in_channels self.embed_dim = config.hidden_size kernel_size = [self.temporal_patch_size, self.patch_size, self.patch_size] self.proj = nn.Conv3d(self.in_channels, self.embed_dim, kernel_size=kernel_size, stride=kernel_size, bias=True) def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: target_dtype = self.proj.weight.dtype hidden_states = hidden_states.view( -1, self.in_channels, self.temporal_patch_size, self.patch_size, self.patch_size ) hidden_states = self.proj(hidden_states.to(dtype=target_dtype)).view(-1, self.embed_dim) return hidden_states class Qwen3VLVisionRotaryEmbedding(nn.Module): inv_freq: torch.Tensor # fix linting for `register_buffer` def __init__(self, dim: int, theta: float = 10000.0) -> None: super().__init__() inv_freq = 1.0 / (theta ** (torch.arange(0, dim, 2, dtype=torch.float) / dim)) self.register_buffer("inv_freq", inv_freq, persistent=False) def forward(self, seqlen: int) -> torch.Tensor: seq = torch.arange(seqlen, device=self.inv_freq.device, dtype=self.inv_freq.dtype) freqs = torch.outer(seq, self.inv_freq) return freqs class Qwen3VLVisionPatchMerger(nn.Module): def __init__(self, config: Qwen3VLVisionConfig, use_postshuffle_norm=False) -> None: super().__init__() self.hidden_size = config.hidden_size * (config.spatial_merge_size**2) self.use_postshuffle_norm = use_postshuffle_norm self.norm = nn.LayerNorm(self.hidden_size if use_postshuffle_norm else config.hidden_size, eps=1e-6) self.linear_fc1 = nn.Linear(self.hidden_size, self.hidden_size) self.act_fn = nn.GELU() self.linear_fc2 = nn.Linear(self.hidden_size, config.out_hidden_size) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.norm(x.view(-1, self.hidden_size) if self.use_postshuffle_norm else x).view(-1, self.hidden_size) x = self.linear_fc2(self.act_fn(self.linear_fc1(x))) return x def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) def apply_rotary_pos_emb_vision( q: torch.Tensor, k: torch.Tensor, cos: torch.Tensor, sin: torch.Tensor ) -> tuple[torch.Tensor, torch.Tensor]: orig_q_dtype = q.dtype orig_k_dtype = k.dtype q, k = q.float(), k.float() cos, sin = cos.unsqueeze(-2).float(), sin.unsqueeze(-2).float() q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) q_embed = q_embed.to(orig_q_dtype) k_embed = k_embed.to(orig_k_dtype) return q_embed, k_embed def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: """ This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) """ batch, num_key_value_heads, slen, head_dim = hidden_states.shape if n_rep == 1: return hidden_states hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) def eager_attention_forward( module: nn.Module, query: torch.Tensor, key: torch.Tensor, value: torch.Tensor, attention_mask: Optional[torch.Tensor], scaling: float, dropout: float = 0.0, **kwargs: Unpack[TransformersKwargs], ): key_states = repeat_kv(key, module.num_key_value_groups) value_states = repeat_kv(value, module.num_key_value_groups) attn_weights = torch.matmul(query, key_states.transpose(2, 3)) * scaling if attention_mask is not None: causal_mask = attention_mask[:, :, :, : key_states.shape[-2]] attn_weights = attn_weights + causal_mask attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query.dtype) attn_weights = nn.functional.dropout(attn_weights, p=dropout, training=module.training) attn_output = torch.matmul(attn_weights, value_states) attn_output = attn_output.transpose(1, 2).contiguous() return attn_output, attn_weights class Qwen3VLVisionAttention(nn.Module): def __init__(self, config: Qwen3VLVisionConfig) -> None: super().__init__() self.dim = config.hidden_size self.num_heads = config.num_heads self.head_dim = self.dim // self.num_heads self.num_key_value_groups = 1 # needed for eager attention self.qkv = nn.Linear(self.dim, self.dim * 3, bias=True) self.proj = nn.Linear(self.dim, self.dim) self.scaling = self.head_dim**-0.5 self.config = config self.attention_dropout = 0.0 self.is_causal = False def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: Optional[torch.Tensor] = None, position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, **kwargs, ) -> torch.Tensor: seq_length = hidden_states.shape[0] query_states, key_states, value_states = ( self.qkv(hidden_states).reshape(seq_length, 3, self.num_heads, -1).permute(1, 0, 2, 3).unbind(0) ) cos, sin = position_embeddings query_states, key_states = apply_rotary_pos_emb_vision(query_states, key_states, cos, sin) query_states = query_states.transpose(0, 1).unsqueeze(0) key_states = key_states.transpose(0, 1).unsqueeze(0) value_states = value_states.transpose(0, 1).unsqueeze(0) attention_interface: Callable = eager_attention_forward if self.config._attn_implementation != "eager": attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] if self.config._attn_implementation == "flash_attention_2": # Flash Attention 2: Use cu_seqlens for variable length attention max_seqlen = (cu_seqlens[1:] - cu_seqlens[:-1]).max() attn_output, _ = attention_interface( self, query_states, key_states, value_states, attention_mask=None, scaling=self.scaling, dropout=0.0 if not self.training else self.attention_dropout, cu_seq_lens_q=cu_seqlens, cu_seq_lens_k=cu_seqlens, max_length_q=max_seqlen, max_length_k=max_seqlen, is_causal=False, **kwargs, ) else: # Other implementations: Process each chunk separately lengths = cu_seqlens[1:] - cu_seqlens[:-1] splits = [ torch.split(tensor, lengths.tolist(), dim=2) for tensor in (query_states, key_states, value_states) ] attn_outputs = [ attention_interface( self, q, k, v, attention_mask=None, scaling=self.scaling, dropout=0.0 if not self.training else self.attention_dropout, is_causal=False, **kwargs, )[0] for q, k, v in zip(*splits) ] attn_output = torch.cat(attn_outputs, dim=1) attn_output = attn_output.reshape(seq_length, -1).contiguous() attn_output = self.proj(attn_output) return attn_output class Qwen3VLVisionBlock(GradientCheckpointingLayer): def __init__(self, config, attn_implementation: str = "sdpa") -> None: super().__init__() self.norm1 = nn.LayerNorm(config.hidden_size, eps=1e-6) self.norm2 = nn.LayerNorm(config.hidden_size, eps=1e-6) self.attn = Qwen3VLVisionAttention(config=config) self.mlp = Qwen3VLVisionMLP(config=config) def forward( self, hidden_states: torch.Tensor, cu_seqlens: torch.Tensor, rotary_pos_emb: Optional[torch.Tensor] = None, position_embeddings: Optional[tuple[torch.Tensor, torch.Tensor]] = None, **kwargs, ) -> torch.Tensor: hidden_states = hidden_states + self.attn( self.norm1(hidden_states), cu_seqlens=cu_seqlens, rotary_pos_emb=rotary_pos_emb, position_embeddings=position_embeddings, **kwargs, ) hidden_states = hidden_states + self.mlp(self.norm2(hidden_states)) return hidden_states class Qwen3VLTextRotaryEmbedding(nn.Module): inv_freq: torch.Tensor # fix linting for `register_buffer` def __init__(self, config: LimeQwen3VLTextConfig, device=None): super().__init__() if hasattr(config, "rope_scaling") and config.rope_scaling is not None: self.rope_type = config.rope_scaling.get("rope_type", "default") else: self.rope_type = "default" self.max_seq_len_cached = config.max_position_embeddings self.original_max_seq_len = config.max_position_embeddings self.config = config self.rope_init_fn = ROPE_INIT_FUNCTIONS[self.rope_type] inv_freq, self.attention_scaling = self.rope_init_fn(self.config, device) self.register_buffer("inv_freq", inv_freq, persistent=False) self.original_inv_freq = self.inv_freq self.mrope_section = config.rope_scaling.get("mrope_section", [24, 20, 20]) def apply_interleaved_mrope(self, freqs, mrope_section): """Apply interleaved MRoPE to 3D rotary embeddings. Reorganizes frequency layout from chunked [TTT...HHH...WWW] to interleaved [THTHWHTHW...TT], preserving frequency continuity. args: x: (3, bs, seq_len, head_dim // 2) mrope_section: (3,) returns: x_t: (bs, seq_len, head_dim // 2) """ freqs_t = freqs[0] # just overwrite the first dimension T for dim, offset in enumerate((1, 2), start=1): # H, W length = mrope_section[dim] * 3 idx = slice(offset, length, 3) freqs_t[..., idx] = freqs[dim, ..., idx] return freqs_t @torch.no_grad() @dynamic_rope_update # power user: used with advanced RoPE types (e.g. dynamic rope) def forward(self, x, position_ids): # In contrast to other models, Qwen3VL has different position ids for the grids # So we expand the inv_freq to shape (3, ...) if position_ids.ndim == 2: position_ids = position_ids[None, ...].expand(3, position_ids.shape[0], -1) inv_freq_expanded = self.inv_freq[None, None, :, None].float().expand(3, position_ids.shape[1], -1, 1) position_ids_expanded = position_ids[:, :, None, :].float() # shape (3, bs, 1, positions) device_type = x.device.type if isinstance(x.device.type, str) and x.device.type != "mps" else "cpu" with torch.autocast(device_type=device_type, enabled=False): # Force float32 freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(2, 3) freqs = self.apply_interleaved_mrope(freqs, self.mrope_section) emb = torch.cat((freqs, freqs), dim=-1) cos = emb.cos() * self.attention_scaling sin = emb.sin() * self.attention_scaling return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype) @use_kernel_forward_from_hub("RMSNorm") class Qwen3VLTextRMSNorm(nn.Module): def __init__(self, hidden_size, eps: float = 1e-6) -> None: """ Qwen3VLTextRMSNorm is equivalent to T5LayerNorm """ super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon = eps def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) return self.weight * hidden_states.to(input_dtype) def extra_repr(self): return f"{tuple(self.weight.shape)}, eps={self.variance_epsilon}" class LimeSimpleRMSNorm(nn.Module): """ Affine-less RMSNorm. No learnable weight, no bias. Just pure normalization. """ def __init__(self, hidden_size = None, eps: float = 1e-6) -> None: super().__init__() # hidden_size 参数在这里没用,但为了保持接口一致性可以留着 self.variance_epsilon = eps def forward(self, hidden_states: torch.Tensor) -> torch.Tensor: input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) return hidden_states.to(input_dtype) def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1): """Applies Rotary Position Embedding to the query and key tensors. Args: q (`torch.Tensor`): The query tensor. k (`torch.Tensor`): The key tensor. cos (`torch.Tensor`): The cosine part of the rotary embedding. sin (`torch.Tensor`): The sine part of the rotary embedding. position_ids (`torch.Tensor`, *optional*): Deprecated and unused. unsqueeze_dim (`int`, *optional*, defaults to 1): The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2. Returns: `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding. """ cos = cos.unsqueeze(unsqueeze_dim) sin = sin.unsqueeze(unsqueeze_dim) q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) return q_embed, k_embed class Qwen3VLTextAttention(nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: LimeQwen3VLTextConfig, layer_idx: int): super().__init__() self.config = config self.layer_idx = layer_idx self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads self.scaling = self.head_dim**-0.5 self.attention_dropout = config.attention_dropout self.is_causal = True self.q_proj = nn.Linear( config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias ) self.k_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.v_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.o_proj = nn.Linear( config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias ) self.q_norm = Qwen3VLTextRMSNorm(self.head_dim, eps=config.rms_norm_eps) # unlike olmo, only on the head dim! self.k_norm = Qwen3VLTextRMSNorm( self.head_dim, eps=config.rms_norm_eps ) # thus post q_norm does not need reshape @deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58") def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor], past_key_values: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, **kwargs: Unpack[FlashAttentionKwargs], ) -> tuple[torch.Tensor, Optional[torch.Tensor]]: input_shape = hidden_states.shape[:-1] hidden_shape = (*input_shape, -1, self.head_dim) query_states = self.q_norm(self.q_proj(hidden_states).view(hidden_shape)).transpose(1, 2) key_states = self.k_norm(self.k_proj(hidden_states).view(hidden_shape)).transpose(1, 2) value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) cos, sin = position_embeddings query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_values is not None: # sin and cos are specific to RoPE models; cache_position needed for the static cache cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} key_states, value_states = past_key_values.update(key_states, value_states, self.layer_idx, cache_kwargs) attention_interface: Callable = eager_attention_forward if self.config._attn_implementation != "eager": attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.attention_dropout, scaling=self.scaling, **kwargs, ) attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = self.o_proj(attn_output) return attn_output, attn_weights class Qwen3VLTextMLP(nn.Module): def __init__(self, config): super().__init__() self.config = config self.hidden_size = config.hidden_size self.intermediate_size = config.intermediate_size self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) self.act_fn = ACT2FN[config.hidden_act] def forward(self, x): down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) return down_proj class LimeCrossAttention(nn.Module): """ Lime Cross Attention: Mimics the Qwen3VLTextAttention structure but operates within the reduced 'lime_hidden_size' dimension. Key Architectural Features: 1. Source: Query comes from Text (hidden_states), Key/Value from Vision (visual_context). 2. No RoPE: Cross-attention relies on content retrieval; positional embeddings are distinct. 3. Dimensionality: Operates in the bottleneck dimension (lime_hidden_size) while preserving the original attention head resolution (head_dim). """ def __init__(self, config: LimeQwen3VLTextConfig, layer_idx: int): super().__init__() self.config = config self.layer_idx = layer_idx # --- 1. Dimension Configuration --- # Use the reduced dimension (bottleneck size) defined in the config. self.hidden_size = config.lime_hidden_size # --- 2. Head Calculation --- # Preserve the original 'head_dim' (e.g., 128) to maintain the granularity of # attention features. We adjust 'num_heads' to fit the reduced hidden_size. original_head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) self.head_dim = original_head_dim # Ensure the reduced dimension is valid for the given head size. if self.hidden_size % self.head_dim != 0: raise ValueError( f"config.lime_hidden_size ({self.hidden_size}) must be divisible by head_dim ({self.head_dim})" ) self.num_heads = self.hidden_size // self.head_dim # --- 3. Grouped Query Attention (GQA) Adjustment --- # Maintain the Key/Value head ratio from the original model. # If the dimension is too small, ensure at least one KV head exists. original_gqa_ratio = config.num_attention_heads // config.num_key_value_heads self.num_key_value_heads = max(1, self.num_heads // original_gqa_ratio) self.num_key_value_groups = self.num_heads // self.num_key_value_heads self.scaling = self.head_dim**-0.5 self.attention_dropout = config.attention_dropout # --- 4. Projection Layers --- # Projections operate strictly within the reduced 'lime_hidden_size'. self.q_proj = nn.Linear( self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias ) self.k_proj = nn.Linear( self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.v_proj = nn.Linear( self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.o_proj = nn.Linear( self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias ) # Norm layers applied to the specific head dimensions. self.q_norm = Qwen3VLTextRMSNorm(self.head_dim, eps=config.rms_norm_eps) self.k_norm = Qwen3VLTextRMSNorm(self.head_dim, eps=config.rms_norm_eps) def forward( self, hidden_states: torch.Tensor, # Query (Text) in lime_hidden_size visual_context: torch.Tensor, # Key/Value (Vision) in lime_hidden_size attention_mask: Optional[torch.Tensor] = None, ) -> torch.Tensor: # 1. Query Processing (Text Side) input_shape = hidden_states.shape[:-1] # Reshape to (batch, seq_len, num_heads, head_dim) query_shape = (*input_shape, -1, self.head_dim) # Standard Qwen normalization: Norm(Proj(x).view).transpose query_states = self.q_norm(self.q_proj(hidden_states).view(query_shape)).transpose(1, 2) # 2. Key/Value Processing (Vision Side) visual_shape = visual_context.shape[:-1] # (batch, vis_len) kv_shape = (*visual_shape, -1, self.head_dim) # Apply projections and norms for GQA key_states = self.k_norm(self.k_proj(visual_context).view(kv_shape)).transpose(1, 2) value_states = self.v_proj(visual_context).view(kv_shape).transpose(1, 2) # 3. Repeat KV for GQA # Explicitly repeat keys/values to match the number of query heads key_states = repeat_kv(key_states, self.num_key_value_groups) value_states = repeat_kv(value_states, self.num_key_value_groups) # 4. Attention Calculation # Cross-modal retrieval (Text-to-Vision). # Shape: (batch, num_heads, text_len, vis_len) attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) * self.scaling if attention_mask is not None: # Apply visual mask (usually broadcasting over batch/heads) attn_weights = attn_weights + attention_mask attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) attn_weights = nn.functional.dropout(attn_weights, p=self.attention_dropout, training=self.training) attn_output = torch.matmul(attn_weights, value_states) # 5. Output Projection attn_output = attn_output.transpose(1, 2).contiguous() attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = self.o_proj(attn_output) return attn_output class LimeMLP(nn.Module): """ Lime MLP: A SwiGLU MLP adapted for the bottleneck dimension. """ def __init__(self, config): super().__init__() self.config = config # Operate in the reduced dimension self.hidden_size = config.lime_hidden_size # Calculate intermediate size dynamically to maintain the original model's # expansion ratio (e.g., scaling factor usually around 3.5x or 4x). expansion_ratio = config.intermediate_size / config.hidden_size self.intermediate_size = int(self.hidden_size * expansion_ratio) # Gate, Up, and Down projections without bias (following Qwen conventions) self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) self.act_fn = ACT2FN[config.hidden_act] def forward(self, x): # SwiGLU activation mechanism down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) return down_proj class LimeMemoryBlock(nn.Module): """ Lime Memory Block (Bottleneck Adapter Architecture). Flow: Input (High-Dim) -> Down-Projection -> [Cross-Attn -> MLP] (Low-Dim) -> Up-Projection -> Gated Residual This design reduces parameter count and computation by performing dense operations in a compressed latent space ('lime_hidden_size'). """ def __init__(self, config, layer_idx): super().__init__() # 1. Dimension Definitions self.orig_hidden_size = config.hidden_size self.lime_dim = config.lime_hidden_size # The bottleneck dimension (e.g., 1024) # 2. Down-Projection Layers (Compression) # Project Text and Vision features from original dimension to bottleneck dimension. self.text_reducer = nn.Linear(self.orig_hidden_size, self.lime_dim, bias=False) self.vision_reducer = nn.Linear(self.orig_hidden_size, self.lime_dim, bias=False) # 3. Up-Projection Layer (Restoration) # Project features back to the original dimension for residual addition. self.output_expander = nn.Linear(self.lime_dim, self.orig_hidden_size, bias=False) # 4. Internal Components (Bottleneck Space) # Norms and Core layers operate in 'lime_dim'. self.input_norm = Qwen3VLTextRMSNorm(self.lime_dim, eps=config.rms_norm_eps) self.post_attn_norm = Qwen3VLTextRMSNorm(self.lime_dim, eps=config.rms_norm_eps) # These modules automatically utilize config.lime_hidden_size internally self.cross_attn = LimeCrossAttention(config, layer_idx) self.mlp = LimeMLP(config) # --- 修改点: 使用无参数的 RMSNorm --- # 即使 config 传进去了 hidden_size,这个类也会忽略它,不会创建 Parameter self.output_norm = LimeSimpleRMSNorm(eps=config.rms_norm_eps) # Learnable Gating Parameter self.gate_alpha = nn.Parameter(torch.zeros(1)) # 5. Zero-Initialization Strategy # Initialize the final output projection to zero. This ensures the module # acts as an identity function (output=0) at the start of training, # preventing disruption to the pre-trained model's features. nn.init.zeros_(self.output_expander.weight) def forward(self, hidden_states, visual_context, visual_mask=None, query_is_visual_mask=None): """ Args: hidden_states: Text Context [Batch, SeqLen, Orig_Dim] visual_context: Visual Memory [Batch, VisLen, Orig_Dim] visual_mask: Attention mask for visual memory. query_is_visual_mask: Mask to silence injection on visual query tokens. """ # --- 短路保护 --- # 如果没有视觉上下文,或者长度为0,直接不做任何计算,返回0(即不影响残差) if visual_context is None or visual_context.numel() == 0: return torch.zeros_like(hidden_states) # ---------------------------- # 1. Down-Projection (High Rank -> Low Rank) # Shapes become [Batch, ..., Lime_Dim] small_hidden = self.text_reducer(hidden_states) small_visual = self.vision_reducer(visual_context) # 2. Cross Attention (Low Rank) normed_hidden = self.input_norm(small_hidden) # Pass the compressed visual context to attention m_raw = self.cross_attn(normed_hidden, small_visual, attention_mask=visual_mask) # 3. MLP Generation (Low Rank) normed_m = self.post_attn_norm(m_raw) m_latent = m_raw + self.mlp(normed_m) # 4. Up-Projection (Low Rank -> High Rank) # Restore shape to [Batch, SeqLen, Orig_Dim] m_restored = self.output_expander(m_latent) # 5. Gated Injection # Apply learnable gate to the restored signal m_normalized = self.output_norm(m_restored) output = self.gate_alpha * m_normalized # 6. Vision Token Silencing # If the current query token is a visual token (indicated by mask), # suppress the output to zero to avoid contaminating visual representations. if query_is_visual_mask is not None: # Expand mask to match feature dimension: [Batch, SeqLen, 1] silence_mask = (1.0 - query_is_visual_mask.to(output.dtype).unsqueeze(-1)) output = output * silence_mask return output class Qwen3VLTextDecoderLayer(GradientCheckpointingLayer): def __init__(self, config: LimeQwen3VLTextConfig, layer_idx: int): super().__init__() self.hidden_size = config.hidden_size self.self_attn = Qwen3VLTextAttention(config=config, layer_idx=layer_idx) self.mlp = Qwen3VLTextMLP(config) self.input_layernorm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.post_attention_layernorm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) # --- LIME MODIFICATION --- self.lime_block = None lime_layers = getattr(config, "lime_layers", None) if lime_layers is not None and layer_idx in lime_layers: self.lime_block = LimeMemoryBlock(config, layer_idx) # ------------------------- @deprecate_kwarg("past_key_value", new_name="past_key_values", version="4.58") def forward( self, hidden_states: torch.Tensor, position_embeddings: tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, use_cache: Optional[bool] = False, cache_position: Optional[torch.LongTensor] = None, # --- 🚨 LIME ARGUMENTS 🚨 --- lime_visual_context: Optional[torch.Tensor] = None, lime_visual_mask: Optional[torch.Tensor] = None, visual_pos_masks: Optional[torch.Tensor] = None, # Reusing DeepStack mask: True=Visual, False=Text # ---------------------------- **kwargs: Unpack[TransformersKwargs], ) -> torch.Tensor: residual = hidden_states hidden_states = self.input_layernorm(hidden_states) # Self Attention hidden_states, _ = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, use_cache=use_cache, cache_position=cache_position, position_embeddings=position_embeddings, **kwargs, ) hidden_states = residual + hidden_states # Fully Connected residual = hidden_states hidden_states = self.post_attention_layernorm(hidden_states) hidden_states = self.mlp(hidden_states) # --- 🚨 LIME INJECTION START 🚨 --- if self.lime_block is not None and lime_visual_context is not None: lime_update = self.lime_block( hidden_states=residual, visual_context=lime_visual_context, visual_mask=lime_visual_mask, query_is_visual_mask=visual_pos_masks # Pass the mask here ) hidden_states = hidden_states + lime_update # --- 🚨 LIME INJECTION END 🚨 --- hidden_states = residual + hidden_states return hidden_states @dataclass @auto_docstring( custom_intro=""" Base class for Llava outputs, with hidden states and attentions. """ ) class Qwen3VLModelOutputWithPast(ModelOutput): r""" past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): It is a [`~cache_utils.Cache`] instance. For more details, see our [kv cache guide](https://huggingface.co/docs/transformers/en/kv_cache). Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see `past_key_values` input) to speed up sequential decoding. rope_deltas (`torch.LongTensor` of shape `(batch_size, )`, *optional*): The rope index difference between sequence length and multimodal rope. """ last_hidden_state: Optional[torch.FloatTensor] = None past_key_values: Optional[Cache] = None hidden_states: Optional[tuple[torch.FloatTensor]] = None attentions: Optional[tuple[torch.FloatTensor]] = None rope_deltas: Optional[torch.LongTensor] = None @auto_docstring class Qwen3VLPreTrainedModel(PreTrainedModel): config: LimeQwen3VLConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["Qwen3VLTextDecoderLayer", "Qwen3VLVisionBlock"] _skip_keys_device_placement = "past_key_values" _supports_flash_attn = True _supports_sdpa = True _can_compile_fullgraph = True _supports_attention_backend = True _can_record_outputs = { "hidden_states": Qwen3VLTextDecoderLayer, "attentions": Qwen3VLTextAttention, } class Qwen3VLVisionModel(Qwen3VLPreTrainedModel): config: Qwen3VLVisionConfig _no_split_modules = ["Qwen3VLVisionBlock"] def __init__(self, config, *inputs, **kwargs) -> None: super().__init__(config, *inputs, **kwargs) self.spatial_merge_size = config.spatial_merge_size self.patch_size = config.patch_size self.spatial_merge_unit = self.spatial_merge_size * self.spatial_merge_size self.patch_embed = Qwen3VLVisionPatchEmbed( config=config, ) self.pos_embed = nn.Embedding(config.num_position_embeddings, config.hidden_size) self.num_grid_per_side = int(config.num_position_embeddings**0.5) head_dim = config.hidden_size // config.num_heads self.rotary_pos_emb = Qwen3VLVisionRotaryEmbedding(head_dim // 2) self.blocks = nn.ModuleList([Qwen3VLVisionBlock(config) for _ in range(config.depth)]) self.merger = Qwen3VLVisionPatchMerger( config=config, use_postshuffle_norm=False, ) self.deepstack_visual_indexes = config.deepstack_visual_indexes self.deepstack_merger_list = nn.ModuleList( [ Qwen3VLVisionPatchMerger( config=config, use_postshuffle_norm=True, ) for _ in range(len(config.deepstack_visual_indexes)) ] ) self.gradient_checkpointing = False def rot_pos_emb(self, grid_thw: torch.Tensor) -> torch.Tensor: merge_size = self.spatial_merge_size max_hw = int(grid_thw[:, 1:].max().item()) freq_table = self.rotary_pos_emb(max_hw) # (max_hw, dim // 2) device = freq_table.device total_tokens = int(torch.prod(grid_thw, dim=1).sum().item()) pos_ids = torch.empty((total_tokens, 2), dtype=torch.long, device=device) offset = 0 for num_frames, height, width in grid_thw: merged_h, merged_w = height // merge_size, width // merge_size block_rows = torch.arange(merged_h, device=device) # block row indices block_cols = torch.arange(merged_w, device=device) # block col indices intra_row = torch.arange(merge_size, device=device) # intra-block row offsets intra_col = torch.arange(merge_size, device=device) # intra-block col offsets # Compute full-resolution positions row_idx = block_rows[:, None, None, None] * merge_size + intra_row[None, None, :, None] col_idx = block_cols[None, :, None, None] * merge_size + intra_col[None, None, None, :] row_idx = row_idx.expand(merged_h, merged_w, merge_size, merge_size).reshape(-1) col_idx = col_idx.expand(merged_h, merged_w, merge_size, merge_size).reshape(-1) coords = torch.stack((row_idx, col_idx), dim=-1) if num_frames > 1: coords = coords.repeat(num_frames, 1) num_tokens = coords.shape[0] pos_ids[offset : offset + num_tokens] = coords offset += num_tokens embeddings = freq_table[pos_ids] # lookup rotary embeddings embeddings = embeddings.flatten(1) return embeddings def fast_pos_embed_interpolate(self, grid_thw): grid_ts, grid_hs, grid_ws = grid_thw[:, 0], grid_thw[:, 1], grid_thw[:, 2] idx_list = [[] for _ in range(4)] weight_list = [[] for _ in range(4)] for t, h, w in zip(grid_ts, grid_hs, grid_ws): h_idxs = torch.linspace(0, self.num_grid_per_side - 1, h) w_idxs = torch.linspace(0, self.num_grid_per_side - 1, w) h_idxs_floor = h_idxs.int() w_idxs_floor = w_idxs.int() h_idxs_ceil = (h_idxs.int() + 1).clip(max=self.num_grid_per_side - 1) w_idxs_ceil = (w_idxs.int() + 1).clip(max=self.num_grid_per_side - 1) dh = h_idxs - h_idxs_floor dw = w_idxs - w_idxs_floor base_h = h_idxs_floor * self.num_grid_per_side base_h_ceil = h_idxs_ceil * self.num_grid_per_side indices = [ (base_h[None].T + w_idxs_floor[None]).flatten(), (base_h[None].T + w_idxs_ceil[None]).flatten(), (base_h_ceil[None].T + w_idxs_floor[None]).flatten(), (base_h_ceil[None].T + w_idxs_ceil[None]).flatten(), ] weights = [ ((1 - dh)[None].T * (1 - dw)[None]).flatten(), ((1 - dh)[None].T * dw[None]).flatten(), (dh[None].T * (1 - dw)[None]).flatten(), (dh[None].T * dw[None]).flatten(), ] for i in range(4): idx_list[i].extend(indices[i].tolist()) weight_list[i].extend(weights[i].tolist()) idx_tensor = torch.tensor(idx_list, dtype=torch.long, device=self.pos_embed.weight.device) weight_tensor = torch.tensor( weight_list, dtype=self.pos_embed.weight.dtype, device=self.pos_embed.weight.device ) pos_embeds = self.pos_embed(idx_tensor) * weight_tensor[:, :, None] patch_pos_embeds = pos_embeds[0] + pos_embeds[1] + pos_embeds[2] + pos_embeds[3] patch_pos_embeds = patch_pos_embeds.split([h * w for h, w in zip(grid_hs, grid_ws)]) patch_pos_embeds_permute = [] merge_size = self.config.spatial_merge_size for pos_embed, t, h, w in zip(patch_pos_embeds, grid_ts, grid_hs, grid_ws): pos_embed = pos_embed.repeat(t, 1) pos_embed = ( pos_embed.view(t, h // merge_size, merge_size, w // merge_size, merge_size, -1) .permute(0, 1, 3, 2, 4, 5) .flatten(0, 4) ) patch_pos_embeds_permute.append(pos_embed) patch_pos_embeds = torch.cat(patch_pos_embeds_permute) return patch_pos_embeds def forward(self, hidden_states: torch.Tensor, grid_thw: torch.Tensor, **kwargs) -> torch.Tensor: """ Args: hidden_states (`torch.Tensor` of shape `(seq_len, hidden_size)`): The final hidden states of the model. grid_thw (`torch.Tensor` of shape `(num_images_or_videos, 3)`): The temporal, height and width of feature shape of each image in LLM. Returns: `torch.Tensor`: hidden_states. """ hidden_states = self.patch_embed(hidden_states) pos_embeds = self.fast_pos_embed_interpolate(grid_thw) hidden_states = hidden_states + pos_embeds rotary_pos_emb = self.rot_pos_emb(grid_thw) seq_len, _ = hidden_states.size() hidden_states = hidden_states.reshape(seq_len, -1) rotary_pos_emb = rotary_pos_emb.reshape(seq_len, -1) emb = torch.cat((rotary_pos_emb, rotary_pos_emb), dim=-1) position_embeddings = (emb.cos(), emb.sin()) cu_seqlens = torch.repeat_interleave(grid_thw[:, 1] * grid_thw[:, 2], grid_thw[:, 0]).cumsum( dim=0, # Select dtype based on the following factors: # - FA2 requires that cu_seqlens_q must have dtype int32 # - torch.onnx.export requires that cu_seqlens_q must have same dtype as grid_thw # See https://github.com/huggingface/transformers/pull/34852 for more information dtype=grid_thw.dtype if torch.jit.is_tracing() else torch.int32, ) cu_seqlens = F.pad(cu_seqlens, (1, 0), value=0) deepstack_feature_lists = [] for layer_num, blk in enumerate(self.blocks): hidden_states = blk( hidden_states, cu_seqlens=cu_seqlens, position_embeddings=position_embeddings, **kwargs, ) if layer_num in self.deepstack_visual_indexes: deepstack_feature = self.deepstack_merger_list[self.deepstack_visual_indexes.index(layer_num)]( hidden_states ) deepstack_feature_lists.append(deepstack_feature) hidden_states = self.merger(hidden_states) return hidden_states, deepstack_feature_lists @auto_docstring( custom_intro=( "Text part of Qwen3VL, " "not a pure text-only model, as DeepStack integrates visual features into the early hidden states." ) ) class Qwen3VLTextModel(Qwen3VLPreTrainedModel): config: LimeQwen3VLTextConfig _no_split_modules = ["Qwen3VLTextDecoderLayer"] def __init__(self, config: LimeQwen3VLTextConfig): super().__init__(config) self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) self.layers = nn.ModuleList( [Qwen3VLTextDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)] ) self.norm = Qwen3VLTextRMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.rotary_emb = Qwen3VLTextRotaryEmbedding(config=config) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() @check_model_inputs() @auto_docstring def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, # args for deepstack visual_pos_masks: Optional[torch.Tensor] = None, deepstack_visual_embeds: Optional[list[torch.Tensor]] = None, # --- 🚨 LIME ARGUMENT START 🚨 --- lime_visual_context: Optional[torch.Tensor] = None, lime_visual_mask: Optional[torch.Tensor] = None, # --- 🚨 LIME ARGUMENT END 🚨 --- **kwargs: Unpack[FlashAttentionKwargs], ) -> Union[tuple, BaseModelOutputWithPast]: r""" visual_pos_masks (`torch.Tensor` of shape `(batch_size, seqlen)`, *optional*): The mask of the visual positions. deepstack_visual_embeds (`list[torch.Tensor]`, *optional*): The deepstack visual embeddings. The shape is (num_layers, visual_seqlen, embed_dim). The feature is extracted from the different visual encoder layers, and fed to the decoder hidden states. It's from the paper DeepStack(https://arxiv.org/abs/2406.04334). lime_visual_context (`torch.Tensor` of shape `(batch_size, vis_seqlen, hidden_size)`, *optional*): The visual context for Lime cross-attention. lime_visual_mask (`torch.Tensor` of shape `(batch_size, 1, 1, vis_seqlen)`, *optional*): The attention mask for Lime cross-attention """ if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") # torch.jit.trace() doesn't support cache objects in the output if use_cache and past_key_values is None and not torch.jit.is_tracing(): past_key_values = DynamicCache(config=self.config) if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if cache_position is None: past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device ) # the hard coded `3` is for temporal, height and width. if position_ids is None: position_ids = cache_position.view(1, 1, -1).expand(3, inputs_embeds.shape[0], -1) elif position_ids.ndim == 2: position_ids = position_ids[None, ...].expand(3, position_ids.shape[0], -1) if position_ids.ndim == 3 and position_ids.shape[0] == 4: text_position_ids = position_ids[0] position_ids = position_ids[1:] else: text_position_ids = position_ids[0] attention_mask = create_causal_mask( config=self.config, input_embeds=inputs_embeds, attention_mask=attention_mask, cache_position=cache_position, past_key_values=past_key_values, position_ids=text_position_ids, ) hidden_states = inputs_embeds # create position embeddings to be shared across the decoder layers position_embeddings = self.rotary_emb(hidden_states, position_ids) # decoder layers for layer_idx, decoder_layer in enumerate(self.layers): layer_outputs = decoder_layer( hidden_states, attention_mask=attention_mask, position_ids=text_position_ids, past_key_values=past_key_values, cache_position=cache_position, position_embeddings=position_embeddings, # --- 🚨 透传 LIME 参数 🚨 --- lime_visual_context=lime_visual_context, lime_visual_mask=lime_visual_mask, # Pass visual_pos_masks to ALL layers for Lime silencing visual_pos_masks=visual_pos_masks, # ------------------------- **kwargs, ) hidden_states = layer_outputs # add visual features to the hidden states of first several layers if deepstack_visual_embeds is not None and layer_idx in range(len(deepstack_visual_embeds)): hidden_states = self._deepstack_process( hidden_states, visual_pos_masks, deepstack_visual_embeds[layer_idx], ) hidden_states = self.norm(hidden_states) return BaseModelOutputWithPast( last_hidden_state=hidden_states, past_key_values=past_key_values, ) def _deepstack_process( self, hidden_states: torch.Tensor, visual_pos_masks: torch.Tensor, visual_embeds: torch.Tensor ): visual_pos_masks = visual_pos_masks.to(hidden_states.device) visual_embeds = visual_embeds.to(hidden_states.device, hidden_states.dtype) local_this = hidden_states[visual_pos_masks, :].clone() + visual_embeds hidden_states[visual_pos_masks, :] = local_this return hidden_states @auto_docstring class LimeQwen3VLModel(Qwen3VLPreTrainedModel): base_model_prefix = "" _checkpoint_conversion_mapping = {} # Reference: fix gemma3 grad acc #37208 accepts_loss_kwargs = False config: LimeQwen3VLConfig _no_split_modules = ["Qwen3VLTextDecoderLayer", "Qwen3VLVisionBlock"] def __init__(self, config): super().__init__(config) self.visual = Qwen3VLVisionModel._from_config(config.vision_config) self.language_model = Qwen3VLTextModel._from_config(config.text_config) self.rope_deltas = None # cache rope_deltas here # --- 🚨 LIME STATE CACHE 🚨 --- self.lime_visual_context_cache = None # 新增:缓存视觉记忆 self.lime_visual_mask_cache = None # 新增:缓存视觉Mask # ----------------------------- # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self): return self.language_model.get_input_embeddings() def set_input_embeddings(self, value): self.language_model.set_input_embeddings(value) def set_decoder(self, decoder): self.language_model = decoder def get_decoder(self): return self.language_model def get_rope_index( self, input_ids: Optional[torch.LongTensor] = None, image_grid_thw: Optional[torch.LongTensor] = None, video_grid_thw: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, ) -> tuple[torch.Tensor, torch.Tensor]: """Different from the original implementation, Qwen3VL use timestamps rather than absolute time position ids.""" # Since we use timestamps to seperate videos, like , the video_grid_thw should also be split if video_grid_thw is not None: video_grid_thw = torch.repeat_interleave(video_grid_thw, video_grid_thw[:, 0], dim=0) video_grid_thw[:, 0] = 1 spatial_merge_size = self.config.vision_config.spatial_merge_size image_token_id = self.config.image_token_id video_token_id = self.config.video_token_id vision_start_token_id = self.config.vision_start_token_id mrope_position_deltas = [] if input_ids is not None and (image_grid_thw is not None or video_grid_thw is not None): total_input_ids = input_ids if attention_mask is None: attention_mask = torch.ones_like(total_input_ids) position_ids = torch.ones( 3, input_ids.shape[0], input_ids.shape[1], dtype=input_ids.dtype, device=input_ids.device, ) image_index, video_index = 0, 0 attention_mask = attention_mask.to(total_input_ids.device) for i, input_ids in enumerate(total_input_ids): input_ids = input_ids[attention_mask[i] == 1] image_nums, video_nums = 0, 0 vision_start_indices = torch.argwhere(input_ids == vision_start_token_id).squeeze(1) vision_tokens = input_ids[vision_start_indices + 1] image_nums = (vision_tokens == image_token_id).sum() video_nums = (vision_tokens == video_token_id).sum() input_tokens = input_ids.tolist() llm_pos_ids_list: list = [] st = 0 remain_images, remain_videos = image_nums, video_nums for _ in range(image_nums + video_nums): if image_token_id in input_tokens and remain_images > 0: ed_image = input_tokens.index(image_token_id, st) else: ed_image = len(input_tokens) + 1 if video_token_id in input_tokens and remain_videos > 0: ed_video = input_tokens.index(video_token_id, st) else: ed_video = len(input_tokens) + 1 if ed_image < ed_video: t, h, w = ( image_grid_thw[image_index][0], image_grid_thw[image_index][1], image_grid_thw[image_index][2], ) image_index += 1 remain_images -= 1 ed = ed_image else: t, h, w = ( video_grid_thw[video_index][0], video_grid_thw[video_index][1], video_grid_thw[video_index][2], ) video_index += 1 remain_videos -= 1 ed = ed_video llm_grid_t, llm_grid_h, llm_grid_w = ( t.item(), h.item() // spatial_merge_size, w.item() // spatial_merge_size, ) text_len = ed - st st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0 llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx) # t_index is always 0 because llm_grid_t is always 1 (we use timestamps to encode the temporal information for videos) t_index = torch.arange(llm_grid_t).view(-1, 1).expand(-1, llm_grid_h * llm_grid_w).flatten() h_index = torch.arange(llm_grid_h).view(1, -1, 1).expand(llm_grid_t, -1, llm_grid_w).flatten() w_index = torch.arange(llm_grid_w).view(1, 1, -1).expand(llm_grid_t, llm_grid_h, -1).flatten() llm_pos_ids_list.append(torch.stack([t_index, h_index, w_index]) + text_len + st_idx) st = ed + llm_grid_t * llm_grid_h * llm_grid_w if st < len(input_tokens): st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0 text_len = len(input_tokens) - st llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx) llm_positions = torch.cat(llm_pos_ids_list, dim=1).reshape(3, -1) position_ids[..., i, attention_mask[i] == 1] = llm_positions.to(position_ids.device) mrope_position_deltas.append(llm_positions.max() + 1 - len(total_input_ids[i])) mrope_position_deltas = torch.tensor(mrope_position_deltas, device=input_ids.device).unsqueeze(1) return position_ids, mrope_position_deltas else: if attention_mask is not None: position_ids = attention_mask.long().cumsum(-1) - 1 position_ids.masked_fill_(attention_mask == 0, 1) position_ids = position_ids.unsqueeze(0).expand(3, -1, -1).to(attention_mask.device) max_position_ids = position_ids.max(0, keepdim=False)[0].max(-1, keepdim=True)[0] mrope_position_deltas = max_position_ids + 1 - attention_mask.shape[-1] else: position_ids = ( torch.arange(input_ids.shape[1], device=input_ids.device) .view(1, 1, -1) .expand(3, input_ids.shape[0], -1) ) mrope_position_deltas = torch.zeros( [input_ids.shape[0], 1], device=input_ids.device, dtype=input_ids.dtype, ) return position_ids, mrope_position_deltas def get_video_features( self, pixel_values_videos: torch.FloatTensor, video_grid_thw: Optional[torch.LongTensor] = None ): """ Encodes videos into continuous embeddings that can be forwarded to the language model. The deepstack visual features are also returned. Args: pixel_values_videos (`torch.FloatTensor` of shape `(batch_size, num_channels, image_size, image_size)`): The tensors corresponding to the input videos. video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): The temporal, height and width of feature shape of each video in LLM. """ # Same implementation as for images return self.get_image_features(pixel_values_videos, video_grid_thw) def get_image_features(self, pixel_values: torch.FloatTensor, image_grid_thw: Optional[torch.LongTensor] = None): """ Encodes images into continuous embeddings that can be forwarded to the language model. The deepstack visual features are also returned. Args: pixel_values (`torch.FloatTensor` of shape `(batch_size, num_channels, image_size, image_size)`): The tensors corresponding to the input images. image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): The temporal, height and width of feature shape of each image in LLM. """ pixel_values = pixel_values.type(self.visual.dtype) image_embeds, deepstack_image_embeds = self.visual(pixel_values, grid_thw=image_grid_thw) split_sizes = (image_grid_thw.prod(-1) // self.visual.spatial_merge_size**2).tolist() image_embeds = torch.split(image_embeds, split_sizes) return image_embeds, deepstack_image_embeds def get_placeholder_mask( self, input_ids: torch.LongTensor, inputs_embeds: torch.FloatTensor, image_features: Optional[torch.FloatTensor] = None, video_features: Optional[torch.FloatTensor] = None, ): """ Obtains multimodal placeholder mask from `input_ids` or `inputs_embeds`, and checks that the placeholder token count is equal to the length of multimodal features. If the lengths are different, an error is raised. """ if input_ids is None: special_image_mask = inputs_embeds == self.get_input_embeddings()( torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device) ) special_image_mask = special_image_mask.all(-1) special_video_mask = inputs_embeds == self.get_input_embeddings()( torch.tensor(self.config.video_token_id, dtype=torch.long, device=inputs_embeds.device) ) special_video_mask = special_video_mask.all(-1) else: special_image_mask = input_ids == self.config.image_token_id special_video_mask = input_ids == self.config.video_token_id n_image_tokens = special_image_mask.sum() special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device) if image_features is not None and inputs_embeds[special_image_mask].numel() != image_features.numel(): raise ValueError( f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {image_features.shape[0]}" ) n_video_tokens = special_video_mask.sum() special_video_mask = special_video_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device) if video_features is not None and inputs_embeds[special_video_mask].numel() != video_features.numel(): raise ValueError( f"Videos features and video tokens do not match: tokens: {n_video_tokens}, features {video_features.shape[0]}" ) return special_image_mask, special_video_mask @auto_docstring @check_model_inputs() def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, pixel_values: Optional[torch.Tensor] = None, pixel_values_videos: Optional[torch.FloatTensor] = None, image_grid_thw: Optional[torch.LongTensor] = None, video_grid_thw: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None, # --- 🚨 LIME CACHE ARGUMENT START 🚨 --- # 用于接收推理时(Generation)缓存的视觉上下文 cached_lime_visual_context: Optional[torch.Tensor] = None, cached_lime_visual_mask: Optional[torch.Tensor] = None, # --- 🚨 LIME CACHE ARGUMENT END 🚨 --- **kwargs: Unpack[TransformersKwargs], ) -> Union[tuple, Qwen3VLModelOutputWithPast]: r""" image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): The temporal, height and width of feature shape of each image in LLM. video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): The temporal, height and width of feature shape of each video in LLM. cached_lime_visual_context (`torch.Tensor` of shape `(batch_size, vis_seqlen, hidden_size)`, *optional*): The cached visual context for Lime cross-attention during generation. cached_lime_visual_mask (`torch.Tensor` of shape `(batch_size, 1, 1, vis_seqlen)`, *optional*): The cached attention mask for Lime cross-attention during generation. """ if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if inputs_embeds is None: inputs_embeds = self.get_input_embeddings()(input_ids) image_mask = None video_mask = None if pixel_values is not None: image_embeds, deepstack_image_embeds = self.get_image_features(pixel_values, image_grid_thw) image_embeds = torch.cat(image_embeds, dim=0).to(inputs_embeds.device, inputs_embeds.dtype) image_mask, _ = self.get_placeholder_mask( input_ids, inputs_embeds=inputs_embeds, image_features=image_embeds ) inputs_embeds = inputs_embeds.masked_scatter(image_mask, image_embeds) if pixel_values_videos is not None: video_embeds, deepstack_video_embeds = self.get_video_features(pixel_values_videos, video_grid_thw) video_embeds = torch.cat(video_embeds, dim=0).to(inputs_embeds.device, inputs_embeds.dtype) _, video_mask = self.get_placeholder_mask( input_ids, inputs_embeds=inputs_embeds, video_features=video_embeds ) inputs_embeds = inputs_embeds.masked_scatter(video_mask, video_embeds) visual_pos_masks = None deepstack_visual_embeds = None if image_mask is not None and video_mask is not None: # aggregate visual_pos_masks and deepstack_visual_embeds image_mask = image_mask[..., 0] video_mask = video_mask[..., 0] visual_pos_masks = image_mask | video_mask deepstack_visual_embeds = [] image_mask_joint = image_mask[visual_pos_masks] video_mask_joint = video_mask[visual_pos_masks] for img_embed, vid_embed in zip(deepstack_image_embeds, deepstack_video_embeds): embed_joint = img_embed.new_zeros(visual_pos_masks.sum(), img_embed.shape[-1]).to(img_embed.device) embed_joint[image_mask_joint, :] = img_embed embed_joint[video_mask_joint, :] = vid_embed deepstack_visual_embeds.append(embed_joint) elif image_mask is not None: image_mask = image_mask[..., 0] visual_pos_masks = image_mask deepstack_visual_embeds = deepstack_image_embeds elif video_mask is not None: video_mask = video_mask[..., 0] visual_pos_masks = video_mask deepstack_visual_embeds = deepstack_video_embeds # ====================================================================== # 🚨 LIME CONTEXT MANAGEMENT # ====================================================================== # 1. 获取当前的历史长度 (判定是否为新序列的关键) current_seq_len = 0 if past_key_values is None else past_key_values.get_seq_length() is_fresh_start = (current_seq_len == 0) # 2. 判断当前输入是否包含新的视觉信息 has_new_vision = (visual_pos_masks is not None) lime_visual_context = None lime_visual_mask = None if cached_lime_visual_context is not None: # Case A: 强制使用外部传入的 Context lime_visual_context = cached_lime_visual_context lime_visual_mask = cached_lime_visual_mask elif has_new_vision: # Case B: 当前输入有图片/视频 -> 提取并更新 Cache # (注意:如果是多轮对话追加图片,这里简化为覆盖。如果需要支持多图累加,需做 concat) bsz = inputs_embeds.shape[0] extracted_visuals = [] for i in range(bsz): mask = visual_pos_masks[i] > 0 feats = inputs_embeds[i][mask] extracted_visuals.append(feats) lime_visual_context = pad_sequence(extracted_visuals, batch_first=True) # 构建 Mask lengths = [x.shape[0] for x in extracted_visuals] max_len = lime_visual_context.shape[1] lime_visual_mask = torch.full( (bsz, 1, 1, max_len), torch.finfo(inputs_embeds.dtype).min, device=inputs_embeds.device, dtype=inputs_embeds.dtype ) for i, l in enumerate(lengths): if l > 0: lime_visual_mask[i, ..., :l] = 0.0 # 更新内部 Cache self.lime_visual_context_cache = lime_visual_context self.lime_visual_mask_cache = lime_visual_mask elif is_fresh_start: # Case C: 是新序列(Start),且没有视觉输入(No Vision) -> 纯文本请求 # 🚨 必须清空 Cache,防止上一轮对话的图片残留 self.lime_visual_context_cache = None self.lime_visual_mask_cache = None lime_visual_context = None lime_visual_mask = None else: # Case D: 是生成过程(Decoding),且没有新图片 -> 使用 Cache lime_visual_context = self.lime_visual_context_cache lime_visual_mask = self.lime_visual_mask_cache # ====================================================================== if position_ids is None: attention_mask_tensor = ( attention_mask if not isinstance(attention_mask, dict) else attention_mask["full_attention"] ) if attention_mask_tensor is not None and attention_mask_tensor.ndim == 4: attention_mask_tensor = torch.diagonal(attention_mask_tensor[:, 0], dim1=1, dim2=2) # Only apply conversion for floating point tensors (inverted masks) if attention_mask_tensor.dtype.is_floating_point: attention_mask_tensor = attention_mask_tensor / torch.finfo(attention_mask_tensor.dtype).min attention_mask_tensor = (1.0 - attention_mask_tensor).int() # Calculate RoPE index once per generation in the pre-fill stage only. # When compiling, we can't check tensor values thus we check only input length # It is safe to assume that `length!=1` means we're in pre-fill because compiled # models currently cannot do asssisted decoding prefill_compiled_stage = is_torchdynamo_compiling() and ( (input_ids is not None and input_ids.shape[1] != 1) or (inputs_embeds is not None and inputs_embeds.shape[1] != 1) ) prefill_noncompiled_stage = not is_torchdynamo_compiling() and ( (cache_position is not None and cache_position[0] == 0) or (past_key_values is None or past_key_values.get_seq_length() == 0) ) if (prefill_compiled_stage or prefill_noncompiled_stage) or self.rope_deltas is None: position_ids, rope_deltas = self.get_rope_index( input_ids, image_grid_thw, video_grid_thw, attention_mask=attention_mask_tensor, ) self.rope_deltas = rope_deltas # then use the prev pre-calculated rope-deltas to get the correct position ids else: batch_size, seq_length, _ = inputs_embeds.shape delta = ( (cache_position[0] + self.rope_deltas).to(inputs_embeds.device) if cache_position is not None else 0 ) position_ids = torch.arange(seq_length, device=inputs_embeds.device) position_ids = position_ids.view(1, -1).expand(batch_size, -1) if cache_position is not None: # otherwise `deltas` is an int `0` delta = delta.repeat_interleave(batch_size // delta.shape[0], dim=0) position_ids = position_ids.add(delta) position_ids = position_ids.unsqueeze(0).expand(3, -1, -1) outputs = self.language_model( input_ids=None, position_ids=position_ids, attention_mask=attention_mask, past_key_values=past_key_values, inputs_embeds=inputs_embeds, cache_position=cache_position, visual_pos_masks=visual_pos_masks, deepstack_visual_embeds=deepstack_visual_embeds, # --- 🚨 PASS LIME CONTEXT 🚨 --- lime_visual_context=lime_visual_context, lime_visual_mask=lime_visual_mask, # ------------------------------- **kwargs, ) return Qwen3VLModelOutputWithPast( last_hidden_state=outputs.last_hidden_state, past_key_values=outputs.past_key_values, rope_deltas=self.rope_deltas, ) @dataclass @auto_docstring( custom_intro=""" Base class for Qwen3VL causal language model (or autoregressive) outputs. """ ) class Qwen3VLCausalLMOutputWithPast(ModelOutput): r""" loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): Language modeling loss (for next-token prediction). logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). past_key_values (`Cache`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): It is a [`~cache_utils.Cache`] instance. For more details, see our [kv cache guide](https://huggingface.co/docs/transformers/en/kv_cache). Contains pre-computed hidden-states (key and values in the self-attention blocks) that can be used (see `past_key_values` input) to speed up sequential decoding. rope_deltas (`torch.LongTensor` of shape `(batch_size, )`, *optional*): The rope index difference between sequence length and multimodal rope. """ loss: Optional[torch.FloatTensor] = None logits: Optional[torch.FloatTensor] = None past_key_values: Optional[Cache] = None hidden_states: Optional[tuple[torch.FloatTensor]] = None attentions: Optional[tuple[torch.FloatTensor]] = None rope_deltas: Optional[torch.LongTensor] = None class LimeQwen3VLForConditionalGeneration(Qwen3VLPreTrainedModel, GenerationMixin): _checkpoint_conversion_mapping = {} _tied_weights_keys = ["lm_head.weight"] # Reference: fix gemma3 grad acc #37208 accepts_loss_kwargs = False config: LimeQwen3VLConfig def __init__(self, config): super().__init__(config) self.model = LimeQwen3VLModel(config) self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False) self.post_init() def get_input_embeddings(self): return self.model.get_input_embeddings() def set_input_embeddings(self, value): self.model.set_input_embeddings(value) def set_decoder(self, decoder): self.model.set_decoder(decoder) def get_decoder(self): return self.model.get_decoder() def get_video_features( self, pixel_values_videos: torch.FloatTensor, video_grid_thw: Optional[torch.LongTensor] = None ): return self.model.get_video_features(pixel_values_videos, video_grid_thw) def get_image_features(self, pixel_values: torch.FloatTensor, image_grid_thw: Optional[torch.LongTensor] = None): return self.model.get_image_features(pixel_values, image_grid_thw) # Make modules available through conditional class for BC @property def language_model(self): return self.model.language_model @property def visual(self): return self.model.visual @check_model_inputs() def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[Cache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, labels: Optional[torch.LongTensor] = None, pixel_values: Optional[torch.Tensor] = None, pixel_values_videos: Optional[torch.FloatTensor] = None, image_grid_thw: Optional[torch.LongTensor] = None, video_grid_thw: Optional[torch.LongTensor] = None, cache_position: Optional[torch.LongTensor] = None, logits_to_keep: Union[int, torch.Tensor] = 0, **kwargs: Unpack[TransformersKwargs], ) -> Union[tuple, Qwen3VLCausalLMOutputWithPast]: r""" labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. image_grid_thw (`torch.LongTensor` of shape `(num_images, 3)`, *optional*): The temporal, height and width of feature shape of each image in LLM. video_grid_thw (`torch.LongTensor` of shape `(num_videos, 3)`, *optional*): The temporal, height and width of feature shape of each video in LLM. Example: TODO: Add example """ outputs = self.model( input_ids=input_ids, pixel_values=pixel_values, pixel_values_videos=pixel_values_videos, image_grid_thw=image_grid_thw, video_grid_thw=video_grid_thw, position_ids=position_ids, attention_mask=attention_mask, past_key_values=past_key_values, inputs_embeds=inputs_embeds, cache_position=cache_position, **kwargs, ) hidden_states = outputs[0] # Only compute necessary logits, and do not upcast them to float if we are not computing the loss slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep logits = self.lm_head(hidden_states[:, slice_indices, :]) loss = None if labels is not None: loss = self.loss_function(logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size) return Qwen3VLCausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, rope_deltas=outputs.rope_deltas, ) def prepare_inputs_for_generation( self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, cache_position=None, position_ids=None, use_cache=True, pixel_values=None, pixel_values_videos=None, image_grid_thw=None, video_grid_thw=None, **kwargs, ): # Overwritten -- in specific circumstances we don't want to forward image inputs to the model model_inputs = super().prepare_inputs_for_generation( input_ids, past_key_values=past_key_values, attention_mask=attention_mask, inputs_embeds=inputs_embeds, cache_position=cache_position, position_ids=position_ids, pixel_values=pixel_values, pixel_values_videos=pixel_values_videos, image_grid_thw=image_grid_thw, video_grid_thw=video_grid_thw, use_cache=use_cache, **kwargs, ) # Qwen3VL position_ids are prepareed with rope_deltas in forward model_inputs["position_ids"] = None if cache_position[0] != 0: model_inputs["pixel_values"] = None model_inputs["pixel_values_videos"] = None return model_inputs def _get_image_nums_and_video_nums( self, input_ids: Optional[torch.LongTensor], inputs_embeds: Optional[torch.Tensor] = None, ) -> tuple[torch.Tensor, torch.Tensor]: """ Get the number of images and videos for each sample to calculate the separation length of the sample tensor. These parameters are not passed through the processor to avoid unpredictable impacts from interface modifications. Args: input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): Indices of input sequence tokens in the vocabulary. Returns: image_nums (`torch.LongTensor` of shape `(batch_size, num_images_sample)`) video_nums (`torch.LongTensor` of shape `(batch_size, num_videos_sample)`) """ image_token_id = self.config.image_token_id video_token_id = self.config.video_token_id vision_start_token_id = self.config.vision_start_token_id if inputs_embeds is not None: vision_start_mask = ( inputs_embeds == self.get_input_embeddings()( torch.tensor(vision_start_token_id, dtype=torch.long, device=inputs_embeds.device) ) )[..., 0] image_mask = ( inputs_embeds == self.get_input_embeddings()( torch.tensor(image_token_id, dtype=torch.long, device=inputs_embeds.device) ) )[..., 0] video_mask = ( inputs_embeds == self.get_input_embeddings()( torch.tensor(video_token_id, dtype=torch.long, device=inputs_embeds.device) ) )[..., 0] else: vision_start_mask = input_ids == vision_start_token_id image_mask = input_ids == image_token_id video_mask = input_ids == video_token_id vision_first_mask = torch.roll(vision_start_mask, shifts=1, dims=1) image_nums = torch.sum(vision_first_mask & image_mask, dim=1) video_nums = torch.sum(vision_first_mask & video_mask, dim=1) return image_nums, video_nums def _expand_inputs_for_generation( self, expand_size: int = 1, is_encoder_decoder: bool = False, input_ids: Optional[torch.LongTensor] = None, **model_kwargs, ) -> tuple[torch.LongTensor, dict[str, Any]]: # Overwritten -- Support for expanding tensors without a batch size dimension # e.g., pixel_values, image_grid_thw, pixel_values_videos, video_grid_thw, second_per_grid_t # pixel_values.shape[0] is sum(seqlen_images for samples) # image_grid_thw.shape[0] is sum(num_images for samples) if expand_size == 1: return input_ids, model_kwargs visual_keys = ["pixel_values", "image_grid_thw", "pixel_values_videos", "video_grid_thw", "second_per_grid_ts"] def _expand_dict_for_generation_visual(dict_to_expand): image_grid_thw = model_kwargs.get("image_grid_thw", None) video_grid_thw = model_kwargs.get("video_grid_thw", None) image_nums, video_nums = self._get_image_nums_and_video_nums( input_ids, inputs_embeds=model_kwargs.get("inputs_embeds", None) ) def _repeat_interleave_samples(x, lengths, repeat_times): samples = torch.split(x, lengths) repeat_args = [repeat_times] + [1] * (x.dim() - 1) result = torch.cat([sample.repeat(*repeat_args) for sample in samples], dim=0) return result for key in dict_to_expand: if key == "pixel_values": # split images into samples samples = torch.split(image_grid_thw, list(image_nums)) # compute the sequence length of images for each sample lengths = [torch.prod(sample, dim=1).sum() for sample in samples] dict_to_expand[key] = _repeat_interleave_samples( dict_to_expand[key], lengths=lengths, repeat_times=expand_size ) elif key == "image_grid_thw": # get the num of images for each sample lengths = list(image_nums) dict_to_expand[key] = _repeat_interleave_samples( dict_to_expand[key], lengths=lengths, repeat_times=expand_size ) elif key == "pixel_values_videos": samples = torch.split(video_grid_thw, list(video_nums)) lengths = [torch.prod(sample, dim=1).sum() for sample in samples] dict_to_expand[key] = _repeat_interleave_samples( dict_to_expand[key], lengths=lengths, repeat_times=expand_size ) elif key == "video_grid_thw": lengths = list(video_nums) dict_to_expand[key] = _repeat_interleave_samples( dict_to_expand[key], lengths=lengths, repeat_times=expand_size ) elif key == "second_per_grid_ts": dict_to_expand[key] = _repeat_interleave_samples( dict_to_expand[key], lengths=list(video_nums), repeat_times=expand_size ) return dict_to_expand def _expand_dict_for_generation(dict_to_expand): for key in dict_to_expand: if ( key != "cache_position" and dict_to_expand[key] is not None and isinstance(dict_to_expand[key], torch.Tensor) and key not in visual_keys ): dict_to_expand[key] = dict_to_expand[key].repeat_interleave(expand_size, dim=0) return dict_to_expand model_kwargs = _expand_dict_for_generation_visual(model_kwargs) if input_ids is not None: input_ids = input_ids.repeat_interleave(expand_size, dim=0) model_kwargs = _expand_dict_for_generation(model_kwargs) if is_encoder_decoder: if model_kwargs.get("encoder_outputs") is None: raise ValueError("If `is_encoder_decoder` is True, make sure that `encoder_outputs` is defined.") model_kwargs["encoder_outputs"] = _expand_dict_for_generation(model_kwargs["encoder_outputs"]) return input_ids, model_kwargs __all__ = [ "Qwen3VLVisionModel", "LimeQwen3VLForConditionalGeneration", "LimeQwen3VLModel", "Qwen3VLPreTrainedModel", "Qwen3VLTextModel", ]