spark on k8s using Google spark-operator
官方github地址:https://github.com/GoogleCloudPlatform/spark-on-k8s-operator
- 安装helm
Documentation: https://helm.sh/docs/
ALL binaries: https://github.com/helm/helm/releases
MAC OS: brew install helm
测试: helm version
- 安装spark operator
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
创建命名空间
kubectl create namespace spark-operator安装operator
helm install incubator/sparkoperator —namespace spark-operator —set sparkJobNamespace=default —generate-name
基本命令
- 创建app:kubectl apply -f spark-pi.yml
- 查看应用yml:kubectl get sparkapplications spark-pi -o=yaml
- 查看应用描述:kubectl describe sparkapplication spark-pi
- 查看应用日志:kubectl logs -f spark-pi
- 删除应用:kubectl delete -f spark-pi.yml
spark-pi.yml参考:
https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/examples/spark-pi.yaml
spark on k8s using spark-submit with official images
官方文档:http://spark.apache.org/docs/latest/running-on-kubernetes.html
镜像打包:
cd $SPARK_HOME &&
./bin/docker-image-tool.sh -t v2.4.3 build
cd $SPARK_HOME &&
./bin/docker-image-tool.sh -t v2.4.3 -f ./kubernetes/dockerfiles/spark/Dockerfile build
上面执行完会打三个images:spark:v2.4.3 / spark-py:v2.4.3 / spark-r:v2.4.3
你如果只需要spark基本镜像包可执行:
cd $SPARK_HOME &&
docker build -t spark:latest -f kubernetes/dockerfiles/spark/Dockerfile .提交测试job
k8s地址查看:kubectl cluster-info
Client Mode
spark driver跑在你提交的物理机器上,所以application jar要写你本地的地址:
spark-submit —master k8s://https://kubernetes.docker.internal:6443
—deploy-mode client
—name spark-pi
—class org.apache.spark.examples.SparkPi
—conf spark.kubernetes.container.image=spark:v2.4.3
—conf spark.kubernetes.namespace=default
local:///Users/sgr/software/bigdata/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar
应用结束后,k8s会自动删除启动的pod
Cluster Mode
spark driver是由k8s启动的pod完成,所以application jar要写你容器内的地址,或者spark能支持读取的文件地址例如hdfs:
spark-submit —master k8s://https://kubernetes.docker.internal:6443
—deploy-mode cluster
—name spark-pi
—class org.apache.spark.examples.SparkPi
—conf spark.kubernetes.container.image=spark:v2.4.3
—conf spark.kubernetes.namespace=default
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar
两种方式比较
Spark submit
- 通过—conf进行参数配置,包括spark内核的参数和k8s相关的参数
- Spark3.0之前,关于个性化的配置比较少(建议在spark3.0上实践)
- App的管理不方便 (建议在k8s的pod上启动spark dirver),
- 和现有的mlsql提交形式比较接近
Spark-on-k8s operator
- 通过yaml方式提交,每一个app对应一个yaml
- K8s Operator云原生方式部署
- 提供了一些kill、list、restart、reschedule等命令
综合上面两种方式,最后我们选择以spark submit方式进行应用提交。
实践一下
我们以提交spark pi应用作为例子进行实践。
镜像准备
你如果不想了解镜像打包过程,可以直接使用我准备好的镜像:docker pull harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0
镜像环境:- java:14
- spark:3.0.0
https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
FROM openjdk:14.0-jdk-slim
RUN set -ex && \
sed -i 's/http:/https:/g' /etc/apt/sources.list && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules krb5-user libnss3 && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/examples && \
mkdir -p /opt/spark/work-dir && \
touch /opt/spark/RELEASE && \
rm /bin/sh && \
ln -sv /bin/bash /bin/sh && \
echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
chgrp root /etc/passwd && chmod ug+rw /etc/passwd && \
rm -rf /var/cache/apt/*
COPY jars /opt/spark/jars
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
ENV SPARK_HOME /opt/spark
WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]
- Spark driver Deployment yaml
apiVersion: apps/v1 kind: Deployment metadata: annotations: {} name: spark-pi namespace: dev spec: selector: matchLabels: app: spark-pi strategy: rollingUpdate: maxUnavailable: 0 type: RollingUpdate template: metadata: labels: app: spark-pi spec: containers: - name: spark-pi args: - >- echo "/opt/spark/bin/spark-submit --master k8s://https://192.168.202.231:6443 --deploy-mode client --class org.apache.spark.examples.SparkPi --conf spark.kubernetes.container.image=harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0 --conf spark.kubernetes.container.image.pullPolicy=Always --conf spark.kubernetes.namespace=dev --conf spark.kubernetes.authenticate.driver.serviceAccountName=songgongru --conf spark.kubernetes.container.image.pullSecrets=regsecret --conf spark.kubernetes.driver.request.cores=1 --conf spark.kubernetes.driver.limit.cores=1 --conf spark.kubernetes.executor.request.cores=1 --conf spark.kubernetes.executor.limit.cores=1 --conf spark.executor.instances=1 --conf spark.driver.host=$POD_IP local:///opt/spark/examples/jars/spark-examples_2.12-3.0.0.jar " | bash command: - /bin/sh - '-c' env: - name: POD_IP valueFrom: fieldRef: fieldPath: status.podIP - name: NODE_IP valueFrom: fieldRef: fieldPath: status.hostIP image: 'harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0' imagePullPolicy: Always resources: limits: cpu: 1 memory: 1Gi requests: cpu: 1 memory: 1Gi imagePullSecrets: - name: regsecret serviceAccountName: songgongru nodeSelector: internet: 'false'
pod 部署
- 创建pod:kubectl apply -f spark-pi.yaml
- 删除pod:kubectl delete -f spark-pi.yaml
- 查看pod:kubectl get pods -n dev |grep spark-pi
- 查看pod状态:kubectl describe po/spark-pi-b4cbfcb68-j2h62 -n dev (需要查询pod名称)
- 查看pod日志:kubectl logs —tail=10 -f spark-pi-b4cbfcb68-j2h62 -n dev
与hadoop进行整合
我们使用configmap配置hadoop相关的配置文件,然后设置HADOOP_CONF_DIR
坏境变量。
- 创建configmap
通过目录创建hadoop配置文件信息
kubectl create configmap hadoop-conf --from-file=hadoop-conf -n dev
创建keytab文件
create configmap mlsql-keytab-sgr --from-file=songgongru.keytab
创建krb5.conf文件
kubectl create configmap krb5-conf --from-file=krb5.conf -n dev
查看configmap
kubectl get cm -n dev
➜ ~ kubectl get cm -n dev
NAME DATA AGE
canal-admin-startup-conf 1 43d
canal-server-startup-conf 1 41d
hadoop-conf 7 7d5h
krb5-conf 1 6d23h
mlsql-keytab-sgr 0 6d23h
- 设置Pod通过数据卷使⽤ConfigMap
apiVersion: apps/v1 kind: Deployment ... spec: containers: - name: spark-mlsql-3 env: - name: HADOOP_CONF_DIR value: "/etc/hadoop/conf" - name: SPARK_HOME value: "/opt/spark" image: 'harbor.k8s-test.uc.host.dxy/dev/spark-jdk-slim-14:v3.0.0' volumeMounts: - mountPath: /data name: spark-mlsql - name: hadoop-conf mountPath: /etc/hadoop/conf - name: mlsql-keytab-sgr mountPath: /root - name: krb5-conf mountPath: /etc/krb5.conf subPath: krb5.conf volumes: - name: spark-mlsql persistentVolumeClaim: claimName: spark-pvc1 - name: hadoop-conf configMap: name: hadoop-conf - name: mlsql-keytab-sgr configMap: name: mlsql-keytab-sgr - name: krb5-conf configMap: name: krb5-conf
发现executor节点报如下错误:
20/08/25 08:38:11 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
java.io.IOException: Incomplete HDFS URI, no host: hdfs:///tmp/mlsql/streamingpro-mlsql-spark_3.0_2.12-1.7.0-SNAPSHOT.jar
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:143)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1853)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:737)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:522)
at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7(Executor.scala:869)
at org.apache.spark.executor.Executor.$anonfun$updateDependencies$7$adapted(Executor.scala:860)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:860)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:404)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
原因是executor读不到hadoop相关配置信息。