Zeppelin 主要的使用场景都是交互式的方式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用Zeppelin UI,但又想用Zeppelin 提交和管理 Flink Job的能力该怎么办?或者是你在Zeppelin里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?
如果你有这样的诉求,那么 Zeppelin Client API (SDK)就是你所需要的东西。虽然 Zeppelin 有Rest API,但是Zeppelin的Rest API太多,对于很多不熟悉Zeppelin的人来说使用Rest API门槛太高,所以Zeppelin专门开发了一个Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分为2个层面的的东西(接下来会逐个详细介绍):

  • Zeppelin Client API (Low Level API)
  • Session API (High Level API)

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度进行操作,你可以先在notebook里写好代码 (比如开发阶段在notebook里写代码,做测试),然后用 low level client api 用编程的方式把 job 跑起来(比如生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的class是 ZeppelinClient,也是Zeppelin Client API的入口。下面例举几个重要的接口 (这些API都比较直观,我就不多做解释了, 完整的接口请参考)

  1. public String createNote(String notePath) throws Exception
  2. public void deleteNote(String noteId) throws Exception
  3. public NoteResult executeNote(String noteId) throws Exception
  4. public NoteResult executeNote(String noteId,
  5. Map<String, String> parameters) throws Exception
  6. public NoteResult queryNoteResult(String noteId) throws Exception
  7. public NoteResult submitNote(String noteId) throws Exception
  8. public NoteResult submitNote(String noteId,
  9. Map<String, String> parameters) throws Exception
  10. public NoteResult waitUntilNoteFinished(String noteId) throws Exception
  11. public String addParagraph(String noteId,
  12. String title,
  13. String text) throws Exception
  14. public void updateParagraph(String noteId,
  15. String paragraphId,
  16. String title,
  17. String text) throws Exception
  18. public ParagraphResult executeParagraph(String noteId,
  19. String paragraphId,
  20. String sessionId,
  21. Map<String, String> parameters) throws Exception
  22. public ParagraphResult submitParagraph(String noteId,
  23. String paragraphId,
  24. String sessionId,
  25. Map<String, String> parameters) throws Exception
  26. public void cancelParagraph(String noteId, String paragraphId)
  27. public ParagraphResult queryParagraphResult(String noteId, String paragraphId)
  28. public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这个API能用来做什么呢? 一个典型的用途是我们在Zeppelin里写好代码,做好测试,然后在第三方系统里集成进来。比如下面的代码就是把Zeppelin自带的 Spark Basic Features 用编程的方式跑起来,你不仅可以跑Zeppelin Note,还可以拿到运行结果 (ParagraphResult)。怎么处理运行结果,就留给你发挥想象的空间吧。
此外,对于Dynamic forms,你还可以动态的提供参数,如下面例子里的 maxAge 和 marital

  1. ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
  2. ZeppelinClient zClient = new ZeppelinClient(clientConfig);
  3. String zeppelinVersion = zClient.getVersion();
  4. System.out.println("Zeppelin version: " + zeppelinVersion);
  5. ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
  6. System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);
  7. paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
  8. System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);
  9. Map<String, String> parameters = new HashMap<>();
  10. parameters.put("maxAge", "40");
  11. paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
  12. System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);
  13. parameters = new HashMap<>();
  14. parameters.put("marital", "married");
  15. paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
  16. System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

image.png

Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 里没有Note,Paragraph的概念,粒度是你提交的代码。Session API里最重要的class就是 ZSession,这也是Session API的入口,一个 ZSession 代表一个独立的Zeppelin Interpreter 进程,对于Flink来说就是一个独立的Flink Session Cluster。下面例举一些典型的接口(这些API都比较直观,我就不多做解释了,完整的接口请参考)

  1. public void start() throws Exception
  2. public void start(MessageHandler messageHandler) throws Exception
  3. public void stop() throws Exception
  4. public ExecuteResult execute(String code) throws Exception
  5. public ExecuteResult execute(String subInterpreter,
  6. Map<String, String> localProperties,
  7. String code,
  8. StatementMessageHandler messageHandler) throws Exception
  9. public ExecuteResult submit(String code) throws Exception
  10. public ExecuteResult submit(String subInterpreter,
  11. Map<String, String> localProperties,
  12. String code,
  13. StatementMessageHandler messageHandler) throws Exception
  14. public void cancel(String statementId) throws Exception
  15. public ExecuteResult queryStatement(String statementId) throws Exception
  16. public ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个API能用来做什么呢? 一个典型的用途是就是我们动态的创建Session (Zeppelin Interpreter 进程),动态的提交运行代码,并拿到运行结果。比如你不想用Zeppelin的UI,要自己做一个Flink的开发管理平台,那么你就可以自己做UI,让用户在UI上配置Flink Job,输入SQL,然后把所有的这些信息发送到后端,后端调用ZSession来运行Flink Job。

下面的Java代码就是用编程的方式调用了2条Flink SQL语句,并且在MyStatementMessageHandler1 和 MyStatementMessageHandler2中读取源源不断发送过来更新的SQL 运行结果 (怎么来使用这个结果就靠你的想象力了)。
需要说明的是像Flink Interpreter这种流式结果数据更新是通过WebSocket实现的,所以下面的代码里有会有CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2

  1. %flink.ssql(type=update)
  2. select url, count(1) as pv from log group by url
  1. %flink.ssql(type=update)
  2. select upper(url), count(1) as pv from log group by url
  1. ZSession session = null;
  2. try {
  3. ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
  4. Map<String, String> intpProperties = new HashMap<>();
  5. session = ZSession.builder()
  6. .setClientConfig(clientConfig)
  7. .setInterpreter("flink")
  8. .setIntpProperties(intpProperties)
  9. .build();
  10. // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
  11. // otherwise you have to use a global MessageHandler.
  12. session.start(new CompositeMessageHandler());
  13. System.out.println("Flink Web UI: " + session.getWeburl());
  14. System.out.println("-----------------------------------------------------------------------------");
  15. String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
  16. ExecuteResult result = session.execute(initCode);
  17. System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());
  18. // run flink ssql
  19. Map<String, String> localProperties = new HashMap<>();
  20. localProperties.put("type", "update");
  21. result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
  22. new MyStatementMessageHandler1());
  23. session.waitUntilFinished(result.getStatementId());
  24. result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
  25. new MyStatementMessageHandler2());
  26. session.waitUntilFinished(result.getStatementId());
  27. } catch (Exception e) {
  28. e.printStackTrace();
  29. } finally {
  30. if (session != null) {
  31. try {
  32. session.stop();
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. }
  38. public static class MyStatementMessageHandler1 implements StatementMessageHandler {
  39. @Override
  40. public void onStatementAppendOutput(String statementId, int index, String output) {
  41. System.out.println("MyStatementMessageHandler1, append output: " + output);
  42. }
  43. @Override
  44. public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
  45. System.out.println("MyStatementMessageHandler1, update output: " + output);
  46. }
  47. }
  48. public static class MyStatementMessageHandler2 implements StatementMessageHandler {
  49. @Override
  50. public void onStatementAppendOutput(String statementId, int index, String output) {
  51. System.out.println("MyStatementMessageHandler2, append output: " + output);
  52. }
  53. @Override
  54. public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
  55. System.out.println("MyStatementMessageHandler2, update output: " + output);
  56. }
  57. }

在Zeppelin里如果你可以通过 %flink.conf 来对你的flink cluster 进行非常丰富的配置,但是 %flink.conf 是纯文本的配置,不熟悉flink的人很容易配错。如果你是自己做Flink开发平台的话就可以做一个更完整的UI,用一些下拉框等等把一些配置选项固定下来,用户只要选择就行了,不需要自己输入文本来配置。
还有下面这类paragraph的local properties配置,比如type,template, resumeFromLatestCheckpoint 也是比较容易写错的,同理你可以在自己UI里用一些控件把这些选项提前固定下来,而不是让用户输入文本的方式。
image.png

怎么用Client API

目前Zeppelin Client API 还不在master里 (马上要merge了)。大家可以到钉钉群里下载最新的zeppelin以及zeppelin-client jar, 把这个把这个zeppelin-client jar安装到本地maven repo里就可以了(用下面这个命令)

  1. mvn install:install-file -Dfile=zeppelin-client-0.9.0-SNAPSHOT.jar -DgroupId=org.apache.zeppelin -DartifactId=zeppelin-client -Dversion=0.9.0-SNAPSHOT -Dpackaging=jar -DpomFile=zeppelin-client/pom.xml

接下来就可以参考zeppelin-client-examples 来用Zeppelin Client API。所有的例子可以在这里 https://github.com/zjffdu/zeppelin/tree/ZEPPELIN-4981/zeppelin-client-examples

总结

我相信Zeppelin Client API 还有很多可以发挥和想象的空间,大家脑洞起来吧。

视频教程

Zeppelin Client API (Low Level API)

Flink on Zeppelin 25. Zeppelin Client API (1).mp4 (45.12MB)

Zeppelin Client API (High Level API)

Flink on Zeppelin 26. Zeppelin Client API (2).mp4 (67.14MB)

钉钉群+公众号

Flink on Zeppelin 3群钉钉.JPG image.png