from dataclasses import dataclass from typing import Unpack, cast import torch import torch.nn as nn from transformers.cache_utils import Cache from transformers.generation import GenerationMixin from transformers.modeling_flash_attention_utils import FlashAttentionKwargs from transformers.modeling_outputs import ModelOutput from transformers.modeling_utils import PreTrainedModel from transformers.models.auto import AutoModel from transformers.utils import TransformersKwargs from .configuration_heron import HeronConfig # Copied from https://github.com/NVlabs/VILA/blob/36f6adcd11a10be1580caeb7e647e1b6f8517f89/llava/model/multimodal_projector/base_projector.py#L83 def _flat_square_2x2(x): n, w, h, c = x.size() if w % 2 == 1: x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() n, w, h, c = x.size() x = x.contiguous() if h % 2 == 1: x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous() n, w, h, c = x.size() x = x.view(n, w, int(h / 2), int(c * 2)) x = x.permute(0, 2, 1, 3).contiguous() x = x.view(n, int(h / 2), int(w / 2), int(c * 4)) x = x.permute(0, 2, 1, 3).contiguous() return x class DownSample2x2BlockFix(nn.Module): def forward(self, x: torch.Tensor) -> torch.Tensor: vit_embeds = x h = w = int(vit_embeds.shape[1] ** 0.5) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) vit_embeds = _flat_square_2x2(vit_embeds) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) return vit_embeds # Copied from https://github.com/NVlabs/VILA/blob/36f6adcd11a10be1580caeb7e647e1b6f8517f89/llava/model/multimodal_projector/base_projector.py#L109 def _flat_square_3x3(x): n, w, h, c = x.size() if w % 3 != 0: x = torch.concat([x, torch.zeros((n, 3 - (w % 3), h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous() n, w, h, c = x.size() x = x.contiguous() if h % 3 != 0: x = torch.concat([x, torch.zeros((n, w, 3 - (h % 3), c), dtype=x.dtype).to(x.device)], dim=2).contiguous() n, w, h, c = x.size() x = x.view(n, w, int(h / 3), int(c * 3)) x = x.permute(0, 2, 1, 3).contiguous() x = x.view(n, int(h / 3), int(w / 3), int(c * 9)) x = x.permute(0, 2, 1, 3).contiguous() return x class DownSample3x3BlockFix(nn.Module): def forward(self, x: torch.Tensor) -> torch.Tensor: vit_embeds = x h = w = int(vit_embeds.shape[1] ** 0.5) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) vit_embeds = _flat_square_3x3(vit_embeds) vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) return vit_embeds class HeronMultiModalProjector(nn.Module): def __init__(self, config: HeronConfig): super().__init__() mm_projector_type = config.mm_projector_type mm_hidden_size = config.vision_config.hidden_size output_hidden_size = config.text_config.hidden_size if mm_projector_type == "mlp_downsample_2x2_fix": self.layers = nn.Sequential( DownSample2x2BlockFix(), nn.LayerNorm(mm_hidden_size * 4), nn.Linear(mm_hidden_size * 4, output_hidden_size), nn.GELU(), nn.Linear(output_hidden_size, output_hidden_size), ) self.downsample_rate = 2 elif mm_projector_type == "mlp_downsample_3x3_fix": self.layers = nn.Sequential( DownSample3x3BlockFix(), nn.LayerNorm(mm_hidden_size * 9), nn.Linear(mm_hidden_size * 9, mm_hidden_size * 3), nn.GELU(), nn.LayerNorm(mm_hidden_size * 3), nn.Linear(mm_hidden_size * 3, output_hidden_size), nn.GELU(), nn.Linear(output_hidden_size, output_hidden_size), ) self.downsample_rate = 3 else: raise ValueError(f"Unknown projector type: {mm_projector_type}") def forward(self, x: torch.Tensor) -> torch.Tensor: return self.layers(x) @dataclass class HeronModelOutputWithPast(ModelOutput): last_hidden_state: torch.FloatTensor | None = None past_key_values: list[torch.FloatTensor] | None = None hidden_states: tuple[torch.FloatTensor] | None = None attentions: tuple[torch.FloatTensor] | None = None image_hidden_states: tuple[torch.FloatTensor] | None = None class HeronPreTrainedModel(PreTrainedModel): config: HeronConfig base_model_prefix = "model" supports_gradient_checkpointing = True _skip_keys_device_placement = "past_key_values" # type: ignore[assignment] _supports_flash_attn = True _supports_sdpa = True _can_compile_fullgraph = True _supports_flex_attn = True _supports_attention_backend = True class HeronModel(HeronPreTrainedModel): def __init__(self, config: HeronConfig): super().__init__(config) self.vision_tower: PreTrainedModel = AutoModel.from_config(config.vision_config) self.multi_modal_projector = HeronMultiModalProjector(config) self.language_model: PreTrainedModel = AutoModel.from_config(config.text_config) self.post_init() def get_input_embeddings(self): return self.language_model.get_input_embeddings() def set_input_embeddings(self, value): self.language_model.set_input_embeddings(value) def get_decoder(self): return self.language_model def set_decoder(self, decoder): self.language_model = decoder def get_image_features(self, pixel_values: torch.FloatTensor, vision_feature_layer: int | None = None): ndim = pixel_values.ndim assert ndim == 5 or ndim == 4, "pixel_values should be of shape (batch_size, num_tiles, n_channels, height, width) or (batch_size, n_channels, height, width)" # fmt: skip if ndim == 4: pixel_values = cast(torch.FloatTensor, pixel_values.unsqueeze(1)) shape = pixel_values.shape pixel_values = cast(torch.FloatTensor, pixel_values.view(shape[0] * shape[1], *shape[2:])) if vision_feature_layer is None: vision_feature_layer = self.config.vision_feature_layer outputs = self.vision_tower(pixel_values=pixel_values, output_hidden_states=True) image_features = outputs.hidden_states[vision_feature_layer] image_features = self.multi_modal_projector(image_features) image_features = image_features.view(shape[0], shape[1], *image_features.shape[1:]) return image_features def get_placeholder_mask( self, input_ids: torch.LongTensor | None, inputs_embeds: torch.FloatTensor, image_features: torch.FloatTensor ): if input_ids is None: special_image_mask = inputs_embeds == self.get_input_embeddings()( torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device) ) special_image_mask = special_image_mask.all(-1) else: special_image_mask = input_ids == self.config.image_token_id n_image_tokens = special_image_mask.sum() special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device) if not torch.compiler.is_compiling(): if inputs_embeds[special_image_mask].numel() != image_features.numel(): n_image_features = image_features.shape[0] * image_features.shape[1] raise ValueError( f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {n_image_features}" ) return special_image_mask def forward( self, input_ids: torch.LongTensor | None = None, pixel_values: torch.FloatTensor | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, inputs_embeds: torch.FloatTensor | None = None, use_cache: bool | None = None, output_attentions: bool | None = None, output_hidden_states: bool | None = None, return_dict: bool | None = None, cache_position: torch.LongTensor | None = None, **kwargs: Unpack[FlashAttentionKwargs], ) -> tuple | HeronModelOutputWithPast: output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if inputs_embeds is None: inputs_embeds = self.get_input_embeddings()(input_ids) if pixel_values is not None: image_features = self.get_image_features(pixel_values) image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype) special_image_mask = self.get_placeholder_mask( input_ids, inputs_embeds=inputs_embeds, image_features=image_features ) inputs_embeds = cast(torch.FloatTensor, inputs_embeds.masked_scatter(special_image_mask, image_features)) outputs = self.language_model( attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=True, cache_position=cache_position, **kwargs, ) output = HeronModelOutputWithPast( last_hidden_state=outputs.last_hidden_state, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, image_hidden_states=image_features if pixel_values is not None else None, ) return output if return_dict else output.to_tuple() @dataclass class HeronCausalLMOutputWithPast(ModelOutput): loss: torch.FloatTensor | None = None logits: torch.FloatTensor | None = None past_key_values: list[torch.FloatTensor] | None = None hidden_states: tuple[torch.FloatTensor] | None = None attentions: tuple[torch.FloatTensor] | None = None image_hidden_states: tuple[torch.FloatTensor] | None = None class HeronForConditionalGeneration(HeronPreTrainedModel, GenerationMixin): # type: ignore[misc] _tied_weights_keys = ["lm_head.weight"] # type: ignore[assignment] def __init__(self, config: HeronConfig): super().__init__(config) self.model = HeronModel(config) self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False) self.post_init() def get_input_embeddings(self): return self.model.get_input_embeddings() def set_input_embeddings(self, value): self.model.set_input_embeddings(value) def get_output_embeddings(self) -> nn.Module: return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def get_decoder(self): return self.model.get_decoder() def set_decoder(self, decoder): self.model.set_decoder(decoder) def forward( # type: ignore[misc] self, input_ids: torch.LongTensor | None = None, pixel_values: torch.FloatTensor | None = None, attention_mask: torch.Tensor | None = None, position_ids: torch.LongTensor | None = None, past_key_values: Cache | None = None, inputs_embeds: torch.FloatTensor | None = None, labels: torch.LongTensor | None = None, use_cache: bool | None = None, output_attentions: bool | None = None, output_hidden_states: bool | None = None, return_dict: bool | None = None, cache_position: torch.LongTensor | None = None, logits_to_keep: int | torch.Tensor = 0, **kwargs: Unpack[TransformersKwargs], ) -> tuple | HeronCausalLMOutputWithPast: output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict outputs = self.model( input_ids=input_ids, pixel_values=pixel_values, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=True, cache_position=cache_position, **kwargs, ) hidden_states = outputs[0] # Only compute necessary logits, and do not upcast them to float if we are not computing the loss slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep logits = self.lm_head(hidden_states[:, slice_indices, :]) loss = None if labels is not None: loss = self.loss_function( logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size, **kwargs ) return HeronCausalLMOutputWithPast( loss=loss, logits=logits, past_key_values=outputs.past_key_values, hidden_states=outputs.hidden_states, attentions=outputs.attentions, image_hidden_states=outputs.image_hidden_states, ) def prepare_inputs_for_generation( # type: ignore[override] self, input_ids, past_key_values=None, inputs_embeds=None, pixel_values=None, attention_mask=None, cache_position=None, logits_to_keep=None, **kwargs, ): # Overwritten -- in specific circumstances we don't want to forward image inputs to the model model_inputs = super().prepare_inputs_for_generation( input_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds, attention_mask=attention_mask, cache_position=cache_position, logits_to_keep=logits_to_keep, **kwargs, ) if cache_position is not None and cache_position[0] == 0: # If we're in cached decoding stage, pixel values should be None because input ids do not contain special image token anymore # Otherwise we need pixel values to be passed to model model_inputs["pixel_values"] = pixel_values return model_inputs