import tensorflow as tf from tensorflow.keras import layers, Model import numpy as np class DeCorrelationLoss(tf.keras.layers.Layer): """논문의 정확한 DeCov 정규화 구현""" def __init__(self, lambda_decov=1e-4, **kwargs): super(DeCorrelationLoss, self).__init__(**kwargs) self.lambda_decov = lambda_decov def build(self, input_shape): super(DeCorrelationLoss, self).build(input_shape) def call(self, inputs): batch_size = tf.cast(tf.shape(inputs)[0], tf.float32) # 중심화 inputs_centered = inputs - tf.reduce_mean(inputs, axis=0, keepdims=True) # 공분산 행렬 계산 covariance = tf.matmul(inputs_centered, inputs_centered, transpose_a=True) / (batch_size - 1) # 대각선 제거 covariance_off_diagonal = covariance - tf.linalg.diag(tf.linalg.diag_part(covariance)) # DeCov 손실 decov_loss = 0.5 * tf.reduce_sum(tf.square(covariance_off_diagonal)) self.add_loss(self.lambda_decov * decov_loss) return inputs class MalConv(Model): """논문 정확 사양 MalConv 모델""" def __init__(self, max_input_length=2_000_000, embedding_size=8, filter_size=500, stride=500, num_filters=128, fc_size=128, use_decov=True, lambda_decov=1e-4, **kwargs): super(MalConv, self).__init__(**kwargs) self.max_input_length = max_input_length self.use_decov = use_decov # 논문 정확 사양: 0-255 바이트만 사용 self.embedding = layers.Embedding( input_dim=256, # 수정: 257→256 output_dim=embedding_size, input_length=None, # 가변 길이 지원 mask_zero=False, name='byte_embedding' ) # 게이트 컨볼루션 (논문 Figure 1) self.conv_A = layers.Conv1D( filters=num_filters, kernel_size=filter_size, strides=stride, padding='valid', activation='relu', name='conv_A' ) self.conv_B = layers.Conv1D( filters=num_filters, kernel_size=filter_size, strides=stride, padding='valid', activation='sigmoid', name='conv_B' ) # 전역 최대 풀링 self.global_max_pool = layers.GlobalMaxPooling1D(name='global_max_pool') # 완전연결층 self.fc = layers.Dense(fc_size, activation='relu', name='fc_layer') # DeCov 정규화 if use_decov: self.decov_layer = DeCorrelationLoss(lambda_decov=lambda_decov) self.dropout = layers.Dropout(0.5, name='dropout') self.output_layer = layers.Dense(1, activation='sigmoid', name='output') def call(self, inputs, training=None): # 1. 바이트 임베딩 x = self.embedding(inputs) # 2. 게이트 컨볼루션 (논문 핵심) conv_a = self.conv_A(x) conv_b = self.conv_B(x) gated_conv = layers.multiply([conv_a, conv_b], name='gated_conv') # 3. 전역 최대 풀링 pooled = self.global_max_pool(gated_conv) # 4. 완전연결층 fc_out = self.fc(pooled) # 5. DeCov 정규화 (penultimate layer) if self.use_decov: fc_out = self.decov_layer(fc_out) # 6. 드롭아웃 if training: fc_out = self.dropout(fc_out, training=training) # 7. 출력 output = self.output_layer(fc_out) return output def create_malconv_model (max_input_length=2_000_000): """논문 완전 동일 사양 모델""" model = MalConv(max_input_length=max_input_length) # 논문 정확한 옵티마이저 + 스케줄러 initial_lr = 0.01 lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay( initial_learning_rate=initial_lr, decay_steps=1000, decay_rate=0.96, # 논문에서 언급된 지수 감소 staircase=True ) optimizer = tf.keras.optimizers.SGD( learning_rate=lr_schedule, momentum=0.9, nesterov=True ) model.compile( optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy', tf.keras.metrics.Precision(name='precision'), tf.keras.metrics.Recall(name='recall'), tf.keras.metrics.AUC(name='auc')] ) return model