ETS

  1. ETS做缓存
  2. 竞态条件

每当我们需要查找一个桶,我们就要发送一个信息给注册表.这时我们的注册表会被多个进程并发访问,就会遇到瓶颈.

本章我们会学习ETS(Erlang条件存储)以及如何使用它作为缓存机制.

警告!不要贸然地使用ETS做缓存!查看日志并分析你的应用表现,确定那一部分是瓶颈,这样你就可以知道是否应该使用缓存,以及缓存什么.本章仅仅是一个如何使用ETS做缓存的例子.

ETS做缓存

ETS允许我们存储任何Elixir条件到一个内存中的表格.操作ETS表格需要通过Erlang的:ets模块:

  1. iex> table = :ets.new(:buckets_registry, [:set, :protected])
  2. 8207
  3. iex> :ets.insert(table, {"foo", self})
  4. true
  5. iex> :ets.lookup(table, "foo")
  6. [{"foo", #PID<0.41.0>}]

创建一个ETS表格时,需要两个参数:表格名以及一些选项.通过这些选项,我们传送了表格类型和它的访问规则.我们已经选择了:set类型,它意味着键不可以被复制.我们也将表格访问设置成了:protected,意味着只有创建了这个表格的进程可以写入它,但所有进程都可以从表中读取.这些都是默认值,所以我们将在之后跳过它们.

ETS表格可以被命名,允许我们通过名称来访问:

  1. iex> :ets.new(:buckets_registry, [:named_table])
  2. :buckets_registry
  3. iex> :ets.insert(:buckets_registry, {"foo", self})
  4. true
  5. iex> :ets.lookup(:buckets_registry, "foo")
  6. [{"foo", #PID<0.41.0>}]

让我们修改KV.Registry来使用ETS表格.由于我们的注册表需要一个名字作为参数,我们可以用相同的名字命名ETS表格.ETS名与进程名存储在不同的位置,所以它们不会冲突.

打开lib/kv/registry.ex,让我们来改变它的实现.我们已经为源代码的修改添加了注释:

  1. defmodule KV.Registry do
  2. use GenServer
  3. ## Client API
  4. @doc """
  5. Starts the registry with the given `name`.
  6. """
  7. def start_link(name) do
  8. # 1. 传送名字给GenServer的init
  9. GenServer.start_link(__MODULE__, name, name: name)
  10. end
  11. @doc """
  12. Looks up the bucket pid for `name` stored in `server`.
  13. Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  14. """
  15. def lookup(server, name) when is_atom(server) do
  16. # 2. 查找将直接在ETS中运行,不需要访问服务器
  17. case :ets.lookup(server, name) do
  18. [{^name, pid}] -> {:ok, pid}
  19. [] -> :error
  20. end
  21. end
  22. @doc """
  23. Ensures there is a bucket associated to the given `name` in `server`.
  24. """
  25. def create(server, name) do
  26. GenServer.cast(server, {:create, name})
  27. end
  28. @doc """
  29. Stops the registry.
  30. """
  31. def stop(server) do
  32. GenServer.stop(server)
  33. end
  34. ## Server callbacks
  35. def init(table) do
  36. # 3. 我们已经用ETS表格取代了名称映射
  37. names = :ets.new(table, [:named_table, read_concurrency: true])
  38. refs = %{}
  39. {:ok, {names, refs}}
  40. end
  41. # 4. 之前用于查找的handle_call回调已经删除
  42. def handle_cast({:create, name}, {names, refs}) do
  43. # 5. 对ETS表格进行读写,而非对映射
  44. case lookup(names, name) do
  45. {:ok, _pid} ->
  46. {:noreply, {names, refs}}
  47. :error ->
  48. {:ok, pid} = KV.Bucket.Supervisor.start_bucket
  49. ref = Process.monitor(pid)
  50. refs = Map.put(refs, ref, name)
  51. :ets.insert(names, {name, pid})
  52. {:noreply, {names, refs}}
  53. end
  54. end
  55. def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
  56. # 6. 从ETS表中删除而不是从映射中
  57. {name, refs} = Map.pop(refs, ref)
  58. :ets.delete(names, name)
  59. {:noreply, {names, refs}}
  60. end
  61. def handle_info(_msg, state) do
  62. {:noreply, state}
  63. end
  64. end

注意到在修改之前,KV.Reigstry.lookup/2会将请求发送给服务器,但现在会直接从ETS表格中读取,改表格会被所有进程分享.这就是我们实现的缓存机制背后的主要原理.

为了使缓存机制运行,ETS表格需要有让对其的访问:protected(默认),这样所有客户端都可以从中读取,而只有KV.Registry进程能写入.我们也在创建表格时设置了read_concurrency: true,为普通的并发读取操作的脚本做了表格优化.

上诉修改破坏了我们的测试,因为之前为所有操作使用的是注册表进程的pid,而现在注册表查找要求的是ETS表格名.然而,由于注册表进程和ETS表格有着相同的名字,就很好解决这个问题.将test/kv/registry_test.exs中的设置函数修改为:

  1. setup context do
  2. {:ok, _} = KV.Registry.start_link(context.test)
  3. {:ok, registry: context.test}
  4. end

我们修改了setup,仍然有一些测试失败了.你可能会注意到每次测试的结果不一致.例如,”生成桶”的测试:

  1. test "spawns buckets", %{registry: registry} do
  2. assert KV.Registry.lookup(registry, "shopping") == :error
  3. KV.Registry.create(registry, "shopping")
  4. assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  5. KV.Bucket.put(bucket, "milk", 1)
  6. assert KV.Bucket.get(bucket, "milk") == 1
  7. end

可能会在这里失败:

  1. {:ok, bucket} = KV.Registry.lookup(registry, "shopping")

为什么这条会失败,我们明明已经在上一条代码中创建了桶?

原因就是,出于教学目的,我们犯了两个错误:

\1. 我们贸然做了优化(通过添加这个缓存层) \2. 我们使用了cast/2(应该使用call/2)

竞态条件?

使用Elixir开发并不能使你的代码魔兽竞态条件影响.然而,Elixir中默认的不共用任何事物的简单概念使得我们更容易发现产生竞态条件的原因.

在发出操作和从ETS表格观察到改变之间会有延迟,导致了我们测试的失败.下面是我们希望看到的:

\1. 我们调用KV.Rgistry.create(registry, "shopping") \2. 注册表创建了桶并更新了缓存表格 \3. 我们使用KV.Registry.lookup(registry, "shopping")从表格中获取信息 \4. 返回{:ok, bucket}

然而,由于KV.Registry.create/2是一个投掷操作,这条命令将会在我们真正写入表格之前返回!换句话或,实际发生的是:

\1. 我们调用KV.Rgistry.create(registry, "shopping") \2. 我们使用KV.Registry.lookup(ets, "shopping")从表格中获取信息 \3. 返回:error \4. 注册表创建了桶并更新了缓存表格

为了修正这个错误,我们需要使得KV.Registry.create/2变为异步的,通过使用call/2代替cast/2.这就保证了客户端只会在对表格的改动发生过后继续.让我们修改函数,以及它的回调:

  1. def create(server, name) do
  2. GenServer.call(server, {:create, name})
  3. end
  4. def handle_call({:create, name}, _from, {names, refs}) do
  5. case lookup(names, name) do
  6. {:ok, pid} ->
  7. {:reply, pid, {names, refs}}
  8. :error ->
  9. {:ok, pid} = KV.Bucket.Supervisor.start_bucket
  10. ref = Process.monitor(pid)
  11. refs = Map.put(refs, ref, name)
  12. :ets.insert(names, {name, pid})
  13. {:reply, pid, {names, refs}}
  14. end
  15. end

我们简单地将回调从handle_cast/2改为了handle_call/3,并回复被创建了的桶的pid.通常来说,Elixir开发者更喜欢使用call/2而不是cast/2,因为它也提供了背压(你会被挡住直到得到回复).在不必要时使用cast/2也可以被看做是贸然的优化.

让我们再次运行测试.这一次,我们会传送--trace选项:

  1. $ mix test --trace

--trace选项在你的测试死锁或遇到竞态条件时很有用,因为它会异步运行所有测试(async: true无效)并展示每个测试的详细信息.这一次我们应该会得到一两个断断续续的失败:

  1. 1) test removes buckets on exit (KV.RegistryTest)
  2. test/kv/registry_test.exs:19
  3. Assertion with == failed
  4. code: KV.Registry.lookup(registry, "shopping") == :error
  5. lhs: {:ok, #PID<0.109.0>}
  6. rhs: :error
  7. stacktrace:
  8. test/kv/registry_test.exs:23

根据错误信息,我们期望桶不再存在,但它仍在那儿!这个问题与我们刚才解决的正相反:之前在命令创建桶与更新表格之间存在延迟,现在是在桶进程死亡与它在表中的记录被删除之间存在延迟.

不走运的是这一次我们不能简单地将handle_info/2这个负责清洁ETS表格的操作,改变为一个异步操作.相反我们需要找到一个方法来保证注册表已经处理了:DOWN通知的发送,当桶崩溃时.

简单的方法是发送一个异步请求给注册表:因为信息会被按顺序处理,如果注册表回复了一个在Agent.stop调用之后的发送的请求,就意味着:DOWN消息已经被处理了.让我们创建一个”bogus”桶,它是一个异步请求,在每个测试中排在Agent.stop之后:

  1. test "removes buckets on exit", %{registry: registry} do
  2. KV.Registry.create(registry, "shopping")
  3. {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  4. Agent.stop(bucket)
  5. # Do a call to ensure the registry processed the DOWN message
  6. _ = KV.Registry.create(registry, "bogus")
  7. assert KV.Registry.lookup(registry, "shopping") == :error
  8. end
  9. test "removes bucket on crash", %{registry: registry} do
  10. KV.Registry.create(registry, "shopping")
  11. {:ok, bucket} = KV.Registry.lookup(registry, "shopping")
  12. # Kill the bucket and wait for the notification
  13. Process.exit(bucket, :shutdown)
  14. # Wait until the bucket is dead
  15. ref = Process.monitor(bucket)
  16. assert_receive {:DOWN, ^ref, _, _, _}
  17. # Do a call to ensure the registry processed the DOWN message
  18. _ = KV.Registry.create(registry, "bogus")
  19. assert KV.Registry.lookup(registry, "shopping") == :error
  20. end

我们的测试现在可以(一直)通过了!

该总结一下我们的优化章节了.我们使用了ETS作为缓存机制,一人写万人读.更重要的是,一但数据可以被同步读取,就要当心竞态条件.

下一章我们将讨论外部和内部的依赖,以及Mix如何帮助我们管理巨型代码库.