Twitter Streaming API提供对Twitter提供的推文流的访问。Flink Streaming附带了一个TwitterSource
用于建立与此流的连接的内置类。要使用此连接器,请将以下依赖项添加到项目中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.11</artifactId>
<version>1.7-SNAPSHOT</version>
</dependency>
请注意,流连接器当前不是二进制分发的一部分。请参阅此处的链接以进行群集执行。
认证
为了连接到Twitter流,用户必须注册他们的程序并获取用于认证的必要信息。该过程如下所述。
获取认证信息
首先,需要一个Twitter帐户。在twitter.com/signup免费注册 或在Twitter的应用程序管理中登录并通过单击“创建新应用程序”按钮注册该应用程序。填写有关您的计划的表格并接受条款和条件。选择应用程序,APIKeys和APIKeys(称为后twitter-source.consumerKey
和twitter-source.consumerSecret
在TwitterSource
分别)位于“APIKeys”选项卡上。可以在“Keys和访问令牌”选项卡上生成和获取必要的OAuth访问令牌数据(twitter-source.token
和twitter-source.tokenSecret
in TwitterSource
)。请记住保密这些信息,不要将它们推送到公共存储库。
用法
与其他连接器相比,它不TwitterSource
依赖于其他服务。例如,以下代码应该正常运行:
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "");
props.setProperty(TwitterSource.CONSUMER_SECRET, "");
props.setProperty(TwitterSource.TOKEN, "");
props.setProperty(TwitterSource.TOKEN_SECRET, "");
DataStream<String> streamSource = env.addSource(new TwitterSource(props));
val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, "")
props.setProperty(TwitterSource.CONSUMER_SECRET, "")
props.setProperty(TwitterSource.TOKEN, "")
props.setProperty(TwitterSource.TOKEN_SECRET, "")
val streamSource = env.addSource(new TwitterSource(props))
所述TwitterSource
含有JSON对象,表示鸣叫发射字符串。
包中的TwitterExample
类flink-examples-streaming
显示了如何使用的完整示例TwitterSource
。
默认情况下,TwitterSource
使用StatusesSampleEndpoint
。此端点返回随机的推文样本。有一个TwitterSource.EndpointInitializer
界面允许用户提供自定义端点。