1. 概述

1. 为什么需要工作流调度系统?

一个完整的数据分析系统通常都是由大量任务单元组成:shell脚本程序,java程序,mapreduce程序、hive脚本等。各任务单元之间存在时间先后及前后依赖关系。为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示:

  1. 通过Hadoop先将原始数据上传到HDFS上(HDFS的操作);
  2. 使用MapReduce对原始数据进行清洗(MapReduce的操作);
  3. 将清洗后的数据导入到Hive表中(Hive的导入操作);
  4. 对Hive中多个表的数据进行JOIN处理,得到一张Hive的明细表(创建中间表);
  5. 通过对明细表的统计和分析,得到结果报表信息(Hive的查询操作);

image.png

2. Azkaban的适用场景

以上业务场景,传统方式下,整个的执行过程都需要人工参加,并且得盯着各任务的进度。但是我们的很多任务都是在深更半夜执行的,通过写脚本设置crontab执行。其实,整个过程类似于一个有向无环图(DAG)。每个子任务相当于大任务中的一个节点,也就是,我们需要的就是一个工作流的调度器,而Azkaban就是能解决上述问题的一个调度器。

3. 什么是Azkaban?

Azkaban是由Linkedin公司推出的批量工作流任务调度器,主要用于在一个工作流内以特定的顺序运行一组工作和流程,它的配置是通过简单的key:value对的方式,通过配置中的dependencies来设置依赖关系。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。

2. Azkaban特点

  1. 兼容任何版本的Hadoop。
  2. 易于使用的Web用户界面。
  3. 简单的工作流的上传。
  4. 方便设置任务之间的关系。
  5. 调度工作流。
  6. 模块化和可插拔的插件机制。
  7. 认证/授权(权限的工作)。
  8. 能够杀死并重新启动工作流。
  9. 有关失败和成功的电子邮件提醒。

    3. 常见工作流调度系统

    参考:《大数据作业调度系统比较》

  10. 简单的任务调度:直接使用crontab实现;

  11. 复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如Azkaban、AirFlow、Oozie、Zeus、Rundeck、Conductor等;
  • Ooize和Azkaban特性对比

下面的表格对上述四种Hadoop工作流调度器的关键特性进行了比较,尽管这些工作流调度器能够解决的需求场景基本一致,但在设计理念,目标用户,应用场景等方面还是存在显著的区别,在做技术选型的时候,可以提供参考。

特性 Oozie Azkaban
工作流描述语言 XML text file with key/value pairs
是否要web容器 Yes Yes
进度跟踪 web page web page
Hadoop job调度支持 yes yes
运行模式 daemon daemon
事件通知 no Yes
需要安装 yes yes
支持的hadoop版本 0.20+ currently unknown
重试支持 workflownode evel yes
运行任意命令 yes yes

4. Azkaban的架构

Azkaban基础入门 - 图4
Azkaban由三个关键组件构成:

  • AzkabanWebServer:AzkabanWebServer是整个Azkaban工作流系统的主要管理者,它用户登录认证、负责project管理、定时执行工作流、跟踪工作流执行进度等一系列任务。
  • AzkabanExecutorServer:负责具体的工作流的提交、执行,它们通过mysql数据库来协调任务的执行。
  • MySQL:存储大部分执行流状态,AzkabanWebServer和AzkabanExecutorServer都需要访问数据库。

    5. Azkaban原生支持作业类型

    | 作业类型 | Type参数 | 描述 | | —- | —- | —- | | command | type=command | Linux shell命令行任务 | | gobblin | type=java | 通用数据采集工具 | | hadoopJava | type=hadoopJava | 运行hadoopMR任务 | | java | type=javaprocess | 原生java任务 | | hive | type=hive | 支持执行hiveSQL | | pig | type=pig | pig脚本任务 | | spark | type=spark | Spark任务 | | hdfsToTeradata | type=? | 把数据从hdfs导入Teradata | | teradataToHdfs | type=? | 把数据从Teradata导入hdfs |

6. Azkaban安装部署

参考:《基于CentOS7安装Azkaban-3.47.0》《基于Ubuntu16.0.4安装Azkaban-3.47.0》。

7. Job配置简介

1. Command类型

  1. type=command
  2. command=echo "This is azkaban cmd ... "
  3. command.1=whoami
  4. # 依赖前一个job
  5. dependencies=cmd1

2. Java类型

  1. type=javaprocess
  2. classpath=./lib/*,${azkaban.home}/lib/*
  3. java.class=com.dataeye.java.MyJavaJob

3. HadoopJava类型

  1. type=hadoopJava
  2. job.extend=false
  3. job.class=azkaban.jobtype.examples.java.WordCount
  4. classpath=./lib/*,${azkaban.home}/lib/*
  5. force.output.overwrite=true
  6. input.path=/data/yann/input
  7. output.path=/data/yann/output

4. Hive类型

  1. type=hive
  2. user.to.proxy=azkaban
  3. classpath=./lib/*,${azkaban.home}/lib/*
  4. azk.hive.action=execute.query
  5. hive.script=res/hive.sql

5. Spark类型

  1. type=spark
  2. master=yarn-cluster
  3. execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
  4. class=com.dataeye.template.spark.WordCount
  5. params=hdfs://de-hdfs/data/yann/info.txt paramtest

8. Azkaban实战

Azkaba内置的任务类型支持command、java。

1. 单一job案例

  1. 创建job描述文件。

    1. vim first.job

    内容如下:

    1. # first.job
    2. type=command
    3. command=echo 'this is my first job'
  2. 将job资源文件打包成zip文件。

    1. zip first.zip first.job

    注意目前,**Azkaban上传的工作流文件只支持xxx.zip文件。zip应包含xxx.job运行作业所需的文件和任何文件(文件名后缀必须以.job**结尾,否则无法识别)。作业名称在项目中必须是唯一的。

  3. 通过Azkaban的Web管理平台创建project并上传job的zip包。

  • 首先创建project

Azkaban基础入门 - 图5

  • 上传zip包

Azkaban基础入门 - 图6

  1. 启动执行该job。

Azkaban基础入门 - 图7
点击执行工作流:
Azkaban基础入门 - 图8
点击继续:
Azkaban基础入门 - 图9

  1. Job执行成功。

Azkaban基础入门 - 图10

  1. 点击查看job日志。

Azkaban基础入门 - 图11

2. 多job工作流案例

  1. 创建有依赖关系的多个job描述。
  • 第一个job(start.job)

    1. vim start.job

    内容如下:

    1. # start.job
    2. type=command
    3. command=touch /opt/module/kangkang.txt
  • 第二个job(step1.job依赖start.job)

    1. vim step1.job

    内容如下:

    1. # step1.job
    2. type=command
    3. dependencies=start
    4. command=echo "this is step1 job"
  • 第三个job(step2.job依赖start.job)

    1. vim step2.job

    内容如下:

    1. # step2.job
    2. type=command
    3. dependencies=start
    4. command=echo "this is step2 job"
  • 第四个job(finish.job依赖step1.job和step2.job)

    1. vim finish.job

    内容如下:

    1. # finish.job
    2. type=command
    3. dependencies=step1,step2
    4. command=echo "this is finish job"
  1. 将所有job资源文件打到一个zip包中。

    1. zip jobs.zip start.job step1.job step2.job finish.job
  2. 在Azkaban的Web管理界面创建工程并上传zip包。

Azkaban基础入门 - 图12

  1. 启动工作流flow。

Azkaban基础入门 - 图13 5. 查看结果。

Azkaban基础入门 - 图14 3. java操作任务

  1. 编写java程序。 ```java import java.io.IOException;

public class AzkabanTest {

public void run() throws IOException { // 根据需求编写具体代码 FileOutputStream fos = new FileOutputStream(“/opt/module/azkaban/output.txt”); fos.write(“this is a java progress”.getBytes()); fos.close(); }

public static void main(String[] args) throws IOException { AzkabanTest azkabanTest = new AzkabanTest(); azkabanTest.run(); }

}

  1. 2. java程序打成jar包,创建lib目录,将jar放入lib内。
  2. ```bash
  3. mkdir -p /opt/module/azkaban/lib
  1. 编写job文件。

    1. vi azkabanJava.job

    内容如下:

    1. # azkabanJava.job
    2. type=javaprocess
    3. java.class=com.atguigu.azkaban.AzkabanTest
    4. classpath=/opt/module/azkaban/lib/*
  2. 将job文件打成zip包。

    1. zip azkabanJava.zip azkabanJava.job
  3. 通过azkaban的web管理平台创建project并上传job压缩包,启动执行该job。

    Azkaban基础入门 - 图154. HDFS操作任务

  4. 创建job描述文件。

    1. vi fs.job

    内容如下:

    1. # hdfs job
    2. type=command
    3. command=/opt/module/hadoop-2.7.2/bin/hadoop fs -mkdir /azkaban
  5. 将job资源文件打包成zip文件。

    1. zip fs.zip fs.job
  6. 通过Azkaban的Web管理平台创建project并上传job压缩包。

  7. 启动执行该job。
  8. 查看结果。

Azkaban基础入门 - 图16

5. MapReduce任务

  1. 创建job描述文件,及mr程序jar包。

    1. vi mapreduce.job

    内容如下:

    1. # mapreduce job
    2. type=command
    3. command=/opt/module/hadoop-2.7.2/bin/hadoop jar /opt/module/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /wordcount/input /wordcount/output
  2. 将所有job资源文件打到一个zip包中。

    1. zip mapreduce.zip mapreduce.job
  3. 在Azkaban的Web管理界面创建工程并上传zip包。

  4. 启动job。
  5. 查看结果。

Azkaban基础入门 - 图17

6. Hive脚本任务

  1. 创建job描述文件和Hive脚本。
  • Hive脚本(student.sql)

    1. vi student.sql

    内容如下:

    1. use default;
    2. drop table student;
    3. create table student(id int, name string)
    4. row format delimited fields terminated by '\t';
    5. load data local inpath '/opt/module/datas/student.txt' into table student;
    6. insert overwrite local directory '/opt/module/datas/student'
    7. row format delimited fields terminated by '\t'
    8. select * from student;
  • Job描述文件(hive.job)

    1. vi hive.job

    内容如下:

    1. # Hive job
    2. type=command
    3. command=/opt/module/hive/bin/hive -f /opt/module/azkaban/jobs/student.sql
  1. 将所有job资源文件打到一个zip包中。

    1. zip hive.zip hive.job
  2. 在Azkaban的Web管理界面创建工程并上传zip包。

  3. 启动job。
  4. 查看结果。

    1. cat /opt/module/datas/student/000000_0

    Azkaban基础入门 - 图18

    9. FAQ

  5. 上传的sh,Azkaban报错找不到。

因为sh文件的格式不对,不是unix格式。
image.png2. 调用任务报错。
“java.lang.Exception: Cannot request memory (Xms 0 kb, Xmx 0 kb) from system for job hello
at azkaban.jobExecutor.ProcessJob.run(ProcessJob.java:63)
at azkaban.execapp.JobRunner.runJob(JobRunner.java:590)
at azkaban.execapp.JobRunner.run(JobRunner.java:443)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)”
解决方案:
参考:https://github.com/azkaban/azkaban/issues/481
If you don’t need 3GB free memory for some simple shell commands, you can also disable the memory check by setting memCheck.enabled=false in plugins/jobtypes/commonprivate.properties.

  1. 随机性的sqoop import job跑死。

解决方案:通过azkanban的配置参数,错误重跑retry解决。

  1. retries=2
  2. retry.backoff=5000
  1. 多个sqoop export shell 脚本job任务并行在一台slave1上,引起CPU过高。

解决方案:通过ssh slave2 ‘sh *.sh’ 在其他slave上远程调度解决。

  1. 配置任务失败发邮件。

image.png

参考

简书:Azkaban各种类型的Job编写
https://www.jianshu.com/p/f2310a5c38c6