Implement support for cache partitioning for sessions (#5073)
* Implement very rudimentary support for cache partitioning for sessions * Convenience for starting partitioned caches * Test basic partitioning expectations * Include put_many in test * Use div/2 * Remove unused alias --------- Co-authored-by: Adam Rutkowski <hq@mtod.org>
This commit is contained in:
parent
417e996c1a
commit
a0c13383e7
|
|
@ -970,4 +970,6 @@ unless s3_disabled? do
|
|||
imports_bucket: s3_env_value.("S3_IMPORTS_BUCKET")
|
||||
end
|
||||
|
||||
config :plausible, Plausible.Cache.Adapter, sessions: [partitions: 4]
|
||||
|
||||
config :phoenix_storybook, enabled: env !== "prod"
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ defmodule Plausible.Application do
|
|||
ttl_check_interval: :timer.minutes(5),
|
||||
global_ttl: :timer.minutes(60)
|
||||
),
|
||||
Plausible.Cache.Adapter.child_spec(:sessions, :cache_sessions,
|
||||
Plausible.Cache.Adapter.child_specs(:sessions, :cache_sessions,
|
||||
ttl_check_interval: :timer.seconds(10),
|
||||
global_ttl: :timer.minutes(30)
|
||||
),
|
||||
|
|
@ -109,6 +109,7 @@ defmodule Plausible.Application do
|
|||
help_scout_vault()
|
||||
end
|
||||
]
|
||||
|> List.flatten()
|
||||
|> Enum.reject(&is_nil/1)
|
||||
|
||||
opts = [strategy: :one_for_one, name: Plausible.Supervisor]
|
||||
|
|
|
|||
|
|
@ -9,6 +9,23 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
require Logger
|
||||
|
||||
@spec child_specs(atom(), atom(), Keyword.t()) :: [Supervisor.child_spec()]
|
||||
def child_specs(name, child_id, opts \\ [])
|
||||
when is_atom(name) and is_atom(child_id) and is_list(opts) do
|
||||
partitions = partitions(name)
|
||||
|
||||
if partitions == 1 do
|
||||
[child_spec(name, child_id, opts)]
|
||||
else
|
||||
Enum.map(1..partitions, fn partition ->
|
||||
partition_name = String.to_atom("#{name}_#{partition}")
|
||||
partition_child_id = String.to_atom("#{child_id}_#{partition}")
|
||||
|
||||
child_spec(partition_name, partition_child_id, opts)
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
@spec child_spec(atom(), atom(), Keyword.t()) :: Supervisor.child_spec()
|
||||
def child_spec(name, child_id, opts \\ [])
|
||||
when is_atom(name) and is_atom(child_id) and is_list(opts) do
|
||||
|
|
@ -29,14 +46,18 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec size(atom()) :: non_neg_integer() | nil
|
||||
def size(cache_name) do
|
||||
ConCache.size(cache_name)
|
||||
cache_name
|
||||
|> get_names()
|
||||
|> Enum.map(&ConCache.size/1)
|
||||
|> Enum.sum()
|
||||
catch
|
||||
:exit, _ -> nil
|
||||
end
|
||||
|
||||
@spec get(atom(), any()) :: any()
|
||||
def get(cache_name, key) do
|
||||
ConCache.get(cache_name, key)
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
ConCache.get(full_cache_name, key)
|
||||
catch
|
||||
:exit, _ ->
|
||||
Logger.error("Error retrieving key from '#{inspect(cache_name)}'")
|
||||
|
|
@ -45,7 +66,8 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec get(atom(), any(), (-> any())) :: any()
|
||||
def get(cache_name, key, fallback_fn) do
|
||||
ConCache.get_or_store(cache_name, key, fallback_fn)
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
ConCache.get_or_store(full_cache_name, key, fallback_fn)
|
||||
catch
|
||||
:exit, _ ->
|
||||
Logger.error("Error retrieving key from '#{inspect(cache_name)}'")
|
||||
|
|
@ -54,7 +76,8 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec fetch(atom(), any(), (-> any())) :: any()
|
||||
def fetch(cache_name, key, fallback_fn) do
|
||||
ConCache.fetch_or_store(cache_name, key, fallback_fn)
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
ConCache.fetch_or_store(full_cache_name, key, fallback_fn)
|
||||
catch
|
||||
:exit, _ ->
|
||||
Logger.error("Error fetching key from '#{inspect(cache_name)}'")
|
||||
|
|
@ -63,10 +86,12 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec put(atom(), any(), any()) :: any()
|
||||
def put(cache_name, key, value, opts \\ []) do
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
|
||||
if opts[:dirty?] do
|
||||
:ok = ConCache.dirty_put(cache_name, key, value)
|
||||
:ok = ConCache.dirty_put(full_cache_name, key, value)
|
||||
else
|
||||
:ok = ConCache.put(cache_name, key, value)
|
||||
:ok = ConCache.put(full_cache_name, key, value)
|
||||
end
|
||||
|
||||
value
|
||||
|
|
@ -78,7 +103,12 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec put_many(atom(), [any()]) :: :ok
|
||||
def put_many(cache_name, items) when is_list(items) do
|
||||
true = :ets.insert(ConCache.ets(cache_name), items)
|
||||
items
|
||||
|> Enum.group_by(fn {key, _} -> get_name(cache_name, key) end)
|
||||
|> Enum.each(fn {full_cache_name, items} ->
|
||||
true = :ets.insert(ConCache.ets(full_cache_name), items)
|
||||
end)
|
||||
|
||||
:ok
|
||||
catch
|
||||
:exit, _ ->
|
||||
|
|
@ -88,7 +118,8 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec delete(atom(), any()) :: :ok
|
||||
def delete(cache_name, key) do
|
||||
ConCache.dirty_delete(cache_name, key)
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
ConCache.dirty_delete(full_cache_name, key)
|
||||
catch
|
||||
:exit, _ ->
|
||||
Logger.error("Error deleting a key in '#{cache_name}'")
|
||||
|
|
@ -97,16 +128,11 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
@spec keys(atom()) :: Enumerable.t()
|
||||
def keys(cache_name) do
|
||||
ets = ConCache.ets(cache_name)
|
||||
|
||||
Stream.resource(
|
||||
fn -> :ets.first(ets) end,
|
||||
fn
|
||||
:"$end_of_table" -> {:halt, nil}
|
||||
prev_key -> {[prev_key], :ets.next(ets, prev_key)}
|
||||
end,
|
||||
fn _ -> :ok end
|
||||
)
|
||||
cache_name
|
||||
|> get_names()
|
||||
|> Enum.reduce([], fn full_cache_name, stream ->
|
||||
Stream.concat(stream, get_keys(full_cache_name))
|
||||
end)
|
||||
catch
|
||||
:exit, _ ->
|
||||
Logger.error("Error retrieving key from '#{inspect(cache_name)}'")
|
||||
|
|
@ -116,7 +142,8 @@ defmodule Plausible.Cache.Adapter do
|
|||
@spec with_lock(atom(), any(), pos_integer(), (-> result)) :: {:ok, result} | {:error, :timeout}
|
||||
when result: any()
|
||||
def with_lock(cache_name, key, timeout, fun) do
|
||||
result = ConCache.isolated(cache_name, key, timeout, fun)
|
||||
full_cache_name = get_name(cache_name, key)
|
||||
result = ConCache.isolated(full_cache_name, key, timeout, fun)
|
||||
{:ok, result}
|
||||
catch
|
||||
:exit, {:timeout, _} ->
|
||||
|
|
@ -127,4 +154,43 @@ defmodule Plausible.Cache.Adapter do
|
|||
|
||||
{:error, :timeout}
|
||||
end
|
||||
|
||||
@spec get_names(atom()) :: [atom()]
|
||||
def get_names(cache_name) do
|
||||
partitions = partitions(cache_name)
|
||||
|
||||
if partitions == 1 do
|
||||
[cache_name]
|
||||
else
|
||||
Enum.map(1..partitions, &String.to_existing_atom("#{cache_name}_#{&1}"))
|
||||
end
|
||||
end
|
||||
|
||||
defp get_keys(full_cache_name) do
|
||||
ets = ConCache.ets(full_cache_name)
|
||||
|
||||
Stream.resource(
|
||||
fn -> :ets.first(ets) end,
|
||||
fn
|
||||
:"$end_of_table" -> {:halt, nil}
|
||||
prev_key -> {[prev_key], :ets.next(ets, prev_key)}
|
||||
end,
|
||||
fn _ -> :ok end
|
||||
)
|
||||
end
|
||||
|
||||
defp get_name(cache_name, key) do
|
||||
partitions = partitions(cache_name)
|
||||
|
||||
if partitions == 1 do
|
||||
cache_name
|
||||
else
|
||||
chosen_partition = :erlang.phash2(key, partitions) + 1
|
||||
String.to_existing_atom("#{cache_name}_#{chosen_partition}")
|
||||
end
|
||||
end
|
||||
|
||||
defp partitions(cache_name) do
|
||||
Application.get_env(:plausible, __MODULE__)[cache_name][:partitions] || 1
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -63,14 +63,30 @@ defmodule Plausible.Cache.Stats do
|
|||
end
|
||||
|
||||
def hit_rate(cache_name) do
|
||||
hit = :ets.lookup_element(__MODULE__, {cache_name, @hit}, 2, 0)
|
||||
miss = :ets.lookup_element(__MODULE__, {cache_name, @miss}, 2, 0)
|
||||
hit_miss = hit + miss
|
||||
cache_name
|
||||
|> Plausible.Cache.Adapter.get_names()
|
||||
|> Enum.reduce(
|
||||
%{hit: 0, miss: 0, hit_miss: 0.0},
|
||||
fn name, acc ->
|
||||
hit =
|
||||
acc.hit + :ets.lookup_element(__MODULE__, {name, @hit}, 2, 0)
|
||||
|
||||
if hit_miss == 0 do
|
||||
0.0
|
||||
else
|
||||
hit / hit_miss * 100
|
||||
end
|
||||
miss =
|
||||
acc.miss + :ets.lookup_element(__MODULE__, {name, @miss}, 2, 0)
|
||||
|
||||
hit_miss = hit + miss
|
||||
|
||||
hit_miss = if(hit_miss == 0, do: 0.0, else: hit / hit_miss * 100)
|
||||
|
||||
acc
|
||||
|> Map.put(:hit, hit)
|
||||
|> Map.put(:miss, miss)
|
||||
|> Map.put(
|
||||
:hit_miss,
|
||||
hit_miss
|
||||
)
|
||||
end
|
||||
)
|
||||
|> Map.fetch!(:hit_miss)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
defmodule Plausible.Cache.AdapterTest do
|
||||
use Plausible.DataCase, async: false
|
||||
|
||||
alias Plausible.Cache.Adapter
|
||||
|
||||
describe "adapter - partitioning" do
|
||||
test "multiple partitions are routed to", %{test: test} do
|
||||
name = :cache_partitions_test
|
||||
|
||||
iterations = 100
|
||||
partitions = 4
|
||||
|
||||
patch_env(Adapter, [{name, partitions: partitions}])
|
||||
|
||||
{:ok, _} =
|
||||
Supervisor.start_link(
|
||||
Adapter.child_specs(name, name, []),
|
||||
strategy: :one_for_one,
|
||||
name: :"cache_supervisor_#{test}"
|
||||
)
|
||||
|
||||
half = div(iterations, 2)
|
||||
for i <- 1..half, do: Adapter.put(name, i, i)
|
||||
Adapter.put_many(name, for(i <- half..iterations, do: {i, i}))
|
||||
|
||||
assert Adapter.size(name) == iterations
|
||||
|
||||
for i <- 1..iterations, do: assert(Adapter.get(name, i) == i)
|
||||
|
||||
assert name |> Adapter.keys() |> Enum.sort() == Enum.to_list(1..iterations)
|
||||
|
||||
for i <- 1..partitions do
|
||||
assert ConCache.size(:"#{name}_#{i}") > 0
|
||||
assert ConCache.size(:"#{name}_#{i}") < iterations
|
||||
end
|
||||
|
||||
assert {:ok, %{hit_rate: 100.0, count: ^iterations}} = Plausible.Cache.Stats.gather(name)
|
||||
end
|
||||
end
|
||||
end
|
||||
Loading…
Reference in New Issue