ラビットチャレンジレポート 深層学習Day4 その3
Section 5 Transformer
5-1 Seq2seq
seq2seqとは
系列(sequence)を入力として、系列を出すもの。Encoder-Decoderモデルとも呼ばれる。
入力系列が内部状態に変換(encode)され、内部状態から系列に変換(decode)される。
実応用上も、入力・出力ともに系列情報的なものが多く、自然言語処理に応用される。
-翻訳
-音声認識
-チャットボット
seq2seqの仕組み
seq2seqはRNNを使用した言語モデルを二つ連結した形になっている。
- 言語モデルとは
言語モデルとは、単語の並びに確率を与えるものである。
単語の並びに尤度、すなわち文章として自然化を確率で評価する。
具体的な手法としては、時刻t-1までの上方で、時刻tの事後確率を求め、同時確率を計算する。
- RNN
RNNは、系列情報を処理して内部状態に変換することが出来る。
- RNN×言語モデル
各地点で次のどの単語が来れば自然(事後確率が最大)かを出力できる。
言語モデルを再現するようにRNNの重みが学習されていれば、ある時点の次の単語を予測することが出来る。
先頭単語を与えれば、文章を生成することも可能になる。
Seq2seqの構造
Encoder側はRNN×言語モデルで内部状態を生成する。
Decoder側はRNN×言語モデルで構造はほぼ同じだが、隠れ状態の初期値にEncoderの内部状態を受け取り、文章を生成する。
学習に関しては、Decoderのoutput側に正解を与えれば、end2endで教師あり学習が行える。
- Teacher Forcing
Q:下記のモデルで起こる問題は?
A:Encoder側から渡された内部状態から生成した出力を次段に入力し、その出力をその次の段に入力し..ということを時系列で繰り返すので、ある段で誤差が生じると、時系列が進むにつれて誤差が増幅してしまうという問題がある。
そこで、正解ラベルを直接Decoderの入力にするというTeacher Forcingの手法が考案された。
Q:上記のTeacher Forcingのモデルで起こる問題は?
A:実際の推論では、教師データをdecoderに与えないため、学習時にはうまく収束したモデルでも、実際の推論では期待通りの動作をしない場合があるという問題がある。
対策として、一定の確率に基づいてTeacher forcingを与える場合とそうでない場合を混ぜて学習をするというScheduled Sampling手法が考案された。
実装演習
seq2seqのコードを実装する。
まず、必要なライブラリのインポートと環境設定をする。
注意点としては、wheel.pep425tagsが最新バージョンには無いため、wheel=0.34.1にダウングレードする必要があること、pytorchは旧バージョンはリンクが消えている場合があるので、最新版をインストールすることである。
pip install wheel==0.34.1 pip install pytorch
from os import path from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag()) accelerator = 'cu80' if path.exists('/opt/bin/nvidia-smi') else 'cpu' #!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.0-{platform}-linux_x86_64.whl torchvision import torch print(torch.__version__) print(torch.cuda.is_available()) ! wget https://www.dropbox.com/s/9narw5x4uizmehh/utils.py ! mkdir images data # data取得 ! wget https://www.dropbox.com/s/o4kyc52a8we25wy/dev.en -P data/ ! wget https://www.dropbox.com/s/kdgskm5hzg6znuc/dev.ja -P data/ ! wget https://www.dropbox.com/s/gyyx4gohv9v65uh/test.en -P data/ ! wget https://www.dropbox.com/s/hotxwbgoe2n013k/test.ja -P data/ ! wget https://www.dropbox.com/s/5lsftkmb20ay9e1/train.en -P data/ ! wget https://www.dropbox.com/s/ak53qirssci6f1j/train.ja -P data/ import random import numpy as np from sklearn.model_selection import train_test_split from sklearn.utils import shuffle from nltk import bleu_score import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.nn.utils.rnn import pad_packed_sequence, pack_padded_sequence from utils import Vocab # デバイスの設定 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.manual_seed(1) random_state = 42 print(torch.__version__)
次にデータセットの準備をする。今回は英語-日本語の対訳コーパスであるTanaka Corpus(
Tanaka Corpus - EDRDG Wiki )を使用する。
今回は、そのうちの一部分を取り出したsmall_parallel_enja: 50k En/Ja Parallel Corpus for Testing SMT Methods ( https://github.com/odashi/small_parallel_enja )を使用する。
データの読み込みと単語の分割をする
def load_data(file_path): # テキストファイルからデータを読み込むメソッド data = [] for line in open(file_path, encoding='utf-8'): words = line.strip().split() # スペースで単語を分割 data.append(words) return data train_X = load_data('./data/train.en') train_Y = load_data('./data/train.ja') # 訓練データと検証データに分割 train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state)
この時点で入力と教師データは以下のようになっている。
print('train data', train_X[0]) print('valid data', valid_X[0])
train data ['where', 'shall', 'we', 'eat', 'tonight', '?'] valid data ['you', 'may', 'extend', 'your', 'stay', 'in', 'tokyo', '.']
単語辞書の作成をする。
データセットに登場する各単語にIDを割り振る。
ここで、特殊文字のPADは、バッチの大きさをそろえるためにダミー。BOSは系列の始まり、EOSは系列の終わりを表す。
UNKは語彙に存在しない単語を表す。
# まず特殊トークンを定義しておく PAD_TOKEN = '<PAD>' # バッチ処理の際に、短い系列の末尾を埋めるために使う (Padding) BOS_TOKEN = '<S>' # 系列の始まりを表す (Beggining of sentence) EOS_TOKEN = '</S>' # 系列の終わりを表す (End of sentence) UNK_TOKEN = '<UNK>' # 語彙に存在しない単語を表す (Unknown) PAD = 0 BOS = 1 EOS = 2 UNK = 3 MIN_COUNT = 2 # 語彙に含める単語の最低出現回数 再提出現回数に満たない単語はUNKに置き換えられる # 単語をIDに変換する辞書の初期値を設定 word2id = { PAD_TOKEN: PAD, BOS_TOKEN: BOS, EOS_TOKEN: EOS, UNK_TOKEN: UNK, } # 単語辞書を作成 vocab_X = Vocab(word2id=word2id) vocab_Y = Vocab(word2id=word2id) vocab_X.build_vocab(train_X, min_count=MIN_COUNT) vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT) vocab_size_X = len(vocab_X.id2word) vocab_size_Y = len(vocab_Y.id2word) print('入力言語の語彙数:', vocab_size_X) print('出力言語の語彙数:', vocab_size_Y)
語彙数は下記である。
入力言語の語彙数: 3725 出力言語の語彙数: 4405
vocab_X.id2wordを見るとIDが0~3の語句は特殊文字に割り当てられ、4以下が文章に出てくる単語に割り当てられている。
次にテンソルへ変換する。
まず、モデルが文章を認識できるように文章を単語IDのリストに変換する。
def sentence_to_ids(vocab, sentence): # 単語(str)のリストをID(int)のリストに変換する関数 ids = [vocab.word2id.get(word, UNK) for word in sentence] ids += [EOS] # EOSを加える return ids train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X] train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y] valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X] valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y]
この時点で入力と教師データは以下のようになっている。
train data [132, 321, 28, 290, 367, 12, 2] valid data [8, 93, 3532, 36, 236, 13, 284, 4, 2]
データセットからバッチを取得するデータローダーを定義する。
この際、ながさのことなる複数の系列をバッチで並列に扱えるように、短い系列にはダミーでパディングし、パッチ内の系列の長さを合わせる。
(batch_size,max_length)サイズの行列を得るが、実際にモデルを学習する時にはバッチを跨いで各時刻ごとにすすめるので、転置する。
def pad_seq(seq, max_length): # 系列(seq)が指定の文長(max_length)になるように末尾をパディングする res = seq + [PAD for i in range(max_length - len(seq))] return res class DataLoader(object): def __init__(self, X, Y, batch_size, shuffle=False): """ :param X: list, 入力言語の文章(単語IDのリスト)のリスト :param Y: list, 出力言語の文章(単語IDのリスト)のリスト :param batch_size: int, バッチサイズ :param shuffle: bool, サンプルの順番をシャッフルするか否か """ self.data = list(zip(X, Y)) self.batch_size = batch_size self.shuffle = shuffle self.start_index = 0 self.reset() def reset(self): if self.shuffle: # サンプルの順番をシャッフルする self.data = shuffle(self.data, random_state=random_state) self.start_index = 0 # ポインタの位置を初期化する def __iter__(self): return self def __next__(self): # ポインタが最後まで到達したら初期化する if self.start_index >= len(self.data): self.reset() raise StopIteration() # バッチを取得 seqs_X, seqs_Y = zip(*self.data[self.start_index:self.start_index+self.batch_size]) # 入力系列seqs_Xの文章の長さ順(降順)に系列ペアをソートする seq_pairs = sorted(zip(seqs_X, seqs_Y), key=lambda p: len(p[0]), reverse=True) seqs_X, seqs_Y = zip(*seq_pairs) # 短い系列の末尾をパディングする lengths_X = [len(s) for s in seqs_X] # 後述のEncoderのpack_padded_sequenceでも用いる lengths_Y = [len(s) for s in seqs_Y] max_length_X = max(lengths_X) max_length_Y = max(lengths_Y) padded_X = [pad_seq(s, max_length_X) for s in seqs_X] padded_Y = [pad_seq(s, max_length_Y) for s in seqs_Y] # tensorに変換し、転置する batch_X = torch.tensor(padded_X, dtype=torch.long, device=device).transpose(0, 1) batch_Y = torch.tensor(padded_Y, dtype=torch.long, device=device).transpose(0, 1) # ポインタを更新する self.start_index += self.batch_size return batch_X, batch_Y, lengths_X
次にモデルの構築をする。
EncoderとDecoderのRNNを定義する。
PackedSequenceというクラスを使用する。これはパディング部分の計算を省略できるため、可変長の系列のバッチを効率よく計算できる。
また、出力されたテンソルはpad_packed_sequenceを用いて通常のテンソルに戻す。
まず、Encoderを構築する。
Encoder側でバッチを処理する際に、pack_padded_sequence関数によってpacked_sequenceに変換し、処理を終えた後にpad_packed_sequence関数によってtensorに戻す。
class Encoder(nn.Module): def __init__(self, input_size, hidden_size): """ :param input_size: int, 入力言語の語彙数 :param hidden_size: int, 隠れ層のユニット数 """ super(Encoder, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size, padding_idx=PAD) self.gru = nn.GRU(hidden_size, hidden_size) def forward(self, seqs, input_lengths, hidden=None): """ :param seqs: tensor, 入力のバッチ, size=(max_length, batch_size) :param input_lengths: 入力のバッチの各サンプルの文長 :param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される :return output: tensor, Encoderの出力, size=(max_length, batch_size, hidden_size) :return hidden: tensor, Encoderの隠れ状態, size=(1, batch_size, hidden_size) """ emb = self.embedding(seqs) # seqsはパディング済み packed = pack_padded_sequence(emb, input_lengths) # PackedSequenceオブジェクトに変換 output, hidden = self.gru(packed, hidden) output, _ = pad_packed_sequence(output) return output, hidden
つぎにDecoder側を構築する。
今回はDecoder側でパディングを行わないので、通常のtensorのままRNNに入力する。
class Decoder(nn.Module): def __init__(self, hidden_size, output_size): """ :param hidden_size: int, 隠れ層のユニット数 :param output_size: int, 出力言語の語彙数 :param dropout: float, ドロップアウト率 """ super(Decoder, self).__init__() self.hidden_size = hidden_size self.output_size = output_size self.embedding = nn.Embedding(output_size, hidden_size, padding_idx=PAD) self.gru = nn.GRU(hidden_size, hidden_size) self.out = nn.Linear(hidden_size, output_size) def forward(self, seqs, hidden): """ :param seqs: tensor, 入力のバッチ, size=(1, batch_size) :param hidden: tensor, 隠れ状態の初期値, Noneの場合は0で初期化される :return output: tensor, Decoderの出力, size=(1, batch_size, output_size) :return hidden: tensor, Decoderの隠れ状態, size=(1, batch_size, hidden_size) """ emb = self.embedding(seqs) output, hidden = self.gru(emb, hidden) output = self.out(output) return output, hidden
次に、一連の処理をまとめるEncoderDecoderのクラスを定義する。
Scheduled Samplingを採用し、一定の確率でターゲット系列を入力とするかどうかを切り替えられるようにクラスを定義しておきます。
class EncoderDecoder(nn.Module): """EncoderとDecoderの処理をまとめる""" def __init__(self, input_size, output_size, hidden_size): """ :param input_size: int, 入力言語の語彙数 :param output_size: int, 出力言語の語彙数 :param hidden_size: int, 隠れ層のユニット数 """ super(EncoderDecoder, self).__init__() self.encoder = Encoder(input_size, hidden_size) self.decoder = Decoder(hidden_size, output_size) def forward(self, batch_X, lengths_X, max_length, batch_Y=None, use_teacher_forcing=False): """ :param batch_X: tensor, 入力系列のバッチ, size=(max_length, batch_size) :param lengths_X: list, 入力系列のバッチ内の各サンプルの文長 :param max_length: int, Decoderの最大文長 :param batch_Y: tensor, Decoderで用いるターゲット系列 :param use_teacher_forcing: Decoderでターゲット系列を入力とするフラグ :return decoder_outputs: tensor, Decoderの出力, size=(max_length, batch_size, self.decoder.output_size) """ # encoderに系列を入力(複数時刻をまとめて処理) _, encoder_hidden = self.encoder(batch_X, lengths_X) _batch_size = batch_X.size(1) # decoderの入力と隠れ層の初期状態を定義 decoder_input = torch.tensor([BOS] * _batch_size, dtype=torch.long, device=device) # 最初の入力にはBOSを使用する decoder_input = decoder_input.unsqueeze(0) # (1, batch_size) decoder_hidden = encoder_hidden # Encoderの最終隠れ状態を取得 # decoderの出力のホルダーを定義 decoder_outputs = torch.zeros(max_length, _batch_size, self.decoder.output_size, device=device) # max_length分の固定長 # 各時刻ごとに処理 for t in range(max_length): decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden) decoder_outputs[t] = decoder_output # 次の時刻のdecoderの入力を決定 if use_teacher_forcing and batch_Y is not None: # teacher forceの場合、ターゲット系列を用いる decoder_input = batch_Y[t].unsqueeze(0) else: # teacher forceでない場合、自身の出力を用いる decoder_input = decoder_output.max(-1)[1] return decoder_outputs]
次に訓練を行う。
基本的にはクロスエントロピーを損失関数として扱うが、パディングを行うと
mce = nn.CrossEntropyLoss(size_average=False, ignore_index=PAD) # PADを無視する def masked_cross_entropy(logits, target): logits_flat = logits.view(-1, logits.size(-1)) # (max_seq_len * batch_size, output_size) target_flat = target.view(-1) # (max_seq_len * batch_size, 1) return mce(logits_flat, target_flat)
学習を行う
# ハイパーパラメータの設定 num_epochs = 10 batch_size = 64 lr = 1e-3 # 学習率 teacher_forcing_rate = 0.2 # Teacher Forcingを行う確率 ckpt_path = 'model.pth' # 学習済みのモデルを保存するパス model_args = { 'input_size': vocab_size_X, 'output_size': vocab_size_Y, 'hidden_size': 256, } # データローダを定義 train_dataloader = DataLoader(train_X, train_Y, batch_size=batch_size, shuffle=True) valid_dataloader = DataLoader(valid_X, valid_Y, batch_size=batch_size, shuffle=False) # モデルとOptimizerを定義 model = EncoderDecoder(**model_args).to(device) optimizer = optim.Adam(model.parameters(), lr=lr)
実際に損失関数を計算する関数を定義する。
def compute_loss(batch_X, batch_Y, lengths_X, model, optimizer=None, is_train=True): # 損失を計算する関数 model.train(is_train) # train/evalモードの切替え # 一定確率でTeacher Forcingを行う use_teacher_forcing = is_train and (random.random() < teacher_forcing_rate) max_length = batch_Y.size(0) # 推論 pred_Y = model(batch_X, lengths_X, max_length, batch_Y, use_teacher_forcing) # 損失関数を計算 loss = masked_cross_entropy(pred_Y.contiguous(), batch_Y.contiguous()) if is_train: # 訓練時はパラメータを更新 optimizer.zero_grad() loss.backward() optimizer.step() batch_Y = batch_Y.transpose(0, 1).contiguous().data.cpu().tolist() pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().T.tolist() return loss.item(), batch_Y, pred
LOSS以外に自然言語のモデルの性能を評価する指標で、BLEUというものがある。
BLEUは機械翻訳の分野において最も一般的な自動評価基準の一つで、予め用意した複数の参照訳と、機械翻訳モデルが出力した訳のn-gramのマッチ率に基づく指標である。
NLTK (Natural Language Tool Kit) という自然言語処理で用いられるライブラリを用いて簡単に計算することができる。
def calc_bleu(refs, hyps): """ BLEUスコアを計算する関数 :param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...]) :param hyps: list, モデルの生成した訳。単語のリストのリスト (例: ['I', 'have', 'a', 'pen']) :return: float, BLEUスコア(0~100) """ refs = [[ref[:ref.index(EOS)]] for ref in refs] # EOSは評価しないで良いので切り捨てる, refsのほうは複数なのでlistが一個多くかかっている hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps] return 100 * bleu_score.corpus_bleu(refs, hyps)
モデルの訓練を行う。
# 訓練 best_valid_bleu = 0. for epoch in range(1, num_epochs+1): train_loss = 0. train_refs = [] train_hyps = [] valid_loss = 0. valid_refs = [] valid_hyps = [] # train for batch in train_dataloader: batch_X, batch_Y, lengths_X = batch loss, gold, pred = compute_loss( batch_X, batch_Y, lengths_X, model, optimizer, is_train=True ) train_loss += loss train_refs += gold train_hyps += pred # valid for batch in valid_dataloader: batch_X, batch_Y, lengths_X = batch loss, gold, pred = compute_loss( batch_X, batch_Y, lengths_X, model, is_train=False ) valid_loss += loss valid_refs += gold valid_hyps += pred # 損失をサンプル数で割って正規化 train_loss = np.sum(train_loss) / len(train_dataloader.data) valid_loss = np.sum(valid_loss) / len(valid_dataloader.data) # BLEUを計算 train_bleu = calc_bleu(train_refs, train_hyps) valid_bleu = calc_bleu(valid_refs, valid_hyps) # validationデータでBLEUが改善した場合にはモデルを保存 if valid_bleu > best_valid_bleu: ckpt = model.state_dict() torch.save(ckpt, ckpt_path) best_valid_bleu = valid_bleu print('Epoch {}: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format( epoch, train_loss, train_bleu, valid_loss, valid_bleu)) print('-'*80)
学習結果は下記である。epochが進むにつれて、lossが下がり、bleuが上がる様子がわかる。
Epoch 1: train_loss: 52.31 train_bleu: 3.39 valid_loss: 48.69 valid_bleu: 4.53 -------------------------------------------------------------------------------- Epoch 2: train_loss: 44.39 train_bleu: 7.72 valid_loss: 44.60 valid_bleu: 8.25 -------------------------------------------------------------------------------- Epoch 3: train_loss: 40.10 train_bleu: 11.43 valid_loss: 42.04 valid_bleu: 10.51 -------------------------------------------------------------------------------- Epoch 4: train_loss: 37.07 train_bleu: 14.47 valid_loss: 40.62 valid_bleu: 11.69 -------------------------------------------------------------------------------- Epoch 5: train_loss: 35.06 train_bleu: 16.38 valid_loss: 39.91 valid_bleu: 13.65 -------------------------------------------------------------------------------- Epoch 6: train_loss: 33.08 train_bleu: 18.89 valid_loss: 39.76 valid_bleu: 15.42 -------------------------------------------------------------------------------- Epoch 7: train_loss: 31.52 train_bleu: 20.73 valid_loss: 40.81 valid_bleu: 18.62 -------------------------------------------------------------------------------- Epoch 8: train_loss: 30.28 train_bleu: 22.82 valid_loss: 39.79 valid_bleu: 16.66 -------------------------------------------------------------------------------- Epoch 9: train_loss: 28.73 train_bleu: 25.12 valid_loss: 40.29 valid_bleu: 18.01 -------------------------------------------------------------------------------- Epoch 10: train_loss: 28.03 train_bleu: 26.22 valid_loss: 40.33 valid_bleu: 16.73 -------------------------------------------------------------------------------- >|| 次に、実際に学習したモデルで実際に文章を生成してみる。 >|python| # 学習済みモデルの読み込み ckpt = torch.load(ckpt_path) # cpuで処理する場合はmap_locationで指定する必要があります。 model.load_state_dict(ckpt) model.eval() def ids_to_sentence(vocab, ids): # IDのリストを単語のリストに変換する return [vocab.id2word[_id] for _id in ids] def trim_eos(ids): # IDのリストからEOS以降の単語を除外する if EOS in ids: return ids[:ids.index(EOS)] else: return ids # テストデータの読み込み test_X = load_data('./data/dev.en') test_Y = load_data('./data/dev.ja') test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X] test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y] test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False) # 生成 batch_X, batch_Y, lengths_X = next(test_dataloader) sentence_X = ' '.join(ids_to_sentence(vocab_X, batch_X.data.cpu().numpy()[:-1, 0])) sentence_Y = ' '.join(ids_to_sentence(vocab_Y, batch_Y.data.cpu().numpy()[:-1, 0])) print('src: {}'.format(sentence_X)) print('tgt: {}'.format(sentence_Y)) output = model(batch_X, lengths_X, max_length=20) output = output.max(dim=-1)[1].view(-1).data.cpu().tolist() output_sentence = ' '.join(ids_to_sentence(vocab_Y, trim_eos(output))) output_sentence_without_trim = ' '.join(ids_to_sentence(vocab_Y, output)) print('out: {}'.format(output_sentence)) print('without trim: {}'.format(output_sentence_without_trim))
結果は下記である。それなりにターゲットに近い文章が生成できている。
src: i can 't swim at all . tgt: 私 は 少し も 泳げ な い 。 out: 私 は 全然 泳げ な い 。 without trim: 私 は 全然 泳げ な い 。 </S> </S> </S> </S> </S> </S> </S> </S> </S> </S> </S> </S> </S>
Bleuを評価する。
# BLEUの計算 test_dataloader = DataLoader(test_X, test_Y, batch_size=1, shuffle=False) refs_list = [] hyp_list = [] for batch in test_dataloader: batch_X, batch_Y, lengths_X = batch pred_Y = model(batch_X, lengths_X, max_length=20) pred = pred_Y.max(dim=-1)[1].view(-1).data.cpu().tolist() refs = batch_Y.view(-1).data.cpu().tolist() refs_list.append(refs) hyp_list.append(pred) bleu = calc_bleu(refs_list, hyp_list) print(bleu)
17.71570480086109
- BeamSearch
テストデータにたいしてあらたな文を生成する際、これまでは各時刻で最も確率の高い単語を生成し、それを次のステップの入力として使っていた。しかし、本当にやりたいことは、文全体の尤度が最も高くなるような文章の生成である。そこで各単語だけでなく、もう少し大局的な評価をしていく必要がある。
BeamSearchでは、各時刻において一定の数Kのそれまでのスコアの高い文を保持しながら選択をおこなっていく。
Transformer
ニューラルネット機械翻訳の問題点
ニューラルネット機械翻訳の課題として文章が長くなると精度が落ちるという点がある。
下図は、Encoder-Decoderモデルと統計的機械翻訳モデルの文章の長さに対する精度を比較したものである。
統計的機械翻訳モデルは、文章が長くてもBleuスコアが落ちないのに対し、Encoder-Decoderモデルでは、文章が長くなった時に顕著にBleuスコアが低下することが分かる。
これは、翻訳元の内容を一つのベクトルで表現しているため、文長が長くなると表現力が足りなくなるからである。
Attention(注意機構)
上記課題を解決するものとして、Attentionが提案された。
これは、(翻訳タスクの場合)翻訳先の各単語を選択する際に、翻訳元の文中の隠れ状態に重みをつけて利用する機構である。言い換えると、翻訳の際に、翻訳元のどの単語に注意を当てて処理するかを決める機構である。
下記はAttentionの例である。
英語→フランス語の翻訳で、注意が当たっている語句が白くハイライトされている。対訳の語句が最も白くなっていることが分かるが、関連する語句(EuropianとEconomic等)も薄くハイライトされていることが分かる。
Attentionの仕組み
非常に簡単に言えば、Query(検索クエリ)に一致する
Keyを索引し、対応するValueを取り出す操作である。
つまり、辞書オブジェクトの機能と同じと言える。
Atenttionの性能
下図のように、文章が長くなっても翻訳精度は落ちない。
Transformer
Transformerは、"Attention is all you need"という論文で2017年6月に発表された機械学習翻訳のアルゴリズム。
RNNを使わず、Attentionのみで構成されている。
当時のSOTAをRNNよりはるかに少ない計算量で実現している。
英仏(3600万文)の学習を8GPUで3.5日で完了している。
Transformerの主要モジュール
Transformerの主要モジュールは、下記の通り、Positional Encoding、Dot Product Attention、Feed Foward、Maskingである。
Attentionの種類
AttentionにはSource Target AttentionとSelf attentionの2種類がある。
Source Target Attentionは、Queryは外部のターゲットであり、KeyとValueは自身の入力データになる。Self-Attentionの場合は、Query, Key, Valueすべてが自身の入力データである。
Self-Attention
TransformerのEncoderでは、self-attentionを6層積んで各単語をエンコードする。
Self-Attentionでは、各単語をエンコードして重みづけしたものをそれぞれの単語の内部状態として持つ。
例えとしては、ウィンドウが文章全体に及ぶ畳み込み演算を行っているイメージである。
Position-Wise Feed-forward Network
位置情報を保持したまま順伝搬させるブロック。
2層の全結合ニューラルネットワークを用い、線形変換→ReLu→線形変換の順番で処理をする。
Scaled dot product attention
全単語に関するattentionをまとめて計算する
*は、次元数である。
Multi Head Attention
重みパラメータの異なる8個のヘッドを使用する。
畳み込みで8つのカーネルフィルターを使用するイメージ
Decoder
Encoderと同じく6層のattentionで構成されている。
各層で2種類のattention機構を用いている。
self-attentionは直下の層の出力へ当てんっし恩をかけている。
未来の情報を見ないようにマスクしている。
Encoder-Decoder attentionは入力文の情報を収集する役割である。encoderの出力へアテンションをかけている。
Add&Norm
計算を効率よく行うために加えた層
- Add(Residual Connection): 入出力の差分を学習させる層。具体的には、出力に入力をそのまま加算させるだけである。学習・テストエラーの低減効果を狙っている。
- Norm(Layer Normalization): 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正規化する。学習の高速化を狙っている。
Positional Encoding
RNNを用いないので、単語列の語順情報を追加する必要があるため、この層がある。
単語の位置情報を、下記のような三角関数でエンコードする。
これらをconcatすることで、位置のソフトな2進数表現を得る。
下記はPosition Encodingした単語のイメージで、右半分はcosによる生成、左半分はsinによる生成である。
Attentionの可視化
Attetionを可視化した図が下記である。
左の上の文章は、it was too tiredの「it」はAnimalを強く差している。
一方で「it was too wide」では、「it」はstreetを強く差しており、attentionが文意を踏まえて注意を向けていることが分かる。
実装演習
Transformerの実装の演習をする。
transformerはRNNやCNNを使用せず、Attentionのみを用いるモデルである。
翻訳に限らない自然言語処理のあらゆるタスクで圧倒的な性能を示すことが知られている。
まず、必要なライブラリと環境を準備する。
! wget https://www.dropbox.com/s/9narw5x4uizmehh/utils.py ! mkdir images data # data取得 ! wget https://www.dropbox.com/s/o4kyc52a8we25wy/dev.en -P data/ ! wget https://www.dropbox.com/s/kdgskm5hzg6znuc/dev.ja -P data/ ! wget https://www.dropbox.com/s/gyyx4gohv9v65uh/test.en -P data/ ! wget https://www.dropbox.com/s/hotxwbgoe2n013k/test.ja -P data/ ! wget https://www.dropbox.com/s/5lsftkmb20ay9e1/train.en -P data/ ! wget https://www.dropbox.com/s/ak53qirssci6f1j/train.ja -P data/ import time import numpy as np from sklearn.utils import shuffle from sklearn.model_selection import train_test_split import matplotlib import matplotlib.pyplot as plt import seaborn as sns %matplotlib inline from nltk import bleu_score import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F from utils import Vocab device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.manual_seed(1) random_state = 42
PAD = 0 UNK = 1 BOS = 2 EOS = 3 PAD_TOKEN = '<PAD>' UNK_TOKEN = '<UNK>' BOS_TOKEN = '<S>' EOS_TOKEN = '</S>'
データセットを準備し、データローダーの定義などをする。
def load_data(file_path): """ テキストファイルからデータを読み込む :param file_path: str, テキストファイルのパス :return data: list, 文章(単語のリスト)のリスト """ data = [] for line in open(file_path, encoding='utf-8'): words = line.strip().split() # スペースで単語を分割 data.append(words) return data train_X = load_data('./data/train.en') train_Y = load_data('./data/train.ja') # 訓練データと検証データに分割 train_X, valid_X, train_Y, valid_Y = train_test_split(train_X, train_Y, test_size=0.2, random_state=random_state) MIN_COUNT = 2 # 語彙に含める単語の最低出現回数 word2id = { PAD_TOKEN: PAD, BOS_TOKEN: BOS, EOS_TOKEN: EOS, UNK_TOKEN: UNK, } vocab_X = Vocab(word2id=word2id) vocab_Y = Vocab(word2id=word2id) vocab_X.build_vocab(train_X, min_count=MIN_COUNT) vocab_Y.build_vocab(train_Y, min_count=MIN_COUNT) vocab_size_X = len(vocab_X.id2word) vocab_size_Y = len(vocab_Y.id2word) def sentence_to_ids(vocab, sentence): """ 単語のリストをインデックスのリストに変換する :param vocab: Vocabのインスタンス :param sentence: list of str :return indices: list of int """ ids = [vocab.word2id.get(word, UNK) for word in sentence] ids = [BOS] + ids + [EOS] # EOSを末尾に加える return ids train_X = [sentence_to_ids(vocab_X, sentence) for sentence in train_X] train_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in train_Y] valid_X = [sentence_to_ids(vocab_X, sentence) for sentence in valid_X] valid_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in valid_Y] class DataLoader(object): def __init__(self, src_insts, tgt_insts, batch_size, shuffle=True): """ :param src_insts: list, 入力言語の文章(単語IDのリスト)のリスト :param tgt_insts: list, 出力言語の文章(単語IDのリスト)のリスト :param batch_size: int, バッチサイズ :param shuffle: bool, サンプルの順番をシャッフルするか否か """ self.data = list(zip(src_insts, tgt_insts)) self.batch_size = batch_size self.shuffle = shuffle self.start_index = 0 self.reset() def reset(self): if self.shuffle: self.data = shuffle(self.data, random_state=random_state) self.start_index = 0 def __iter__(self): return self def __next__(self): def preprocess_seqs(seqs): # パディング max_length = max([len(s) for s in seqs]) data = [s + [PAD] * (max_length - len(s)) for s in seqs] # 単語の位置を表現するベクトルを作成 positions = [[pos+1 if w != PAD else 0 for pos, w in enumerate(seq)] for seq in data] # テンソルに変換 data_tensor = torch.tensor(data, dtype=torch.long, device=device) position_tensor = torch.tensor(positions, dtype=torch.long, device=device) return data_tensor, position_tensor # ポインタが最後まで到達したら初期化する if self.start_index >= len(self.data): self.reset() raise StopIteration() # バッチを取得して前処理 src_seqs, tgt_seqs = zip(*self.data[self.start_index:self.start_index+self.batch_size]) src_data, src_pos = preprocess_seqs(src_seqs) tgt_data, tgt_pos = preprocess_seqs(tgt_seqs) # ポインタを更新する self.start_index += self.batch_size return (src_data, src_pos), (tgt_data, tgt_pos)
- 各モジュールの定義
1.Position Encoding
入力系列の埋め込み行列に単語の位置情報を加算する。
def position_encoding_init(n_position, d_pos_vec): """ Positional Encodingのための行列の初期化を行う :param n_position: int, 系列長 :param d_pos_vec: int, 隠れ層の次元数 :return torch.tensor, size=(n_position, d_pos_vec) """ # PADがある単語の位置はpos=0にしておき、position_encも0にする position_enc = np.array([ [pos / np.power(10000, 2 * (j // 2) / d_pos_vec) for j in range(d_pos_vec)] if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)]) position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1 return torch.tensor(position_enc, dtype=torch.float)
Position Encodingを可視化すると次のようになる。
縦軸が単語の位置を、横軸が成分の次元を表しており、濃淡が加算される値である。
ここでは最大系列長を50、隠れ層の次元数を256とした。
pe = position_encoding_init(50, 256).numpy() plt.figure(figsize=(16,8)) sns.heatmap(pe, cmap='Blues') plt.show()
2.Attention
Scaled Dot-Product Attention
Attentionには、注意の重みを隠れ層一つのフィードフォワードネットワークで求めるAdditive Attentionと、注意の重みを内積で求めるDot-prudoct Attentionが存在する。
一般にDot-Product Attentionのほうがパラメータが少なくて高速であり、Transformerではこちらを使う。
class ScaledDotProductAttention(nn.Module): def __init__(self, d_model, attn_dropout=0.1): """ :param d_model: int, 隠れ層の次元数 :param attn_dropout: float, ドロップアウト率 """ super(ScaledDotProductAttention, self).__init__() self.temper = np.power(d_model, 0.5) # スケーリング因子 self.dropout = nn.Dropout(attn_dropout) self.softmax = nn.Softmax(dim=-1) def forward(self, q, k, v, attn_mask): """ :param q: torch.tensor, queryベクトル, size=(n_head*batch_size, len_q, d_model/n_head) :param k: torch.tensor, key, size=(n_head*batch_size, len_k, d_model/n_head) :param v: torch.tensor, valueベクトル, size=(n_head*batch_size, len_v, d_model/n_head) :param attn_mask: torch.tensor, Attentionに適用するマスク, size=(n_head*batch_size, len_q, len_k) :return output: 出力ベクトル, size=(n_head*batch_size, len_q, d_model/n_head) :return attn: Attention size=(n_head*batch_size, len_q, len_k) """ # QとKの内積でAttentionの重みを求め、スケーリングする attn = torch.bmm(q, k.transpose(1, 2)) / self.temper # (n_head*batch_size, len_q, len_k) # Attentionをかけたくない部分がある場合は、その部分を負の無限大に飛ばしてSoftmaxの値が0になるようにする attn.data.masked_fill_(attn_mask, -float('inf')) attn = self.softmax(attn) attn = self.dropout(attn) output = torch.bmm(attn, v) return output, attn
Multi-Head Attention
TransformerではAttentionを複数のヘッドで並列に扱うMulti-Head Attentionを採用している。複数のヘッドでAttentionを行うことにより、各ヘッドが異なる部分空間を処理でき、精度が向上する。
class MultiHeadAttention(nn.Module): def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1): """ :param n_head: int, ヘッド数 :param d_model: int, 隠れ層の次元数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param dropout: float, ドロップアウト率 """ super(MultiHeadAttention, self).__init__() self.n_head = n_head self.d_k = d_k self.d_v = d_v # 各ヘッドごとに異なる重みで線形変換を行うための重み # nn.Parameterを使うことで、Moduleのパラメータとして登録できる. TFでは更新が必要な変数はtf.Variableでラップするのでわかりやすい self.w_qs = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float)) self.w_ks = nn.Parameter(torch.empty([n_head, d_model, d_k], dtype=torch.float)) self.w_vs = nn.Parameter(torch.empty([n_head, d_model, d_v], dtype=torch.float)) # nn.init.xavier_normal_で重みの値を初期化 nn.init.xavier_normal_(self.w_qs) nn.init.xavier_normal_(self.w_ks) nn.init.xavier_normal_(self.w_vs) self.attention = ScaledDotProductAttention(d_model) self.layer_norm = nn.LayerNorm(d_model) # 各層においてバイアスを除く活性化関数への入力を平均0、分散1に正則化 self.proj = nn.Linear(n_head*d_v, d_model) # 複数ヘッド分のAttentionの結果を元のサイズに写像するための線形層 # nn.init.xavier_normal_で重みの値を初期化 nn.init.xavier_normal_(self.proj.weight) self.dropout = nn.Dropout(dropout) def forward(self, q, k, v, attn_mask=None): """ :param q: torch.tensor, queryベクトル, size=(batch_size, len_q, d_model) :param k: torch.tensor, key, size=(batch_size, len_k, d_model) :param v: torch.tensor, valueベクトル, size=(batch_size, len_v, d_model) :param attn_mask: torch.tensor, Attentionに適用するマスク, size=(batch_size, len_q, len_k) :return outputs: 出力ベクトル, size=(batch_size, len_q, d_model) :return attns: Attention size=(n_head*batch_size, len_q, len_k) """ d_k, d_v = self.d_k, self.d_v n_head = self.n_head # residual connectionのための入力 出力に入力をそのまま加算する residual = q batch_size, len_q, d_model = q.size() batch_size, len_k, d_model = k.size() batch_size, len_v, d_model = v.size() # 複数ヘッド化 # torch.repeat または .repeatで指定したdimに沿って同じテンソルを作成 q_s = q.repeat(n_head, 1, 1) # (n_head*batch_size, len_q, d_model) k_s = k.repeat(n_head, 1, 1) # (n_head*batch_size, len_k, d_model) v_s = v.repeat(n_head, 1, 1) # (n_head*batch_size, len_v, d_model) # ヘッドごとに並列計算させるために、n_headをdim=0に、batch_sizeをdim=1に寄せる q_s = q_s.view(n_head, -1, d_model) # (n_head, batch_size*len_q, d_model) k_s = k_s.view(n_head, -1, d_model) # (n_head, batch_size*len_k, d_model) v_s = v_s.view(n_head, -1, d_model) # (n_head, batch_size*len_v, d_model) # 各ヘッドで線形変換を並列計算(p16左側`Linear`) q_s = torch.bmm(q_s, self.w_qs) # (n_head, batch_size*len_q, d_k) k_s = torch.bmm(k_s, self.w_ks) # (n_head, batch_size*len_k, d_k) v_s = torch.bmm(v_s, self.w_vs) # (n_head, batch_size*len_v, d_v) # Attentionは各バッチ各ヘッドごとに計算させるためにbatch_sizeをdim=0に寄せる q_s = q_s.view(-1, len_q, d_k) # (n_head*batch_size, len_q, d_k) k_s = k_s.view(-1, len_k, d_k) # (n_head*batch_size, len_k, d_k) v_s = v_s.view(-1, len_v, d_v) # (n_head*batch_size, len_v, d_v) # Attentionを計算(p16.左側`Scaled Dot-Product Attention * h`) outputs, attns = self.attention(q_s, k_s, v_s, attn_mask=attn_mask.repeat(n_head, 1, 1)) # 各ヘッドの結果を連結(p16左側`Concat`) # torch.splitでbatch_sizeごとのn_head個のテンソルに分割 outputs = torch.split(outputs, batch_size, dim=0) # (batch_size, len_q, d_model) * n_head # dim=-1で連結 outputs = torch.cat(outputs, dim=-1) # (batch_size, len_q, d_model*n_head) # residual connectionのために元の大きさに写像(p16左側`Linear`) outputs = self.proj(outputs) # (batch_size, len_q, d_model) outputs = self.dropout(outputs) outputs = self.layer_norm(outputs + residual) return outputs, attns
3.Position-Wise Feed Foward Network
単語列の位置ごとに独立して処理する2層のネットワークであるPosition-Wise Feed Forward Networkを定義する。
class PositionwiseFeedForward(nn.Module): """ :param d_hid: int, 隠れ層1層目の次元数 :param d_inner_hid: int, 隠れ層2層目の次元数 :param dropout: float, ドロップアウト率 """ def __init__(self, d_hid, d_inner_hid, dropout=0.1): super(PositionwiseFeedForward, self).__init__() # window size 1のconv層を定義することでPosition wiseな全結合層を実現する. self.w_1 = nn.Conv1d(d_hid, d_inner_hid, 1) self.w_2 = nn.Conv1d(d_inner_hid, d_hid, 1) self.layer_norm = nn.LayerNorm(d_hid) self.dropout = nn.Dropout(dropout) self.relu = nn.ReLU() def forward(self, x): """ :param x: torch.tensor, size=(batch_size, max_length, d_hid) :return: torch.tensor, size=(batch_size, max_length, d_hid) """ residual = x output = self.relu(self.w_1(x.transpose(1, 2))) output = self.w_2(output).transpose(2, 1) output = self.dropout(output) return self.layer_norm(output + residual)
4.Masking
TransformerではAttentionに対して2つのマスクを定義する。
一つはkey側の系列のPADトークンに対してAttentionを行わないようにするマスク。
def get_attn_padding_mask(seq_q, seq_k): """ keyのPADに対するattentionを0にするためのマスクを作成する :param seq_q: tensor, queryの系列, size=(batch_size, len_q) :param seq_k: tensor, keyの系列, size=(batch_size, len_k) :return pad_attn_mask: tensor, size=(batch_size, len_q, len_k) """ batch_size, len_q = seq_q.size() batch_size, len_k = seq_k.size() pad_attn_mask = seq_k.data.eq(PAD).unsqueeze(1) # (N, 1, len_k) PAD以外のidを全て0にする pad_attn_mask = pad_attn_mask.expand(batch_size, len_q, len_k) # (N, len_q, len_k) return pad_attn_mask
もう一つはDecoder側でSelf Attensionを行う際に、各時刻で未来の情報に対するAttentionを行わないようにするマスクである。
def get_attn_subsequent_mask(seq): """ 未来の情報に対するattentionを0にするためのマスクを作成する :param seq: tensor, size=(batch_size, length) :return subsequent_mask: tensor, size=(batch_size, length, length) """ attn_shape = (seq.size(1), seq.size(1)) # 上三角行列(diagonal=1: 対角線より上が1で下が0) subsequent_mask = torch.triu(torch.ones(attn_shape, dtype=torch.uint8, device=device), diagonal=1) subsequent_mask = subsequent_mask.repeat(seq.size(0), 1, 1) return subsequent_mask
- モデルの定義
Encoder
Encoderは、Self AttentionとPosition-Wise Forward Networkからなるブロックを拭く早々繰り返すので、ブロックのクラスEnoderLayerを定義した後にEncoderを定義
class EncoderLayer(nn.Module): """Encoderのブロックのクラス""" def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1): """ :param d_model: int, 隠れ層の次元数 :param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数 :param n_head: int, ヘッド数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param dropout: float, ドロップアウト率 """ super(EncoderLayer, self).__init__() # Encoder内のSelf-Attention self.slf_attn = MultiHeadAttention( n_head, d_model, d_k, d_v, dropout=dropout) # Postionwise FFN self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout) def forward(self, enc_input, slf_attn_mask=None): """ :param enc_input: tensor, Encoderの入力, size=(batch_size, max_length, d_model) :param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク, size=(batch_size, len_q, len_k) :return enc_output: tensor, Encoderの出力, size=(batch_size, max_length, d_model) :return enc_slf_attn: tensor, EncoderのSelf Attentionの行列, size=(n_head*batch_size, len_q, len_k) """ # Self-Attentionのquery, key, valueにはすべてEncoderの入力(enc_input)が入る enc_output, enc_slf_attn = self.slf_attn( enc_input, enc_input, enc_input, attn_mask=slf_attn_mask) enc_output = self.pos_ffn(enc_output) return enc_output, enc_slf_attn class Encoder(nn.Module): """EncoderLayerブロックからなるEncoderのクラス""" def __init__( self, n_src_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64, d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1): """ :param n_src_vocab: int, 入力言語の語彙数 :param max_length: int, 最大系列長 :param n_layers: int, レイヤー数 :param n_head: int, ヘッド数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param d_word_vec: int, 単語の埋め込みの次元数 :param d_model: int, 隠れ層の次元数 :param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数 :param dropout: float, ドロップアウト率 """ super(Encoder, self).__init__() n_position = max_length + 1 self.max_length = max_length self.d_model = d_model # Positional Encodingを用いたEmbedding self.position_enc = nn.Embedding(n_position, d_word_vec, padding_idx=PAD) self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec) # 一般的なEmbedding self.src_word_emb = nn.Embedding(n_src_vocab, d_word_vec, padding_idx=PAD) # EncoderLayerをn_layers個積み重ねる self.layer_stack = nn.ModuleList([ EncoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout) for _ in range(n_layers)]) def forward(self, src_seq, src_pos): """ :param src_seq: tensor, 入力系列, size=(batch_size, max_length) :param src_pos: tensor, 入力系列の各単語の位置情報, size=(batch_size, max_length) :return enc_output: tensor, Encoderの最終出力, size=(batch_size, max_length, d_model) :return enc_slf_attns: list, EncoderのSelf Attentionの行列のリスト """ # 一般的な単語のEmbeddingを行う enc_input = self.src_word_emb(src_seq) # Positional EncodingのEmbeddingを加算する enc_input += self.position_enc(src_pos) enc_slf_attns = [] enc_output = enc_input # key(=enc_input)のPADに対応する部分のみ1のマスクを作成 enc_slf_attn_mask = get_attn_padding_mask(src_seq, src_seq) # n_layers個のEncoderLayerに入力を通す for enc_layer in self.layer_stack: enc_output, enc_slf_attn = enc_layer( enc_output, slf_attn_mask=enc_slf_attn_mask) enc_slf_attns += [enc_slf_attn] return enc_output, enc_slf_attns
- Decoder
Self Attention, source-target Atetntion, Position-Wise Feed Foward Networkからなるブロックを複数層繰り返すので、ブロックのクラスDecoderLayerを定義した後に、Decoderを定義する。
class DecoderLayer(nn.Module): """Decoderのブロックのクラス""" def __init__(self, d_model, d_inner_hid, n_head, d_k, d_v, dropout=0.1): """ :param d_model: int, 隠れ層の次元数 :param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数 :param n_head: int, ヘッド数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param dropout: float, ドロップアウト率 """ super(DecoderLayer, self).__init__() # Decoder内のSelf-Attention self.slf_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout) # Encoder-Decoder間のSource-Target Attention self.enc_attn = MultiHeadAttention(n_head, d_model, d_k, d_v, dropout=dropout) # Positionwise FFN self.pos_ffn = PositionwiseFeedForward(d_model, d_inner_hid, dropout=dropout) def forward(self, dec_input, enc_output, slf_attn_mask=None, dec_enc_attn_mask=None): """ :param dec_input: tensor, Decoderの入力, size=(batch_size, max_length, d_model) :param enc_output: tensor, Encoderの出力, size=(batch_size, max_length, d_model) :param slf_attn_mask: tensor, Self Attentionの行列にかけるマスク, size=(batch_size, len_q, len_k) :param dec_enc_attn_mask: tensor, Soutce-Target Attentionの行列にかけるマスク, size=(batch_size, len_q, len_k) :return dec_output: tensor, Decoderの出力, size=(batch_size, max_length, d_model) :return dec_slf_attn: tensor, DecoderのSelf Attentionの行列, size=(n_head*batch_size, len_q, len_k) :return dec_enc_attn: tensor, DecoderのSoutce-Target Attentionの行列, size=(n_head*batch_size, len_q, len_k) """ # Self-Attentionのquery, key, valueにはすべてDecoderの入力(dec_input)が入る dec_output, dec_slf_attn = self.slf_attn( dec_input, dec_input, dec_input, attn_mask=slf_attn_mask) # Source-Target-AttentionのqueryにはDecoderの出力(dec_output), key, valueにはEncoderの出力(enc_output)が入る dec_output, dec_enc_attn = self.enc_attn( dec_output, enc_output, enc_output, attn_mask=dec_enc_attn_mask) dec_output = self.pos_ffn(dec_output) return dec_output, dec_slf_attn, dec_enc_attn class Decoder(nn.Module): """DecoderLayerブロックからなるDecoderのクラス""" def __init__( self, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_k=64, d_v=64, d_word_vec=512, d_model=512, d_inner_hid=1024, dropout=0.1): """ :param n_tgt_vocab: int, 出力言語の語彙数 :param max_length: int, 最大系列長 :param n_layers: int, レイヤー数 :param n_head: int, ヘッド数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param d_word_vec: int, 単語の埋め込みの次元数 :param d_model: int, 隠れ層の次元数 :param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数 :param dropout: float, ドロップアウト率 """ super(Decoder, self).__init__() n_position = max_length + 1 self.max_length = max_length self.d_model = d_model # Positional Encodingを用いたEmbedding self.position_enc = nn.Embedding( n_position, d_word_vec, padding_idx=PAD) self.position_enc.weight.data = position_encoding_init(n_position, d_word_vec) # 一般的なEmbedding self.tgt_word_emb = nn.Embedding( n_tgt_vocab, d_word_vec, padding_idx=PAD) self.dropout = nn.Dropout(dropout) # DecoderLayerをn_layers個積み重ねる self.layer_stack = nn.ModuleList([ DecoderLayer(d_model, d_inner_hid, n_head, d_k, d_v, dropout=dropout) for _ in range(n_layers)]) def forward(self, tgt_seq, tgt_pos, src_seq, enc_output): """ :param tgt_seq: tensor, 出力系列, size=(batch_size, max_length) :param tgt_pos: tensor, 出力系列の各単語の位置情報, size=(batch_size, max_length) :param src_seq: tensor, 入力系列, size=(batch_size, n_src_vocab) :param enc_output: tensor, Encoderの出力, size=(batch_size, max_length, d_model) :return dec_output: tensor, Decoderの最終出力, size=(batch_size, max_length, d_model) :return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト :return dec_slf_attns: list, DecoderのSelf Attentionの行列のリスト """ # 一般的な単語のEmbeddingを行う dec_input = self.tgt_word_emb(tgt_seq) # Positional EncodingのEmbeddingを加算する dec_input += self.position_enc(tgt_pos) # Self-Attention用のマスクを作成 # key(=dec_input)のPADに対応する部分が1のマスクと、queryから見たkeyの未来の情報に対応する部分が1のマスクのORをとる dec_slf_attn_pad_mask = get_attn_padding_mask(tgt_seq, tgt_seq) # (N, max_length, max_length) dec_slf_attn_sub_mask = get_attn_subsequent_mask(tgt_seq) # (N, max_length, max_length) dec_slf_attn_mask = torch.gt(dec_slf_attn_pad_mask + dec_slf_attn_sub_mask, 0) # ORをとる # key(=dec_input)のPADに対応する部分のみ1のマスクを作成 dec_enc_attn_pad_mask = get_attn_padding_mask(tgt_seq, src_seq) # (N, max_length, max_length) dec_slf_attns, dec_enc_attns = [], [] dec_output = dec_input # n_layers個のDecoderLayerに入力を通す for dec_layer in self.layer_stack: dec_output, dec_slf_attn, dec_enc_attn = dec_layer( dec_output, enc_output, slf_attn_mask=dec_slf_attn_mask, dec_enc_attn_mask=dec_enc_attn_pad_mask) dec_slf_attns += [dec_slf_attn] dec_enc_attns += [dec_enc_attn] return dec_output, dec_slf_attns, dec_enc_attns
Transofomer全体のモデルのクラスを定義する。
class Transformer(nn.Module): """Transformerのモデル全体のクラス""" def __init__( self, n_src_vocab, n_tgt_vocab, max_length, n_layers=6, n_head=8, d_word_vec=512, d_model=512, d_inner_hid=1024, d_k=64, d_v=64, dropout=0.1, proj_share_weight=True): """ :param n_src_vocab: int, 入力言語の語彙数 :param n_tgt_vocab: int, 出力言語の語彙数 :param max_length: int, 最大系列長 :param n_layers: int, レイヤー数 :param n_head: int, ヘッド数 :param d_k: int, keyベクトルの次元数 :param d_v: int, valueベクトルの次元数 :param d_word_vec: int, 単語の埋め込みの次元数 :param d_model: int, 隠れ層の次元数 :param d_inner_hid: int, Position Wise Feed Forward Networkの隠れ層2層目の次元数 :param dropout: float, ドロップアウト率 :param proj_share_weight: bool, 出力言語の単語のEmbeddingと出力の写像で重みを共有する """ super(Transformer, self).__init__() self.encoder = Encoder( n_src_vocab, max_length, n_layers=n_layers, n_head=n_head, d_word_vec=d_word_vec, d_model=d_model, d_inner_hid=d_inner_hid, dropout=dropout) self.decoder = Decoder( n_tgt_vocab, max_length, n_layers=n_layers, n_head=n_head, d_word_vec=d_word_vec, d_model=d_model, d_inner_hid=d_inner_hid, dropout=dropout) self.tgt_word_proj = nn.Linear(d_model, n_tgt_vocab, bias=False) nn.init.xavier_normal_(self.tgt_word_proj.weight) self.dropout = nn.Dropout(dropout) assert d_model == d_word_vec # 各モジュールの出力のサイズは揃える if proj_share_weight: # 出力言語の単語のEmbeddingと出力の写像で重みを共有する assert d_model == d_word_vec self.tgt_word_proj.weight = self.decoder.tgt_word_emb.weight def get_trainable_parameters(self): # Positional Encoding以外のパラメータを更新する enc_freezed_param_ids = set(map(id, self.encoder.position_enc.parameters())) dec_freezed_param_ids = set(map(id, self.decoder.position_enc.parameters())) freezed_param_ids = enc_freezed_param_ids | dec_freezed_param_ids return (p for p in self.parameters() if id(p) not in freezed_param_ids) def forward(self, src, tgt): src_seq, src_pos = src tgt_seq, tgt_pos = tgt src_seq = src_seq[:, 1:] src_pos = src_pos[:, 1:] tgt_seq = tgt_seq[:, :-1] tgt_pos = tgt_pos[:, :-1] enc_output, *_ = self.encoder(src_seq, src_pos) dec_output, *_ = self.decoder(tgt_seq, tgt_pos, src_seq, enc_output) seq_logit = self.tgt_word_proj(dec_output) return seq_logit
学習
def compute_loss(batch_X, batch_Y, model, criterion, optimizer=None, is_train=True): # バッチの損失を計算 model.train(is_train) pred_Y = model(batch_X, batch_Y) gold = batch_Y[0][:, 1:].contiguous() # gold = batch_Y[0].contiguous() loss = criterion(pred_Y.view(-1, pred_Y.size(2)), gold.view(-1)) if is_train: # 訓練時はパラメータを更新 optimizer.zero_grad() loss.backward() optimizer.step() gold = gold.data.cpu().numpy().tolist() pred = pred_Y.max(dim=-1)[1].data.cpu().numpy().tolist() return loss.item(), gold, pred MAX_LENGTH = 20 batch_size = 64 num_epochs = 15 lr = 0.001 ckpt_path = 'transformer.pth' max_length = MAX_LENGTH + 2 model_args = { 'n_src_vocab': vocab_size_X, 'n_tgt_vocab': vocab_size_Y, 'max_length': max_length, 'proj_share_weight': True, 'd_k': 32, 'd_v': 32, 'd_model': 128, 'd_word_vec': 128, 'd_inner_hid': 256, 'n_layers': 3, 'n_head': 6, 'dropout': 0.1, } # DataLoaderやモデルを定義 train_dataloader = DataLoader( train_X, train_Y, batch_size ) valid_dataloader = DataLoader( valid_X, valid_Y, batch_size, shuffle=False ) model = Transformer(**model_args).to(device) optimizer = optim.Adam(model.get_trainable_parameters(), lr=lr) criterion = nn.CrossEntropyLoss(ignore_index=PAD, size_average=False).to(device) def calc_bleu(refs, hyps): """ BLEUスコアを計算する関数 :param refs: list, 参照訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...]) :param hyps: list, モデルの生成した訳。単語のリストのリスト (例: [['I', 'have', 'a', 'pen'], ...]) :return: float, BLEUスコア(0~100) """ refs = [[ref[:ref.index(EOS)]] for ref in refs] hyps = [hyp[:hyp.index(EOS)] if EOS in hyp else hyp for hyp in hyps] return 100 * bleu_score.corpus_bleu(refs, hyps) # 訓練 best_valid_bleu = 0. for epoch in range(1, num_epochs+1): start = time.time() train_loss = 0. train_refs = [] train_hyps = [] valid_loss = 0. valid_refs = [] valid_hyps = [] # train for batch in train_dataloader: batch_X, batch_Y = batch loss, gold, pred = compute_loss( batch_X, batch_Y, model, criterion, optimizer, is_train=True ) train_loss += loss train_refs += gold train_hyps += pred # valid for batch in valid_dataloader: batch_X, batch_Y = batch loss, gold, pred = compute_loss( batch_X, batch_Y, model, criterion, is_train=False ) valid_loss += loss valid_refs += gold valid_hyps += pred # 損失をサンプル数で割って正規化 train_loss /= len(train_dataloader.data) valid_loss /= len(valid_dataloader.data) # BLEUを計算 train_bleu = calc_bleu(train_refs, train_hyps) valid_bleu = calc_bleu(valid_refs, valid_hyps) # validationデータでBLEUが改善した場合にはモデルを保存 if valid_bleu > best_valid_bleu: ckpt = model.state_dict() torch.save(ckpt, ckpt_path) best_valid_bleu = valid_bleu elapsed_time = (time.time()-start) / 60 print('Epoch {} [{:.1f}min]: train_loss: {:5.2f} train_bleu: {:2.2f} valid_loss: {:5.2f} valid_bleu: {:2.2f}'.format( epoch, elapsed_time, train_loss, train_bleu, valid_loss, valid_bleu)) print('-'*80)
実行結果
Epoch 1 [0.8min]: train_loss: 77.55 train_bleu: 4.72 valid_loss: 41.38 valid_bleu: 11.02 -------------------------------------------------------------------------------- Epoch 2 [0.8min]: train_loss: 39.45 train_bleu: 12.25 valid_loss: 32.30 valid_bleu: 17.59 -------------------------------------------------------------------------------- Epoch 3 [0.8min]: train_loss: 32.22 train_bleu: 17.89 valid_loss: 28.26 valid_bleu: 21.75 -------------------------------------------------------------------------------- Epoch 4 [0.8min]: train_loss: 28.39 train_bleu: 21.59 valid_loss: 25.91 valid_bleu: 24.61 -------------------------------------------------------------------------------- Epoch 5 [0.8min]: train_loss: 25.89 train_bleu: 24.35 valid_loss: 24.46 valid_bleu: 26.96 -------------------------------------------------------------------------------- Epoch 6 [0.8min]: train_loss: 24.04 train_bleu: 26.66 valid_loss: 23.14 valid_bleu: 28.96 -------------------------------------------------------------------------------- Epoch 7 [0.8min]: train_loss: 22.57 train_bleu: 28.52 valid_loss: 22.37 valid_bleu: 29.68 -------------------------------------------------------------------------------- Epoch 8 [0.8min]: train_loss: 21.40 train_bleu: 30.22 valid_loss: 21.59 valid_bleu: 31.05 -------------------------------------------------------------------------------- Epoch 9 [0.8min]: train_loss: 20.37 train_bleu: 31.49 valid_loss: 20.98 valid_bleu: 31.94 -------------------------------------------------------------------------------- Epoch 10 [0.8min]: train_loss: 19.48 train_bleu: 32.70 valid_loss: 20.47 valid_bleu: 33.20 -------------------------------------------------------------------------------- Epoch 11 [0.8min]: train_loss: 18.71 train_bleu: 33.77 valid_loss: 20.19 valid_bleu: 33.62 -------------------------------------------------------------------------------- Epoch 12 [0.8min]: train_loss: 17.98 train_bleu: 35.06 valid_loss: 19.86 valid_bleu: 33.94 -------------------------------------------------------------------------------- Epoch 13 [0.8min]: train_loss: 17.35 train_bleu: 35.94 valid_loss: 19.32 valid_bleu: 34.66 -------------------------------------------------------------------------------- Epoch 14 [0.8min]: train_loss: 16.75 train_bleu: 36.75 valid_loss: 19.23 valid_bleu: 35.18 -------------------------------------------------------------------------------- Epoch 15 [0.8min]: train_loss: 16.23 train_bleu: 37.60 valid_loss: 19.11 valid_bleu: 35.40
評価用コードの作成
def test(model, src, max_length=20): # 学習済みモデルで系列を生成する model.eval() src_seq, src_pos = src batch_size = src_seq.size(0) enc_output, enc_slf_attns = model.encoder(src_seq, src_pos) tgt_seq = torch.full([batch_size, 1], BOS, dtype=torch.long, device=device) tgt_pos = torch.arange(1, dtype=torch.long, device=device) tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1) # 時刻ごとに処理 for t in range(1, max_length+1): dec_output, dec_slf_attns, dec_enc_attns = model.decoder( tgt_seq, tgt_pos, src_seq, enc_output) dec_output = model.tgt_word_proj(dec_output) out = dec_output[:, -1, :].max(dim=-1)[1].unsqueeze(1) # 自身の出力を次の時刻の入力にする tgt_seq = torch.cat([tgt_seq, out], dim=-1) tgt_pos = torch.arange(t+1, dtype=torch.long, device=device) tgt_pos = tgt_pos.unsqueeze(0).repeat(batch_size, 1) return tgt_seq[:, 1:], enc_slf_attns, dec_slf_attns, dec_enc_attns def ids_to_sentence(vocab, ids): # IDのリストを単語のリストに変換する return [vocab.id2word[_id] for _id in ids] def trim_eos(ids): # IDのリストからEOS以降の単語を除外する if EOS in ids: return ids[:ids.index(EOS)] else: return ids
学習済みモデルと、テストデータの読み込みを行う。
# 学習済みモデルの読み込み model = Transformer(**model_args).to(device) ckpt = torch.load(ckpt_path) model.load_state_dict(ckpt) # テストデータの読み込み test_X = load_data('./data/dev.en') test_Y = load_data('./data/dev.ja') test_X = [sentence_to_ids(vocab_X, sentence) for sentence in test_X] test_Y = [sentence_to_ids(vocab_Y, sentence) for sentence in test_Y]
文章を生成してみる。
test_dataloader = DataLoader( test_X, test_Y, 1, shuffle=False ) src, tgt = next(test_dataloader) src_ids = src[0][0].cpu().numpy() tgt_ids = tgt[0][0].cpu().numpy() print('src: {}'.format(' '.join(ids_to_sentence(vocab_X, src_ids[1:-1])))) print('tgt: {}'.format(' '.join(ids_to_sentence(vocab_Y, tgt_ids[1:-1])))) preds, enc_slf_attns, dec_slf_attns, dec_enc_attns = test(model, src) pred_ids = preds[0].data.cpu().numpy().tolist() print('out: {}'.format(' '.join(ids_to_sentence(vocab_Y, trim_eos(pred_ids)))))
結果
src: show your own business . tgt: 自分 の 事 を しろ 。 out: 君 の 商売 は 自分 の 商売 を 見せ て い る 。
Targetほどこなれてはいないものの、直訳としては大方あっている。
Bleuの評価
# BLEUの評価 test_dataloader = DataLoader( test_X, test_Y, 128, shuffle=False ) refs_list = [] hyp_list = [] for batch in test_dataloader: batch_X, batch_Y = batch preds, *_ = test(model, batch_X) preds = preds.data.cpu().numpy().tolist() refs = batch_Y[0].data.cpu().numpy()[:, 1:].tolist() refs_list += refs hyp_list += preds bleu = calc_bleu(refs_list, hyp_list) print(bleu)
24.974396035182135
計算負荷はRNNより低いにも関わらず、良好なBleu値が出た。