author: Meryl Dakin author_link: categories: general date: 2019-01-15 layout: post title: Connecting Elixir to Kafka with Kaffe excerpt: >

A codealong to help connect Kafka to your Elixir project with the wrapper Kaffe.

Elixir Kaffe Codealong

如果我们要在 Elixir 项目中使用流行的消息传递系统 Kafka,我们可以选择几种包装器。此博客文章介绍如何集成其中一个,Kaffe,该资源不多,因此很难进行故障排除。

在此代码中,我们将构建一个简单的 Elixir 应用程序,并使用 Kaffe 将其连接到本地运行的 Kafka 服务器。 稍后我们将介绍几个变体,以连接 docker 化的 Kafka 服务器或 umbrella Elixir 应用程序。

这篇文章假设你已经拥有 Elixir 的基本知识,而没有 Kafka 或 Kaffe 的知识。 这是完整项目的地址:Elixir Kaffe Codealong

Kafka 是啥, 简单点?

Kafka 是一个消息传递系统。 它实际上完成了三件事:

  1. 接收来自应用程序的消息
  2. 保持这些消息的接收顺序
  3. 允许其他应用程序按顺序读取这些消息

一个使用 Kafka 的情境:

假设我们要为用户保留活动日志。 每当用户在您的网站上触发事件(登录,搜索,单击 banner 等)时,您都希望记录该活动。 您还希望允许多个服务访问此活动日志,例如市场营销跟踪器,用户数据聚合器,当然还有您网站的前端应用程序。 我们可以将每个活动发送到 Kafka,而不是将每个活动都保存到您自己的数据库中,并允许所有这些应用程序仅从其中读取所需内容。


Kafka Flow 示例

从 Kafka 读取的三个服务将仅获取它们所需的数据。 例如,第一个服务只能从 banner_click topic 中读取,而最后一个只能从 search_term 中读取。 关心活跃用户的第二项服务可能会从这两个 topic 中读取信息,以捕获所有站点活动。

Kafka 基本术语

在开始进入代码之前,让我们先澄清一些常见的 Kafka 术语,这些术语在您学习有关此服务的更多知识时会遇到:

  • 消费者: 从 Kafka 接收邮件
  • 生产者: 向 Kafka 发送消息
  • 主题: 一种组织消息并允许消费者仅订阅他们想要接收的消息的方法
  • 分区: 允许将主题在多台计算机之间拆分并保留相同的数据,以便一次有多个消费者可以从一个主题中读取内容
  • 领导者/副本: 这些是分区的类型。 有一个领导者和多个副本。 领导者确保副本具有相同和最新的数据。 如果领导者失败,则副本将接管领导者。
  • 偏移量: 消息的唯一标识符,可在 Kafka 中保持其顺序

    代码: 基础的 elixir 应用 & 本地运行的 Kafka

    设置 Kafka 服务器

    请遵循 Apache Kafka 的快速开始说明 的前两个步骤:
  1. 下载代码

  2. 启动服务器

    Zookeeper(一项为 Kafka 处理某些协调和状态管理的服务)

    bin / config /


    bin / config /

设置 Elixir 应用

  • 1. 创建一个新项目

    mix new elixir_kaffe_codealong

  • 2. 配置 kaffe

    • 2.a:mix.exs 中添加 :kaffe 至额外的应用列表中:

      1. def application do
      2. [
      3. extra_applications: [:logger, :kaffe]
      4. ]
      5. end
    • 2.b: 添加 kaffe 到依赖列表中:

      1. defp deps do
      2. [
      3. {:kaffe, "~> 1.9"}
      4. ]
      5. end
    • 2.c: 在命令行终端运行 mix deps.get 以锁定新的依赖.

  • 3. 配置 producer

    config/config.exs 中添加:

    1. config :kaffe,
    2. producer: [
    3. endpoints: [localhost: 9092],
    4. # endpoints references [hostname: port]. Kafka is configured to run on port 9092.
    5. # In this example, the hostname is localhost because we've started the Kafka server
    6. # straight from our machine. However, if the server is dockerized, the hostname will
    7. # be called whatever is specified by that container (usually "kafka")
    8. topics: ["our_topic", "another_topic"], # add a list of topics you plan to produce messages to
    9. ]
  • 4. 配置 consumer

    • 4.a:/lib/application.ex 中添加以下代码:

      1. defmodule ElixirKaffeCodealong.Application do
      2. use Application # read more about Elixir's Application module here:
      3. def start(_type, args) do
      4. import Supervisor.Spec
      5. children = [
      6. worker(Kaffe.Consumer, []) # calls to start Kaffe's Consumer module
      7. ]
      8. opts = [strategy: :one_for_one, name: ExampleConsumer.Supervisor]
      9. Supervisor.start_link(children, opts)
      10. end
      11. end
    • 4.b: 回到 mix.exs, 在 application 函数中新增一个条目:

      1. def application do
      2. [
      3. extra_applications: [:logger, :kaffe],
      4. mod: {ElixirKaffeCodealong.Application, []}
      5. # now that we're using the Application module, this is where we'll tell it to start.
      6. # We use the keyword `mod` with applications that start a supervision tree,
      7. # which we configured when adding our Kaffe.Consumer to Application above.
      8. ]
      9. end
      • 4.c:/lib/example_consumer.ex 添加一个 consumer 模块接收来自 Kafka 的消息
      1. defmodule ExampleConsumer do
      2. # function to accept Kafka messaged MUST be named "handle_message"
      3. # MUST accept arguments structured as shown here
      4. # MUST return :ok
      5. # Can do anything else within the function with the incoming message
      6. def handle_message(%{key: key, value: value} = message) do
      7. IO.inspect(message)
      8. IO.puts("#{key}: #{value}")
      9. :ok
      10. end
      11. end
      • 4.d:/config/config.exs 中配置 consumer 模块
      1. config :kaffe,
      2. consumer: [
      3. endpoints: [localhost: 9092],
      4. topics: ["our_topic", "another_topic"], # the topic(s) that will be consumed
      5. consumer_group: "example-consumer-group", # the consumer group for tracking offsets in Kafka
      6. message_handler: ExampleConsumer, # the module that will process messages
      7. ]
  • 5. 添加生产者模块(可选,也可以从控制台调用 Kaffe)

    我们将在自己的 ExampleProducer 方法中包装 Kaffe 提供的功能。 直接调用 Kaffe 也可以; produce_sync 函数最终将我们的消息发送给 Kafka。

    /lib/example_producer.ex 中增加以下代码:

    1. defmodule ExampleProducer do
    2. def send_my_message({key, value}, topic) do
    3. Kaffe.Producer.produce_sync(topic, [{key, value}])
    4. end
    5. def send_my_message(key, value) do
    6. Kaffe.Producer.produce_sync(key, value)
    7. end
    8. def send_my_message(value) do
    9. Kaffe.Producer.produce_sync("sample_key", value)
    10. end
    11. end
  • 6. 在控制台中发送和接收消息!

    现在,我们已经完成了所有配置,可以使用我们创建的模块通过 Kafka 发送和读取消息!

    1. 我们将调用生产者将消息发送到 Kafka 服务器。
    2. Kafka 服务器收到消息。
    3. 我们配置为订阅名为 another_topic 的主题的使用者,将收到我们发送的消息并将其打印到控制台。

    使用 iex -S mix 启动一个 elixir 交互式命令行,然后调用以下命令:

    1. iex> ExampleProducer.send_my_message({"Metamorphosis", "Franz Kafka"}, "another_topic")
    2. ...>[debug] event#produce_list topic=another_topic
    3. ...>[debug] event#produce_list_to_topic topic=another_topic partition=0
    4. ...>:ok
    5. iex> %{
    6. ...> attributes: 0,
    7. ...> crc: 2125760860, # will vary
    8. ...> key: "Metamorphosis",
    9. ...> magic_byte: 1,
    10. ...> offset: 1, # will vary
    11. ...> partition: 0,
    12. ...> topic: "another_topic",
    13. ...> ts: 1546634470702, # will vary
    14. ...> ts_type: :create,
    15. ...> value: "Franz Kafka"
    16. ...> }
    17. ...> Metamorphosis: Franz Kafka

变体: Docker & Umbrella Apps

  • 如果您是从 docker 容器(在实际应用中最常见)运行 Kafka,则将在配置文件中使用该主机名,而不是 localhost

  • 在 umbrella app 中,您将在运行它的子应用程序中配置 Kaffe。 如果您的应用程序被环境分隔开,则可以通过将其构造为子级来启动使用者,如下所示:

  1. children = case args do
  2. [env: :prod] -> [worker(Kaffe.Consumer, [])]
  3. [env: :test] -> []
  4. [env: :dev] -> [worker(Kaffe.Consumer, [])]
  5. [_] -> []
  6. end


  • 没有 leader 错误

    1. ** (MatchError) no match of right hand side value: {:error, :LeaderNotAvailable}

    解决方案:再试一次。 只需一分钟即可预热。

  • 无效的 Topic 错误

    1. ** (MatchError) no match of right hand side value: {:error, :InvalidTopicException}

    解决方案:您的 topic 中不应有空格,对吗?


这篇文章应该已经为您提供了基本设置,让您可以自己开始探索更多功能,但是使用 Kaffe 可以做更多的事情,因此请检查发送多条消息,使用 consumer 组等。如果您遇到其他您已解决的错误 ,请通过 在此处创建问题 告诉我们。
