はじめに
Elixir Advent Calendar 2019 23 日目です!(遅れてごめんなさい…!)
今回はニューラルネットワークを用いた手書きの数字認識を行うプログラムを作っていきます。 GitHubでも公開しているので、全体像を見たい方や試してみたい方はこちらで確認してください!
僕の大学のプログラミング初心者も多数存在する授業の最終課題が C 言語でニューラルネットワークを用いた手書きの数字認識を行うプログラムを作成する というものでした。
(数年前は最終課題がじゃんけんのプログラムだったそうですがインフレしすぎではないですかね。。)
今回そこで作成したプログラムを元に Elixir で実装し直していくという形で行なっていきます。
元々アドベントカレンダーに向けてこの記事で終わらせる予定だったのですが、最後の最後で詰まったので 2 回に分けることにします。
前編の今回は学習部分で用いる関数についてです。 全体のプログラムとしてまだ未完成なので後編では少し修正が入っているかもしれないです。
また、ニューラルネットワークに関しては授業で軽く習った浅く狭い知識を掘り返して説明しています。 説明の足りない部分・間違っている部分等ありましたら、是非是非コメントをください。
ニューラルネットワークについての軽すぎる紹介
今回学習のために与えるのは MNIST の 0-9 の手書きの数字のデータセットです。
ニューラルネットワークによる文字認識の学習では、入力(画像)x を与えると出力ベクトル y を返す y = f(x) という関数を用意することをさします。
推論ではこの関数を用いて出力された y の中の最大の要素に対応する数字を推論結果として返します。
今回は 6 層のニューラルネットワークを実装します。
全体の実装について
また、今回Matrixというライブラリも使用しています。
では実装した関数を軽く紹介していきます。 (時間が足りなくて関数内で使わないのに引数を取っているなど、かなり荒削りです…)
実装上の前提
・行列は全て List で表す 例えば 2×3 で要素が全て 1 の行列は
[
[[1],[1],[1]],
[[1],[1],[1]]
]
と表すこととします。
・説明上、y(i, j)などと書いてある場合、行列 y の i 行目、j 列目の要素をさす。 ・説明上、y(t)などと書いてある場合、行ベクトル y の t 番目の要素をさす。
行列の標準出力
def print(m, n, x) do
List.flatten(x)
|> Enum.with_index(1)
|> Enum.each(fn {xx, index} -> if rem(index, n) == 0, do: IO.write(inspect(xx)<>"\n"), else: IO.write(xx) end)
end
m*n の行列 matrix を渡すことでその行列を標準出力します。
行列の積和計算
def fc(m, n, x, a, b) do
x = List.flatten(x)
b
|> List.flatten()
|> Enum.with_index()
|> Enum.map(fn {bx, index} ->
result =
Enum.at(a, index)
|>Enum.with_index()
|>Enum.map(fn{ax, a_index} ->
ax * Enum.at(x, a_index)
end)
|> Enum.sum()
result + bx
end)
|> Enum.chunk_every(1)
end
m*n 行列 a, m 行の列ベクトル b, n 行の列ベクトル x が入力されたときに ax+b を計算する。
ReLU 演算
def relu(x, n) when is_integer(x) == false, do: relu(n, x)
def relu(_n, x) do
x
|> List.flatten()
|> Enum.map(fn xx -> if xx < 0, do: 0, else: xx end)
|> Enum.chunk_every(1)
end
n 行の列ベクトル x が入力された時に 0 未満の要素の値を 0 にする。
Softmax 演算
def softmax(x, n) when is_integer(x) == false, do: softmax(n, x)
def softmax(n, x) do
x_max = x |> List.flatten() |> Enum.max()
denominator = x |> List.flatten() |> Enum.map(fn xx -> :math.exp(xx - x_max) end) |> Enum.sum()
x
|> List.flatten()
|> Enum.map(fn xx -> :math.exp(xx - x_max)/denominator end)
|> Enum.chunk_every(1)
end
入力 x に対し、exp(x(k) −xmax)/∑ i exp(x(i) −xmax)となる行列を返す。
誤差逆伝搬(Softmax 層)
def softmaxwithloss_bwd(m, y, t) do
y = List.flatten(y)
[0,0,0,0,0,0,0,0,0,0]
|> Enum.with_index(0)
|> Enum.map(fn {ans, index} -> if index == t, do: {1, index}, else: {0, index} end)
|> Enum.map(fn {ans, index} ->
Enum.at(y, index) - ans
end)
|> Enum.chunk_every(1)
end
入力された行列 y、正解の数字 t に対して、 y(t) - 1 した行列を返す。
誤差逆伝搬(ReLU 層)
def relu_bwd(m, x, dEdy) do
x = List.flatten(x)
dEdy |> List.flatten()
|> Enum.with_index()
|> Enum.map(fn {dedy, index} -> if Enum.at(x, index) > 0, do: dedy, else: 0 end)
|> Enum.chunk_every(1)
end
n 行の列ベクトル x が入力された時に x0 未満の要素の値を 0 にする。
誤差逆伝搬(FC 層)
def fc_bwd(m, n, x, dEdy, a, dEda, dEdx) do
dEdb = dEdy
a = a |> List.flatten()
dEdy = dEdy |> List.flatten()
x = x |> List.flatten()
dEda =
Task.async(fn ->
dEda
|> Enum.with_index()
|> Enum.map(fn {deda, index} ->
Task.async(fn ->
Enum.with_index(deda)
|> Enum.map(fn {dedaa, indexx} ->
Enum.at(dEdy, index) * Enum.at(x, indexx)
end)
end)
end)
|> Enum.map(&(Task.await(&1, 100000)))
end)
dEdx =
Task.async(fn ->
dEdx
|> Enum.with_index()
|> Enum.map(fn {dedx, index} ->
dedx = 0
a
|> Enum.with_index()
|> Enum.filter(fn {aa, indexx} -> rem(indexx, n) == index end)
|> Enum.map(fn {aa, indexx} ->
index_for_dedy = div(indexx, n)
aa * Enum.at(dEdy, index_for_dedy)
end)
|> Enum.sum()
end)
|> Enum.chunk_every(1)
end)
dEda = Task.await(dEda, 1000000)
dEdx = Task.await(dEdx, 1000000)
{dEda, dEdb, dEdx}
end
行列 dEdy, a, dEda, dEdb, dEdx を引数にとる。 dEdb には dEdy をそのままコピーする。
dEda は以下の式の通りに変更 dEda(i, j) = dEdy(i) * x(j)
dEdx は以下の式の通りに変更 dEdx(i) = ∑ j (a(j, i) * dEdy(j))
この関数が重すぎだったので所々 Task.async / await を用いて並行処理を行うようにしています。
誤差逆伝播
def backward6(a1, a2, a3, b1, b2, b3, x, t, deda1, deda2, deda3, dedb1, dedb2, dedb3) do
copy_fc1 = x
copy_relu1 = fc(50, 784, x, a1, b1)
copy_fc2 = relu(50, copy_relu1)
copy_relu2 = fc(100, 50, copy_fc2, a2, b2)
copy_fc3 = relu(100, copy_relu2)
y = fc(10, 100, copy_fc3, a3, b3) |> softmax(10)
y0 = Matrix.new(784, 1)
y1 = Matrix.new(50, 1)
y2 = Matrix.new(100, 1)
y3 = softmaxwithloss_bwd(10, y, t)
{deda3, dedb3, y2} = fc_bwd(10, 100, copy_fc3, y3, a3, deda3, y2)
y2 = relu_bwd(100, copy_relu2, y2)
{deda2, dedb2, y1} = fc_bwd(100, 50, copy_fc2, y2, a2, deda2, y1)
y1 = relu_bwd(50, copy_relu1, y1)
{deda1, dedb1, y0} = fc_bwd(50, 784, copy_fc1, y1, a1, deda1, y0)
{deda1, deda2, deda3, dedb1, dedb2, dedb3}
end
これまでの関数を組み合わせて 6 層の誤差逆伝播を行う。
学習モードの関数群
def learn_6layers(train_count, train_x, train_y) do
epoch = 1
batch = 100
h = 0.01
batch = 2
deda1 = Matrix.new(50, 784)
deda2 = Matrix.new(100, 50)
deda3 = Matrix.new(10, 100)
dedb1 = Matrix.new(50, 1)
dedb2 = Matrix.new(100, 1)
dedb3 = Matrix.new(10, 1)
a1 = rand_init(50, 784)
a2 = rand_init(100, 50)
a3 = rand_init(10, 100)
b1 = rand_init(50, 1)
b2 = rand_init(100, 1)
b3 = rand_init(10, 1)
index = 0..train_count-1
{a1, a2, a3, b1, b2, b3} = learn_single(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, epoch, train_count)
save_file(a1, a2, a3, b1, b2, b3)
{a1, a2, a3, b1, b2, b3}
end
def learn_single(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, epoch, train_count, i \\ 1) do
case i do
^epoch ->
IO.puts("processing... Epoch:#{epoch}/#{epoch}")
index = shuffle(index)
#train_count/batchは割り切れると仮定
j = round(train_count/batch)-1
learn_batch(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, j)
ii ->
IO.puts("processing... Epoch: #{ii}/#{epoch}")
index = shuffle(index)
#train_count/batchは割り切れると仮定
j = round(train_count/batch)-1
{a1, a2, a3, b1, b2, b3} = learn_batch(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, j)
learn_single(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, epoch, train_count, ii+1)
end
end
def learn_batch(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, j) do
case j do
0 -> {a1, a2, a3, b1, b2, b3}
jj ->
#平均勾配の初期化
deda1_av = Matrix.new(50, 784)
deda2_av = Matrix.new(100, 50)
deda3_av = Matrix.new(10, 100)
dedb1_av = Matrix.new(50, 1)
dedb2_av = Matrix.new(100, 1)
dedb3_av = Matrix.new(10, 1)
IO.puts("processing batch...")
{a1, a2, a3, b1, b2, b3} = cal_gradient(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av, train_x, train_y, index, batch, jj, batch-1)
learn_batch(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, train_x, train_y, index, batch, jj-1)
end
end
def cal_gradient(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av, train_x, train_y, index, batch, j, time, tasks_list \\ []) do
case time do
0 -> IO.puts("\n")
IO.puts("wait task finishing..")
{deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av} =
tasks_list
|> Enum.map(fn task ->
IO.write "."
Task.await(task, 1000000)
end)
|> Enum.reduce(fn {deda1, deda2, deda3, dedb1, dedb2, dedb3}, {deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av} ->
deda1_av = Matrix.add(deda1, deda1_av)
deda2_av = Matrix.add(deda2, deda2_av)
deda3_av = Matrix.add(deda3, deda3_av)
dedb1_av = Matrix.add(dedb1, dedb1_av)
dedb2_av = Matrix.add(dedb2, dedb2_av)
dedb3_av = Matrix.add(dedb3, dedb3_av)
{deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av}
end)
#ミニバッチで割って平均を求める
h = 0.01
deda1_av = Matrix.scale(deda1_av, -h/batch)
deda2_av = Matrix.scale(deda2_av, -h/batch)
deda3_av = Matrix.scale(deda3_av, -h/batch)
dedb1_av = Matrix.scale(dedb1_av, -h/batch)
dedb2_av = Matrix.scale(dedb2_av, -h/batch)
dedb3_av = Matrix.scale(dedb3_av, -h/batch)
#係数a, bを更新
a1 = Matrix.add(deda1_av, a1)
a2 = Matrix.add(deda2_av, a2)
a3 = Matrix.add(deda3_av, a3)
b1 = Matrix.add(dedb1_av, b1)
b2 = Matrix.add(dedb2_av, b2)
b3 = Matrix.add(dedb3_av, b3)
IO.puts("\n")
{a1, a2, a3, b1, b2, b3}
n ->
task = Task.async(Aimodoki, :backward6, [a1, a2, a3, b1, b2, b3, Enum.at(train_x, 1) ,Enum.at(train_y, 1), deda1, deda2, deda3, dedb1, dedb2, dedb3])
tasks_list = tasks_list ++ [task]
IO.write "."
cal_gradient(a1, a2, a3, b1, b2, b3, deda1, deda2, deda3, dedb1, dedb2, dedb3, deda1_av, deda2_av, deda3_av, dedb1_av, dedb2_av, dedb3_av, train_x, train_y, index, batch, j, n-1, tasks_list)
end
end
再帰が使われまくってかなりわかりにくくなっています
大雑把な流れとしては
- a, b を[−1 : 1]の乱数で初期化
- 以下をエポック回数だけ繰り返す (関数 learn_single)
(a) [0 : train_count − 1] の値を持つ配列 index をランダムに並び替える
(b) 以下のミニバッチ学習を traincount/batch 回繰り返す.(関数 learnbatch)
① 平均勾配を初期化 ② 関数 cal_gradient を用いて、a, b を更新する
となっています。
cal_gradient 内では
- backword6 を用いて、平均勾配の計算
- a, b に平均勾配に学習率をかけたものを加えることで更新
ということを行なっています。
また、所々 Task.async / await での並行処理を行うようになっています。
main 関数
def main(args \\ []) do
if is_learn_mode?(args) do
IO.puts "------ learn mode ------"
train_count = 2000
train_x = MNIST.train_image(train_count)
train_y = MNIST.train_label(train_count)
{a1, a2, a3, b1, b2, b3} = learn_6layers(train_count, train_x, train_y)
#テストを行う
test_count = 100
test_x = MNIST.test_image(test_count)
test_y = MNIST.test_label(test_count)
test_inference(a1, a2, a3, b1, b2, b3, test_x, test_y, test_count)
IO.puts "== end =="
else
IO.puts "------ inference mode ------"
{a1, a2, a3, b1, b2, b3} = load_file()
test_x = MNIST.test_image(1) |> Enum.at(0)
test_y = MNIST.test_label(1)
inference_number = inference6(a1, a2, a3, b1, b2, b3, test_x)
IO.puts "The number is #{inference_number}, I think!"
IO.puts "(The currect number is #{Enum.at(test_y, 0)}.)"
IO.puts "== end =="
end
end
その他便利関数
配列の要素のシャッフル
def shuffle(matrix) do
matrix
|> Enum.shuffle()
end
損失関数
def cross_entropy_error(y, t) do
target = List.flatten(y) |> Enum.at(t)
-1*:math.log(target + 1.0e-7)
end
[-1:1]の乱数で行列の初期化
def rand_init(m, n) do
matrix = Matrix.scale( Matrix.rand(m, n), 2)
mainuser = Matrix.new(m, n, -1)
Matrix.add(matrix, mainuser)
end
ファイルの保存・保存したファイルからの読み込み
def save_file(a1, a2, a3, b1, b2, b3) do
IO.puts("save file ...")
a1 = a1 |> matrix_to_string()
a2 = a2 |> matrix_to_string()
a3 = a3 |> matrix_to_string()
b1 = b1 |> matrix_to_string()
b2 = b2 |> matrix_to_string()
b3 = b3 |> matrix_to_string()
File.write!("a1", a1)
File.write!("a2", a2)
File.write!("a3", a3)
File.write!("b1", b1)
File.write!("b2", b2)
File.write!("b3", b3)
end
def load_file() do
IO.puts("load file ...")
a1 = File.read!("a1") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(784)
a2 = File.read!("a2") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(50)
a3 = File.read!("a3") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(100)
b1 = File.read!("b1") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(1)
b2 = File.read!("b2") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(1)
b3 = File.read!("b3") |> String.split(",") |> Enum.filter(&(&1 != "" and &1 != "\n")) |> Enum.map(&(String.to_float(&1))) |> Enum.chunk_every(1)
{a1, a2, a3, b1, b2, b3}
end
def matrix_to_string(matrix) do
matrix
|> Enum.map(fn rows ->
rows |> Enum.map(fn elem -> Float.to_string(elem) <> "," end)
end)
end
学習データを用いたテスト
def test_inference(a1, a2, a3, b1, b2, b3, test_x, test_y, test_count) do
IO.puts("testing ...")
currect =
test_x
|> Enum.with_index()
|> Enum.map(fn {x, index} ->
IO.write "."
inference_number = inference6(a1, a2, a3, b1, b2, b3, x)
if inference_number == Enum.at(test_y, index), do: 1, else: 0
end)
|> Enum.sum()
IO.puts "\n"
IO.write "currect percentage (%) : "
IO.inspect (currect * 100 / test_count)
end
引数から学習モードかどうかの判断
def is_learn_mode?(args) do
{opts, target, _} =
args
|> OptionParser.parse(switches: [learn: :boolean])
opts[:learn]
end
次回予告
現在以下の推論に用いる関数が必ず 9 を返す(さらに合っている確率 100%として)というクソバグに引っかかっています。 半日くらいかけて調べていたのですが、fc や relu、softmax などの関数に問題らしいところは見つからず、完全に詰まってしまいました。
# maxのindexを返す
def inference6(a1, a2, a3, b1, b2, b3, x) do
y1 = fc(50, 784, x, a1, b1) |> relu(50)
IO.inspect y1
y2 = fc(100, 50, y1, a2, b2)
IO.inspect y2
y2 = y2 |> relu(100)
IO.inspect y2
y = fc(10, 100, y2, a3, b3)
IO.inspect y #ソート済み
y= y|> softmax(10)
IO.inspect y
y = y
|> List.flatten()
|> Enum.with_index()
|> Enum.sort_by(&(elem(&1, 0)), &>=/2)
|> Enum.at(0)
IO.inspect y
elem(y, 1)
end
後編にご期待ください。
(何かこの関数が原因じゃないかみたいなことを気づいた方がおられましたらぜひ教えてください…)