Spark Operator特集・2日目 ハンズオン編:kubeflow/spark-operatorでSparkアプリをK8sにデプロイする

この記事は Distributed Computing Advent Calendar 2025 の8日目の記事です。

qiita.com

今回は、 Spark Operator特集・1日目「まずはSpark on K8sのおさらい」 - やっさんメモ の続きです。

  1. Spark on Kubernetes を理解する(理解編) ※前回
  2. Spark Kubernetes Operatorを動かしてみる(実践編) ※今回の記事

始める前に、想定読者と前提は以下の通りです。

ハンズオンでは、最終的に以下が出来る事を目指します。

  1. k3dでマルチノードのSpark用Kubernetesクラスタを立てる
  2. Spark OperatorをHelmで入れる
  3. 独自のSparkアプリをPySparkとしてKubernetes上で実行する

今回の実践編は、以下の目次の流れで進めていきます。

説明不要な人は「ハンズオン向けの簡単な設定でインストール」から見てください。

Spark の Kubernetes Operator のおさらい

前回はSparkアプリケーションの特徴を説明し、Kubernetes上にSparkアプリケーションをデプロイする方法について説明してきました。 改めてなぜSparkをKubernetes上で、しかも、Operatorを使うのかについてはざっくり以下の通り。

Spark on K8s視点での利点

  1. Spark独自に運用スタックを従来のYARNベースで用意するよりもKubernetesエコシステムに乗っかる方が楽
  2. SparkをKubernetesで稼働させる事で、同じマニフェストがあれば、どんなKubernetesクラスタでも実行が可能
  3. Sparkのワークロードまたは用途に応じてクラスタを分けずに、同一クラスタ上に展開が可能(極端な例としてはバッチとストリームのワークロードを混在したクラスタにも出来る)
  4. 同一クラスタ上で複数のSparkバージョンを混在出来る

SparkのKubernetes Operator視点での利点

  1. マニフェスト+kubectlが出来ればSparkアプリをSubmitできる!(spark-submitでながーいコマンドを打たなくてもOK)
  2. Kubernetes Operator1により、CRD と Contorllerを用いることでSparkアプリのライフサイクル管理をKubernetesに任せられる

SparkのKubernetes Operatorのこれまで(歴史の話)

SparkのKubernetes Operator(以降、Spark Operator)の歴史は割と長く、KubeCon and CloudNativeCon Europe 2018Google Cloudのキーノートで紹介され、

youtu.be

そして、同年の KubeCon + CloudNativeCon China 2018に「Apache Spark on Kubernetes: A Technical Deep Dive」として紹介されています2

www.youtube.com

2018年なので、Kubernetes v1.11~1.13辺り、Spark v2.4 あたりなのでだいぶ古いですね。

また、見ていただいたら分かるように、Spark Operatorの始まりは、 KubeflowでもSpark公式でもなく、Google Cloudからでした。また、リポジトリhttps://github.com/GoogleCloudPlatform/spark-on-k8s-operator となっていました。

リンクから辿ってもらうと分かるように、現在は、KubeflowのSpark Operatorへリダイレクトされていることからも分かる様に後継がKubeflowのSpark Operatorとなっています。

2020~2022年頃から段々とGoogle Cloud側で開発が停滞した結果、

ドイツのデータプラットフォームを提供する Stackable から、以下のSpark Operatorが出てきたりしました。

https://github.com/stackabletech/spark-k8s-operator

その後、Google Cloud側から、以下のように、SparkコミュニティとKubeflow側にリポジトリを引き継がないかと打診がありました(詳しい背景はリンクを参照)。

このSpark OperatorがGolangで出来ていることもあり、SparkコミュニティとしてはJavaで作る事を選択したので、Kubeflow側が引き継ぐことになり、今のリポジトリとなりました。

https://github.com/kubeflow/spark-operator

blog.kubeflow.org

では、歴史はこの辺にして、2大 Spark Operatorの特徴を紹介していきます。

kubeflow/spark-operator(元祖Spark Operator)

まずは元祖Spark Operatorから。
最新バージョンは v2.4.0 で、APIバージョンは v1beta2となっています。ベータですが長い事同じままです。

KubeflowがCNCFのプロジェクトであることも有り、定期的にコミュニティミーティングを開催しています。

最新の開発動向をウォッチしたい場合は参加するかアーカイブをウォッチすると良いです。過去回含めて以下の通り、アーカイブが残っています。

spark-operator/ROADMAP.md at master · kubeflow/spark-operator を見てもらうと分かりますが、引き継いだ当初は現行の改善が主だった内容でしたが、v2.0.0 にて、controller-runtime を使用してSparkオペレータを大規模にリファクタリングしています(#2072)。

さらに今年に入ってからは、Spark v4.0系のサポート (#2564)や CR(Custom Resource)でのSpark Connect (#2569)をサポートしたりと、活発に開発されています。

他にも、Spark History Server MCP がKubeflowに加わっています。

github.com

github.com

公式ドキュメントはこちら:
Kubeflow Spark Operator | Kubeflow

なんとSpark Operator専用のロゴまである3

artwork/examples/incubating.md at main · cncf/artwork · GitHub

本番での利用を検討している方は今年のKubeCon + CloudNativeCon North America 2025 での以下のセッションが導入の際の参考になります。

Spark on Kubernetes, a Practical Guide - Damon Cortesi, Airbnb

kccncna2025.sched.com

中身の話に入っていくと、CRは以下

出来る事は、沢山あって紹介しきれないので Writing a SparkApplication | Kubeflow を読んでください。

内部的な情報を見るにはDeepWikiが参考になりました。

deepwiki.com

kubeflow/spark-operatorを利用する際の大事なポイント

次に、導入するうえで大事な点をいくつか紹介します。

Spark Operator専用のCLI sparkctl がなくなっている(過去から使っている人向け)

Google Cloud時代にあったCLI sparkctl はサポートを止めてDropされたので注意です(便利だったんだがなぁ)。

github.com

github.com

Sparkアプリの再起動を設定を入れる

.spec.restartPolicy を指定して、自動再起動するよう定義する。

  restartPolicy:
     type: OnFailure
     onFailureRetries: 3
     onFailureRetryInterval: 10
     onSubmissionFailureRetries: 5
     onSubmissionFailureRetryInterval: 20

詳細はこの辺に説明があるのでそこを参照ください。

実行が終わったSparkApplicationを自動削除する設定を入れる

.spec.timeToLiveSeconds を指定して実行が終わったSparkApplicationのDriverポッドを一定時間過ぎた後に削除するようにする。

Sparkアプリの実行が終わったらCPU/メモリは消費しないもののDriverポッドのログは残り続けるので自動で削除するように設定しておく。

spec:
  timeToLiveSeconds: 3600

S3を利用する場合の導入先に注意

AWS S3 または S3互換のオブジェクトストレージを使いたい場合は、Hadoop AWS Moduleを入れる必要があります。また、その場合、プロトコルの指定は s3a となります。

利用ケース1: SparkのHistory Serverが参照するストレージとして利用したい場合

Sparkアプリケーションの実行中は、Spark Web UIが利用できるので、kubectl port-forward を使ってDriverポッドに接続すると良いです5

実行が終わったSparkアプリのSparkイベントログをs3(s3a)などに入れたい場合、別途起動しているSpark History Server6が参照するパスをあわせた上で、書き込み側(アプリ側)と読み込み側(Spark History Server側)の両方のコンテナイメージにHadoop AWS Moduleを入れる必要があります。

コンテナイメージにHadoop AWS ModuleのJARを入れる場合は以下のアドカレ2日目の記事を参考にしてください。

qiita.com

History ServerでS3互換ストレージを使う場合のSparkApplicationの定義の一部が以下。S3互換ストレージとして、さくらのオブジェクトストレージを利用しています。

spec:
  # ---- Spark 設定 (spark-submit の --conf 相当) ----
  sparkConf:
    # History ServerのApp Nameで使う
    "spark.app.name": "my-spark-apps"
    # UI / ログ
    "spark.ui.port": "18080"
    "spark.logConf": "true"
    "spark.sql.session.timeZone": "Asia/Tokyo"

    # History Server 用イベントログ
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "s3a://nagatomi-test/spark-events"

    # イベントログのサイズ・頻度調整
    "spark.eventLog.compression.codec": "zstd"
    "spark.eventLog.rolling.enabled": "true"
    "spark.eventLog.rolling.maxFileSize": "128m"
    "spark.eventLog.buffer.kb": "64k"

    # Executor メトリクスをイベントログに書き出す
    "spark.eventLog.logStageExecutorMetrics": "true"

    # さらに細かくやりたければ(任意)
    "spark.executor.metrics.pollingInterval": "100"   # heartbeat 間隔 (ms)(デフォルト 0)
    "spark.executor.processTreeMetrics.enabled": "true"

  # ---- Hadoop (S3A) 設定 ----
  hadoopConf:
    "fs.s3a.endpoint": "s3.isk01.sakurastorage.jp"
    "fs.s3a.endpoint.region": "jp-north-1"
    "fs.s3a.path.style.access": "true"
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
    "fs.s3a.aws.credentials.provider": "software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider"
    "fs.s3a.connection.ssl.enabled": "true"

利用ケース2: Sparkアプリの参照先して利用したい場合

.spec.mainApplicationFile には、イメージ内のパスを利用するのが一般的ですが、開発中に試行錯誤したい場合に都度、イメージをビルドするのがダルいのでSparkの外から参照したいケースもあります。

その場合は、利用ケース1と同様にSparkApplicationのコンテナイメージにHadoop AWS Moduleを入れる必要があります。

利用ケース3: SparkのPySparkアプリの依存をマニフェスト側からリモートで指定したい場合

spark-submitの --py-files に相当する .spec.deps.pyFiles にて、PySparkの依存を動的に入れる際に、S3(s3a)を使いたい場合は、Spark Operatorのイメージ側Hadoop AWS Moduleを入れる必要があります。

Hadoop AWS Moduleを入れるには「利用ケース1」を参考にしても良いですが、以下も参考になります。

www.kubeflow.org

.spec.sparkConf.spec.sparkConfigMap は同居出来ないので注意

現状のバージョンでは、 .spec.sparkConf.spec.sparkConfigMap は同居出来そうだけど、同居出来ないので注意が必要です。

Sparkの設定は、 .spec.sparkConfigMap でSparkの設定周りのファイル群 (e.g. spark-defaults.conf, spark-env.sh, log4j.properties)をConfigMapに入れて利用出来るので便利です。

一方で、spark-submitの --conf に相当する .spec.sparkConf で指定して、Sparkアプリのプロパティを設定することも可能です。

.spec.sparkConf の場合、Spark Operator側で集約してファイルにまとめたConfigMapを作成し、そのDriverポッドがそのConfigMapをマウントして起動します。Spark Operatorのログを追うと分かるのですが、Driverポッドの起動時には、 --properties-file 使ってspark-submitするようにしてDriverポッドが起動します。

その為、.spec.sparkConfigMap で指定したspark-defaults.confの設定がすべて無視されます。。

ただ、Spark v4 で --load-spark-defaults が導入されたので、これをSpark Operator側が利用する様に実装が変更になれば、共存が可能になりそうなので今後に期待(参考)。

kubeflow/spark-operator を使ってSparkアプリを起動する

次にkubeflow/spark-operatorを使ってSparkアプリを起動してみます。

手元で起動するKubernetesクラスタをk3dで用意する

手元で利用するKubernetesクラスタはk3dを使います7

yassan.hatenablog.jp

必要な人は、SparkやSpark Operatorのイメージをビルド出来るようにローカルのコンテナレジストリを先に起動しておく。

k3d cluster create --registry-use k3d-my-registry.localhost:5000

今回用のKubernetesクラスタの設定を以下のようにして用意する。serverは1ノード、Workerにあたるagentが3ノードの構成で、ブラウザからhttp://localhost:18080History Serverにアクセス出来るようにしてます。

# k3d-config-s-op-kubeflow.yaml
apiVersion: k3d.io/v1alpha5
kind: Simple
metadata:
  name: s-op-kubeflow
servers: 1
agents: 3

# k3dのLBで 18080/tcp をクラスタ内 30080 に割り当て(History Server用)
ports:
  - port: 18080:30080
    nodeFilters:
      - loadbalancer

# ローカルレジストリは事前に作成した k3d-my-registry.localhost:5000 を使う
registries:
  use:
    - k3d-my-registry.localhost:5000

options:
  k3d:
    wait: true
  kubeconfig:
    updateDefaultKubeconfig: true
    switchCurrentContext: true

上記のYAMLを指定して起動。

k3d cluster create --config ./k3d-config-s-op-kubeflow.yaml

あとは待っていれば起動します。 kubectl get node して確認してみてください。

Spark Operatorのインストール

本番を想定した場合の大事なポイント

Sparkアプリを複数のNamespaceにデプロイ出来るようにするには以下のようにリストで複数していすると良いです。Namespace defaultではなく、利用しているKubernetesの環境に合わせて指定してください。また、Spark Operatorはインストール時に、これらのNamespaceとは分けてインストールして個別にリソースを確保しやすいようにしておきます。

spark:
  jobNamespaces:
  - apps

本番を想定するなら、Spark Operatorのコントローラーに割り当てるリソースの上限と下限の設定は必ず実施。

controller:
  resources:
    limits:
      cpu: 100m
      memory: 300Mi
    requests:
      cpu: 100m
      memory: 300Mi

次に、Spark Operatorの可用性の考慮も重要です。

レプリカ数はsplit-brainにならないように必ず奇数にしてください。

controller:
  # -- Number of replicas of controller.
  replicas: 3

  leaderElection:
    # -- Specifies whether to enable leader election for controller.
    enable: true

webhook:
  # -- Number of replicas of webhook server.
  replicas: 3

  leaderElection:
    # -- Specifies whether to enable leader election for webhook.
    enable: true

その他、Spark Operatorポッドの散らし方について、Pod Topology Spread Constraints を考慮することもオススメ8。可用性を考慮してレプリカ数を3にしても同じノードに配置されたら意味がないので、、

# デフォルト設定にあったサンプルより
controller:
  topologySpreadConstraints:
  - maxSkew: 1
    topologyKey: topology.kubernetes.io/zone
    whenUnsatisfiable: ScheduleAnyway
  - maxSkew: 1
    topologyKey: kubernetes.io/hostname
    whenUnsatisfiable: DoNotSchedule

また、PDB(PodDisruptionBudget) が考慮が可能になっているので、これも有効にすることをオススメ9。以下の例では最低1ポッドは利用可能な状態を保ちながらノードをシャットダウンできます(Kubernetesアップデートでローリングアップデートするときとかに効いてくる)。

  podDisruptionBudget:
    # -- Specifies whether to create pod disruption budget for controller.
    enable: true
    # -- The number of pods that must be available.
    # Require `controller.replicas` to be greater than 1
    minAvailable: 1

Spark Operatorのモニタリングについては以下のようにPrometheusでスクレイプ出来るようになっています。 podMonitorにも対応しているので、Prometheus Operatorをインストールしていれば楽にメトリクスが収集出来ます。

prometheus:
  metrics:
    # -- Specifies whether to enable prometheus metrics scraping.
    enable: true

  # Prometheus pod monitor for controller pods
  podMonitor:
    # -- Specifies whether to create pod monitor.
    # Note that prometheus metrics should be enabled as well.
    create: true

以下は、Sparkアプリの場合の例です。Sparkアプリも合わせて対応するしてください。

www.kubeflow.org

その他にも細かいところまで設定できるようになっているので、自身の環境に合わせて、以下のSpark OperatorのHelmチャートのvaluesを一通り読むことをオススメします。

https://github.com/kubeflow/spark-operator/blob/v2.4.0/charts/spark-operator-chart/values.yaml

ハンズオン向けの簡単な設定でインストール

今回は、Spark Operatorは、Operatorは、Namespace spark-operator、Sparkアプリは Namespace apps として分けるようにしてインストールします。

helm repo add --force-update spark-operator https://kubeflow.github.io/spark-operator

helm upgrade -i spark-operator spark-operator/spark-operator \
  --namespace spark-operator --create-namespace \
  --set "spark.jobNamespaces={apps}" \
  --wait

今回はCLI10 でインストールしてますが、本番を想定した場合は、再現性を考慮し、helm show values spark-operator/spark-operator --version 2.4.0values.yaml を作成して編集し、-f values.yaml を使う方が良いです。

Sparkアプリの起動

SparkApplicationの記述例は 公式のサンプル を見ると良いです。

今回は以下の様にしました。

spark-pi-sample.yaml (クリックで展開)

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: apps
spec:
  type: Scala
  mode: cluster
  image: k3d-my-registry.localhost:5000/spark:4.0.1-aws
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
  sparkVersion: 4.0.1

  dynamicAllocation:
    enabled: true
    initialExecutors: 1
    maxExecutors: 3
    minExecutors: 1

  # ---- Spark 設定 (spark-submit の --conf 相当) ----
  sparkConf:
    # History ServerのApp Nameで使う
    "spark.app.name": "spark-pi"
    # UI / ログ
    "spark.ui.port": "18080"
    "spark.logConf": "true"
    "spark.sql.session.timeZone": "Asia/Tokyo"

    # History Server 用イベントログ
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "s3a://nagatomi-test/spark-events"

    # イベントログのサイズ・頻度調整
    "spark.eventLog.compression.codec": "zstd"
    "spark.eventLog.rolling.enabled": "true"
    "spark.eventLog.rolling.maxFileSize": "128m"
    "spark.eventLog.buffer.kb": "64k"  # デフォルト 100k から少し下げて flush を細かく

    # Executor メトリクスをイベントログに書き出す
    "spark.eventLog.logStageExecutorMetrics": "true"

    # さらに細かくやりたければ(任意)
    "spark.executor.metrics.pollingInterval": "100"   # heartbeat 間隔 (ms)(デフォルト 0)
    "spark.executor.processTreeMetrics.enabled": "true"

  # ---- Hadoop (S3A) 設定 ----
  hadoopConf:
    "fs.s3a.endpoint": "s3.isk01.sakurastorage.jp"
    "fs.s3a.endpoint.region": "jp-north-1"
    "fs.s3a.path.style.access": "true"
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
    "fs.s3a.aws.credentials.provider": "software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider"
    "fs.s3a.connection.ssl.enabled": "true"

  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
      - secretRef:
          name: sakura-s3

    securityContext:
      capabilities:
        drop:
        - ALL
      runAsGroup: 185
      runAsUser: 185
      runAsNonRoot: true
      allowPrivilegeEscalation: false
      seccompProfile:
        type: RuntimeDefault
  executor:
    instances: 1
    cores: 1
    memory: 512m
    envFrom:
      - secretRef:
          name: sakura-s3
    securityContext:
      capabilities:
        drop:
        - ALL
      runAsGroup: 185
      runAsUser: 185
      runAsNonRoot: true
      allowPrivilegeEscalation: false
      seccompProfile:
        type: RuntimeDefault

補足:

  • k3d-my-registry.localhost:5000/spark:4.0.1-awsdocker.io/library/spark:4.0.1Hadoop AWS Moduleを コンテナイメージの ${SPARK_HOME}/jars/ 以下にいれて、k3dで用意したコンテナレジストリにあらかじめPushしたもの。

  • driver・executorの envFrom に指定している secretRefsakura-s3 は、環境変数 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY , AWS_REGION , AWS_DEFAULT_REGION が入ったSecrets

  • spec.driver|executor.securityContextを使ってポッドは非特権ユーザーで動かす例としています

あとは、これをapplyするだけ。

kubectl -n apps apply -f spark-pi-sample.yaml

(小ネタ)以下の様にしてSparkアプリの状況を確認出来ます。

# SparkApplicationの状態
kubectl -n apps get sparkapplications

# SparkApplicationの詳細
kubectl -n apps describe sparkapplications

# SparkApplicationの実行ログ
kubectl -n apps logs spark-pi-driver

PySparkアプリの起動

次にPySparkアプリを実行してみます。

以下の様にマニフェストを作成します。

このマニフェスでは、PySparkで実行するスクリプトは、さくらのオブジェクトストレージ上の s3://nagatomi-test/deps/pg_read.py を利用します。mainApplicationFileにはlocalも利用できますが、SparkコンテナイメージにHadoop AWS Moduleを入れておくことで、s3aプロトコルでS3互換ストレージを参照出来るようになります。

spark-py-pg-read.yaml (クリックで展開)

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-py-pg-read
  namespace: apps
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: k3d-my-registry.localhost:5000/spark:4.0.1-aws
  imagePullPolicy: IfNotPresent
  # local:// でも良いがS3バケットも指定が出来る
  mainApplicationFile: s3a://nagatomi-test/deps/pg_read.py
  sparkVersion: 4.0.1
  deps:
    packages:
      - org.postgresql:postgresql:42.7.8

  volumes:
    - name: ivy-cache
      emptyDir:
        sizeLimit: 1Gi

  dynamicAllocation:
    enabled: true
    initialExecutors: 1
    maxExecutors: 3
    minExecutors: 1

  sparkConf:
    spark.ui.port: "18080"
    spark.logConf: "true"
    spark.sql.session.timeZone: Asia/Tokyo
    spark.jars.ivy: /tmp
    # JHSへのログ転送
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: s3a://nagatomi-test/spark-events

    # spark-pi-sample.yaml の様に
    # hadoopConf を使わなくても以下の様に記述も出来ます
    spark.hadoop.fs.s3a.endpoint: s3.isk01.sakurastorage.jp
    spark.hadoop.fs.s3a.endpoint.region: jp-north-1
    spark.hadoop.fs.s3a.path.style.access: "true"
    spark.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hadoop.fs.s3a.aws.credentials.provider: software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
    spark.hadoop.fs.s3a.connection.ssl.enabled: "true"

  driver:
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
      - secretRef:
          name: sakura-s3
      - secretRef:
          name: pg-conf

    volumeMounts:
      - name: ivy-cache
        mountPath: /tmp
        
    securityContext:
      capabilities:
        drop:
        - ALL
      runAsGroup: 185
      runAsUser: 185
      runAsNonRoot: true
      allowPrivilegeEscalation: false
      seccompProfile:
        type: RuntimeDefault
  executor:
    instances: 1
    cores: 1
    memory: 512m
    envFrom:
      - secretRef:
          name: sakura-s3
      - secretRef:
          name: pg-conf
    securityContext:
      capabilities:
        drop:
        - ALL
      runAsGroup: 185
      runAsUser: 185
      runAsNonRoot: true
      allowPrivilegeEscalation: false
      seccompProfile:
        type: RuntimeDefault

この場合、事前に s3://nagatomi-test/deps/pg_read.py に以下のコードをアップしています。

pg_read.py (クリックで展開)

import os
from pyspark.sql import SparkSession

# 環境変数から接続先を取得
host = os.getenv("PGHOST", "127.0.0.1")
port = os.getenv("PGPORT", "5432")
db   = os.getenv("PGDATABASE", "postgres")
user = os.getenv("PG_USER", "postgres")
pw   = os.getenv("PG_PASSWORD", "")

url = f"jdbc:postgresql://{host}:{port}/{db}"

props = {
    "user": user,
    "password": pw,
    "driver": "org.postgresql.Driver",
    "fetchsize": "1000",
}

spark = SparkSession.builder.appName("pg_jdbc_read").getOrCreate()

table_or_query = "(SELECT * FROM public.trips_yellow ORDER BY 1 LIMIT 10000) t"

df = spark.read.jdbc(url=url, table=table_or_query, properties=props)
df.show(5, truncate=False)

spark.stop()

本番を想定するなら、Sparkアプリのコンテナイメージに実行するPythonスクリプトを入れるのが良いです。同様に、pg_read.py ではPostgreSQLのDBを参照するので追加で、deps.packages を使って、PostgreSQLのJDBC Driver を入れることで実行時にMavenからダウンロードしていますが、本番を想定するなら事前にSparkコンテナイメージに入れておく方が良いです。

準備が出来たら、applyするだけ。

kubectl -n apps apply -f spark-py-pg-read.yaml

以下の様に実行されました。

$ kubectl get po -A --output-watch-events --watch -o wide
EVENT      NAMESPACE        NAME                                         READY   STATUS      RESTARTS      AGE     IP           NODE                         NOMINATED NODE   READINESS GATES
:
ADDED      apps             spark-py-pg-read-driver                      0/1     Pending     0             0s      <none>       <none>                       <none>           <none>
MODIFIED   apps             spark-py-pg-read-driver                      0/1     Pending     0             0s      <none>       k3d-s-op-kubeflow-server-0   <none>           <none>
MODIFIED   apps             spark-py-pg-read-driver                      0/1     ContainerCreating   0             0s      <none>       k3d-s-op-kubeflow-server-0   <none>           <none>
MODIFIED   apps             spark-py-pg-read-driver                      1/1     Running             0             1s      10.42.2.53   k3d-s-op-kubeflow-server-0   <none>           <none>
ADDED      apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     Pending             0             0s      <none>       <none>                       <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     Pending             0             0s      <none>       k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     ContainerCreating   0             0s      <none>       k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         1/1     Running             0             2s      10.42.3.31   k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         1/1     Terminating         0             22s     10.42.3.31   k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     Completed           0             23s     10.42.3.31   k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     Completed           0             23s     10.42.3.31   k3d-s-op-kubeflow-agent-2    <none>           <none>
DELETED    apps             pg-jdbc-read-ae35179b1676cbae-exec-1         0/1     Completed           0             23s     10.42.3.31   k3d-s-op-kubeflow-agent-2    <none>           <none>
MODIFIED   apps             spark-py-pg-read-driver                      0/1     Completed           0             40s     10.42.2.53   k3d-s-op-kubeflow-server-0   <none>           <none>
MODIFIED   apps             spark-py-pg-read-driver                      0/1     Completed           0             41s     10.42.2.53   k3d-s-op-kubeflow-server-0   <none>           <none>

▼ Sparkアプリの実行ログ (クリックで展開)

アプリの実行前に Ivy でJDBC Driverをダウンロードしているのが分かります。

WARNING: Using incubator modules: jdk.incubator.vector
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /tmp/cache
The jars for the packages stored in: /tmp/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a4d2b39e-89cc-4dea-b717-fcd227a1d921;1.0
    confs: [default]
    found org.postgresql#postgresql;42.7.8 in central
    found org.checkerframework#checker-qual;3.49.5 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.7.8/postgresql-42.7.8.jar ...
    [SUCCESSFUL ] org.postgresql#postgresql;42.7.8!postgresql.jar (257ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.49.5/checker-qual-3.49.5.jar ...
    [SUCCESSFUL ] org.checkerframework#checker-qual;3.49.5!checker-qual.jar (91ms)
:: resolution report :: resolve 1246ms :: artifacts dl 365ms
    :: modules in use:
    org.checkerframework#checker-qual;3.49.5 from central in [default]
    org.postgresql#postgresql;42.7.8 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   2   |   2   |   0   ||   2   |   2   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-a4d2b39e-89cc-4dea-b717-fcd227a1d921
    confs: [default]
    2 artifacts copied, 0 already retrieved (1323kB/20ms)
25/12/13 06:47:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/13 06:47:17 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Files file:///tmp/jars/org.postgresql_postgresql-42.7.8.jar from /tmp/jars/org.postgresql_postgresql-42.7.8.jar to /opt/spark/work-dir/org.postgresql_postgresql-42.7.8.jar
Files file:///tmp/jars/org.checkerframework_checker-qual-3.49.5.jar from /tmp/jars/org.checkerframework_checker-qual-3.49.5.jar to /opt/spark/work-dir/org.checkerframework_checker-qual-3.49.5.jar
Files file:///tmp/jars/org.postgresql_postgresql-42.7.8.jar from /tmp/jars/org.postgresql_postgresql-42.7.8.jar to /opt/spark/work-dir/org.postgresql_postgresql-42.7.8.jar
Files file:///tmp/jars/org.checkerframework_checker-qual-3.49.5.jar from /tmp/jars/org.checkerframework_checker-qual-3.49.5.jar to /opt/spark/work-dir/org.checkerframework_checker-qual-3.49.5.jar
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/13 06:47:21 INFO SparkContext: Running Spark version 4.0.1
25/12/13 06:47:21 INFO SparkContext: OS info Linux, 6.6.87.2-microsoft-standard-WSL2, amd64
25/12/13 06:47:21 INFO SparkContext: Java version 17.0.17
25/12/13 06:47:21 INFO ResourceUtils: ==============================================================
25/12/13 06:47:21 INFO ResourceUtils: No custom resources configured for spark.driver.
25/12/13 06:47:21 INFO ResourceUtils: ==============================================================
25/12/13 06:47:21 INFO SparkContext: Submitted application: pg_jdbc_read
25/12/13 06:47:21 INFO SparkContext: Spark configuration:
spark.app.id=spark-df402d88543545fd9772aa2b1f56d372
spark.app.name=pg_jdbc_read
spark.app.startTime=1765608441253
spark.app.submitTime=1765608439885
spark.driver.bindAddress=10.42.2.53
spark.driver.blockManager.port=7079
spark.driver.cores=1
spark.driver.extraJavaOptions=-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true
spark.driver.host=spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc
spark.driver.memory=512m
spark.driver.port=7078
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.maxExecutors=3
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.shuffleTracking.enabled=true
spark.eventLog.buffer.kb=64k
spark.eventLog.compression.codec=zstd
spark.eventLog.dir=s3a://nagatomi-test/spark-events
spark.eventLog.enabled=true
spark.eventLog.logStageExecutorMetrics=true
spark.eventLog.rolling.enabled=true
spark.eventLog.rolling.maxFileSize=128m
spark.executor.cores=1
spark.executor.extraJavaOptions=-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-modules=jdk.incubator.vector --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true
spark.executor.instances=1
spark.executor.memory=512m
spark.executor.metrics.pollingInterval=100
spark.executor.processTreeMetrics.enabled=true
spark.executorEnv.SPARK_DRIVER_POD_IP=10.42.2.53
spark.files=file:/tmp/jars/org.postgresql_postgresql-42.7.8.jar,file:/tmp/jars/org.checkerframework_checker-qual-3.49.5.jar
spark.hadoop.fs.s3a.aws.credentials.provider=software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider
spark.hadoop.fs.s3a.connection.ssl.enabled=true
spark.hadoop.fs.s3a.endpoint=s3.isk01.sakurastorage.jp
spark.hadoop.fs.s3a.endpoint.region=jp-north-1
spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
spark.hadoop.fs.s3a.path.style.access=true
spark.hadoop.fs.s3a.vectored.read.max.merged.size=2M
spark.hadoop.fs.s3a.vectored.read.min.seek.size=128K
spark.jars=file:/tmp/jars/org.postgresql_postgresql-42.7.8.jar,file:/tmp/jars/org.checkerframework_checker-qual-3.49.5.jar
spark.jars.ivy=/tmp
spark.jars.packages=org.postgresql:postgresql:42.7.8
spark.kubernetes.authenticate.driver.serviceAccountName=spark-operator-spark
spark.kubernetes.container.image=k3d-my-registry.localhost:5000/spark:4.0.1-aws
spark.kubernetes.container.image.pullPolicy=IfNotPresent
spark.kubernetes.driver.container.image=k3d-my-registry.localhost:5000/spark:4.0.1-aws
spark.kubernetes.driver.label.sparkoperator.k8s.io/app-name=spark-py-pg-read
spark.kubernetes.driver.label.sparkoperator.k8s.io/launched-by-spark-operator=true
spark.kubernetes.driver.label.sparkoperator.k8s.io/mutated-by-spark-operator=true
spark.kubernetes.driver.label.sparkoperator.k8s.io/submission-id=0ace691e-9b6a-4ecf-9192-1c57041d257d
spark.kubernetes.driver.pod.name=spark-py-pg-read-driver
spark.kubernetes.driver.podTemplateContainerName=spark-kubernetes-driver
spark.kubernetes.driver.podTemplateFile=/tmp/spark/0ace691e-9b6a-4ecf-9192-1c57041d257d/driver-pod-template.yaml
spark.kubernetes.executor.container.image=k3d-my-registry.localhost:5000/spark:4.0.1-aws
spark.kubernetes.executor.label.sparkoperator.k8s.io/app-name=spark-py-pg-read
spark.kubernetes.executor.label.sparkoperator.k8s.io/launched-by-spark-operator=true
spark.kubernetes.executor.label.sparkoperator.k8s.io/mutated-by-spark-operator=true
spark.kubernetes.executor.label.sparkoperator.k8s.io/submission-id=0ace691e-9b6a-4ecf-9192-1c57041d257d
spark.kubernetes.executor.podTemplateContainerName=spark-kubernetes-executor
spark.kubernetes.executor.podTemplateFile=/opt/spark/pod-template/pod-spec-template.yml
spark.kubernetes.memoryOverheadFactor=0.4
spark.kubernetes.namespace=apps
spark.kubernetes.pyspark.pythonVersion=3
spark.kubernetes.resource.type=python
spark.kubernetes.submission.waitAppCompletion=false
spark.kubernetes.submitInDriver=true
spark.logConf=true
spark.master=k8s://https://10.43.0.1:443
spark.rdd.compress=True
spark.repl.local.jars=file:///tmp/jars/org.postgresql_postgresql-42.7.8.jar,file:///tmp/jars/org.checkerframework_checker-qual-3.49.5.jar
spark.serializer.objectStreamReset=100
spark.sql.artifact.isolation.enabled=false
spark.sql.session.timeZone=Asia/Tokyo
spark.submit.deployMode=client
spark.submit.pyFiles=/tmp/jars/org.postgresql_postgresql-42.7.8.jar,/tmp/jars/org.checkerframework_checker-qual-3.49.5.jar
spark.ui.port=18080
25/12/13 06:47:21 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 512, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/12/13 06:47:21 INFO ResourceProfile: Limiting resource is cpus at 1 tasks per executor
25/12/13 06:47:21 INFO ResourceProfileManager: Added ResourceProfile id: 0
25/12/13 06:47:21 INFO SecurityManager: Changing view acls to: spark
25/12/13 06:47:21 INFO SecurityManager: Changing modify acls to: spark
25/12/13 06:47:21 INFO SecurityManager: Changing view acls groups to: spark
25/12/13 06:47:21 INFO SecurityManager: Changing modify acls groups to: spark
25/12/13 06:47:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY; RPC SSL disabled
25/12/13 06:47:21 INFO Utils: Successfully started service 'sparkDriver' on port 7078.
25/12/13 06:47:21 INFO SparkEnv: Registering MapOutputTracker
25/12/13 06:47:21 INFO SparkEnv: Registering BlockManagerMaster
25/12/13 06:47:22 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
25/12/13 06:47:22 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
25/12/13 06:47:22 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/12/13 06:47:22 INFO DiskBlockManager: Created local directory at /var/data/spark-bc97eada-1738-4ed3-b6f1-bb670db556be/blockmgr-4caf3d0f-a418-40c9-8e93-1a9cccb50d56
25/12/13 06:47:22 INFO SparkEnv: Registering OutputCommitCoordinator
25/12/13 06:47:22 INFO JettyUtils: Start Jetty 0.0.0.0:18080 for SparkUI
25/12/13 06:47:22 INFO Utils: Successfully started service 'SparkUI' on port 18080.
25/12/13 06:47:22 INFO SparkContext: Added JAR file:/tmp/jars/org.postgresql_postgresql-42.7.8.jar at spark://spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:7078/jars/org.postgresql_postgresql-42.7.8.jar with timestamp 1765608441253
25/12/13 06:47:22 INFO SparkContext: Added JAR file:/tmp/jars/org.checkerframework_checker-qual-3.49.5.jar at spark://spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:7078/jars/org.checkerframework_checker-qual-3.49.5.jar with timestamp 1765608441253
25/12/13 06:47:22 INFO SparkContext: Added file file:/tmp/jars/org.postgresql_postgresql-42.7.8.jar at spark://spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:7078/files/org.postgresql_postgresql-42.7.8.jar with timestamp 1765608441253
25/12/13 06:47:22 INFO Utils: Copying /tmp/jars/org.postgresql_postgresql-42.7.8.jar to /var/data/spark-bc97eada-1738-4ed3-b6f1-bb670db556be/spark-e1b54af4-20c6-4ec1-a307-8775884cad8e/userFiles-a7dd0a8f-7cca-4ed9-a3c8-b3590a6c2f35/org.postgresql_postgresql-42.7.8.jar
25/12/13 06:47:22 INFO SparkContext: Added file file:/tmp/jars/org.checkerframework_checker-qual-3.49.5.jar at spark://spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:7078/files/org.checkerframework_checker-qual-3.49.5.jar with timestamp 1765608441253
25/12/13 06:47:22 INFO Utils: Copying /tmp/jars/org.checkerframework_checker-qual-3.49.5.jar to /var/data/spark-bc97eada-1738-4ed3-b6f1-bb670db556be/spark-e1b54af4-20c6-4ec1-a307-8775884cad8e/userFiles-a7dd0a8f-7cca-4ed9-a3c8-b3590a6c2f35/org.checkerframework_checker-qual-3.49.5.jar
25/12/13 06:47:22 INFO SecurityManager: Changing view acls to: spark
25/12/13 06:47:22 INFO SecurityManager: Changing modify acls to: spark
25/12/13 06:47:22 INFO SecurityManager: Changing view acls groups to: spark
25/12/13 06:47:22 INFO SecurityManager: Changing modify acls groups to: spark
25/12/13 06:47:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY; RPC SSL disabled
25/12/13 06:47:22 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
25/12/13 06:47:24 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors,spark.dynamicAllocation.minExecutors and spark.executor.instances
25/12/13 06:47:25 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647.
25/12/13 06:47:25 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs
25/12/13 06:47:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 7079.
25/12/13 06:47:25 INFO NettyBlockTransferService: Server created on spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc 10.42.2.53:7079
25/12/13 06:47:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
25/12/13 06:47:25 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
25/12/13 06:47:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc, 7079, None)
25/12/13 06:47:25 INFO BlockManagerMasterEndpoint: Registering block manager spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:7079 with 117.0 MiB RAM, BlockManagerId(driver, spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc, 7079, None)
25/12/13 06:47:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc, 7079, None)
25/12/13 06:47:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc, 7079, None)
25/12/13 06:47:26 INFO RollingEventLogFilesWriter: Logging events to s3a://nagatomi-test/spark-events/eventlog_v2_spark-df402d88543545fd9772aa2b1f56d372/events_1_spark-df402d88543545fd9772aa2b1f56d372.zstd
25/12/13 06:47:27 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to spark-events/eventlog_v2_spark-df402d88543545fd9772aa2b1f56d372/events_1_spark-df402d88543545fd9772aa2b1f56d372.zstd. This is Unsupported
25/12/13 06:47:27 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors,spark.dynamicAllocation.minExecutors and spark.executor.instances
25/12/13 06:47:27 INFO ExecutorAllocationManager: Dynamic allocation is enabled without a shuffle service.
25/12/13 06:47:33 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.42.3.31:46400) with ID 1, ResourceProfileId 0
25/12/13 06:47:33 INFO ExecutorMonitor: New executor 1 has registered (new total is 1)
25/12/13 06:47:33 INFO KubernetesClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
25/12/13 06:47:33 INFO BlockManagerMasterEndpoint: Registering block manager 10.42.3.31:46115 with 117.0 MiB RAM, BlockManagerId(1, 10.42.3.31, 46115, None)
25/12/13 06:47:38 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
25/12/13 06:47:38 INFO SharedState: Warehouse path is 'file:/opt/spark/work-dir/spark-warehouse'.
25/12/13 06:47:42 INFO CodeGenerator: Code generated in 503.517041 ms
25/12/13 06:47:42 INFO SparkContext: Starting job: showString at NativeMethodAccessorImpl.java:0
25/12/13 06:47:42 INFO DAGScheduler: Got job 0 (showString at NativeMethodAccessorImpl.java:0) with 1 output partitions
25/12/13 06:47:42 INFO DAGScheduler: Final stage: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0)
25/12/13 06:47:42 INFO DAGScheduler: Parents of final stage: List()
25/12/13 06:47:42 INFO DAGScheduler: Missing parents: List()
25/12/13 06:47:42 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at showString at NativeMethodAccessorImpl.java:0), which has no missing parents
25/12/13 06:47:42 INFO MemoryStore: MemoryStore started with capacity 117.0 MiB
25/12/13 06:47:42 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 23.4 KiB, free 116.9 MiB)
25/12/13 06:47:42 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.6 KiB, free 116.9 MiB)
25/12/13 06:47:42 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1676
25/12/13 06:47:42 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at showString at NativeMethodAccessorImpl.java:0) (first 15 tasks are for partitions Vector(0))
25/12/13 06:47:42 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
25/12/13 06:47:43 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (10.42.3.31,executor 1, partition 0, PROCESS_LOCAL, 10299 bytes) 
25/12/13 06:47:46 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3996 ms on 10.42.3.31 (executor 1) (1/1)
25/12/13 06:47:46 INFO TaskSchedulerImpl: Removed TaskSet 0.0 whose tasks have all completed, from pool 
25/12/13 06:47:47 INFO DAGScheduler: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0) finished in 4380 ms
25/12/13 06:47:47 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
25/12/13 06:47:47 INFO TaskSchedulerImpl: Canceling stage 0
25/12/13 06:47:47 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
25/12/13 06:47:47 INFO DAGScheduler: Job 0 finished: showString at NativeMethodAccessorImpl.java:0, took 4480.406162 ms
25/12/13 06:47:47 INFO CodeGenerator: Code generated in 117.605374 ms
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|1       |2025-04-23 19:56:29 |2025-04-23 20:03:08  |1              |0.4          |1         |N                 |236         |263         |1           |7.2        |2.5  |0.5    |2.25      |0.0         |1.0                  |13.45       |2.5                 |0.0        |0.0               |
|1       |2025-04-23 22:56:25 |2025-04-23 23:08:23  |2              |0.9          |1         |N                 |186         |170         |2           |11.4       |3.25 |0.5    |0.0       |0.0         |1.0                  |16.15       |2.5                 |0.0        |0.75              |
|1       |2025-04-23 20:49:23 |2025-04-23 21:25:12  |1              |6.1          |1         |N                 |43          |186         |2           |35.9       |3.25 |0.5    |0.0       |0.0         |1.0                  |40.65       |2.5                 |0.0        |0.75              |
|1       |2025-04-23 20:00:52 |2025-04-23 20:28:25  |1              |6.8          |99        |N                 |185         |168         |1           |31.5       |0.0  |0.5    |0.0       |0.0         |1.0                  |33.0        |0.0                 |0.0        |0.0               |
|1       |2025-04-23 23:43:25 |2025-04-24 00:40:44  |1              |7.6          |1         |N                 |143         |225         |1           |47.8       |3.25 |0.5    |13.15     |0.0         |1.0                  |65.7        |2.5                 |0.0        |0.75              |
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
only showing top 5 rows
25/12/13 06:47:47 INFO SparkContext: SparkContext is stopping with exitCode 0 from stop at NativeMethodAccessorImpl.java:0.
25/12/13 06:47:47 INFO SparkUI: Stopped Spark web UI at http://spark-py-pg-read-efe0909b16768b79-driver-svc.apps.svc:18080
25/12/13 06:47:47 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
25/12/13 06:47:47 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
25/12/13 06:47:47 INFO ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
25/12/13 06:47:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
25/12/13 06:47:48 INFO MemoryStore: MemoryStore cleared
25/12/13 06:47:48 INFO BlockManager: BlockManager stopped
25/12/13 06:47:48 INFO BlockManagerMaster: BlockManagerMaster stopped
25/12/13 06:47:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
25/12/13 06:47:48 INFO SparkContext: Successfully stopped SparkContext
25/12/13 06:47:49 INFO ShutdownHookManager: Shutdown hook called
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /tmp/artifacts-da2a7d00-743c-4510-b85a-f9076a50766f
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /tmp/spark-5a5293fb-bfc5-452d-aa2f-87f13dda09e1
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /tmp/spark-c98ade2f-e79d-458d-8113-3d5d14a37fd1
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /var/data/spark-bc97eada-1738-4ed3-b6f1-bb670db556be/spark-e1b54af4-20c6-4ec1-a307-8775884cad8e
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /tmp/spark-2bad5978-071f-4d58-9773-2bd026cfc0b3
25/12/13 06:47:49 INFO ShutdownHookManager: Deleting directory /var/data/spark-bc97eada-1738-4ed3-b6f1-bb670db556be/spark-e1b54af4-20c6-4ec1-a307-8775884cad8e/pyspark-2b90040c-fae6-4ea9-b0cf-7da3d4528e8e
25/12/13 06:47:49 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
25/12/13 06:47:49 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
25/12/13 06:47:49 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.

Spark Connectを使ってクラスタの外からPySparkアプリを実行してみる

※ここでは概要紹介のみで、ハンズオンはSpark Operator特集・3日目の記事に分離します。

次に、v2.3.0 から導入された、kind: SparkConnect について。

まだ、kubeflowのSpark Operatorでは出たばかりなので、APIバージョンが v1alpha1 と新しく、破壊的変更が入りやすいので本番で利用する際は注意が必要です。

ちなみにSpark Connectは Spark v3.4 で入ったSparkの中では新しい機能です。

www.databricks.com

Spark v4.0.0 かなりの改善が入り、本番での利用もしやすくなりました。詳しくは以下を参照ください。

国内では以下の利用事例もあります。

developers.microad.co.jp

developers.microad.co.jp

さっそく、ハンズオンを始めていきたいところですが、結構長くなったのと、Spark Connectは面白い機能なので、ここだけハンズオンは、Spark Operator特集・3日目として切り出すことにしました🙏

ハンズオンでは、事前にSpark ConnectをKubernetes上で起動し、Kubernetesクラスタの外からPythonスクリプト経由でSparkのワークロードを実行します。起動済みのSpark Connectにクラスタの外からどうやってPythonスクリプトでしか対応していないライブラリなどの依存を入れるのか?DRAでマルチノードにスケールアウトするのか?などについて見ていきます。

まとめ

今回は、Spark OperatorとKubeflowのSpark Operatorの紹介及び、ハンズオン(SparkConnect除く)を紹介しました。 Spark Operator特集・3日目のSpark Connectの話もよろしくお願いします。

以上、 Distributed Computing Advent Calendar 2025 の8日目の記事でした。

また、Distributed Computing Advent Calendar 2025の枠が空いてるので参加お待ちしてます!


  1. 詳しくは 以下を参照ください。
    Kubernetes Operator 超入門/Kubernetes_Operator_Introduction - Speaker Deck
  2. Spark on Kubernetes や Operatorの思想が語られたとても良い内容です。
  3. ここからダウンロード出来ます。
  4. 今回は取り上げませんが、サンプル見るとイメージ湧きます。
    Running Spark Applications on a Schedule | Kubeflow
  5. Spark Operatorの設定 controller.uiService.enable がデフォルト true なので利用可能になってます。Ingressを使って公開も出来るので詳しくはここを参照ください。
  6. Monitoring and Instrumentation - Spark 4.0.1 Documentation を参考にすると設定方法や起動方法があるのでそれを元にDeploymentでSpark History Serverを起動するマニフェストを作成すればOK。もしくは、以下を導入してもおもしろそうです
    github.com
  7. 去年、k3dは取り上げたので良かったら見てください。
  8. Kubernetesの散らし方や集め方といったスケジューリングについては、チシャ猫さんのスライドがとても参考になります
    賢く「散らす」ための Topology Spread Constraints #k8sjp / Kubernetes Meetup Tokyo 25th - Speaker Deck とか 「詰める」と「散らす」の動力学 - 原理・原則から理解するコンテナ配置戦略 #openshiftjp / OpenShift Run 2019 - Speaker Deck など。
  9. あおいさんの記事も参考になります。
    [イラストでわかるKubernetes] Pod Disruption Budget
  10. --set の扱い方は Using Helm | Helm を読むと分かりやすいです。