Heron-NVILA-Lite-1B-hf / modeling_heron.py
chantera's picture
Upload modeling_heron.py with huggingface_hub
118422c verified
from dataclasses import dataclass
from typing import Unpack, cast
import torch
import torch.nn as nn
from transformers.cache_utils import Cache
from transformers.generation import GenerationMixin
from transformers.modeling_flash_attention_utils import FlashAttentionKwargs
from transformers.modeling_outputs import ModelOutput
from transformers.modeling_utils import PreTrainedModel
from transformers.models.auto import AutoModel
from transformers.utils import TransformersKwargs
from .configuration_heron import HeronConfig
# Copied from https://github.com/NVlabs/VILA/blob/36f6adcd11a10be1580caeb7e647e1b6f8517f89/llava/model/multimodal_projector/base_projector.py#L83
def _flat_square_2x2(x):
n, w, h, c = x.size()
if w % 2 == 1:
x = torch.concat([x, torch.zeros((n, 1, h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous()
n, w, h, c = x.size()
x = x.contiguous()
if h % 2 == 1:
x = torch.concat([x, torch.zeros((n, w, 1, c), dtype=x.dtype).to(x.device)], dim=2).contiguous()
n, w, h, c = x.size()
x = x.view(n, w, int(h / 2), int(c * 2))
x = x.permute(0, 2, 1, 3).contiguous()
x = x.view(n, int(h / 2), int(w / 2), int(c * 4))
x = x.permute(0, 2, 1, 3).contiguous()
return x
class DownSample2x2BlockFix(nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
vit_embeds = x
h = w = int(vit_embeds.shape[1] ** 0.5)
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
vit_embeds = _flat_square_2x2(vit_embeds)
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1])
return vit_embeds
# Copied from https://github.com/NVlabs/VILA/blob/36f6adcd11a10be1580caeb7e647e1b6f8517f89/llava/model/multimodal_projector/base_projector.py#L109
def _flat_square_3x3(x):
n, w, h, c = x.size()
if w % 3 != 0:
x = torch.concat([x, torch.zeros((n, 3 - (w % 3), h, c), dtype=x.dtype).to(x.device)], dim=1).contiguous()
n, w, h, c = x.size()
x = x.contiguous()
if h % 3 != 0:
x = torch.concat([x, torch.zeros((n, w, 3 - (h % 3), c), dtype=x.dtype).to(x.device)], dim=2).contiguous()
n, w, h, c = x.size()
x = x.view(n, w, int(h / 3), int(c * 3))
x = x.permute(0, 2, 1, 3).contiguous()
x = x.view(n, int(h / 3), int(w / 3), int(c * 9))
x = x.permute(0, 2, 1, 3).contiguous()
return x
class DownSample3x3BlockFix(nn.Module):
def forward(self, x: torch.Tensor) -> torch.Tensor:
vit_embeds = x
h = w = int(vit_embeds.shape[1] ** 0.5)
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1)
vit_embeds = _flat_square_3x3(vit_embeds)
vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1])
return vit_embeds
class HeronMultiModalProjector(nn.Module):
def __init__(self, config: HeronConfig):
super().__init__()
mm_projector_type = config.mm_projector_type
mm_hidden_size = config.vision_config.hidden_size
output_hidden_size = config.text_config.hidden_size
if mm_projector_type == "mlp_downsample_2x2_fix":
self.layers = nn.Sequential(
DownSample2x2BlockFix(),
nn.LayerNorm(mm_hidden_size * 4),
nn.Linear(mm_hidden_size * 4, output_hidden_size),
nn.GELU(),
nn.Linear(output_hidden_size, output_hidden_size),
)
self.downsample_rate = 2
elif mm_projector_type == "mlp_downsample_3x3_fix":
self.layers = nn.Sequential(
DownSample3x3BlockFix(),
nn.LayerNorm(mm_hidden_size * 9),
nn.Linear(mm_hidden_size * 9, mm_hidden_size * 3),
nn.GELU(),
nn.LayerNorm(mm_hidden_size * 3),
nn.Linear(mm_hidden_size * 3, output_hidden_size),
nn.GELU(),
nn.Linear(output_hidden_size, output_hidden_size),
)
self.downsample_rate = 3
else:
raise ValueError(f"Unknown projector type: {mm_projector_type}")
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.layers(x)
@dataclass
class HeronModelOutputWithPast(ModelOutput):
last_hidden_state: torch.FloatTensor | None = None
past_key_values: list[torch.FloatTensor] | None = None
hidden_states: tuple[torch.FloatTensor] | None = None
attentions: tuple[torch.FloatTensor] | None = None
image_hidden_states: tuple[torch.FloatTensor] | None = None
class HeronPreTrainedModel(PreTrainedModel):
config: HeronConfig
base_model_prefix = "model"
supports_gradient_checkpointing = True
_skip_keys_device_placement = "past_key_values" # type: ignore[assignment]
_supports_flash_attn = True
_supports_sdpa = True
_can_compile_fullgraph = True
_supports_flex_attn = True
_supports_attention_backend = True
class HeronModel(HeronPreTrainedModel):
def __init__(self, config: HeronConfig):
super().__init__(config)
self.vision_tower: PreTrainedModel = AutoModel.from_config(config.vision_config)
self.multi_modal_projector = HeronMultiModalProjector(config)
self.language_model: PreTrainedModel = AutoModel.from_config(config.text_config)
self.post_init()
def get_input_embeddings(self):
return self.language_model.get_input_embeddings()
def set_input_embeddings(self, value):
self.language_model.set_input_embeddings(value)
def get_decoder(self):
return self.language_model
def set_decoder(self, decoder):
self.language_model = decoder
def get_image_features(self, pixel_values: torch.FloatTensor, vision_feature_layer: int | None = None):
ndim = pixel_values.ndim
assert ndim == 5 or ndim == 4, "pixel_values should be of shape (batch_size, num_tiles, n_channels, height, width) or (batch_size, n_channels, height, width)" # fmt: skip
if ndim == 4:
pixel_values = cast(torch.FloatTensor, pixel_values.unsqueeze(1))
shape = pixel_values.shape
pixel_values = cast(torch.FloatTensor, pixel_values.view(shape[0] * shape[1], *shape[2:]))
if vision_feature_layer is None:
vision_feature_layer = self.config.vision_feature_layer
outputs = self.vision_tower(pixel_values=pixel_values, output_hidden_states=True)
image_features = outputs.hidden_states[vision_feature_layer]
image_features = self.multi_modal_projector(image_features)
image_features = image_features.view(shape[0], shape[1], *image_features.shape[1:])
return image_features
def get_placeholder_mask(
self, input_ids: torch.LongTensor | None, inputs_embeds: torch.FloatTensor, image_features: torch.FloatTensor
):
if input_ids is None:
special_image_mask = inputs_embeds == self.get_input_embeddings()(
torch.tensor(self.config.image_token_id, dtype=torch.long, device=inputs_embeds.device)
)
special_image_mask = special_image_mask.all(-1)
else:
special_image_mask = input_ids == self.config.image_token_id
n_image_tokens = special_image_mask.sum()
special_image_mask = special_image_mask.unsqueeze(-1).expand_as(inputs_embeds).to(inputs_embeds.device)
if not torch.compiler.is_compiling():
if inputs_embeds[special_image_mask].numel() != image_features.numel():
n_image_features = image_features.shape[0] * image_features.shape[1]
raise ValueError(
f"Image features and image tokens do not match: tokens: {n_image_tokens}, features {n_image_features}"
)
return special_image_mask
def forward(
self,
input_ids: torch.LongTensor | None = None,
pixel_values: torch.FloatTensor | None = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None,
past_key_values: Cache | None = None,
inputs_embeds: torch.FloatTensor | None = None,
use_cache: bool | None = None,
output_attentions: bool | None = None,
output_hidden_states: bool | None = None,
return_dict: bool | None = None,
cache_position: torch.LongTensor | None = None,
**kwargs: Unpack[FlashAttentionKwargs],
) -> tuple | HeronModelOutputWithPast:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError("You must specify exactly one of input_ids or inputs_embeds")
if inputs_embeds is None:
inputs_embeds = self.get_input_embeddings()(input_ids)
if pixel_values is not None:
image_features = self.get_image_features(pixel_values)
image_features = image_features.to(inputs_embeds.device, inputs_embeds.dtype)
special_image_mask = self.get_placeholder_mask(
input_ids, inputs_embeds=inputs_embeds, image_features=image_features
)
inputs_embeds = cast(torch.FloatTensor, inputs_embeds.masked_scatter(special_image_mask, image_features))
outputs = self.language_model(
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position,
**kwargs,
)
output = HeronModelOutputWithPast(
last_hidden_state=outputs.last_hidden_state,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=image_features if pixel_values is not None else None,
)
return output if return_dict else output.to_tuple()
@dataclass
class HeronCausalLMOutputWithPast(ModelOutput):
loss: torch.FloatTensor | None = None
logits: torch.FloatTensor | None = None
past_key_values: list[torch.FloatTensor] | None = None
hidden_states: tuple[torch.FloatTensor] | None = None
attentions: tuple[torch.FloatTensor] | None = None
image_hidden_states: tuple[torch.FloatTensor] | None = None
class HeronForConditionalGeneration(HeronPreTrainedModel, GenerationMixin): # type: ignore[misc]
_tied_weights_keys = ["lm_head.weight"] # type: ignore[assignment]
def __init__(self, config: HeronConfig):
super().__init__(config)
self.model = HeronModel(config)
self.lm_head = nn.Linear(config.text_config.hidden_size, config.text_config.vocab_size, bias=False)
self.post_init()
def get_input_embeddings(self):
return self.model.get_input_embeddings()
def set_input_embeddings(self, value):
self.model.set_input_embeddings(value)
def get_output_embeddings(self) -> nn.Module:
return self.lm_head
def set_output_embeddings(self, new_embeddings):
self.lm_head = new_embeddings
def get_decoder(self):
return self.model.get_decoder()
def set_decoder(self, decoder):
self.model.set_decoder(decoder)
def forward( # type: ignore[misc]
self,
input_ids: torch.LongTensor | None = None,
pixel_values: torch.FloatTensor | None = None,
attention_mask: torch.Tensor | None = None,
position_ids: torch.LongTensor | None = None,
past_key_values: Cache | None = None,
inputs_embeds: torch.FloatTensor | None = None,
labels: torch.LongTensor | None = None,
use_cache: bool | None = None,
output_attentions: bool | None = None,
output_hidden_states: bool | None = None,
return_dict: bool | None = None,
cache_position: torch.LongTensor | None = None,
logits_to_keep: int | torch.Tensor = 0,
**kwargs: Unpack[TransformersKwargs],
) -> tuple | HeronCausalLMOutputWithPast:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
outputs = self.model(
input_ids=input_ids,
pixel_values=pixel_values,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
use_cache=use_cache,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=True,
cache_position=cache_position,
**kwargs,
)
hidden_states = outputs[0]
# Only compute necessary logits, and do not upcast them to float if we are not computing the loss
slice_indices = slice(-logits_to_keep, None) if isinstance(logits_to_keep, int) else logits_to_keep
logits = self.lm_head(hidden_states[:, slice_indices, :])
loss = None
if labels is not None:
loss = self.loss_function(
logits=logits, labels=labels, vocab_size=self.config.text_config.vocab_size, **kwargs
)
return HeronCausalLMOutputWithPast(
loss=loss,
logits=logits,
past_key_values=outputs.past_key_values,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
image_hidden_states=outputs.image_hidden_states,
)
def prepare_inputs_for_generation( # type: ignore[override]
self,
input_ids,
past_key_values=None,
inputs_embeds=None,
pixel_values=None,
attention_mask=None,
cache_position=None,
logits_to_keep=None,
**kwargs,
):
# Overwritten -- in specific circumstances we don't want to forward image inputs to the model
model_inputs = super().prepare_inputs_for_generation(
input_ids,
past_key_values=past_key_values,
inputs_embeds=inputs_embeds,
attention_mask=attention_mask,
cache_position=cache_position,
logits_to_keep=logits_to_keep,
**kwargs,
)
if cache_position is not None and cache_position[0] == 0:
# If we're in cached decoding stage, pixel values should be None because input ids do not contain special image token anymore
# Otherwise we need pixel values to be passed to model
model_inputs["pixel_values"] = pixel_values
return model_inputs