Storm/Trident 集成 Redis

Storm-redis使用Jedis为Redis客户端。

用法

如何使用它?

使用它作为一个maven依赖:

  1. <dependency>
  2. <groupId>org.apache.storm</groupId>
  3. <artifactId>storm-redis</artifactId>
  4. <version>${storm.version}</version>
  5. <type>jar</type>
  6. </dependency>

常用Bolt

Storm-redis提供了基本的Bolt实现, RedisLookupBolt and RedisStoreBolt

根据名称可以知道其功能,RedisLookupBolt使用键从Redis中检索值,而RedisStoreBolt将键/值存储到Redis。 一个元组将匹配一个键/值对,您可以将匹配模式定义为“`TupleMapper```。

您还可以从RedisDataTypeDescription中选择数据类型来使用。请参考 RedisDataTypeDescription.RedisDataType来查看支持哪些数据类型。在一些数据类型(散列和排序集)中,它需要额外的键和从元组转换的元素成为元素。

这些接口与 RedisLookupMapperRedisStoreMapper组合,分别适合 RedisLookupBoltRedisStoreBolt

RedisLookupBolt示例

  1. class WordCountRedisLookupMapper implements RedisLookupMapper {
  2. private RedisDataTypeDescription description;
  3. private final String hashKey = "wordCount";
  4. public WordCountRedisLookupMapper() {
  5. description = new RedisDataTypeDescription(
  6. RedisDataTypeDescription.RedisDataType.HASH, hashKey);
  7. }
  8. @Override
  9. public List<Values> toTuple(ITuple input, Object value) {
  10. String member = getKeyFromTuple(input);
  11. List<Values> values = Lists.newArrayList();
  12. values.add(new Values(member, value));
  13. return values;
  14. }
  15. @Override
  16. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  17. declarer.declare(new Fields("wordName", "count"));
  18. }
  19. @Override
  20. public RedisDataTypeDescription getDataTypeDescription() {
  21. return description;
  22. }
  23. @Override
  24. public String getKeyFromTuple(ITuple tuple) {
  25. return tuple.getStringByField("word");
  26. }
  27. @Override
  28. public String getValueFromTuple(ITuple tuple) {
  29. return null;
  30. }
  31. }
  1. JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
  2. .setHost(host).setPort(port).build();
  3. RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
  4. RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);

RedisStoreBolt示例

  1. class WordCountStoreMapper implements RedisStoreMapper {
  2. private RedisDataTypeDescription description;
  3. private final String hashKey = "wordCount";
  4. public WordCountStoreMapper() {
  5. description = new RedisDataTypeDescription(
  6. RedisDataTypeDescription.RedisDataType.HASH, hashKey);
  7. }
  8. @Override
  9. public RedisDataTypeDescription getDataTypeDescription() {
  10. return description;
  11. }
  12. @Override
  13. public String getKeyFromTuple(ITuple tuple) {
  14. return tuple.getStringByField("word");
  15. }
  16. @Override
  17. public String getValueFromTuple(ITuple tuple) {
  18. return tuple.getStringByField("count");
  19. }
  20. }
  1. JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
  2. .setHost(host).setPort(port).build();
  3. RedisStoreMapper storeMapper = new WordCountStoreMapper();
  4. RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

非简单的 Bolt

如果您的场景不适合 RedisStoreBoltRedisLookupBolt,Storm-redis还提供了 AbstractRedisBolt,让您扩展和应用业务逻辑。

  1. public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
  2. private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
  3. private static final Random RANDOM = new Random();
  4. public LookupWordTotalCountBolt(JedisPoolConfig config) {
  5. super(config);
  6. }
  7. public LookupWordTotalCountBolt(JedisClusterConfig config) {
  8. super(config);
  9. }
  10. @Override
  11. public void execute(Tuple input) {
  12. JedisCommands jedisCommands = null;
  13. try {
  14. jedisCommands = getInstance();
  15. String wordName = input.getStringByField("word");
  16. String countStr = jedisCommands.get(wordName);
  17. if (countStr != null) {
  18. int count = Integer.parseInt(countStr);
  19. this.collector.emit(new Values(wordName, count));
  20. // print lookup result with low probability
  21. if(RANDOM.nextInt(1000) > 995) {
  22. LOG.info("Lookup result - word : " + wordName + " / count : " + count);
  23. }
  24. } else {
  25. // skip
  26. LOG.warn("Word not found in Redis - word : " + wordName);
  27. }
  28. } finally {
  29. if (jedisCommands != null) {
  30. returnInstance(jedisCommands);
  31. }
  32. this.collector.ack(input);
  33. }
  34. }
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. // wordName, count
  38. declarer.declare(new Fields("wordName", "count"));
  39. }
  40. }

Trident State 用法

  1. RedisState和RedisMapState,它提供Jedis接口,仅用于单次重新启动。

  2. RedisClusterState和RedisClusterMapState,它们提供JedisCluster接口,仅用于redis集群。

RedisState ```java JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig);

  1. TridentTopology topology = new TridentTopology();
  2. Stream stream = topology.newStream("spout1", spout);
  3. stream.partitionPersist(factory,
  4. fields,
  5. new RedisStateUpdater(storeMapper).withExpire(86400000),
  6. new Fields());
  7. TridentState state = topology.newStaticState(factory);
  8. stream = stream.stateQuery(state, new Fields("word"),
  9. new RedisStateQuerier(lookupMapper),
  10. new Fields("columnName","columnValue"));
  1. RedisClusterState
  2. ```java
  3. Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
  4. for (String hostPort : redisHostPort.split(",")) {
  5. String[] host_port = hostPort.split(":");
  6. nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
  7. }
  8. JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes)
  9. .build();
  10. RedisStoreMapper storeMapper = new WordCountStoreMapper();
  11. RedisLookupMapper lookupMapper = new WordCountLookupMapper();
  12. RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);
  13. TridentTopology topology = new TridentTopology();
  14. Stream stream = topology.newStream("spout1", spout);
  15. stream.partitionPersist(factory,
  16. fields,
  17. new RedisClusterStateUpdater(storeMapper).withExpire(86400000),
  18. new Fields());
  19. TridentState state = topology.newStaticState(factory);
  20. stream = stream.stateQuery(state, new Fields("word"),
  21. new RedisClusterStateQuerier(lookupMapper),
  22. new Fields("columnName","columnValue"));