お手軽・簡単?!Cloud Storage Connectorを使ってHadoopクラスタからGCS・S3にデータを移動する

qiita.com

この記事は MicroAd (マイクロアド) Advent Calendar 2020 - Qiita の3日目の記事です。

昨日は Kotlin大好き? wrongwrong の以下のGitHub ActionsでJava/Kotlin製ライブラリ(ビルドツールはgradle)のCI環境構築する話でした。 qiita.com

3日目の記事は、Hadoopネタです。

HDFSに貯めたデータをクラウドストレージに持っていってクラウド上で何かをしたい(BigQueryから参照させたいetc)。 また、バックアップ先としてクラウドストレージを使いたい、クラウドストレージのデータをHadoopクラスタから参照したい。。。

など、Hadoopクラスタからクラウドへ、また、その逆についてもやりたいケースがありますが、その為にローカルに落として aws s3gsutil などを使って都度都度データを移動するのはとても億劫です。

そこで、今回はHadoopクラスタからHadoopクラスタ外に楽にデータを移動する事が出来るCloud Storage Connectorを紹介します。
Cloud Storage Connectorを使うとGoogle Cloud Storage(GCS)、Amazon S3Microsoft Azure Data Lake Storage (ADLS)といったパブリッククラウドに楽にデータを移動する事が出来ます。

利用イメージは、以下のようなことが出来る様になります。

# Hadoopクラスタからデータを参照
$ hadoop fs -ls gs://gcs-connector-bucket/user/hdfs/

# GCSバケットからデータを取得
$ hadoop distcp gs://gcs-connector-bucket/user/hdfs/db/table/ hdfs://name-node:8020/user/hdfs/db/table/

前提条件

今回は以下を前提とします。

  • CDH v6.3.21
  • Cloud Storage Connectorは GCS のものを利用

利用するCloud Storage Connector について

今回は、GCPのCloud Storage Connectorを利用します。

cloud.google.com

試してはいませんが HDPCDP でも利用可能だと考えます。

また、GCPのCloud Storage Connectorは、CDHがv6以降の場合はCDHに含まれていますが、v5系の場合は別途Cloud Storage ConnectorをParcelにしてクラスタに適用すればいけるかも。

github.com

では、さっそく環境構築から。

環境の用意

GCSバケットの作成

Webコンソールまたは、gsutilにてバケットを作成。 今回は、 gcs-connector-bucket という名前で作成。

cf. ストレージ バケットの作成 | Cloud Storage | Google Cloud

サービスアカウントの作成

Webコンソール または gcloud iam service-accounts create などでサービスアカウントを作成。 また、作成したらJSONフォーマットの秘密鍵も合わせて作成する。 今回は、 gcs-connector-admin@gcp-prj.iam.gserviceaccount.com という名前で作成。

cf. サービス アカウントの作成と管理 | Cloud IAM のドキュメント | Google Cloud

カスタムロールの作成

Webコンソールまたは gcloud iam roles create を使って、以下の権限を含むカスタムロールを作成します。 今回は、 gcs-connector という名前で作成。

  • storage.bucket.get
  • storage.objects.create
  • storage.objects.delete
  • storage.objects.get
  • storage.objects.getIamPolicy
  • storage.objects.list
  • storage.objects.setIamPolicy
  • storage.objects.update

cf. カスタムロールの作成と管理 | Cloud IAM のドキュメント | Google Cloud

GCSバケットに権限の付与

GCSバケット gcs-connector-bucket に対して、カスタムロール gcs-connector を割り当てしたサービスアカウント gcs-connector-admin@gcp-prj.iam.gserviceaccount.com に紐づけます。

cf. IAM 権限の使用 | Cloud Storage | Google Cloud

Cloudera Managerに設定を追加

HDFSサービスの設定 Cluster-wide Advanced Configuration Snippet (Safety Valve) for core-site.xml を以下の様に設定。

Key Value 備考
fs.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem 決め打ち
fs.AbstractFileSystem.gs.impl com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS 決め打ち
fs.gs.project.id gcp-prj GCPのプロジェクトID
fs.gs.auth.service.account.email 例) gcs-connector-admin@gcp-prj.iam.gserviceaccount.com サービスアカウントe-mailアドレス
fs.gs.auth.service.account.enable true サービスアカウント使うので true
fs.gs.auth.service.account.private.key.id 例)55aa3575c〜〜c30cf2 サービスアカウントのJSONにある private_key_id
fs.gs.auth.service.account.private.key 例) ----BEGIN PRIVATE KEY-----\nMI〜〜〜w==\n-----END PRIVATE KEY-----\n サービスアカウントのJSONにある private_key
fs.gs.working.dir (作成したバケット名)/ Trashの置き場とかの起点となるルート。 バケット指定しているので、作成したバケット名の指定が必要。デフォルトは / なので設定が必須。※ 末尾の / 忘れずに
fs.gs.path.encoding uri-path ファイル名にスペースを含む場合に指定。 v1.9.11からデフォルト uri-path となり、 v2.1.0以降は変更不可になって設定値自体が削除されている。
fs.gs.proxy.address proxy:8080 (必要なら)HTTP Proxyの指定。 host:portで指定。

cf. Configuring Google Cloud Storage Connectivity | 6.3.x | Cloudera Documentation

GCSバケット直下にディレクトリ user を作成

Webコンソールやgsutilを使って、GCSバケット直下に userフォルダを作成して gs://gcs-connector-bucket/user にパスを通す。 hdfsコマンドでデータを削除したときやコマンド実行時のテンポラリ領域は、 gs://bucket/user/(実行ユーザ名)/ 以下を使う為、予め作成しておく必要があります。

動作確認

設定が終わったので動作確認します。

hdoopコマンド使えるか(HadoopクラスタからGCSバケットを操作できるか確認)

$ hadoop fs -mkdir gs://gcs-connector-bucket/user/yassan
$ hadoop fs -ls gs://gcs-connector-bucket/user/yassan
Found 1 items
drwx------   - yassan yassan          0 2020-12-01 00:31 gs://gcs-connector-bucket/user/yassan

HiveからSQLでGCSにデータを入れられるか

データベース hoge_db の テーブル page_view を作成するとします。

テーブルのLocationに指定するパスをGCSバケットにフォルダを作成する

$ hadoop fs -mkdir -p gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/

※Webコンソールでもgsutilでも何でも良い

テーブル作成

beeline -u jdbc:hive2://my-cdh-hiveserver2:10000 -n yassan などでHiveにアクセスしたりもしくは、Hueを利用して以下のようなDDLを流してテーブルを作成する。

CREATE EXTERNAL TABLE hoge_db.page_view(
  time TIMESTAMP,
  url  STRING,
  ip   STRING
)
PARTITIONED BY (dt STRING)
STORED AS PARQUET
LOCATION 'gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/'   ★GCSバケットを指定★
TBLPROPERTIES ("parquet.compression"="SNAPPY")

データの挿入

事前にパーティションを作成

ALTER TABLE nagatomi_yasukazu_db.page_view ADD IF NOT EXISTS PARTITION (dt='20201129');

次に、Hadoopクラスタの別のテーブルからGCSバケットがLoactionになっているテーブルにデータを挿入する

INSERT OVERWRITE TABLE hoge_db.page_view
PARTITION (dt = '20201129')
SELECT
  time
  ,NVL(url, 'NULL')
  ,NVL(ip, 'NULL')
FROM huga_db.access_log
WHERE dt = '20201129'
;

データがあるか確認

$ hadoop fs -ls gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129
Found 2 items
drwx------   - yassan yassan          0 2020-12-02 22:16 gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/.hive-staging_hive_2020-12-02_22-16-14_321_8149762901856143454-1
-rwx------   3 yassan yassan       4136 2020-12-02 22:17 gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/000000_0
:

DistCp使ってみる

GCSバケットからHadoopクラスタにデータを取ってくる

$ hadoop distcp \
   gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/000000_0 \
   hdfs:///user/yassan/hoge_db/page_view2/

その逆もやってみる

$ hadoop distcp \
   hdfs:///user/yassan/hoge_db/page_view2/000000_1 \
   gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/

Sparkを使ってみる(割愛)

Sparkも対象ですがここでは割愛。

professional-services/tools/cloudera-parcel-gcsconnector at master · GoogleCloudPlatform/professional-services が参考になると思います。

注意事項

hadoop fs -rm しただけでは、 gs://bucket/user/(実行ユーザ名)/.Trash/Current/ に移動されるだけなので注意です。

補足

  • CDH v6.3.2に含まれるgcs-connectorは gcs-connector-hadoop3-1.9.10-cdh6.3.2-shaded.jarとなっていて、本家の hadoop3-1.9.10 相当になっていそう(たぶん)
    • jarは /opt/cloudera/parcels/CDH/jars/gcs-connector-hadoop3-1.9.10-cdh6.3.2-shaded.jar にありました
  • core-site.xmlに可能な設定項目とその説明・デフォルト値は ここ が参考になります

最後に

いかがだったでしょうか。今回はGCSを使ってみましたが、 Amazon S3Microsoft Azure Data Lake Storage (ADLS) も使えるようなのでご利用の環境に合せてお試しください。

明日は、 somasomaso-ma の 「akka-http で request-timeout が機能しなかった話」だそうです。お楽しみに!

(´-`).。oO(20日目の記事は何書こうかな…。Hadoop✕コンテナ?)