Patrick's 데이터 세상

Transformer Encoder 카테고리 분류 개발 후기 - 2. train 본문

Deep Learning/프로젝트 후기

Transformer Encoder 카테고리 분류 개발 후기 - 2. train

patrick610 2022. 3. 5. 19:25
반응형
SMALL

 

 

 

2022.02.27 - [Deep Learning/NLP(Natural Language Processing)] - Transformer Encoder 카테고리 분류 개발 후기 - 1. Tokenizer

 

Transformer Encoder 카테고리 분류 개발 후기 - 1. Tokenizer

그동안 회사에서 작업했던 30개 카테고리 class 분류 모델 개발 과정 및 삽질 과정을 기록하려고 한다. 전체 프로세스에 대한 공유 목적이자 다시 공부하려는 목적이기도 하다. https://github.com/hipste

hipster4020.tistory.com

 

 

 

Transformer Encoder 카테고리 분류 모델 1부에서 BPE Tokenizer 학습에 대해 정리해보았다.

2부에서는 model train 전체 프로세스에 대해 정리해보려고 한다!


 

👉🏻 작업 환경

GPU type NVIDIA GeForce RTX 3090
CPU count 48
GPU count 4
CUDA Version 11.4
Python Version 3.6.9
OS Docker tensorflow - Ubuntu

 

 

👉🏻 Issue

해당 분류 모델의 목적은 스크랩된 뉴스 본문 기사를 30개 클래스 카테고리로 분류하는 것이었다.
클래스는 각 기사당 최대 3개까지 예측하기 위해 30개 클래스에 대해 모두 각각의 0~1까지 범위의 확률을 낼 수 있도록 각각의 class에 sigmoid를 적용하여 예측값이 0.8 이상인 값 중 3개를 추출하고 1개도 없으면 etc로 분류하였다.
따라서, 각 기사 당 predict label은 1~3개가 된다.

모델은 Transformer의 self-attn과 feedforward network로 구성된 EncoderLayer모델을 활용하였다.

 

 

 

👉🏻 dataloader

huggingface datasets을 토대로 dataloader pipeline을 구축하였다.

 

 

train.py

import os
import numpy as np
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import hydra
from transformers import AutoTokenizer
from dataloader import default_collator, load

# 1
tokenizer = AutoTokenizer.from_pretrained(cfg.ETC.tokenizer_dir)
train_dataset, eval_dataset = load(tokenizer=tokenizer, **cfg.DATASETS)

dataloader.py

def load(
    # 2
    tokenizer,
    seq_len,
    train_data_path: str,
    eval_data_path: Optional[str] = None,
    train_test_split: Optional[float] = None,
    worker: int = 1,
    batch_size: int = 1000,
    shuffle_seed: Optional[int] = None,
):
    def _tokenize_function(sample):
        tokenized = dict()

        e = tokenizer(
            sample["content"],
            max_length=seq_len,
            padding="max_length",
            truncation=True,
            return_tensors="np",
        )
        tokenized["input_ids"] = e["input_ids"]
        tokenized["attention_mask"] = e["attention_mask"]

        label = np.zeros((len(sample["category"]), 30))

        for i, c in enumerate(sample["category"]):
            for j in c:
                label[i, j] = 1

        tokenized["labels"] = label

        return tokenized

    train_data_path = abspath(train_data_path)
    is_eval = False
    _, extention = splitext(train_data_path)

    datafiles = {"train": train_data_path}
    if eval_data_path is not None:
        assert (
            train_test_split is None
        ), "Only one of eval_data_path and train_test_split must be entered."
        datafiles["test"] = abspath(eval_data_path)
        is_eval = True

    if train_test_split is not None:
        assert (
            0.0 < train_test_split < 1.0
        ), "train_test_split must be a value between 0 and 1"
        train_test_split = int(train_test_split * 100)
        train_test_split = {
            "train": f"train[:{train_test_split}%]",
            "test": f"train[{train_test_split}%:]",
        }
        is_eval = True

    data = load_dataset(
        extention.replace(".", ""), data_files=datafiles, split=train_test_split
    )
    if shuffle_seed is not None:
        data = data.shuffle(seed=shuffle_seed)

    data = data.map(
        _tokenize_function,
        batched=True,
        batch_size=batch_size,
        num_proc=worker,
        remove_columns=data["train"].column_names,
    )

    return data["train"], (data["test"] if is_eval else None)

1. train된 BPE Tokenizer를 load해서 hydra로 config.yml의 DATASETS을 가져온 kwargs parameter와 함께 dataloader단으로 넘긴다.

2. 그러면 dataloader에서 tokenizing한 데이터를 DATASETS parameter(train_data_path, eval_data_path, train_test_split, worker, batch_size, shuffle_seed, seq_len)에 의해 생성된 train_dataset과 eval_dataset을 받도록 pipeline 구축하였다.

 

 

 

 

👉🏻 model

model은 총 3개 파일(MainModels.py, TransformderLayer.py, UtilLayers.py)로 구성하였다.

 

 

MainModels.py

class EncoderModel(tf.keras.Model):
    def __init__(
        self,
        embedding_size: int,
        d_model: int,
        num_heads: int,
        num_layers: int,
        vocab_size: int,
        num_classes: int,
        seq_len: int,
        pe: int = 1024,
        rate: float = 0.1,
    ):
        super(EncoderModel, self).__init__()

        self.embedding_size = embedding_size
        self.d_model = d_model
        self.num_heads = num_heads
        self.num_layers = num_layers
        self.vocab_size = vocab_size
        self.num_classes = num_classes
        self.seq_len = seq_len
        self.pe = pe
        self.rate = rate

        # 1
        self.embedding = PositionalEmbedding(vocab_size, embedding_size, pe)
        if embedding_size != d_model:
            self.embedding_intermediate = tf.keras.layers.Dense(d_model)

        self.encoders = [
            EncoderLayer(d_model, num_heads, rate) for _ in range(self.num_layers)
        ]
        self.intermediate_layers = [
            tf.keras.layers.Dense(self.d_model, activation="gelu")
            for _ in range(self.num_layers)
        ]
        self.dropout = [
            tf.keras.layers.Dropout(rate=self.rate) for _ in range(self.num_layers)
        ]
        self.pooling_embedding = tf.keras.layers.Embedding(1, self.d_model)
        self.output_layer = tf.keras.layers.Dense(
            self.num_classes, activation="sigmoid"
        )

 

1. MainModels EncoderModel의 생성자에서는 위치 정보에 해당하는 UtilLayers의 Positional Encoding으로 layer를 만든 PositionalEmbedding를 가져와 embeddingLayer로 넣고 Transformer의 Encoder부분에 해당하는 TransformerLayersEncoderLayerencoders로 가져왔다.

intermediate_layers 중간 계층의 활성화 함수로 zoneout 되고 relu보다 더 빠르게 수렴하고, 낮은 오차를 보이는 gelu를 택한다.

0 또는 1로 이루어진 마스크를 stochastic 하게 곱하면서도 stochasticity를 x의 부호가 아닌 값에 의해서 정하게 된다.

아래는 gelu의 수식이다.

pooling_embedding으로 1차원으로 flatten하게 하고 최종 output_layer에 클래스 갯수 30,  일정 값을 기준으로 0인지 1인지 구분할 목적으로 활성화 함수 sigmoid를 설정한다.

 

 

 

def call(
        self,
        inputs,
        # input_ids,
        # attention_mask=None,
        training=False,
        **kwargs,
    ):
        input_ids = inputs["input_ids"]
        attention_mask = inputs.get("attention_mask")

        # forward
        encoder_output = self.embedding(
            input_ids
        )  # shape [batch_size, sequance_length]
        if self.embedding_size != self.d_model:
            encoder_output = self.embedding_intermediate(encoder_output)

        for i in range(self.num_layers):

            encoder_output = self.encoders[i](
                encoder_output, attention_mask, training=training
            )
            encoder_output = self.intermediate_layers[i](encoder_output)
            encoder_output = self.dropout[i](encoder_output, training=training)
        # shape [batch_size, sequance_length, d_model]

        # add pooling layer
        pooling_code = self.pooling_embedding(  # code_embedding
            tf.range(1, dtype=tf.int32)
        )  # shape: (batch_size, 1, d_model)
        encoder_output = self.dot_attention(
            pooling_code, encoder_output, encoder_output, mask=attention_mask
        )  # shape: (batch_size, 1, d_model). query의 shape를 따라감.
        encoder_output = tf.squeeze(
            encoder_output, [1]
        )  # == tf.keras.Layers.Flatten()(encoder_output)

        output = self.output_layer(
            encoder_output
        )  # shape [batch_size, sequance_length, num_classes]
        return output
def dot_attention(self, q, k, v, mask=None):
    # 2
    logits = tf.matmul(q, k, transpose_b=True)

    if mask is not None:
        logits += tf.cast((1 - mask[:, tf.newaxis, :]), tf.float32) * -1e9

    attention_weights = tf.nn.softmax(logits, axis=-1)
    output = tf.matmul(attention_weights, v)

    return output

2. 추가적으로 call method 부분에 차원을 맞춰주기 위한 목적으로 Dot-Product Attention을 구현하여 적용하였다.

아래 Scaled Dot-Product Attention에서 scale이 빠진 구조.

polling layer, encoder ouput 행렬 연산(내적 연산)을 한 후, attention mask를 받아 mask를 취한다.

softmax로 확률 값으로 변환시켜주고 Value와 행렬곱을 해줘서 각 콘텍스트 벡터를 구해준다.

 

 

👉🏻 Trainer

 

train.py

from metrics import CustomF1Score, accuracy, loss
from models.MainModels import EncoderModel
from trainer import TrainArgument, Trainer

# 1
args = TrainArgument(**cfg.TRAINARGS)
with args.strategy.scope():
    # 2
    model = EncoderModel(vocab_size=tokenizer.vocab_size, **cfg.MODEL)
    model(model._get_sample_data())

    # 3
    weights = np.load(cfg.ETC.weights_dir)
    model.embedding.embedding.set_weights([weights])
    model.embedding.trainable = False

    metrics = [
        accuracy,
        CustomF1Score(cfg.MODEL.num_classes, average="micro"),
    ]

# 4
trainer = Trainer(
    model,
    args,
    train_dataset,
    loss,
    eval_dataset=eval_dataset,
    data_collator=default_collator,
    metrics=metrics,
)

trainer.train()

model.save(cfg.ETC.output_dir)

1. 먼저 TrainingArgument에 train_batch_size, eval_batch_size, epoch, signature, checkpoint_dir, logging_dir, warmup_step 등을 받아 args로 세팅.

args.strategy.scope()로 분산 학습 처리 진행.

 

2. model은 MainModel단에서 EncoderModel을 가져와 tokenizer의 vocab_size MODEL config parameter 등으로 불러온다.

dataset으로 받은 train_dataseteval_dataset, model, metrics으로 loss(tensorflow binary_crossentropy), F1Score (tensorflow_addons micro F1Score), accuracy(tensorflow binary_accuracy)를 담는다.

 

3. 추가적으로 직접 train한 tokenizer로 word2vec로 학습한 weight를 numpy로 불러와 별도로 weight를 embedding에 setting하였다. 코드 참고

 

4. 위에 설명된 trainer에 해당하는 parameter들이 세팅되면 trainer.train으로 학습 진행한다.

 

trainer.py

구축한 trainer pipeline 사용함.

 

그리고 MLOps로 WandB를  사용하였다.

학습 결과에 대해 실시간으로 metrics, loss 등 측정 지표를 다양한 플랫폼에서 접근할 수 있는 장점이 있고, gpu, cpu 사용량, gpu 온도 등을 실시간으로 볼 수 있는 것도 매우 좋다.

 

 

 

👉🏻 결과

transformer encoder으로 category 30개 class 분류 모델을 만드는 과정에서 삽질도 많이 하고 finetuning 과정에서 어려움도 많이 겪었다...

꾸준한 모델링과 학습이 필요할 것 같다.

 

 

 

반응형
LIST
Comments