Gunosyデータ分析ブログ

Gunosyで働くデータエンジニアが知見を共有するブログです。

Sparkで利用できるDeep Learningフレームワークまとめ

こんにちは、Gunosyデータ分析部に所属している森本です。
主な担当業務は記事配信アルゴリズムの改善、ログ基盤運用です。
最近良く聞く音楽はOne Direction - Live While We're Youngです。

本記事では、Sparkで利用できるDeep Learningフレームワークをまとめました。

GunosyではChainerで畳み込みニューラルネットワークを応用し、ユーザーのデモグラフィック推定を行っています。

Chainer以外にも多数のDeep LearningフレームワークがPythonを中心に数多く存在します。 TensorFlow, Keras, Caffe, Theanoなどなど。どのフレームワークが優れているかという回答は状況に応じて変わりますが、Pythonを使用する大きな強みは前処理からモデル生成、評価まで一気通貫した処理ができることです。

一方で、次のようなのニーズもあります。

  • S3等に保存されている膨大なデータを分散処理して学習したい
  • ハイパーパラメーターを効率的に探索し、全体の計算時間を短縮したい

そこで分散処理に定評のあるSparkを活用し、このようなニーズを解消するフレームワークがないか調査してみました。

Elephas

http://maxpumperla.github.io/elephas/

ElephasはKerasを拡張したフレームワークです。ソースコードを見てみると、Kerasの直感的で理解しやすいコードがそのまま登場します。 MNISTのサンプルコードの一部を見てみましょう。

elephas/mnist_mlp_spark.py at master · maxpumperla/elephas · GitHub

# Convert class vectors to binary class matrices
y_train = np_utils.to_categorical(y_train, nb_classes)
y_test = np_utils.to_categorical(y_test, nb_classes)

model = Sequential()
model.add(Dense(128, input_dim=784))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(128))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(Dense(10))
model.add(Activation('softmax'))

sgd = SGD(lr=0.1)

# Build RDD from numpy features and labels
rdd = to_simple_rdd(sc, x_train, y_train)

# Initialize SparkModel from Keras model and Spark context
adagrad = elephas_optimizers.Adagrad()
spark_model = SparkModel(sc,
                         model,
                         optimizer=adagrad,
                         frequency='epoch',
                         mode='asynchronous',
                         num_workers=2,master_optimizer=sgd)

モデル定義部分はKerasそのものであり、コードは非常に馴染みやすくなっています。

Deeplearning4j

github.com

Deeplearning4JはJavaとScalaで開発されたフレームワークです。独自実装を行い既存のPythonライブラリを拡張していないフレームワークです。サンプルコードが豊富であり、次のリポジトリには、Scalaを利用したサンプルコードがまとまっています。

GitHub - kogecoo/dl4j-examples-scala: a simple scala porting of https://github.com/deeplearning4j/dl4j-examples MNISTのコードの一部を見てみましょう。モデル定義部分はわかりやすく記述できます。

val conf = new NeuralNetConfiguration.Builder()
                .seed(rngSeed)
                .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
                .iterations(1)
                .learningRate(0.006)
                .updater(Updater.NESTEROVS).momentum(0.9)
                .regularization(true).l2(1e-4)
                .list()
                .layer(0, new DenseLayer.Builder()
                  .nIn(numRows*numColumns)
                  .nOut(1000)
                  .activation("relu")
                  .weightInit(WeightInit.XAVIER)
                  .build())
                .layer(1, new OutputLayer.Builder(LossFunction.NEGATIVELOGLIKELIHOOD)
                  .nIn(1000)
                  .nOut(outputNum)
                  .activation("softmax")
                  .weightInit(WeightInit.XAVIER)
                  .build())
                .pretrain(false).backprop(true)
                .build()

一方、 Spark Summit 2016の「What Is Deeper Comparison of Deep Learning Frameworks Atop Spark」の資料17ページに示してあるように、他のフレームワークよりベンチマーク結果は低調のようでした。

SparkNet

github.com

Sparkを生んだAMPLabが開発したTensorflowやCaffeの拡張フレームワークです。

MNISTのモデル定義部分は次のように表現できます

https://github.com/amplab/SparkNet/blob/master/models/tensorflow/mnist/mnist_graph.py

def model(data, train=False):
    """The Model definition."""
    # 2D convolution, with 'SAME' padding (i.e. the output feature map has
    # the same size as the input). Note that {strides} is a 4D array whose
    # shape matches the data layout: [image index, y, x, depth].
    conv = tf.nn.conv2d(data,
                        conv1_weights,
                        strides=[1, 1, 1, 1],
                        padding='SAME')
    # Bias and rectified linear non-linearity.
    relu = tf.nn.relu(tf.nn.bias_add(conv, conv1_biases))
    # Max pooling. The kernel size spec {ksize} also follows the layout of
    # the data. Here we have a pooling window of 2, and a stride of 2.
    pool = tf.nn.max_pool(relu,
                          ksize=[1, 2, 2, 1],
                          strides=[1, 2, 2, 1],
                          padding='SAME')
    conv = tf.nn.conv2d(pool,
                        conv2_weights,
                        strides=[1, 1, 1, 1],
                        padding='SAME')
    relu = tf.nn.relu(tf.nn.bias_add(conv, conv2_biases))
    pool = tf.nn.max_pool(relu,
                          ksize=[1, 2, 2, 1],
                          strides=[1, 2, 2, 1],
                          padding='SAME')
    # Reshape the feature map cuboid into a 2D matrix to feed it to the
    # fully connected layers.
    pool_shape = pool.get_shape().as_list()
    reshape = tf.reshape(
        pool,
        [pool_shape[0], pool_shape[1] * pool_shape[2] * pool_shape[3]])
    # Fully connected layer. Note that the '+' operation automatically
    # broadcasts the biases.
    hidden = tf.nn.relu(tf.matmul(reshape, fc1_weights) + fc1_biases)
    # Add a 50% dropout during training only. Dropout also scales
    # activations such that no rescaling is needed at evaluation time.
    # if train:
    #   hidden = tf.nn.dropout(hidden, 0.5, seed=SEED)
    return tf.matmul(hidden, fc2_weights) + fc2_biases

コミットログを見ると継続的な開発が停止しているように見えました。Spark 2系へも対応してほしいですね。

CaffeOnSpark

github.com

CaffeOnSparkはYahooが開発しているフレームワークです。HDFS上のデータを直接利用できます。またLMDBなど既存資産を利用できるのでCaffeユーザーには馴染みやすいフレームワークになっています。Spark 2系にも対応しています。

TensorFlow on Spark

arimo.com

ARIMO社が提供する、文字通りTensorflowを拡張するフレームワークです。Spark Summitのスライドに詳細が記されています。

github.com

まとめ

SparkでDeep Learningできるフレームワークを紹介しました。Sparkで分散処理できると大規模データの活用以外にも、ハイパーパラメーターの探索時間短縮にも効果を発揮できます。一方、データマイニングエンジニアの状況を考えるとJavaやScalaを使用せずにPythonで完結するほうが親和性が高いと感じました。また、紹介したフレームワークを載せたSparkの環境構築はGPUまわりも考慮すると大変そうです。EMRなどのサービスでボタン一発で利用できるようになると、利便性が向上しさらに活発に利用されていくのではないでしょうか。