spark on k8s using Google spark-operator

官方github地址:https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

  1. 安装helm

Documentation: https://helm.sh/docs/

ALL binaries: https://github.com/helm/helm/releases

MAC OS: brew install helm

测试: helm version

  1. 安装spark operator

helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator

  • 创建命名空间
    kubectl create namespace spark-operator

  • 安装operator
    helm install incubator/sparkoperator —namespace spark-operator —set sparkJobNamespace=default —generate-name

  1. 基本命令

    • 创建app:kubectl apply -f spark-pi.yml
    • 查看应用yml:kubectl get sparkapplications spark-pi -o=yaml
    • 查看应用描述:kubectl describe sparkapplication spark-pi
    • 查看应用日志:kubectl logs -f spark-pi
    • 删除应用:kubectl delete -f spark-pi.yml

      spark-pi.yml参考:

https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml

spark on k8s using spark-submit with official images

官方文档:http://spark.apache.org/docs/latest/running-on-kubernetes.html

  1. 镜像打包:
    cd $SPARK_HOME &&
    ./bin/docker-image-tool.sh -t v2.4.3 build
    cd $SPARK_HOME &&
    ./bin/docker-image-tool.sh -t v2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile build
    上面执行完会打三个images:spark:v2.4.3 / spark-py:v2.4.3 / spark-r:v2.4.3
    你如果只需要spark基本镜像包可执行:
    cd $SPARK_HOME &&
    docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .

  2. 提交测试job

k8s地址查看:kubectl cluster-info

Client Mode

spark driver跑在你提交的物理机器上,所以application jar要写你本地的地址:

spark-submit —master k8s://https://kubernetes.docker.internal:6443
—deploy-mode client
—name spark-pi
—class org.apache.spark.examples.SparkPi
—conf spark.kubernetes.container.image=spark:v2.4.3
—conf spark.kubernetes.namespace=default
local:///Users/sgr/software/bigdata/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar

应用结束后,k8s会自动删除启动的pod

Cluster Mode

spark driver是由k8s启动的pod完成,所以application jar要写你容器内的地址,或者spark能支持读取的文件地址例如hdfs:

spark-submit —master k8s://https://kubernetes.docker.internal:6443
—deploy-mode cluster
—name spark-pi
—class org.apache.spark.examples.SparkPi
—conf spark.kubernetes.container.image=spark:v2.4.3
—conf spark.kubernetes.namespace=default
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

两种方式比较

  1. Spark submit

    • 通过—conf进行参数配置,包括spark内核的参数和k8s相关的参数
    • Spark3.0之前,关于个性化的配置比较少(建议在spark3.0上实践)
    • App的管理不方便 (建议在k8s的pod上启动spark dirver),
    • 和现有的mlsql提交形式比较接近
  2. Spark-on-k8s operator

    • 通过yaml方式提交,每一个app对应一个yaml
    • K8s Operator云原生方式部署
    • 提供了一些kill、list、restart、reschedule等命令

综合上面两种方式,最后我们选择以spark submit方式进行应用提交。

实践一下

我们以提交spark pi应用作为例子进行实践。

  • 镜像准备
    你如果不想了解镜像打包过程,可以直接使用我准备好的镜像:
    docker pull harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0
    镜像环境:

    • java:14
    • spark:3.0.0

https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz

  1. FROM openjdk:14.0-jdk-slim
  2. RUN set -ex && \
  3. sed -i 's/http:/https:/g' /etc/apt/sources.list && \
  4. apt-get update && \
  5. ln -s /lib /lib64 && \
  6. apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \
  7. mkdir -p /opt/spark && \
  8. mkdir -p /opt/spark/examples && \
  9. mkdir -p /opt/spark/work-dir && \
  10. touch /opt/spark/RELEASE && \
  11. rm /bin/sh && \
  12. ln -sv /bin/bash /bin/sh && \
  13. echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
  14. chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
  15. rm -rf /var/cache/apt/*
  16. COPY jars /opt/spark/jars
  17. COPY bin /opt/spark/bin
  18. COPY sbin /opt/spark/sbin
  19. COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
  20. COPY examples /opt/spark/examples
  21. COPY kubernetes/tests /opt/spark/tests
  22. COPY data /opt/spark/data
  23. ENV SPARK_HOME /opt/spark
  24. WORKDIR /opt/spark/work-dir
  25. RUN chmod g+w /opt/spark/work-dir
  26. ENTRYPOINT [ "/opt/entrypoint.sh" ]
  • Spark driver Deployment yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
    annotations: {}
    name: spark-pi
    namespace: dev
    spec:
    selector:
      matchLabels:
        app: spark-pi
    strategy:
      rollingUpdate:
        maxUnavailable: 0
      type: RollingUpdate
    template:
      metadata:
        labels:
          app: spark-pi
      spec:
        containers:
        - name: spark-pi
          args:
            - >-
              echo "/opt/spark/bin/spark-submit --master k8s://https://192.168.202.231:6443
              --deploy-mode client --class org.apache.spark.examples.SparkPi  
              --conf spark.kubernetes.container.image=harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0 
              --conf spark.kubernetes.container.image.pullPolicy=Always 
              --conf spark.kubernetes.namespace=dev  
              --conf spark.kubernetes.authenticate.driver.serviceAccountName=songgongru 
              --conf spark.kubernetes.container.image.pullSecrets=regsecret 
              --conf spark.kubernetes.driver.request.cores=1 
              --conf spark.kubernetes.driver.limit.cores=1 
              --conf spark.kubernetes.executor.request.cores=1 
              --conf spark.kubernetes.executor.limit.cores=1 
              --conf spark.executor.instances=1  
              --conf spark.driver.host=$POD_IP  
              local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar " | bash
          command:
            - /bin/sh
            - '-c'
          env:
            - name: POD_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: NODE_IP
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
          image: 'harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0'
          imagePullPolicy: Always
          resources:
            limits:
              cpu: 1
              memory: 1Gi
            requests:
              cpu: 1
              memory: 1Gi
        imagePullSecrets:
          - name: regsecret
        serviceAccountName: songgongru
        nodeSelector:
          internet: 'false'
    
  • pod 部署

    • 创建pod:kubectl apply -f spark-pi.yaml
    • 删除pod:kubectl delete -f spark-pi.yaml
    • 查看pod:kubectl get pods -n dev |grep spark-pi
    • 查看pod状态:kubectl describe po/spark-pi-b4cbfcb68-j2h62 -n dev (需要查询pod名称)
    • 查看pod日志:kubectl logs —tail=10 -f spark-pi-b4cbfcb68-j2h62 -n dev

与hadoop进行整合

我们使用configmap配置hadoop相关的配置文件,然后设置HADOOP_CONF_DIR坏境变量。

  1. 创建configmap

    通过目录创建hadoop配置文件信息


kubectl create configmap hadoop-conf --from-file=hadoop-conf -n dev

创建keytab文件


create configmap mlsql-keytab-sgr --from-file=songgongru.keytab

创建krb5.conf文件


kubectl create configmap krb5-conf --from-file=krb5.conf -n dev

查看configmap


kubectl get cm -n dev

 ➜  ~ kubectl get cm -n dev
NAME                        DATA   AGE
canal-admin-startup-conf    1      43d
canal-server-startup-conf   1      41d
hadoop-conf                 7      7d5h
krb5-conf                   1      6d23h
mlsql-keytab-sgr            0      6d23h
  1. 设置Pod通过数据卷使⽤ConfigMap
    apiVersion: apps/v1
    kind: Deployment
    ...
    spec:
    containers:
    - name: spark-mlsql-3
     env:
       - name: HADOOP_CONF_DIR
         value: "/etc/hadoop/conf"
       - name: SPARK_HOME
         value: "/opt/spark"
     image: 'harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0'
     volumeMounts:
     - mountPath: /data
       name: spark-mlsql
     - name: hadoop-conf
       mountPath: /etc/hadoop/conf
     - name: mlsql-keytab-sgr
       mountPath: /root
     - name: krb5-conf
       mountPath: /etc/krb5.conf
       subPath: krb5.conf
    volumes:
     - name: spark-mlsql
       persistentVolumeClaim:
         claimName: spark-pvc1
     - name: hadoop-conf
       configMap:
         name: hadoop-conf
     - name: mlsql-keytab-sgr
       configMap:
         name: mlsql-keytab-sgr
     - name: krb5-conf
       configMap:
         name: krb5-conf
    

发现executor节点报如下错误:

20/08/25 08:38:11 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
java.io.IOException: Incomplete HDFS URI, no host: hdfs:///tmp/mlsql/streamingpro-mlsql-spark_3.0_2.12-1.7.0-SNAPSHOT.jar
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:143)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1853)
    at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:737)
    at org.apache.spark.util.Utils$.fetchFile(Utils.scala:522)
    at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)
    at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
    at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
    at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
    at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)

原因是executor读不到hadoop相关配置信息。