一、源端

1、结构展示

1.1 外层

image.png

1.2 内层

image.png

2、PROCESS

2.1 ExecuteGroovyScript

a) 描述
  1. 这个处理器负责处理多个传入的keys(逗号分隔),拆分为单条KEY为一个流传出,并将KEY包含再【属性】中传出。

    b) SETTINGS

    image.png

    c) SCHEDULING

    image.png

    d) PROPERTIES

    image.png

    2.2 FetchDistributedMapCache

    a) 描述
  2. 获取上游处理器传入的KEY,通过RedisDistributedMapCacheClientService处理器获取VALUE。

    b) SETTINGS

    image.png

    c) SCHEDULING

    image.png

    d) PROPERTIES

    image.png

    2.2.1 RedisConnectionPoolService

    image.png

    a) 描述
  3. 这是2.2.2 RedisDistributedMapCacheClientService处理器需要的处理器(Redis连接池)

    b) PROPERTIES

    image.png

    2.2.2 RedisDistributedMapCacheClientService

    image.png

    a) 描述
  4. 这是FetchDistributedMapCache或PutDistributedMapCache处理器需要的处理器(Redis客户端)

    b) PROPERTIES

    image.png

    二、目标端

    1、结构展示

    1.1 外层

    image.png

    1.2 内层

    image.png

    2、PROCESS

    2.1 NifiFLow

    a) 描述
  5. 指定集群URL,获取夸集群传入的流文件

    b) PROPERTIES

    image.png

    2.2 PutDistributedMapCache

    a) 描述
  6. 根据用户指定的KEY做put操作

    b) SETTINGS

    image.png

    c) SCHEDULING

    image.png

    d) PROPERTIES

    image.png

    三、附页:

    1、附1:

    ```groovy import java.nio.charset.StandardCharsets import org.apache.commons.io.IOUtils

String keys try { keys = Keys String[] keyList = keys.split(“,”) for (key in keyList) { createFlowFile(key) } } catch (E) { E.printStackTrace() }

def createFlowFile (String key) { flowFile = session.create()

  1. session.putAttribute(flowFile, 'KEY', key)
  2. String json = "{\"key\":\"" + key + "\"}"
  3. session.write(flowFile, {outputStream ->
  4. outputStream.write(json.getBytes(StandardCharsets.UTF_8))
  5. } as OutputStreamCallback)
  6. session.transfer(flowFile, REL_SUCCESS)

} ```