Docker安装

使用Docker进行拉取镜像安装

  1. docker pull apachepulsar/pulsar:2.8.0

运行,查看
http协议访问端口为8080
pulsar协议访问端口为6650

  1. docker run --name pulsar -d -it -p 6650:6650 -p 8080:8080 -v data -v conf apachepulsar/pulsar:2.8.0 bin/pulsar standalone

pulsar需要我们手动创建租户和命名空间
进入容器内部进行创建

docker exec -it pulsar /bin/bash

关于telnet和namespace的命令如下

  • telnet ```bash

    查看租户

    pulsar-admin tenants list

创建租户

pulsar-admin tenants create my-tenant

删除租户

pulsar-admin tenants delete my-tenant


- namespace
```bash
#查看指定租户下的命名空间
pulsar-admin namespaces list my-tenant

#给指定租户创建命名空间
pulsar-admin namespaces create my-tenant/my-space

#删除命名空间
pulsar-admin namespaces delete my-tenant/my-space

可视化管理

拉取可视化镜像进行安装,查看,管理
官方提供了可视化的管理工具
可以支持多个Pulsar进行管理

  • 下载镜像
    docker pull apachepulsar/pulsar-manager:v0.2.0
    
    启动
    docker run -it --name pulsar-manager\
      -p 9527:9527 -p 7750:7750 \
      -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
      -d apachepulsar/pulsar-manager:v0.2.0
    
    创建账号
    SRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
    curl \
      -H "X-XSRF-TOKEN: $CSRF_TOKEN" \
      -H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
      -H 'Content-Type: application/json' \
      -X PUT http://localhost:7750/pulsar-manager/users/superuser \
      -d '{"name": "admin", "password": "123456", "description": "test", "email": "username@test.org"}'
    
    查看界面
    可以通过地址进行访问

    整合SpringBoot

    依赖

    添加pulsar相关依赖
    <!--SpringBoot整合Pulsar-->
    <dependency>
      <groupId>io.github.majusko</groupId>
      <artifactId>pulsar-java-spring-boot-starter</artifactId>
      <version>1.0.4</version>
    </dependency>
    

    配置文件

    pulsar:
    service-url: pulsar://192.168.5.78:6650
    

整合Java

声明了两个Topic,并确定好发送的消息类型

配置类

/**
 * Pulsar配置类
 * Created by macro on 2021/5/21.
 */
@Configuration
public class PulsarConfig {
    @Bean
    public ProducerFactory producerFactory() {
        return new ProducerFactory()
                .addProducer("bootTopic", MessageDto.class)
                .addProducer("stringTopic", String.class);
    }
}

生产者

创建Pulsar生产者,往Topic中发送消息,Pulsar是支持直接发送消息对象的

/**
 * Pulsar消息生产者
 * Created by macro on 2021/5/19.
 */
@Component
public class PulsarProducer {
    @Autowired
    private PulsarTemplate<MessageDto> template;

    public void send(MessageDto message){
        try {
            template.send("bootTopic",message);
        } catch (PulsarClientException e) {
            e.printStackTrace();
        }
    }
}

消费者

创建消费者,丛Topic中获取并消费消息,也是可以直接获取到消息对象的

/**
 * Pulsar消息消费者
 * Created by macro on 2021/5/19.
 */
@Slf4j
@Component
public class PulsarRealConsumer {

    @PulsarConsumer(topic="bootTopic", clazz= MessageDto.class)
    public void consume(MessageDto message) {
        log.info("PulsarRealConsumer consume id:{},content:{}",message.getId(),message.getContent());
    }

}

测试类

/**
 * Pulsar功能测试
 * Created by macro on 2021/5/19.
 */
@Api(tags = "PulsarController", description = "Pulsar功能测试")
@Controller
@RequestMapping("/pulsar")
public class PulsarController {

    @Autowired
    private PulsarProducer pulsarProducer;

    @ApiOperation("发送消息")
    @RequestMapping(value = "/sendMessage", method = RequestMethod.POST)
    @ResponseBody
    public CommonResult sendMessage(@RequestBody MessageDto message) {
        pulsarProducer.send(message);
        return CommonResult.success(null);
    }
}

附录: