Broadcast consolidated view cache updates across the cluster (#5769)
This commit is contained in:
parent
f52bf3feab
commit
45659fd2a6
|
|
@ -6,6 +6,7 @@ defmodule Plausible.ConsolidatedView do
|
||||||
"""
|
"""
|
||||||
|
|
||||||
use Plausible
|
use Plausible
|
||||||
|
alias Plausible.ConsolidatedView
|
||||||
|
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
|
|
@ -105,10 +106,15 @@ defmodule Plausible.ConsolidatedView do
|
||||||
defp do_enable(%Team{} = team) do
|
defp do_enable(%Team{} = team) do
|
||||||
case get(team) do
|
case get(team) do
|
||||||
nil ->
|
nil ->
|
||||||
team
|
{:ok, consolidated_view} =
|
||||||
|> Site.new_for_team(%{consolidated: true, domain: make_id(team)})
|
team
|
||||||
|> change_stats_dates(team)
|
|> Site.new_for_team(%{consolidated: true, domain: make_id(team)})
|
||||||
|> Repo.insert()
|
|> change_stats_dates(team)
|
||||||
|
|> Repo.insert()
|
||||||
|
|
||||||
|
{:ok, site_ids} = site_ids(team)
|
||||||
|
:ok = ConsolidatedView.Cache.broadcast_put(consolidated_view.domain, site_ids)
|
||||||
|
{:ok, consolidated_view}
|
||||||
|
|
||||||
consolidated_view ->
|
consolidated_view ->
|
||||||
{:ok, consolidated_view}
|
{:ok, consolidated_view}
|
||||||
|
|
|
||||||
|
|
@ -64,6 +64,19 @@ defmodule Plausible.Cache do
|
||||||
alias Plausible.Cache.Adapter
|
alias Plausible.Cache.Adapter
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
|
@spec broadcast_put(any(), Keyword.t()) :: :ok
|
||||||
|
def broadcast_put(key, value, opts \\ []) do
|
||||||
|
cache_name = Keyword.get(opts, :cache_name, name())
|
||||||
|
multicall_timeout = Keyword.get(opts, :multicall_timeout, :timer.seconds(5))
|
||||||
|
|
||||||
|
{:ok, _} =
|
||||||
|
Task.start(fn ->
|
||||||
|
:rpc.multicall(Adapter, :put, [cache_name, key, value, opts], multicall_timeout)
|
||||||
|
end)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
@spec get(any(), Keyword.t()) :: any() | nil
|
@spec get(any(), Keyword.t()) :: any() | nil
|
||||||
def get(key, opts \\ []) when is_list(opts) do
|
def get(key, opts \\ []) when is_list(opts) do
|
||||||
cache_name = Keyword.get(opts, :cache_name, name())
|
cache_name = Keyword.get(opts, :cache_name, name())
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,15 @@ defmodule Plausible.CacheTest do
|
||||||
assert :changed == ExampleCache.get("item1", cache_name: test, force?: true)
|
assert :changed == ExampleCache.get("item1", cache_name: test, force?: true)
|
||||||
assert :item2 == ExampleCache.get("item2", cache_name: test, force?: true)
|
assert :item2 == ExampleCache.get("item2", cache_name: test, force?: true)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "broadcast_put puts into local cache", %{test: test} do
|
||||||
|
{:ok, _} = start_test_cache(test)
|
||||||
|
:ok = ExampleCache.broadcast_put("item1", :item1, cache_name: test)
|
||||||
|
|
||||||
|
assert eventually(fn ->
|
||||||
|
{ExampleCache.get("item1", cache_name: test, force?: true) == :item1, :ok}
|
||||||
|
end)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "warming the cache" do
|
describe "warming the cache" do
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,15 @@ defmodule Plausible.ConsolidatedViewTest do
|
||||||
assert view.native_stats_start_at == min
|
assert view.native_stats_start_at == min
|
||||||
assert view.stats_start_date == NaiveDateTime.to_date(min)
|
assert view.stats_start_date == NaiveDateTime.to_date(min)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "enable/1 updates cache", %{team: team} do
|
||||||
|
site = new_site(team: team)
|
||||||
|
{:ok, _} = ConsolidatedView.enable(team)
|
||||||
|
|
||||||
|
assert eventually(fn ->
|
||||||
|
{ConsolidatedView.Cache.get(team.identifier) == [site.id], :ok}
|
||||||
|
end)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "disable/1" do
|
describe "disable/1" do
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue