本页介绍如何使用Storm消费来自Kestrel集群的项目。

准备阶段

Storm

本教程使用的示例来自于 storm-kestrel 项目和 storm-starter 项目。建议你克隆这些项目并根着示例走。 阅读 Setting up development environmentCreating a new Storm project 来设置你的机器。

Kestrel

它假设您可以在本地选择Kestrel 服务器,如上所述 here.

Kestrel 服务 和 队列

单个kestrel服务器具有一组队列。Kestrel 队列是在JVM上运行的非常简单的消息队列,并使用memcache协议(具有一定的扩展名)与客户端进行通信。对于更加详细的信息,你可查看 KestrelThriftClient 类里面提供的 storm-kestrel 项目.

每个队列按照FIFO(先进先出)的原则进行严格排序。跟随性能项目缓存在系统内存中;但是,只有前128MB保存在内存中。当服务器停止时,队列状态存储在日志文件中。

此外,还可以从 here 找到细节。

Kestrel is: fast small durable(耐久) reliable(稳定)

例如,Twitter 使用 Kestrel 作为其消息传递基础设施的骨干,如上所述here.

添加项目至 Kestrel

首先,我们需要一个可以将项目添加到Kestrel队列的程序。以下方法受益于 KestrelClient的实现 storm-kestrel. 它将句子添加到从包含五个可能句子的数组中随机选择的Kestrel队列中。

  1. private static void queueSentenceItems(KestrelClient kestrelClient, String queueName)
  2. throws ParseError, IOException {
  3. String[] sentences = new String[] {
  4. "the cow jumped over the moon",
  5. "an apple a day keeps the doctor away",
  6. "four score and seven years ago",
  7. "snow white and the seven dwarfs",
  8. "i am at two with nature"};
  9. Random _rand = new Random();
  10. for(int i=1; i<=10; i++){
  11. String sentence = sentences[_rand.nextInt(sentences.length)];
  12. String val = "ID " + i + " " + sentence;
  13. boolean queueSucess = kestrelClient.queue(queueName, val);
  14. System.out.println("queueSucess=" +queueSucess+ " [" + val +"]");
  15. }
  16. }

将项目从 Kestrel 移除

此方法将队列中的项目排队,而不是删除。 ``` private static void dequeueItems(KestrelClient kestrelClient, String queueName) throws IOException, ParseError { for(int i=1; i<=12; i++){

  1. Item item = kestrelClient.dequeue(queueName);
  2. if(item==null){
  3. System.out.println("The queue (" + queueName + ") contains no items.");
  4. }
  5. else
  6. {
  7. byte[] data = item._data;
  8. String receivedVal = new String(data);
  9. System.out.println("receivedItem=" + receivedVal);
  10. }
  11. }
  1. 此方法将队列中的项目排队,然后将其删除。
  2. This method dequeues items from a queue and then removes them.
  1. private static void dequeueAndRemoveItems(KestrelClient kestrelClient, String queueName)
  2. throws IOException, ParseError
  3. {
  4. for(int i=1; i<=12; i++){
  5. Item item = kestrelClient.dequeue(queueName);
  6. if(item==null){
  7. System.out.println("The queue (" + queueName + ") contains no items.");
  8. }
  9. else
  10. {
  11. int itemID = item._id;
  12. byte[] data = item._data;
  13. String receivedVal = new String(data);
  14. kestrelClient.ack(queueName, itemID);
  15. System.out.println("receivedItem=" + receivedVal);
  16. }
  17. }
  18. }
  1. ## 连续添加项目至 Kestrel
  2. 这是我们的最终运行程序,以便连续地将句子项添加到本地运行的Kestrel服务器的名为 **sentence_queue** 的队列中。
  3. 为了阻止它在控制台中键入一个关闭括号 char ']' ,然后按 'Enter'
  1. import java.io.IOException;
  2. import java.io.InputStream;
  3. import java.util.Random;
  4. import org.apache.storm.spout.KestrelClient;
  5. import org.apache.storm.spout.KestrelClient.Item;
  6. import org.apache.storm.spout.KestrelClient.ParseError;
  7. public class AddSentenceItemsToKestrel {
  8. /**
  9. * @param args
  10. */
  11. public static void main(String[] args) {
  12. InputStream is = System.in;
  13. char closing_bracket = ']';
  14. int val = closing_bracket;
  15. boolean aux = true;
  16. try {
  17. KestrelClient kestrelClient = null;
  18. String queueName = "sentence_queue";
  19. while(aux){
  20. kestrelClient = new KestrelClient("localhost",22133);
  21. queueSentenceItems(kestrelClient, queueName);
  22. kestrelClient.close();
  23. Thread.sleep(1000);
  24. if(is.available()>0){
  25. if(val==is.read())
  26. aux=false;
  27. }
  28. }
  29. } catch (IOException e) {
  30. // TODO Auto-generated catch block
  31. e.printStackTrace();
  32. }
  33. catch (ParseError e) {
  34. // TODO Auto-generated catch block
  35. e.printStackTrace();
  36. } catch (InterruptedException e) {
  37. // TODO Auto-generated catch block
  38. e.printStackTrace();
  39. }
  40. System.out.println("end");
  41. }
  42. }
  1. ## 使用 KestrelSpout
  2. 该拓扑结构使用KestrelSpoutKestrel队列中读取句子,将句子分解成其组成词(Bolt: SplitSentence),然后为每个单词发出它之前(Bolt: WordCount)所见到的次数。数据的处理方式如下所述 [Guaranteeing message processing](docs_Guaranteeing-message-processing.html).
  1. TopologyBuilder builder = new TopologyBuilder();
  2. builder.setSpout("sentences", new KestrelSpout("localhost",22133,"sentence_queue",new StringScheme()));
  3. builder.setBolt("split", new SplitSentence(), 10)
  4. .shuffleGrouping("sentences");
  5. builder.setBolt("count", new WordCount(), 20)
  6. .fieldsGrouping("split", new Fields("word"));
  1. ## 执行
  2. 首先,在生产或开发模式下启动您本地的Kestrel服务器。
  3. 等大约5秒,以避免ConnectionException
  4. 现在执行程序将项目添加到队列并启动Storm 拓扑。启动程序的排序并不重要。
  5. 如果您使用TOPOLOGY_DEBUG运行拓扑,您应该会看到在拓扑中发出的元组。