この記事は MicroAd (マイクロアド) Advent Calendar 2020 - Qiita の3日目の記事です。
昨日は Kotlin大好き? wrongwrong の以下のGitHub ActionsでJava/Kotlin製ライブラリ(ビルドツールはgradle)のCI環境構築する話でした。 qiita.com
3日目の記事は、Hadoopネタです。
HDFSに貯めたデータをクラウドストレージに持っていってクラウド上で何かをしたい(BigQueryから参照させたいetc)。 また、バックアップ先としてクラウドストレージを使いたい、クラウドストレージのデータをHadoopクラスタから参照したい。。。
など、Hadoopクラスタからクラウドへ、また、その逆についてもやりたいケースがありますが、その為にローカルに落として aws s3
や gsutil
などを使って都度都度データを移動するのはとても億劫です。
そこで、今回はHadoopクラスタからHadoopクラスタ外に楽にデータを移動する事が出来るCloud Storage Connectorを紹介します。
Cloud Storage Connectorを使うとGoogle Cloud Storage(GCS)、Amazon S3、Microsoft Azure Data Lake Storage (ADLS)といったパブリッククラウドに楽にデータを移動する事が出来ます。
利用イメージは、以下のようなことが出来る様になります。
# Hadoopクラスタからデータを参照 $ hadoop fs -ls gs://gcs-connector-bucket/user/hdfs/ # GCSバケットからデータを取得 $ hadoop distcp gs://gcs-connector-bucket/user/hdfs/db/table/ hdfs://name-node:8020/user/hdfs/db/table/
前提条件
今回は以下を前提とします。
- CDH v6.3.21
- Cloud Storage Connectorは GCS のものを利用
利用するCloud Storage Connector について
今回は、GCPのCloud Storage Connectorを利用します。
試してはいませんが HDP や CDP でも利用可能だと考えます。
また、GCPのCloud Storage Connectorは、CDHがv6以降の場合はCDHに含まれていますが、v5系の場合は別途Cloud Storage ConnectorをParcelにしてクラスタに適用すればいけるかも。
では、さっそく環境構築から。
環境の用意
GCSバケットの作成
Webコンソールまたは、gsutilにてバケットを作成。
今回は、 gcs-connector-bucket
という名前で作成。
cf. ストレージ バケットの作成 | Cloud Storage | Google Cloud
サービスアカウントの作成
Webコンソール または gcloud iam service-accounts create
などでサービスアカウントを作成。
また、作成したらJSONフォーマットの秘密鍵も合わせて作成する。
今回は、 gcs-connector-admin@gcp-prj.iam.gserviceaccount.com
という名前で作成。
cf. サービス アカウントの作成と管理 | Cloud IAM のドキュメント | Google Cloud
カスタムロールの作成
Webコンソールまたは gcloud iam roles create
を使って、以下の権限を含むカスタムロールを作成します。
今回は、 gcs-connector
という名前で作成。
- storage.bucket.get
- storage.objects.create
- storage.objects.delete
- storage.objects.get
- storage.objects.getIamPolicy
- storage.objects.list
- storage.objects.setIamPolicy
- storage.objects.update
cf. カスタムロールの作成と管理 | Cloud IAM のドキュメント | Google Cloud
GCSバケットに権限の付与
GCSバケット gcs-connector-bucket
に対して、カスタムロール gcs-connector
を割り当てしたサービスアカウント gcs-connector-admin@gcp-prj.iam.gserviceaccount.com
に紐づけます。
cf. IAM 権限の使用 | Cloud Storage | Google Cloud
Cloudera Managerに設定を追加
HDFSサービスの設定 Cluster-wide Advanced Configuration Snippet (Safety Valve) for core-site.xml を以下の様に設定。
Key | Value | 備考 |
---|---|---|
fs.gs.impl | com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem | 決め打ち |
fs.AbstractFileSystem.gs.impl | com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS | 決め打ち |
fs.gs.project.id | gcp-prj | GCPのプロジェクトID |
fs.gs.auth.service.account.email | 例) gcs-connector-admin@gcp-prj.iam.gserviceaccount.com | サービスアカウントe-mailアドレス |
fs.gs.auth.service.account.enable | true | サービスアカウント使うので true |
fs.gs.auth.service.account.private.key.id | 例)55aa3575c〜〜c30cf2 | サービスアカウントのJSONにある private_key_id |
fs.gs.auth.service.account.private.key | 例) ----BEGIN PRIVATE KEY-----\nMI〜〜〜w==\n-----END PRIVATE KEY-----\n | サービスアカウントのJSONにある private_key |
fs.gs.working.dir | (作成したバケット名)/ | Trashの置き場とかの起点となるルート。 バケット指定しているので、作成したバケット名の指定が必要。デフォルトは / なので設定が必須。※ 末尾の / 忘れずに |
fs.gs.path.encoding | uri-path | ファイル名にスペースを含む場合に指定。 v1.9.11からデフォルト uri-path となり、 v2.1.0以降は変更不可になって設定値自体が削除されている。 |
fs.gs.proxy.address | proxy:8080 | (必要なら)HTTP Proxyの指定。 host:portで指定。 |
cf. Configuring Google Cloud Storage Connectivity | 6.3.x | Cloudera Documentation
GCSバケット直下にディレクトリ user を作成
Webコンソールやgsutilを使って、GCSバケット直下に userフォルダを作成して gs://gcs-connector-bucket/user
にパスを通す。
hdfsコマンドでデータを削除したときやコマンド実行時のテンポラリ領域は、 gs://bucket/user/(実行ユーザ名)/
以下を使う為、予め作成しておく必要があります。
動作確認
設定が終わったので動作確認します。
hdoopコマンド使えるか(HadoopクラスタからGCSバケットを操作できるか確認)
$ hadoop fs -mkdir gs://gcs-connector-bucket/user/yassan $ hadoop fs -ls gs://gcs-connector-bucket/user/yassan Found 1 items drwx------ - yassan yassan 0 2020-12-01 00:31 gs://gcs-connector-bucket/user/yassan
HiveからSQLでGCSにデータを入れられるか
データベース hoge_db の テーブル page_view を作成するとします。
テーブルのLocationに指定するパスをGCSバケットにフォルダを作成する
$ hadoop fs -mkdir -p gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/
※Webコンソールでもgsutilでも何でも良い
テーブル作成
beeline -u jdbc:hive2://my-cdh-hiveserver2:10000 -n yassan
などでHiveにアクセスしたりもしくは、Hueを利用して以下のようなDDLを流してテーブルを作成する。
CREATE EXTERNAL TABLE hoge_db.page_view( time TIMESTAMP, url STRING, ip STRING ) PARTITIONED BY (dt STRING) STORED AS PARQUET LOCATION 'gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/' ★GCSバケットを指定★ TBLPROPERTIES ("parquet.compression"="SNAPPY")
データの挿入
事前にパーティションを作成
ALTER TABLE nagatomi_yasukazu_db.page_view ADD IF NOT EXISTS PARTITION (dt='20201129');
次に、Hadoopクラスタの別のテーブルからGCSバケットがLoactionになっているテーブルにデータを挿入する
INSERT OVERWRITE TABLE hoge_db.page_view PARTITION (dt = '20201129') SELECT time ,NVL(url, 'NULL') ,NVL(ip, 'NULL') FROM huga_db.access_log WHERE dt = '20201129' ;
データがあるか確認
$ hadoop fs -ls gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129 Found 2 items drwx------ - yassan yassan 0 2020-12-02 22:16 gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/.hive-staging_hive_2020-12-02_22-16-14_321_8149762901856143454-1 -rwx------ 3 yassan yassan 4136 2020-12-02 22:17 gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/000000_0 :
DistCp使ってみる
$ hadoop distcp \ gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/000000_0 \ hdfs:///user/yassan/hoge_db/page_view2/
その逆もやってみる
$ hadoop distcp \ hdfs:///user/yassan/hoge_db/page_view2/000000_1 \ gs://gcs-connector-bucket/user/yassan/hoge_db/page_view/dt=20201129/
Sparkを使ってみる(割愛)
Sparkも対象ですがここでは割愛。
professional-services/tools/cloudera-parcel-gcsconnector at master · GoogleCloudPlatform/professional-services が参考になると思います。
注意事項
hadoop fs -rm
しただけでは、 gs://bucket/user/(実行ユーザ名)/.Trash/Current/
に移動されるだけなので注意です。
補足
- CDH v6.3.2に含まれるgcs-connectorは
gcs-connector-hadoop3-1.9.10-cdh6.3.2-shaded.jar
となっていて、本家の hadoop3-1.9.10 相当になっていそう(たぶん)- jarは
/opt/cloudera/parcels/CDH/jars/gcs-connector-hadoop3-1.9.10-cdh6.3.2-shaded.jar
にありました
- jarは
- core-site.xmlに可能な設定項目とその説明・デフォルト値は ここ が参考になります
最後に
いかがだったでしょうか。今回はGCSを使ってみましたが、 Amazon S3 や Microsoft Azure Data Lake Storage (ADLS) も使えるようなのでご利用の環境に合せてお試しください。
明日は、 somasomaso-ma の 「akka-http で request-timeout が機能しなかった話」だそうです。お楽しみに!