拉取镜像
$ docker pull mirrorgooglecontainers/kubernetes-kafka:1.0-10.2.1
创建pv
$ vi kfkpv
将下列代码复制进去
kind: PersistentVolumeapiVersion: v1metadata:name: pv-kafka1namespace: bigdataannotations:volume.beta.kubernetes.io/storage-class: "anything"labels:type: localspec:capacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/data/kafka1"persistentVolumeReclaimPolicy: Recycle---kind: PersistentVolumeapiVersion: v1metadata:name: pv-kafka2namespace: bigdataannotations:volume.beta.kubernetes.io/storage-class: "anything"labels:type: localspec:capacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/data/kafka2"persistentVolumeReclaimPolicy: Recycle---kind: PersistentVolumeapiVersion: v1metadata:name: pv-kafka3namespace: bigdataannotations:volume.beta.kubernetes.io/storage-class: "anything"labels:type: localspec:capacity:storage: 5GiaccessModes:- ReadWriteOncehostPath:path: "/data/kafka3"persistentVolumeReclaimPolicy: Recycle
修改文件格式
$ mv kfkpv kfkpv.yaml
运行kfkpv.yaml文件
$ kubectl create -f kfkpv.yaml
查看pv
$ kubectl get pv
创建kafka statefulset
$ vi kfksts
将下列代码复制进去
apiVersion: v1kind: Servicemetadata:name: kafka-hsnamespace: bigdatalabels:app: kafkaspec:ports:- port: 9093name: serverclusterIP: Noneselector:app: kafka---apiVersion: policy/v1beta1kind: PodDisruptionBudgetmetadata:name: kafka-pdbnamespace: bigdataspec:selector:matchLabels:app: kafkamaxUnavailable: 1---apiVersion: apps/v1kind: StatefulSetmetadata:name: kafkanamespace: bigdataspec:selector:matchLabels:app: kafkaserviceName: kafka-hsreplicas: 3podManagementPolicy: ParallelupdateStrategy:type: RollingUpdatetemplate:metadata:labels:app: kafkaspec:terminationGracePeriodSeconds: 300containers:- name: k8skafkaimagePullPolicy: IfNotPresentimage: mirrorgooglecontainers/kubernetes-kafka:1.0-10.2.1resources:requests:memory: "256Mi"cpu: "0.1"ports:- containerPort: 9093name: servercommand:- sh- -c- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \--override listeners=PLAINTEXT://:9093 \--override zookeeper.connect=zk-cs.bigdata.svc.cluster.local:2181 \--override log.dir=/var/lib/kafka \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=false \--override leader.imbalance.check.interval.seconds=300 \--override leader.imbalance.per.broker.percentage=10 \--override log.flush.interval.messages=9223372036854775807 \--override log.flush.offset.checkpoint.interval.ms=60000 \--override log.flush.scheduler.interval.ms=9223372036854775807 \--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1 \--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--override offsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3 \--override offsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--override quota.consumer.default=9223372036854775807 \--override quota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--override replica.high.watermark.checkpoint.interval.ms=5000 \--override replica.lag.time.max.ms=10000 \--override replica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400 \--override socket.request.max.bytes=104857600 \--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true \--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--override controlled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000 \--override default.replication.factor=1 \--override fetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000 \--override group.min.session.timeout.ms=6000 \--override inter.broker.protocol.version=0.10.2-IV0 \--override log.cleaner.backoff.ms=15000 \--override log.cleaner.dedupe.buffer.size=134217728 \--override log.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--override log.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \--override log.cleaner.min.cleanable.ratio=0.5 \--override log.cleaner.min.compaction.lag.ms=0 \--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--override log.message.timestamp.difference.max.ms=9223372036854775807 \--override log.message.timestamp.type=CreateTime \--override log.preallocate=false \--override log.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=3 \--override producer.purgatory.purge.interval.requests=1000 \--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--override replica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000 "env:- name: KAFKA_HEAP_OPTSvalue : "-Xmx256M -Xms256M"- name: KAFKA_OPTSvalue: "-Dlogging.level=INFO"volumeMounts:- name: datadirmountPath: /var/lib/kafkareadinessProbe:exec:command:- sh- -c- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"volumeClaimTemplates:- metadata:name: datadirannotations:volume.beta.kubernetes.io/storage-class: "anything"spec:accessModes: [ "ReadWriteOnce" ]resources:requests:storage: 5Gi
修改文件格式
$ mv kfksts kfksts.yaml
运行kfksts.yaml文件
$ kubectl create -f kfksts.yaml
查看pod
$ kubectl get pods -n bigdataNAME READY STATUS RESTARTS AGEkafka-0 1/1 Running 0 88mkafka-1 1/1 Running 0 88mkafka-2 1/1 Running 0 88m
全部running说明启动成功了
