|
|
"""MagicBERT model implementation for HuggingFace transformers. |
|
|
|
|
|
This module provides HuggingFace-compatible implementations of MagicBERT, |
|
|
a BERT-style model trained for binary file type understanding. |
|
|
""" |
|
|
|
|
|
import math |
|
|
from dataclasses import dataclass |
|
|
from typing import Optional, Tuple, Union |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
from transformers import PreTrainedModel |
|
|
from transformers.modeling_outputs import ( |
|
|
MaskedLMOutput, |
|
|
SequenceClassifierOutput, |
|
|
BaseModelOutput, |
|
|
) |
|
|
|
|
|
try: |
|
|
from .configuration_magic_bert import MagicBERTConfig |
|
|
except ImportError: |
|
|
from configuration_magic_bert import MagicBERTConfig |
|
|
|
|
|
|
|
|
class MagicBERTEmbeddings(nn.Module): |
|
|
"""MagicBERT embeddings: token + position embeddings.""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__() |
|
|
self.token_embeddings = nn.Embedding( |
|
|
config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id |
|
|
) |
|
|
self.position_embeddings = nn.Embedding( |
|
|
config.max_position_embeddings, config.hidden_size |
|
|
) |
|
|
self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
|
|
self.register_buffer( |
|
|
"position_ids", |
|
|
torch.arange(config.max_position_embeddings).expand((1, -1)), |
|
|
persistent=False, |
|
|
) |
|
|
|
|
|
def forward(self, input_ids: torch.Tensor) -> torch.Tensor: |
|
|
batch_size, seq_length = input_ids.shape |
|
|
token_embeds = self.token_embeddings(input_ids) |
|
|
position_ids = self.position_ids[:, :seq_length] |
|
|
position_embeds = self.position_embeddings(position_ids) |
|
|
embeddings = token_embeds + position_embeds |
|
|
embeddings = self.layer_norm(embeddings) |
|
|
embeddings = self.dropout(embeddings) |
|
|
return embeddings |
|
|
|
|
|
|
|
|
class MagicBERTAttention(nn.Module): |
|
|
"""Multi-head self-attention.""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__() |
|
|
self.num_attention_heads = config.num_attention_heads |
|
|
self.attention_head_size = config.hidden_size // config.num_attention_heads |
|
|
self.all_head_size = self.num_attention_heads * self.attention_head_size |
|
|
|
|
|
self.query = nn.Linear(config.hidden_size, self.all_head_size) |
|
|
self.key = nn.Linear(config.hidden_size, self.all_head_size) |
|
|
self.value = nn.Linear(config.hidden_size, self.all_head_size) |
|
|
self.dropout = nn.Dropout(config.attention_probs_dropout_prob) |
|
|
|
|
|
def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor: |
|
|
new_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) |
|
|
x = x.view(new_shape) |
|
|
return x.permute(0, 2, 1, 3) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
query_layer = self.transpose_for_scores(self.query(hidden_states)) |
|
|
key_layer = self.transpose_for_scores(self.key(hidden_states)) |
|
|
value_layer = self.transpose_for_scores(self.value(hidden_states)) |
|
|
|
|
|
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) |
|
|
attention_scores = attention_scores / math.sqrt(self.attention_head_size) |
|
|
|
|
|
if attention_mask is not None: |
|
|
attention_mask = attention_mask[:, None, None, :] |
|
|
attention_scores = attention_scores + (1.0 - attention_mask) * -10000.0 |
|
|
|
|
|
attention_probs = F.softmax(attention_scores, dim=-1) |
|
|
attention_probs = self.dropout(attention_probs) |
|
|
context = torch.matmul(attention_probs, value_layer) |
|
|
context = context.permute(0, 2, 1, 3).contiguous() |
|
|
new_shape = context.size()[:-2] + (self.all_head_size,) |
|
|
context = context.view(new_shape) |
|
|
return context |
|
|
|
|
|
|
|
|
class MagicBERTLayer(nn.Module): |
|
|
"""Single transformer layer.""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__() |
|
|
self.attention = MagicBERTAttention(config) |
|
|
self.attention_output = nn.Linear(config.hidden_size, config.hidden_size) |
|
|
self.attention_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.attention_dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
|
|
self.intermediate = nn.Linear(config.hidden_size, config.intermediate_size) |
|
|
self.output = nn.Linear(config.intermediate_size, config.hidden_size) |
|
|
self.output_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) |
|
|
self.output_dropout = nn.Dropout(config.hidden_dropout_prob) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
|
|
|
attention_output = self.attention(hidden_states, attention_mask) |
|
|
attention_output = self.attention_output(attention_output) |
|
|
attention_output = self.attention_dropout(attention_output) |
|
|
attention_output = self.attention_norm(hidden_states + attention_output) |
|
|
|
|
|
|
|
|
intermediate_output = self.intermediate(attention_output) |
|
|
intermediate_output = F.gelu(intermediate_output) |
|
|
layer_output = self.output(intermediate_output) |
|
|
layer_output = self.output_dropout(layer_output) |
|
|
layer_output = self.output_norm(attention_output + layer_output) |
|
|
return layer_output |
|
|
|
|
|
|
|
|
class MagicBERTEncoder(nn.Module): |
|
|
"""Stack of transformer layers.""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__() |
|
|
self.layers = nn.ModuleList( |
|
|
[MagicBERTLayer(config) for _ in range(config.num_hidden_layers)] |
|
|
) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
for layer in self.layers: |
|
|
hidden_states = layer(hidden_states, attention_mask) |
|
|
return hidden_states |
|
|
|
|
|
|
|
|
class MagicBERTPreTrainedModel(PreTrainedModel): |
|
|
"""Base class for MagicBERT models.""" |
|
|
|
|
|
config_class = MagicBERTConfig |
|
|
base_model_prefix = "magic_bert" |
|
|
supports_gradient_checkpointing = False |
|
|
|
|
|
def _init_weights(self, module): |
|
|
if isinstance(module, nn.Linear): |
|
|
module.weight.data.normal_(mean=0.0, std=0.02) |
|
|
if module.bias is not None: |
|
|
module.bias.data.zero_() |
|
|
elif isinstance(module, nn.Embedding): |
|
|
module.weight.data.normal_(mean=0.0, std=0.02) |
|
|
if module.padding_idx is not None: |
|
|
module.weight.data[module.padding_idx].zero_() |
|
|
elif isinstance(module, nn.LayerNorm): |
|
|
module.bias.data.zero_() |
|
|
module.weight.data.fill_(1.0) |
|
|
|
|
|
|
|
|
class MagicBERTModel(MagicBERTPreTrainedModel): |
|
|
"""MagicBERT base model outputting raw hidden states.""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
self.embeddings = MagicBERTEmbeddings(config) |
|
|
self.encoder = MagicBERTEncoder(config) |
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
token_type_ids: Optional[torch.Tensor] = None, |
|
|
return_dict: Optional[bool] = None, |
|
|
) -> Union[Tuple[torch.Tensor, torch.Tensor], BaseModelOutput]: |
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
hidden_states = self.embeddings(input_ids) |
|
|
sequence_output = self.encoder(hidden_states, attention_mask) |
|
|
pooled_output = sequence_output[:, 0, :] |
|
|
|
|
|
if not return_dict: |
|
|
return (sequence_output, pooled_output) |
|
|
|
|
|
return BaseModelOutput( |
|
|
last_hidden_state=sequence_output, |
|
|
hidden_states=None, |
|
|
attentions=None, |
|
|
) |
|
|
|
|
|
|
|
|
class MagicBERTForMaskedLM(MagicBERTPreTrainedModel): |
|
|
"""MagicBERT for masked language modeling (fill-mask task).""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
self.embeddings = MagicBERTEmbeddings(config) |
|
|
self.encoder = MagicBERTEncoder(config) |
|
|
self.mlm_head = nn.Linear(config.hidden_size, config.vocab_size) |
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
token_type_ids: Optional[torch.Tensor] = None, |
|
|
labels: Optional[torch.Tensor] = None, |
|
|
return_dict: Optional[bool] = None, |
|
|
) -> Union[Tuple[torch.Tensor, ...], MaskedLMOutput]: |
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
hidden_states = self.embeddings(input_ids) |
|
|
sequence_output = self.encoder(hidden_states, attention_mask) |
|
|
logits = self.mlm_head(sequence_output) |
|
|
|
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fct = nn.CrossEntropyLoss(ignore_index=-100) |
|
|
loss = loss_fct(logits.view(-1, self.config.vocab_size), labels.view(-1)) |
|
|
|
|
|
if not return_dict: |
|
|
output = (logits,) |
|
|
return ((loss,) + output) if loss is not None else output |
|
|
|
|
|
return MaskedLMOutput( |
|
|
loss=loss, |
|
|
logits=logits, |
|
|
hidden_states=None, |
|
|
attentions=None, |
|
|
) |
|
|
|
|
|
def get_embeddings( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
pooling: str = "cls", |
|
|
) -> torch.Tensor: |
|
|
"""Get embeddings for downstream tasks. |
|
|
|
|
|
Args: |
|
|
input_ids: Input token IDs |
|
|
attention_mask: Attention mask |
|
|
pooling: Pooling strategy ("cls" or "mean") |
|
|
|
|
|
Returns: |
|
|
Pooled embeddings [batch_size, hidden_size] |
|
|
""" |
|
|
hidden_states = self.embeddings(input_ids) |
|
|
sequence_output = self.encoder(hidden_states, attention_mask) |
|
|
|
|
|
if pooling == "cls": |
|
|
return sequence_output[:, 0, :] |
|
|
elif pooling == "mean": |
|
|
if attention_mask is not None: |
|
|
mask = attention_mask.unsqueeze(-1).float() |
|
|
return (sequence_output * mask).sum(1) / mask.sum(1).clamp(min=1e-9) |
|
|
return sequence_output.mean(dim=1) |
|
|
else: |
|
|
raise ValueError(f"Unknown pooling: {pooling}") |
|
|
|
|
|
|
|
|
class MagicBERTForSequenceClassification(MagicBERTPreTrainedModel): |
|
|
"""MagicBERT for sequence classification (file type classification).""" |
|
|
|
|
|
def __init__(self, config: MagicBERTConfig): |
|
|
super().__init__(config) |
|
|
self.config = config |
|
|
self.num_labels = getattr(config, "num_labels", 106) |
|
|
|
|
|
self.embeddings = MagicBERTEmbeddings(config) |
|
|
self.encoder = MagicBERTEncoder(config) |
|
|
|
|
|
|
|
|
projection_dim = getattr(config, "contrastive_projection_dim", 256) |
|
|
self.projection = nn.Sequential( |
|
|
nn.Linear(config.hidden_size, config.hidden_size), |
|
|
nn.ReLU(), |
|
|
nn.Linear(config.hidden_size, projection_dim), |
|
|
) |
|
|
self.classifier = nn.Linear(projection_dim, self.num_labels) |
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
token_type_ids: Optional[torch.Tensor] = None, |
|
|
labels: Optional[torch.Tensor] = None, |
|
|
return_dict: Optional[bool] = None, |
|
|
) -> Union[Tuple[torch.Tensor, ...], SequenceClassifierOutput]: |
|
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
hidden_states = self.embeddings(input_ids) |
|
|
sequence_output = self.encoder(hidden_states, attention_mask) |
|
|
pooled_output = sequence_output[:, 0, :] |
|
|
|
|
|
projections = self.projection(pooled_output) |
|
|
projections = F.normalize(projections, p=2, dim=1) |
|
|
logits = self.classifier(projections) |
|
|
|
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fct = nn.CrossEntropyLoss() |
|
|
loss = loss_fct(logits, labels) |
|
|
|
|
|
if not return_dict: |
|
|
output = (logits,) |
|
|
return ((loss,) + output) if loss is not None else output |
|
|
|
|
|
return SequenceClassifierOutput( |
|
|
loss=loss, |
|
|
logits=logits, |
|
|
hidden_states=None, |
|
|
attentions=None, |
|
|
) |
|
|
|
|
|
def get_embeddings( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
) -> torch.Tensor: |
|
|
"""Get normalized projection embeddings for similarity search.""" |
|
|
hidden_states = self.embeddings(input_ids) |
|
|
sequence_output = self.encoder(hidden_states, attention_mask) |
|
|
pooled_output = sequence_output[:, 0, :] |
|
|
projections = self.projection(pooled_output) |
|
|
return F.normalize(projections, p=2, dim=1) |
|
|
|