译者:flink.sojb.cn

Twitter Streaming API提供对Twitter提供的推文流的访问。Flink Streaming附带了一个TwitterSource用于建立与此流的连接的内置类。要使用此连接器,请将以下依赖项添加到项目中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-twitter_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

请注意,流连接器当前不是二进制分发的一部分。请参阅此处的链接以进行群集执行。

认证

为了连接到Twitter流,用户必须注册他们的程序并获取用于认证的必要信息。该过程如下所述。

获取认证信息

首先,需要一个Twitter帐户。在twitter.com/signup免费注册 或在Twitter的应用程序管理中登录并通过单击“创建新应用程序”按钮注册该应用程序。填写有关您的计划的表格并接受条款和条件。选择应用程序,APIKeys和APIKeys(称为后twitter-source.consumerKeytwitter-source.consumerSecretTwitterSource分别)位于“APIKeys”选项卡上。可以在“Keys和访问令牌”选项卡上生成和获取必要的OAuth访问令牌数据(twitter-source.tokentwitter-source.tokenSecretin TwitterSource)。请记住保密这些信息,不要将它们推送到公共存储库。

用法

与其他连接器相比,它不TwitterSource依赖于其他服务。例如,以下代码应该正常运行:

  1. Properties props = new Properties();
  2. props.setProperty(TwitterSource.CONSUMER_KEY, "");
  3. props.setProperty(TwitterSource.CONSUMER_SECRET, "");
  4. props.setProperty(TwitterSource.TOKEN, "");
  5. props.setProperty(TwitterSource.TOKEN_SECRET, "");
  6. DataStream<String> streamSource = env.addSource(new TwitterSource(props));
  1. val props = new Properties()
  2. props.setProperty(TwitterSource.CONSUMER_KEY, "")
  3. props.setProperty(TwitterSource.CONSUMER_SECRET, "")
  4. props.setProperty(TwitterSource.TOKEN, "")
  5. props.setProperty(TwitterSource.TOKEN_SECRET, "")
  6. val streamSource = env.addSource(new TwitterSource(props))

所述TwitterSource含有JSON对象,表示鸣叫发射字符串。

包中的TwitterExampleflink-examples-streaming显示了如何使用的完整示例TwitterSource

默认情况下,TwitterSource使用StatusesSampleEndpoint。此端点返回随机的推文样本。有一个TwitterSource.EndpointInitializer界面允许用户提供自定义端点。