学習と評価#

このノートブックでは,構築されたモデルをデータセットを用いて学習する処理を実装する.モデルの学習ループ(1 iteration)は一般的に次のような手順となる.

  1. ミニバッチの作成

  2. 順伝播

  3. 損失の計算

  4. 勾配計算とパラメータの更新

  5. 学習結果の保存

ここでは,このループを説明するために,moon_datasetを使って分類問題を解く.まずはミニバッチの作成のためにこのデータセットの生成と可視化を行おう.

データセットの作成#

moons datasetは線形分離が不可能なシンプルなトイデータセットであり,クラスタリングや二値分類のアルゴリズムの検証に利用できる.可視化をすると半月が対称に重なるような分布である.

moon_datasetはscikit-learnから容易に利用できる.次のコードで読み込みができる.

from sklearn.datasets import make_moons

X, y = make_moons(n_samples=500, noise=0.1, random_state=1234)

Xy

返り値の Xy はそれぞれモデルの入出力に対応する入力データと正解値となっている.これらの形状(.shape)を確認する.

print('X: ', X.shape)
print('y: ', y.shape)
X:  (500, 2)
y:  (500,)

形状(.shape)からも分かるように入力データは(サンプル数,入力次元数)となっている.つまりは,以下のようにすると,\(i\)番目のサンプルの値を取得できる.

i = 10
print(X[i], y[i])
[-0.91116178  0.41454065] 0

最初の数サンプルをスライシングで出力してみる.

print('X = ', X[:3])
print('y = ', y[:3])
X =  [[ 0.6244503   0.74367979]
 [ 1.54257073 -0.35631793]
 [ 0.74117835 -0.56900493]]
y =  [0 1 1]

y はそのデータに対応する正解値(ここでは,分類問題なのでクラスラベル)が格納されている.moon_datasetは2クラスなので,np.unique 関数などで配列の中に含まれるユニークな要素を出力してみると,01 の二つの値が入っていることがわかる.

import numpy as np
np.unique(y)
array([0, 1])

続いて,これらのデータとラベルからmoon_datasetを可視化してみる.入力特徴次元は2次元なのでそれぞれx軸y軸として散布図を描く.このとき,点の色をラベル値に応じてつける.

import matplotlib.pyplot as plt
plt.scatter(X[:, 0], X[:, 1], c=y, s=10)
plt.title("Moons Data")
plt.grid(True)
plt.show();
../../_images/d683da30b2b60c2b69b572f617d3b11539c7a086db9b1bb870b74eebab5deca8.png

データの分布が可視化できたと思う.データの可視化はこのようなトイデータセットだけでなく画像・言語・信号であってもそのデータの特性を理解する上で非常に重要である(例えば,そもそも線形分離可能なデータであったならば,深層学習を利用する必要がない).

続いて,モデルの学習と評価のために,データセットを 学習(train)検証(validation)評価(test) 用に分割する.これらの役割は

  • 学習データ:モデルのパラメータを学習するため

  • 評価データ:未知のデータに対するモデルの性能を評価するため

  • 検証データ:学習可能なパラメータ以外のパラメータ(例えば,層数や隠れ層の次元)を調整するため

である.インターネット上の説明には学習と評価のみの解説や実装も多いが,検証データがないと,本来未知であるべきの評価データを使ってモデルのパラメータを探索することとなる.これは未知のデータに対する評価ではなくなり,正当なモデルの評価ができているとは言えない実験となる.

この分割自体は次のように,scikit-learn の関数を用いれば容易に実装できる.

from sklearn.model_selection import train_test_split

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.4, random_state=1234)
X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=1234)

print(f"train data: {len(X_train)}")
print(f"validation data: {len(X_valid)}")
print(f"test data: {len(X_test)}")
train data: 300
validation data: 100
test data: 100

ここで,random_stateを指定したが,これは分割のランダム性を固定する処理である.固定しないと毎回結果が変わるので再現性が担保できない.可能な限り,再現性を意識した実装をしよう.

データローダーの作成#

確率的勾配降下法でモデルを学習するために,作成したデータセットからランダムにデータを取り出して,ミニバッチ(minibatch)と呼ばれるサブセットを作成する必要がある.PyTorchではこの処理をデータローダー(DataLoader)として実装する.この機能は,今回のような数値データだけでなく,画像,音声,言語など様々なデータを扱うことができ,また並列化も内部でサポートしている.

以下のセルでデータローダーの作成を行う.まずはnumpy形式のデータセットをtorch.Tensortorch.from_numpyで変換する.このとき,入力データは torch.float32,ラベル値は整数値として torch.long で型を定義する必要がある.

import torch
_X_train = torch.from_numpy(X_train).to(torch.float32)
_y_train = torch.from_numpy(y_train).to(torch.long)
_X_valid = torch.from_numpy(X_valid).to(torch.float32)
_y_valid = torch.from_numpy(y_valid).to(torch.long)
_X_test = torch.from_numpy(X_test).to(torch.float32)
_y_test = torch.from_numpy(y_test).to(torch.long)

データ型が適切に変換されていることがわかる.

print(type(X_train), type(y_train))
print(type(_X_train), type(_y_train))
<class 'numpy.ndarray'> <class 'numpy.ndarray'>
<class 'torch.Tensor'> <class 'torch.Tensor'>

続いて,変換されたデータをPyTorchのデータセットクラスの一種であるTensorDatasetへ変換する.このデータセットクラスはデータからサンプル一つだけを取り出し,データローダーへ渡すメソッドが定義されている.TensorDatasetの作成は次のように,入力データと正解値をそれぞれ引数として渡せば良い.

from torch.utils.data import TensorDataset

train_dataset = TensorDataset(_X_train, _y_train)
valid_dataset = TensorDataset(_X_valid, _y_valid)
test_dataset = TensorDataset(_X_test, _y_test)

次のようにprintしてもわかるように,データセットクラスとなっている.

print(train_dataset)
<torch.utils.data.dataset.TensorDataset object at 0x14a4acbd4af0>

補足であるが,データセットのサイズや\(i\)番目のサンプルは次のようにデータセットから取得できる.

print(len(train_dataset))

i = 10
x_i, y_i = train_dataset[i]
print(x_i, y_i)
300
tensor([0.8725, 0.3475]) tensor(0)

最後に,データローダーを作成する.並列化や複数GPUでの学習の場合は引数を加える必要があるが,ここではバッチサイズのみを指定する.

from torch.utils.data import DataLoader

train_loader = DataLoader(train_dataset, batch_size=32)
valid_loader = DataLoader(valid_dataset, batch_size=10)
test_loader = DataLoader(test_dataset, batch_size=10)

データローダーはfor文を使って以下のようにミニバッチを作成してくれる.

for batch in train_loader:
    x, y = batch
    print(x.shape, y.shape)
    break
torch.Size([32, 2]) torch.Size([32])

モデルの学習ループの作成#

確率的勾配降下法でも説明したように,ニューラルネットワークのパラメータの一回の更新は(1)optimizer.zero_grad() による勾配の初期化,(2)順伝播の実行と損失の計算,(3).backwardによる勾配計算,(4)optimizer.step()によるパラメータの更新 という一連の処理からなる.そして,この処理を1回実行することを,1 iteration といった.

モデルの構築,損失関数とオプティマイザの設定を行い,この1 iterationを実装しよう.今回の例では,2次元の入力を受け取り,クラス0か1を出力する.したがって,交差エントロピー関数を損失関数として設定する.

補足: 2クラス分類なので出力次元を1としてSigmoid関数と二値エントロピー関数を損失としても良い.

import torch
import torch.nn as nn
from torch import optim

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(MLP, self).__init__()
        self.layer_in = nn.Linear(input_dim, hidden_dim)
        self.layer_hidden = nn.Linear(hidden_dim, hidden_dim)
        self.layer_out = nn.Linear(hidden_dim, output_dim)
        self.act = nn.ReLU()
        self.act_out = nn.Identity()

    def forward(self, x):
        h = x
        h = self.layer_in(h)
        h = self.act(h)
        h = self.layer_hidden(h)
        h = self.act(h)
        h = self.layer_out(h)
        h = self.act_out(h)
        return h
    
model = MLP(input_dim=2, hidden_dim=32, output_dim=2)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

これらをベースにDataLoaderのforループ内に書くと次のようになる.

for batch in train_loader:
    x, y = batch       
    optimizer.zero_grad()
    output = model(x)
    loss = loss_function(output, y)
    loss.backward()
    optimizer.step()

このループは データ数 / ミニバッチ数 回だけ行われる.この回数は len(train_loader) でも取得できる.そして,このループが終了した,つまり,データセットに含まれるすべてのデータを使ってモデルを学習したとき,その1回の回数を 1 epoch と言う.iterationとepochは区別して表記しよう.

このループ処理,つまり,1 epochの処理は,\(N\) epoch回学習を回すので再利用しやすいよう関数化しておこう.

def train_one_epoch(model, loader, loss_function, optimizer):
    model.train()
    train_loss, train_acc = [], []
    for batch in loader:
        x, y = batch
        optimizer.zero_grad()
        output = model(x)
        loss = loss_function(output, y)
        loss.backward()
        optimizer.step()
            
        acc = (output.max(1)[1] == y).float().mean()
        train_loss.append(loss.item())
        train_acc.append(acc.item())
    return np.mean(train_loss), np.mean(train_acc)

ここで,model.train() はモデルを学習モードにする処理である.ニューラルネットワークには学習時と推論時で処理が異なる層がある.それを制御するための呼び出しである.

加えて,各iterationの損失と分類精度を記録するリストをtrain_losstrain_accとして作成してこれらの値を記録している.後述するが損失と精度の推移はうまく学習が進んでいるかを確認するための重要な情報である.毎回記録しよう.

では,これを \(N=100\) epochだけ学習させる.関数化しているので次のように実装すれば良い.

epochs = 100
train_loss, train_acc = [], []
for epoch in range(1, epochs+1):
    loss, acc = train_one_epoch(model, train_loader, loss_function, optimizer)
    if epoch % 10 == 0:
        print(f'Epoch {epoch}/{epochs}: loss - {loss:.4f}, acc - {acc:.4f}')
    train_loss.append(loss)
    train_acc.append(acc)
Epoch 10/100: loss - 0.2146, acc - 0.9125
Epoch 20/100: loss - 0.1577, acc - 0.9344
Epoch 30/100: loss - 0.1089, acc - 0.9500
Epoch 40/100: loss - 0.0662, acc - 0.9781
Epoch 50/100: loss - 0.0403, acc - 0.9875
Epoch 60/100: loss - 0.0268, acc - 0.9969
Epoch 70/100: loss - 0.0195, acc - 1.0000
Epoch 80/100: loss - 0.0151, acc - 1.0000
Epoch 90/100: loss - 0.0121, acc - 1.0000
Epoch 100/100: loss - 0.0101, acc - 1.0000

実行してみると,100 epoch分の学習が実行され,各epochの平均の損失が徐々に減少し,精度が徐々に増加していることがわかる.これはつまり正解ラベルに近い学習ができていることを意味している.上記のコードではこの学習の推移をtrain_losstrain_accに保存している.

この損失が十分に下がり切るまで学習を行えば良い.

精度の計算の補足#

精度の計算 acc = (output.max(1)[1] == y).float().mean() について補足しておく. モデルからの予測結果の出力 output は次のように各データに対して,クラス数分だけ出力されている.つまり,ミニバッチ数を行,クラス数を列とした行列になっている.そして,正解ラベルはミニバッチ数次元だけのベクトルであり,各要素には正解ラベルのインデックスが格納されている.

output = torch.tensor([
    [0.2, 0.5, 0.3],  # サンプル1の予測スコア
    [0.1, 0.1, 0.8],  # サンプル2の予測スコア
    [0.1, 0.3, 0.6],  # サンプル3の予測スコア
    [0.3, 0.4, 0.3]   # サンプル4の予測スコア
])

y = torch.tensor([1, 0, 2, 1])  # 正解ラベル

得られた処理に対して output.max(1)[1] を計算している.output.max(1)では,各サンプルに対して最も大きな値(高い予測スコア)を持つクラスのインデックスとその値を計算して返している.

values, indices = output.max(1)
print('value:', values)
print('indices:', indices)
value: tensor([0.5000, 0.8000, 0.6000, 0.4000])
indices: tensor([1, 2, 2, 1])

そのため,output.max(1)[1] は最大値を持つクラスのインデックスを取得していることとなる.

output.max(1)[1]
tensor([1, 2, 2, 1])

そして,次の output.max(1)[1] == y ではこのインデックスを正解ラベルのインデックスと比較して,同じ場合は True,違う場合は False として,データ数分だけでの要素を持つTensorを返す.

print('indices:', output.max(1)[1])
print('y:', y)

print(output.max(1)[1] == y)
indices: tensor([1, 2, 2, 1])
y: tensor([1, 0, 2, 1])
tensor([ True, False,  True,  True])

この情報からミニバッチあたり何サンプルが正解したかを計算できる.そのために,Trueを1.0,Falseを0.0とする数値のベクトルに変換して,平均を取れば良い.それが精度計算で行なっている処理である.

(output.max(1)[1] == y).float().mean()
tensor(0.7500)

評価#

学習の損失と精度に関しては正解ラベルを使ってパラメータを更新するので,適切なモデルとハイパーパラメータを設定し,バグがなければ基本的には損失は更新を繰り返すたびに減少し,精度も高くなっていく.しかしながら,本来評価したいのは,学習に用いていないデータ の予測精度である.そこで,データセットの作成時に分割した test_dataset を使ってモデルを評価しよう.

評価は train_loader の代わりに test_loader を使えば良い.関数化した train_one_epoch をベースに評価用の関数 test を実装しよう.

def test(model, loader, loss_function):
    model.eval()
    test_loss, test_acc = [], []
    with torch.no_grad():
        for batch in loader:
            x, y = batch
            output = model(x)
            loss = loss_function(output, y)
      
            acc = (output.max(1)[1] == y).float().mean()
            test_loss.append(loss.item())
            test_acc.append(acc.item())
    return np.mean(test_loss), np.mean(test_acc)

評価時はパラメータの更新が必要ないので,optimizer を渡さず,更新処理に関連する勾配計算も行わない.また,評価用モードで実行したいので model.eval() を呼び出している.

以下のように学習が完了したモデルとtest_loaderを渡してテストデータに対する分類性能を確認する.

test_loss, test_acc = test(model, test_loader, loss_function)
print('test_loss = ', test_loss)
print('test_acc = ', test_acc)
test_loss =  0.013278961321339012
test_acc =  1.0

検証#

検証データ(val_dataset)は学習率や層数などのハイパーパラメータを調整するために使用する.評価で用いたtest関数にtest_loaderではなく,val_loaderを渡して,学習時に含まれていないデータでの学習の過程を記録しておこう.そのために,学習で作成した学習ループを次のように修正する.

model = MLP(input_dim=2, hidden_dim=32, output_dim=2)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

epochs = 100
train_loss, train_acc = [], []
valid_loss, valid_acc = [], []
for epoch in range(1, epochs+1):
    loss, acc = train_one_epoch(model, train_loader, loss_function, optimizer)
    train_loss.append(loss)
    train_acc.append(acc)
    
    loss, acc = test(model, valid_loader, loss_function)
    valid_loss.append(loss)
    valid_acc.append(acc)
    
    if epoch % 10 == 0:
        print(f'Epoch {epoch}/{epochs}')
        print(f'train_loss - {loss:.4f}, train_acc - {acc:.4f}')
        print(f'valid_loss - {loss:.4f}, valid_acc - {acc:.4f}')
Epoch 10/100
train_loss - 0.2431, train_acc - 0.8700
valid_loss - 0.2431, valid_acc - 0.8700
Epoch 20/100
train_loss - 0.1835, train_acc - 0.9000
valid_loss - 0.1835, valid_acc - 0.9000
Epoch 30/100
train_loss - 0.1356, train_acc - 0.9300
valid_loss - 0.1356, valid_acc - 0.9300
Epoch 40/100
train_loss - 0.0874, train_acc - 0.9800
valid_loss - 0.0874, valid_acc - 0.9800
Epoch 50/100
train_loss - 0.0527, train_acc - 0.9900
valid_loss - 0.0527, valid_acc - 0.9900
Epoch 60/100
train_loss - 0.0339, train_acc - 0.9900
valid_loss - 0.0339, valid_acc - 0.9900
Epoch 70/100
train_loss - 0.0235, train_acc - 0.9900
valid_loss - 0.0235, valid_acc - 0.9900
Epoch 80/100
train_loss - 0.0173, train_acc - 1.0000
valid_loss - 0.0173, valid_acc - 1.0000
Epoch 90/100
train_loss - 0.0135, train_acc - 1.0000
valid_loss - 0.0135, valid_acc - 1.0000
Epoch 100/100
train_loss - 0.0107, train_acc - 1.0000
valid_loss - 0.0107, valid_acc - 1.0000

これで,各epochで学習と検証を行うことができる.一般的に,評価するモデルは,\(N\) epoch学習が完了した時点でのモデルを利用する,もしくは早期終了(early stopping) と呼ばれる検証データを精度の推移から評価モデルを決定する.評価(テスト)は 未知のデータ である必要があるので,test_dataset を使ってハイパーパラメータ等の調整をしてはいけない.代わりに valid_dataset を利用しよう.

モデルの保存#

学習と評価が完了したモデルを保存する.保存の際には,PyTorchが提供する state_dict を用いて,モデルの全てのパラメータを辞書形式で取得する.state_dictは,PyTorchの各レイヤーやオプティマイザの内部状態が含まれているが,モデル構造そのものは含まず,パラメータの値のみを保存することに注意されたい.

import os
os.makedirs('output', exist_ok=True)

save_path = 'output/model.pth'
torch.save(model.state_dict(), save_path)

保存されたモデルパラメータを再度読み込むには,保存した.model.pthtorch.load関数を用いてstate_dictをロードし,load_state_dictメソッドを使用して保存されたパラメータを読み込む必要がある.

注意点として,state_dictではモデルの構造は保存されていないので,読み込むモデルと保存したモデルの構造が異なる場合はエラーが生ずる.

loaded_model = MLP(input_dim=2, hidden_dim=32, output_dim=2)
loaded_model.load_state_dict(torch.load(save_path))
<All keys matched successfully>

再度,評価を行うと先ほどと同じ結果が得られていることがわかる.

test_loss, test_acc = test(loaded_model, test_loader, loss_function)
print('test_loss = ', test_loss)
print('test_acc = ', test_acc)
test_loss =  0.011800136673264206
test_acc =  1.0

学習結果の保存#

学習の過程を保存した損失と精度を後から読み出せるように保存しておこう.pandascsvpickleを使って保存することもできるが,ここではnumpyのsave_txtを使って保存する.

np.savetxt('output/train_loss.txt', train_loss)
np.savetxt('output/train_acc.txt', train_acc)

np.savetxt('output/valid_loss.txt', valid_loss)
np.savetxt('output/valid_acc.txt', valid_acc)