ラビットチャレンジ 深層学習Day3 その2

Section2 LSTM

RNNの課題

  • 勾配消失

 時間を遡るほど勾配が消失してしまうため、長い時系列の学習が困難だった。
 勾配消失は、誤差逆伝搬法で深い層のニューラルネットワークを遡る時に見られる現象で、1より小さい値になる微分値が多数乗算されることにより引き起こされる。例えば、活性化関数として使用されるシグモイド関数は、微分値は最大でも0.25にしかならないため、2つ以上乗算されると、非常に小さな値となり、勾配はほぼ消失してしまう。

  • 確認テスト

シグモイド関数微分した時、入力値が0の時に最大値をとる。その値として正しいものを選択肢から選べ。
(1)0.15
(2)0.25
(3)0.35
(4)0.45
[解答]
(2)0.25

  • 勾配爆発

 勾配消失とは逆に、勾配が層を逆伝搬するごとに指数関数的に大きくなってしまう問題。活性化関数に恒関数を用いたりすると、この問題が起こりやすい。

  • 演習チャレンジ

RNNや深いモデルでは勾配の消失または勾配爆発が起こる傾向がある。勾配爆発を防ぐために勾配のクリッピングを行うという手法がある。具体的には勾配のノルムが閾値を超えたら、勾配のノルムを閾値に正規化するというものである。以下は勾配のクリッピングを行う関数である。
(さ)にあてはまるコードをかけ
f:id:tibet:20211129184855p:plain
[解答]
gradient*rate

LSTM(Long Short Term Memory)は、RNNの勾配消失や勾配爆発という問題を解決するために考案された構造である。
次に、LSTMの全体像を示す。
f:id:tibet:20211129195005p:plain
入力から出力の間がRNNの中間層に相当する。中間層で自己回帰(青点線)していることがわかる。
中間層内の構造の工夫がLSTMである。

1-2 CEC

 LSTMの中心はCEC(Constant Error carousel:定誤差カルーセル)である。このセルは、勾配を記憶する役割を果たしている。
 具体的には、CECの中では、勾配消失及び勾配爆発の解決法として、勾配を1としている。
 δ^{t-z-1}=δ^{t-z}(Wf'(u^{t-z-1}))=1
 \dfrac{\partial E}{\partial c^{t-1}}=\dfrac{\partial E}{\partial c^t}\dfrac{\partial}{\partial c^{t-1}}(a^t-c^{t-1})=\dfrac{\partial E}{\partial c^t}

CECの課題は、入力データについて時間依存度の関係なく重みが一律のため、学習が出来ないということである。
 つまり、学習と記憶の役割を中間層内で分けている。
 学習については、入力層の重荷については入力ゲート、出力層への重みについては出力ゲートでそれぞれ担っている。

2-2入力ゲートと出力ゲート

入力ゲート

f:id:tibet:20211129210828p:plain
入力ゲートは、CECに記憶する内容を調整する機能を持つ。
具体的には入力側の変数x(t)と、出力からの一つ前の数値の回帰h(t-1)にそれぞれ重みWiとUiを掛け合わせて演算した値をCECの入力との内積演算ユニットに送る。

出力ゲート

f:id:tibet:20211129211301p:plain
出力ゲートは、ECEからの出力を調整して出力層に送る機能を持つ。
具体的には入力側の変数x(t)と、出力からの一つ前の数値の回帰h(t-1)にそれぞれ重みWoとUoを掛け合わせて演算した値をCECの出力との内積演算ユニットに送る。

このように、入力・出力ゲートを追加することで、それぞれのゲートへの入力値の重みを重み行列W,Uで可変可能とすることで、CECの課題を解決している。

2-3忘却ゲート

CECは過去のすべての情報が保管されているが、逆に不要な場合でも削除が出来ない。
そこで、過去の上方がいらなくなった場合、そのタイミングで情報を忘却するための機能として、忘却ゲートが設けられた。
f:id:tibet:20211130095950p:plain
上記の赤枠が忘却ゲートである。
忘却ゲートは、入出力にそれぞれWf, Ufの重みを乗算した関数忘却関数f(t)を作る。次にCECの一つ前の時間の出力c(t-1)と内積を取り、CEC内で入力層からの信号i(t)a(t)と線形結合をとり、下記c(t)を出力する。
 c(t)=i(t)\cdot a(t)+f(t)\cdot c(t-1)
忘却関数f(t)と内積をとることで、CECの記憶の忘却度が調整される。

  • 確認テスト

以下の文章をLSTMに入力し、空欄に当てはまる単語を予測したいとする。文中の「とても」という言葉は空欄の予測において、無くなっても影響を及ぼさないと考えられる。
このような場合、どのゲートが作用すると考えられるか。
「映画おもしろかったね。ところで、とてもおなかが空いたからなにか____。」

[解答]
文意に影響を及ぼさないため、忘却ゲートが作用していると考えられる。

  • 演習チャレンジ

以下のプログラムはLSTMの順伝搬を行うプログラムである。ただし、_sigmoid関数は、要素ごとにシグモイドを作用させる関数である。
(け)にあてはまるコードを示せ。
f:id:tibet:20211130135130p:plain
[解答]
input_gate*a+forget_gate*c

覗き穴結合

CECの保存されている過去の情報を任意のタイミングで他のノードに伝搬させたり、任意のタイミングで忘却させたりしたいが、CEC自身の値は、ゲート制御に影響を与えていないという課題がある。
そこで、覗き穴結合というパスを作り、CEC自身の値に重み行列を介して伝搬可能にした構造を設けた。
f:id:tibet:20211130135346p:plain
上図中の赤枠で囲んだパスが覗き穴結合に当たり、入力ゲート、忘却ゲート、出力ゲートそれぞれにCECに出力にVi, Vf,Voという重みを乗算して伝達している。

Section 3 GRU

 従来のLSTMでは、パラメータ数が多いため、計算負荷が大きかった。そこで、パラメータを大幅に削って計算負荷を小さくし、かつ精度は同等以上を見込める構造として、GRUが考案された。
f:id:tibet:20211130171202p:plain
GRUはリセットゲート、更新ゲート、活性化関数を持つ内部ノードからなり、CECは持っていない。
数式化のために、下記のように信号と重みの定義をする。
x(t):入力信号
W_h:入力重み
W_r:入力からリセットゲートへの重み
U_r:出力からリセットゲートへの重み
b_h(t):リセットゲートのバイアス
r(t):リセットゲートの出力信号
 U_h:出力信号の回帰への重み
 f(x):内部ノードの活性化関数
W_z:入力から更新ゲートへの重み
U_z:出力から更新ゲートへの重み
b_z(t):更新ゲートのバイアス
z(t):更新ゲートの出力信号
h(t):出力信号
まず、リセットゲートの出力は、下記のように表せる。
 r(t)=Wrx(t)+Ur\cdot h(t-1)+b_h(t)
次段の内部演算ユニットの出力は下記のようになる。
 U_h\cdot (r(t)\cdot h(t-1))
よって、活性化関数f(x)を持つ内部ノードの出力は下記のようになる。
 f(W_hx(t)+U_h\cdot (r(t)\cdot h(t-1))
また、更新ゲートの出力は下記のようになる。
 z(t)=W_zx(t)+U_z\cdot h(t-1)+b_z(t)
よって、出力層の出力h(t)は下記のように表せる。
 h(t)=z(t)\cdot h(t-1)+(1-z(t))\cdot f(W_hx(t)+U_h\cdot (r(t)\cdot h(t-1))

  • 確認テスト

LSTMとCECが抱える問題について、それぞれ簡潔に述べよ
[解答]
LSTMはパラメータが多く、計算負荷が高いため、計算に時間がかかる。CECは勾配1で記憶するが、情報処理が出来ない。そこで、周りに情報処理のためのゲートを設けるため、構造が複雑になる。

実装演習

RNNで単語予測をするネットワークをtensorflowで記述した。

import tensorflow as tf
import numpy as np
import re
import glob
import collections
import random
import pickle
import time
import datetime
import os

# logging levelを変更
tf.logging.set_verbosity(tf.logging.ERROR)

class Corpus:
    def __init__(self):
        self.unknown_word_symbol = "<???>" # 出現回数の少ない単語は未知語として定義しておく
        self.unknown_word_threshold = 3 # 未知語と定義する単語の出現回数の閾値
        self.corpus_file = "./corpus/**/*.txt"
        self.corpus_encoding = "utf-8"
        self.dictionary_filename = "./data_for_predict/word_dict.dic"
        self.chunk_size = 5
        self.load_dict()

        words = []
        for filename in glob.glob(self.corpus_file, recursive=True):
            with open(filename, "r", encoding=self.corpus_encoding) as f:

                # word breaking
                text = f.read()
                # 全ての文字を小文字に統一し、改行をスペースに変換
                text = text.lower().replace("\n", " ")
                # 特定の文字以外の文字を空文字に置換する
                text = re.sub(r"[^a-z '\-]", "", text)
                # 複数のスペースはスペース一文字に変換
                text = re.sub(r"[ ]+", " ", text)

                # 前処理: '-' で始まる単語は無視する
                words = [ word for word in text.split() if not word.startswith("-")]


        self.data_n = len(words) - self.chunk_size
        self.data = self.seq_to_matrix(words)

    def prepare_data(self):
        """
        訓練データとテストデータを準備する。
        data_n = ( text データの総単語数 ) - chunk_size
        input: (data_n, chunk_size, vocabulary_size)
        output: (data_n, vocabulary_size)
        """

        # 入力と出力の次元テンソルを準備
        all_input = np.zeros([self.chunk_size, self.vocabulary_size, self.data_n])
        all_output = np.zeros([self.vocabulary_size, self.data_n])

        # 準備したテンソルに、コーパスの one-hot 表現(self.data) のデータを埋めていく
        # i 番目から ( i + chunk_size - 1 ) 番目までの単語が1組の入力となる
        # このときの出力は ( i + chunk_size ) 番目の単語
        for i in range(self.data_n):
            all_output[:, i] = self.data[:, i + self.chunk_size] # (i + chunk_size) 番目の単語の one-hot ベクトル
            for j in range(self.chunk_size):
                all_input[j, :, i] = self.data[:, i + self.chunk_size - j - 1]

        # 後に使うデータ形式に合わせるために転置を取る
        all_input = all_input.transpose([2, 0, 1])
        all_output = all_output.transpose()

        # 訓練データ:テストデータを 4 : 1 に分割する
        training_num = ( self.data_n * 4 ) // 5
        return all_input[:training_num], all_output[:training_num], all_input[training_num:], all_output[training_num:]


    def build_dict(self):
        # コーパス全体を見て、単語の出現回数をカウントする
        counter = collections.Counter()
        for filename in glob.glob(self.corpus_file, recursive=True):
            with open(filename, "r", encoding=self.corpus_encoding) as f:

                # word breaking
                text = f.read()
                # 全ての文字を小文字に統一し、改行をスペースに変換
                text = text.lower().replace("\n", " ")
                # 特定の文字以外の文字を空文字に置換する
                text = re.sub(r"[^a-z '\-]", "", text)
                # 複数のスペースはスペース一文字に変換
                text = re.sub(r"[ ]+", " ", text)

                # 前処理: '-' で始まる単語は無視する
                words = [word for word in text.split() if not word.startswith("-")]

                counter.update(words)

        # 出現頻度の低い単語を一つの記号にまとめる
        word_id = 0
        dictionary = {}
        for word, count in counter.items():
            if count <= self.unknown_word_threshold:
                continue

            dictionary[word] = word_id
            word_id += 1
        dictionary[self.unknown_word_symbol] = word_id

        print("総単語数:", len(dictionary))

        # 辞書を pickle を使って保存しておく
        with open(self.dictionary_filename, "wb") as f:
            pickle.dump(dictionary, f)
            print("Dictionary is saved to", self.dictionary_filename)

        self.dictionary = dictionary

        print(self.dictionary)

    def load_dict(self):
        with open(self.dictionary_filename, "rb") as f:
            self.dictionary = pickle.load(f)
            self.vocabulary_size = len(self.dictionary)
            self.input_layer_size = len(self.dictionary)
            self.output_layer_size = len(self.dictionary)
            print("総単語数: ", self.input_layer_size)

    def get_word_id(self, word):
        # print(word)
        # print(self.dictionary)
        # print(self.unknown_word_symbol)
        # print(self.dictionary[self.unknown_word_symbol])
        # print(self.dictionary.get(word, self.dictionary[self.unknown_word_symbol]))
        return self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])

    # 入力された単語を one-hot ベクトルにする
    def to_one_hot(self, word):
        index = self.get_word_id(word)
        data = np.zeros(self.vocabulary_size)
        data[index] = 1
        return data

    def seq_to_matrix(self, seq):
        print(seq)
        data = np.array([self.to_one_hot(word) for word in seq]) # (data_n, vocabulary_size)
        return data.transpose() # (vocabulary_size, data_n)

class Language:
    """
    input layer: self.vocabulary_size
    hidden layer: rnn_size = 30
    output layer: self.vocabulary_size
    """

    def __init__(self):
        self.corpus = Corpus()
        self.dictionary = self.corpus.dictionary
        self.vocabulary_size = len(self.dictionary) # 単語数
        self.input_layer_size = self.vocabulary_size # 入力層の数
        self.hidden_layer_size = 30 # 隠れ層の RNN ユニットの数
        self.output_layer_size = self.vocabulary_size # 出力層の数
        self.batch_size = 128 # バッチサイズ
        self.chunk_size = 5 # 展開するシーケンスの数。c_0, c_1, ..., c_(chunk_size - 1) を入力し、c_(chunk_size) 番目の単語の確率が出力される。
        self.learning_rate = 0.005 # 学習率
        self.epochs = 1000 # 学習するエポック数
        self.forget_bias = 1.0 # LSTM における忘却ゲートのバイアス
        self.model_filename = "./data_for_predict/predict_model.ckpt"
        self.unknown_word_symbol = self.corpus.unknown_word_symbol

    def inference(self, input_data, initial_state):
        """
        :param input_data: (batch_size, chunk_size, vocabulary_size) 次元のテンソル
        :param initial_state: (batch_size, hidden_layer_size) 次元の行列
        :return:
        """
        # 重みとバイアスの初期化
        hidden_w = tf.Variable(tf.truncated_normal([self.input_layer_size, self.hidden_layer_size], stddev=0.01))
        hidden_b = tf.Variable(tf.ones([self.hidden_layer_size]))
        output_w = tf.Variable(tf.truncated_normal([self.hidden_layer_size, self.output_layer_size], stddev=0.01))
        output_b = tf.Variable(tf.ones([self.output_layer_size]))

        # BasicLSTMCell, BasicRNNCell は (batch_size, hidden_layer_size) が chunk_size 数ぶんつながったリストを入力とする。
        # 現時点での入力データは (batch_size, chunk_size, input_layer_size) という3次元のテンソルなので
        # tf.transpose や tf.reshape などを駆使してテンソルのサイズを調整する。

        input_data = tf.transpose(input_data, [1, 0, 2]) # 転置。(chunk_size, batch_size, vocabulary_size)
        input_data = tf.reshape(input_data, [-1, self.input_layer_size]) # 変形。(chunk_size * batch_size, input_layer_size)
        input_data = tf.matmul(input_data, hidden_w) + hidden_b # 重みWとバイアスBを適用。 (chunk_size, batch_size, hidden_layer_size)
        input_data = tf.split(input_data, self.chunk_size, 0) # リストに分割。chunk_size * (batch_size, hidden_layer_size)

        # RNN のセルを定義する。RNN Cell の他に LSTM のセルや GRU のセルなどが利用できる。
        cell = tf.nn.rnn_cell.BasicRNNCell(self.hidden_layer_size)
        outputs, states = tf.nn.static_rnn(cell, input_data, initial_state=initial_state)
        
        # 最後に隠れ層から出力層につながる重みとバイアスを処理する
        # 最終的に softmax 関数で処理し、確率として解釈される。
        # softmax 関数はこの関数の外で定義する。
        output = tf.matmul(outputs[-1], output_w) + output_b

        return output

    def loss(self, logits, labels):
        cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))

        return cost

    def training(self, cost):
        # 今回は最適化手法として Adam を選択する。
        # ここの AdamOptimizer の部分を変えることで、Adagrad, Adadelta などの他の最適化手法を選択することができる
        optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(cost)

        return optimizer

    def train(self):
        # 変数などの用意
        input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
        actual_labels = tf.placeholder("float", [None, self.output_layer_size])
        initial_state = tf.placeholder("float", [None, self.hidden_layer_size])

        prediction = self.inference(input_data, initial_state)
        cost = self.loss(prediction, actual_labels)
        optimizer = self.training(cost)
        correct = tf.equal(tf.argmax(prediction, 1), tf.argmax(actual_labels, 1))
        accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

        # TensorBoard で可視化するため、クロスエントロピーをサマリーに追加
        tf.summary.scalar("Cross entropy: ", cost)
        summary = tf.summary.merge_all()

        # 訓練・テストデータの用意
        # corpus = Corpus()
        trX, trY, teX, teY = self.corpus.prepare_data()
        training_num = trX.shape[0]

        # ログを保存するためのディレクトリ
        timestamp = time.time()
        dirname = datetime.datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S")

        # ここから実際に学習を走らせる
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            summary_writer = tf.summary.FileWriter("./log/" + dirname, sess.graph)

            # エポックを回す
            for epoch in range(self.epochs):
                step = 0
                epoch_loss = 0
                epoch_acc = 0

                # 訓練データをバッチサイズごとに分けて学習させる (= optimizer を走らせる)
                # エポックごとの損失関数の合計値や(訓練データに対する)精度も計算しておく
                while (step + 1) * self.batch_size < training_num:
                    start_idx = step * self.batch_size
                    end_idx = (step + 1) * self.batch_size

                    batch_xs = trX[start_idx:end_idx, :, :]
                    batch_ys = trY[start_idx:end_idx, :]

                    _, c, a = sess.run([optimizer, cost, accuracy],
                                       feed_dict={input_data: batch_xs,
                                                  actual_labels: batch_ys,
                                                  initial_state: np.zeros([self.batch_size, self.hidden_layer_size])
                                                  }
                                       )
                    epoch_loss += c
                    epoch_acc += a
                    step += 1

                # コンソールに損失関数の値や精度を出力しておく
                print("Epoch", epoch, "completed ouf of", self.epochs, "-- loss:", epoch_loss, " -- accuracy:",
                      epoch_acc / step)

                # Epochが終わるごとにTensorBoard用に値を保存
                summary_str = sess.run(summary, feed_dict={input_data: trX,
                                                           actual_labels: trY,
                                                           initial_state: np.zeros(
                                                               [trX.shape[0],
                                                                self.hidden_layer_size]
                                                           )
                                                           }
                                       )
                summary_writer.add_summary(summary_str, epoch)
                summary_writer.flush()

            # 学習したモデルも保存しておく
            saver = tf.train.Saver()
            saver.save(sess, self.model_filename)

            # 最後にテストデータでの精度を計算して表示する
            a = sess.run(accuracy, feed_dict={input_data: teX, actual_labels: teY,
                                              initial_state: np.zeros([teX.shape[0], self.hidden_layer_size])})
            print("Accuracy on test:", a)


    def predict(self, seq):
        """
        文章を入力したときに次に来る単語を予測する
        :param seq: 予測したい単語の直前の文字列。chunk_size 以上の単語数が必要。
        :return:
        """

        # 最初に復元したい変数をすべて定義してしまいます
        tf.reset_default_graph()
        input_data = tf.placeholder("float", [None, self.chunk_size, self.input_layer_size])
        initial_state = tf.placeholder("float", [None, self.hidden_layer_size])
        prediction = tf.nn.softmax(self.inference(input_data, initial_state))
        predicted_labels = tf.argmax(prediction, 1)

        # 入力データの作成
        # seq を one-hot 表現に変換する。
        words = [word for word in seq.split() if not word.startswith("-")]
        x = np.zeros([1, self.chunk_size, self.input_layer_size])
        for i in range(self.chunk_size):
            word = seq[len(words) - self.chunk_size + i]
            index = self.dictionary.get(word, self.dictionary[self.unknown_word_symbol])
            x[0][i][index] = 1
        feed_dict = {
            input_data: x, # (1, chunk_size, vocabulary_size)
            initial_state: np.zeros([1, self.hidden_layer_size])
        }

        # tf.Session()を用意
        with tf.Session() as sess:
            # 保存したモデルをロードする。ロード前にすべての変数を用意しておく必要がある。
            saver = tf.train.Saver()
            saver.restore(sess, self.model_filename)

            # ロードしたモデルを使って予測結果を計算
            u, v = sess.run([prediction, predicted_labels], feed_dict=feed_dict)

            keys = list(self.dictionary.keys())


            # コンソールに文字ごとの確率を表示
            for i in range(self.vocabulary_size):
                c = self.unknown_word_symbol if i == (self.vocabulary_size - 1) else keys[i]
                print(c, ":", u[0][i])

            print("Prediction:", seq + " " + ("<???>" if v[0] == (self.vocabulary_size - 1) else keys[v[0]]))

        return u[0]


def build_dict():
    cp = Corpus()
    cp.build_dict()

if __name__ == "__main__":
    #build_dict()

    ln = Language()

    # 学習するときに呼び出す
    #ln.train()

    # 保存したモデルを使って単語の予測をする
    ln.predict("some of them looks like")


結果の一部

lipoprotein : 1.3668956e-14
ldl : 1.4424414e-14
statin : 1.378644e-14
a--z : 1.3740787e-14
simvastatin : 1.413377e-14
bmi : 3.1720919e-07
covariates : 2.8344898e-06
yhl : 2.2330354e-07
yol : 9.992968e-11
obesity : 1.3501551e-09
evgfp : 6.1829963e-09
unintended : 4.6785074e-09
sizes : 2.569963e-07
obese : 1.9368042e-07
<???> : 2.9195742e-05
Prediction: some of them looks like et
  • 演習チャレンジ

GROもLSTMと同様にRNNの一種であり、単純な
RNNにおいて問題となる勾配消失問題を解決し、長期的な依存関係を学習することが出来る。LSTMに比べ変数の数やゲートの数が少なく、より単純なモデルであるが、タスクによってはLSTMより良い性能を発揮すr。以下のプログラムはGRUである。ただし、_sigmoid関数は要素ごとにシグモイド関数を作用させる関数である。
(こ)にあてはまるコードを書け。
f:id:tibet:20211130223210p:plain
[解答]
GRUの更新ゲートの出力なので、下記になる。
(1-z)*h+z*h_bar

  • 確認テスト

LSTMとGRUの違いを述べよ
[解答]
LSTMはCEC、入力ゲート、忘却ゲート、出力ゲートを備え、構成が複雑でパラメータが多い。
GRUは、リセットゲート、更新ゲートで更新され、LSTMより構成が単純でパラメータが少ない。

Section4 双方向RNN

 過去の上方だけでなく、未来の情報を加味することで、精度を向上させるモデル。文章の推敲や機械翻訳などに使用される。
f:id:tibet:20211130223758p:plain

  • 演習チャレンジ

以下は双方向RNNの順伝搬を行うプログラムである。順方向については、入力から中間層への重みW_f、1ステップ前の中間層出力から中間層への重みをU_f、逆方向に関しては同様にパラメータW_b、U_bを持ち、両者の中間表現を合わせた特徴から出力層への重みはVである。_rnn関数はRNNの順伝搬を表し中間層の系列を返す関数であるとする。(か)にあてはまるコードを示せ。
f:id:tibet:20211130223912p:plain
[解答]
hsは、順方向と逆方向の中間表現の特徴量を合わせたものなので、下記になる。

np.concatenate([h_f,h_b[::,-1]],axis=1)