Treasure Data から大量のデータを MongoDB にインポートする話

こんにちは。今年の6月に入社した Web Developer の田中です。

Quipper ではデータの収集に Treasure Data(以下 TD) を使っています。今回、収集したデータの一部(数千万件ほど!)を MongoDB にインポートする必要があったので、その時にやった事をまとめました。

tl;dr

  • TD に数千万件あるデータを MongoDB にインポートする必要があった
  • Ruby で愚直に書いたらリソースを使いきってサーバが落ちた
  • Engineering チームと協力して、サーバ強化+別インスタンス化、コードの省メモリ化を行い、無事にインポートできた

概要

以下のような事をやりました。

  • まず、TD で必要なデータを出力する JOB をつくる
  • Rake タスクで、以下を行う
    • JOB ID を指定してデータをダウンロード
    • ダウンロードしたデータを MongoDB に突っ込む
  • つくった Rake タスクを Jenkins 上で実行する

Quipper では Web Application Framework として Ruby on Rails を使っているので、Rake タスクを作成して実行する事にし、 Ruby から TD のデータを取得するには td-client-ruby を使いました。また、検証環境や本番環境に向けた、さまざまなタスクを実行する基盤として Jenkins が使われていますので、今回もそれにのっとって Jenkins 上で実行する事になります。

td-client-ruby で TD からデータを取得する

td-client-rubyTreaureData::Client.authenticate で認証済みのクライアントオブジェクトが手に入るので、これを使ってデータをダウンロードします。TD にログインする時に使うメールアドレス・パスワードを渡す事で認証が完了します。

user = ENV['TD_USER']
pass = ENV['TD_PASSWORD']
td_client = TreasureData::Client.authenticate(user, pass)

ここでは Jenkins からの実行を考えて、認証情報を環境変数から取得しています。

また、大量のデータをダウンロードする事になると、 td-client-ruby が内部で使ってる HTTPClient がタイムアウトする可能性があります(デフォルトで60秒)。このタイムアウト値を設定するためには、 authenticate の呼び出し時に connect_timeout を渡してあげれば良いです。

td_client = TreasureData::Client.authenticate(user, pass, {connect_timeout: 180}) # 単位は秒

結果の取得には TreasureData::Client#job_result が使えます。対象 JOB ID のデータを Ruby の二次元配列として取得できます。配列中の値は、TD で出力したテーブルの型を Ruby に変換してくれてるので、そのまま扱えて便利です。

job_id = ENV['TD_JOB_ID']
td_client.job_result(job_id).each do |result|
  data_a, data_b = result

  model_a = ModelA.create!(data_a: data_a, data_b: data_b)
end

最初、上記のように愚直に実装した所、ダウンロードしてきたデータがすべてメモリに乗る事になり、Quipper の環境だとデータが100万件を超えるあたりから Jenkins サーバが CPU とメモリを使い切り、最終的には落ちてしまいました。。

Jenkins の Slave で実行する

Jenkins 上には毎日の開発に必要なジョブが他にも流れているので、今回のように1つのタスクがリソースを使い切ってしまうのは良くない状況でした。Engineering チーム(Quipper でインフラ周りを専門にやっているチーム)に相談したところ、 今回のジョブに関しては Jenkins の Slave 機能を使って別インスタンスで実行すると良いんじゃないか、との事で、そのような環境を整えてもらいました。ついでに、この新しいインスタンス自体も高性能なものにしてもらっています。

省メモリ化

引き続き、コードについても Engineering チームと相談しました。省メモリ化には、一旦全データをファイルに書き出し、そこから一行ずつ読み込んで処理していく事にしました。

データをファイルに書き出すには TreasureData::Client.job_result_format が使えます。(一度 CSV になるので型情報は失われます。)

tempfile = Tempfile.open(job_id)
job_id = ENV['TD_JOB_ID']
td_client.job_result_format(job_id, 'csv', tempfile)

CSV.foreach(tempfile) do |row|
  data_a, data_b = row
  
  # something good
end

Ruby の Tempfile と組み合わせて、良い感じに書けました。

bulk insert

処理時間も大変長くなっていたので、bulk insert する事にしました。Quipper では ORM として MongoMapper を使っているのですが(現在、Mongoid へ移行中です!)、MongoMapper 自体は bulk insert をサポートしていないので、ruby driver 経由で呼び出す事になります。

Model.collection.insert(
  [
    {foo: 1, bar: 2},
    {foo: 1, bar: 4}
  ]
)

bulk insert するコードは以下のようになりました。

executed_at = Time.zone.now
CSV.foreach(tempfile).each_slice(1000) do |rows|
  records = rows.map{ |row|
    data_a, data_b = row

    {
      data_a: data_a,
      data_b: data_b,
      created_at: executed_at,
      updated_at: executed_at
    }
  }

  ModelA.collection.insert(records)
end

1000件ずつ bulk insert するようにしています。 created_atupdated_at は忘れやすいので注意が必要です。

終わり

これで数千万件のデータを MongoDB にインポートする事ができました。参考までに、最終的なコードを貼っておきます。

user = ENV['TD_USER']
pass = ENV['TD_PASSWORD']
td_client = TreasureData::Client.authenticate(user, pass, {connect_timeout: 180})

job_id = ENV['TD_JOB_ID']
tempfile = Tempfile.open(job_id)
td_client.job_result_format(job_id, 'csv', tempfile)

executed_at = Time.zone.now
CSV.foreach(tempfile).each_slice(1000) do |rows|
  records = rows.map{ |row|
    data_a, data_b = row

    {
      data_a: data_a,
      data_b: data_b,
      created_at: executed_at,
      updated_at: executed_at
    }
  }

  ModelA.collection.insert(records)
end

このような大量のデータを相手にする機会は今まで無かったので、とても新鮮でした。ただ、各改善前後の具体的な数値を残していなかったので、そこが心残りではあります…次に機会があった時は、しっかりと残しておきたいです。

そして Engineering チームの方々には大変お世話になりました…!Engineering チームの方々がいる事で、Web Developer として安心してやりたい事に集中できるので、大変良い環境だなぁと感じています。


★Quipper日本オフィスでは仲間を募集しています。是非お気軽にご応募ください。★