Storm/Trident 集成 Redis
Storm-redis使用Jedis为Redis客户端。
用法
如何使用它?
使用它作为一个maven依赖:
<dependency><groupId>org.apache.storm</groupId><artifactId>storm-redis</artifactId><version>${storm.version}</version><type>jar</type></dependency>
常用Bolt
Storm-redis提供了基本的Bolt实现, RedisLookupBolt and RedisStoreBolt。
根据名称可以知道其功能,RedisLookupBolt使用键从Redis中检索值,而RedisStoreBolt将键/值存储到Redis。 一个元组将匹配一个键/值对,您可以将匹配模式定义为“`TupleMapper```。
您还可以从RedisDataTypeDescription中选择数据类型来使用。请参考 RedisDataTypeDescription.RedisDataType来查看支持哪些数据类型。在一些数据类型(散列和排序集)中,它需要额外的键和从元组转换的元素成为元素。
这些接口与 RedisLookupMapper 和 RedisStoreMapper组合,分别适合 RedisLookupBolt 和RedisStoreBolt。
RedisLookupBolt示例
class WordCountRedisLookupMapper implements RedisLookupMapper {private RedisDataTypeDescription description;private final String hashKey = "wordCount";public WordCountRedisLookupMapper() {description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@Overridepublic List<Values> toTuple(ITuple input, Object value) {String member = getKeyFromTuple(input);List<Values> values = Lists.newArrayList();values.add(new Values(member, value));return values;}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("wordName", "count"));}@Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}@Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField("word");}@Overridepublic String getValueFromTuple(ITuple tuple) {return null;}}
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
RedisStoreBolt示例
class WordCountStoreMapper implements RedisStoreMapper {private RedisDataTypeDescription description;private final String hashKey = "wordCount";public WordCountStoreMapper() {description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);}@Overridepublic RedisDataTypeDescription getDataTypeDescription() {return description;}@Overridepublic String getKeyFromTuple(ITuple tuple) {return tuple.getStringByField("word");}@Overridepublic String getValueFromTuple(ITuple tuple) {return tuple.getStringByField("count");}}
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder().setHost(host).setPort(port).build();RedisStoreMapper storeMapper = new WordCountStoreMapper();RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
非简单的 Bolt
如果您的场景不适合 RedisStoreBolt和 RedisLookupBolt,Storm-redis还提供了 AbstractRedisBolt,让您扩展和应用业务逻辑。
public static class LookupWordTotalCountBolt extends AbstractRedisBolt {private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);private static final Random RANDOM = new Random();public LookupWordTotalCountBolt(JedisPoolConfig config) {super(config);}public LookupWordTotalCountBolt(JedisClusterConfig config) {super(config);}@Overridepublic void execute(Tuple input) {JedisCommands jedisCommands = null;try {jedisCommands = getInstance();String wordName = input.getStringByField("word");String countStr = jedisCommands.get(wordName);if (countStr != null) {int count = Integer.parseInt(countStr);this.collector.emit(new Values(wordName, count));// print lookup result with low probabilityif(RANDOM.nextInt(1000) > 995) {LOG.info("Lookup result - word : " + wordName + " / count : " + count);}} else {// skipLOG.warn("Word not found in Redis - word : " + wordName);}} finally {if (jedisCommands != null) {returnInstance(jedisCommands);}this.collector.ack(input);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// wordName, countdeclarer.declare(new Fields("wordName", "count"));}}
Trident State 用法
RedisState和RedisMapState,它提供Jedis接口,仅用于单次重新启动。
RedisClusterState和RedisClusterMapState,它们提供JedisCluster接口,仅用于redis集群。
RedisState ```java JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(redisHost).setPort(redisPort) .build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisLookupMapper lookupMapper = new WordCountLookupMapper(); RedisState.Factory factory = new RedisState.Factory(poolConfig);
TridentTopology topology = new TridentTopology();Stream stream = topology.newStream("spout1", spout);stream.partitionPersist(factory,fields,new RedisStateUpdater(storeMapper).withExpire(86400000),new Fields());TridentState state = topology.newStaticState(factory);stream = stream.stateQuery(state, new Fields("word"),new RedisStateQuerier(lookupMapper),new Fields("columnName","columnValue"));
RedisClusterState```javaSet<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();for (String hostPort : redisHostPort.split(",")) {String[] host_port = hostPort.split(":");nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));}JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes).build();RedisStoreMapper storeMapper = new WordCountStoreMapper();RedisLookupMapper lookupMapper = new WordCountLookupMapper();RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);TridentTopology topology = new TridentTopology();Stream stream = topology.newStream("spout1", spout);stream.partitionPersist(factory,fields,new RedisClusterStateUpdater(storeMapper).withExpire(86400000),new Fields());TridentState state = topology.newStaticState(factory);stream = stream.stateQuery(state, new Fields("word"),new RedisClusterStateQuerier(lookupMapper),new Fields("columnName","columnValue"));
