Implement dedicated persistence service (#5653)

* Implement a very crude HTTP relay to persistor service

* Temporarily disable local session and event persistence

* Temporarily disable Promex in dev

* Setup dedicated Finch pool for persistor

* Temporarily adjust load script parameters

* Fiddle with pool opts

* Only log errors

* Bump rate

* Put persistor configuration in runtime config

* Move persistor client code to a separate module

* Move event and session persistence behind a switchable adapter

* Add scaffolding of relaying persistor

* Instrument relayed persistence

* Adjust persistor API to accept full ingest event

* Fix persistor URL in config defaults

* Switch local dev env of embedded_with_relay backend

* Revert "Temporarily disable Promex in dev"

This reverts commit d9c9e9075d130cc5177a2a7559b98a2026077327.

* Lower default `PERSISTOR_COUNT`

* Refactor payload encoding and decoding slightly

* Test and slightly improve persistor logic
This commit is contained in:
Adrian Gruntkowski 2025-09-01 11:13:41 +02:00 committed by GitHub
parent 91e60fe148
commit 40ff36a4fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 551 additions and 33 deletions

View File

@ -3,6 +3,7 @@ SECURE_COOKIE=false
DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/plausible_dev DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/plausible_dev
CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_events_db CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_events_db
CLICKHOUSE_MAX_BUFFER_SIZE_BYTES=1000000 CLICKHOUSE_MAX_BUFFER_SIZE_BYTES=1000000
PERSISTOR_BACKEND=embedded_with_relay
SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg
TOTP_VAULT_KEY=Q3BD4nddbkVJIPXgHuo5NthGKSIH0yesRfG05J88HIo= TOTP_VAULT_KEY=Q3BD4nddbkVJIPXgHuo5NthGKSIH0yesRfG05J88HIo=
ENVIRONMENT=dev ENVIRONMENT=dev

View File

@ -141,6 +141,20 @@ end
|> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000") |> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000")
|> Integer.parse() |> Integer.parse()
persistor_backend =
case get_var_from_path_or_env(config_dir, "PERSISTOR_BACKEND", "embedded") do
"embedded" -> Plausible.Ingestion.Persistor.Embedded
"embedded_with_relay" -> Plausible.Ingestion.Persistor.EmbeddedWithRelay
"remote" -> Plausible.Ingestion.Persistor.Remote
end
persistor_url =
get_var_from_path_or_env(config_dir, "PERSISTOR_URL", "http://localhost:8001/event")
persistor_count = get_int_from_path_or_env(config_dir, "PERSISTOR_COUNT", 200)
persistor_timeout_ms = get_int_from_path_or_env(config_dir, "PERSISTOR_TIMEOUT_MS", 10_000)
# Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from # Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from
# iex shell or `openssl rand -base64 32` from command line. # iex shell or `openssl rand -base64 32` from command line.
totp_vault_key = totp_vault_key =
@ -649,6 +663,13 @@ config :plausible, Plausible.ImportDeletionRepo,
transport_opts: ch_transport_opts, transport_opts: ch_transport_opts,
pool_size: 1 pool_size: 1
config :plausible, Plausible.Ingestion.Persistor, backend: persistor_backend
config :plausible, Plausible.Ingestion.Persistor.Remote,
url: persistor_url,
count: persistor_count,
timeout_ms: persistor_timeout_ms
config :ex_money, config :ex_money,
open_exchange_rates_app_id: get_var_from_path_or_env(config_dir, "OPEN_EXCHANGE_RATES_APP_ID"), open_exchange_rates_app_id: get_var_from_path_or_env(config_dir, "OPEN_EXCHANGE_RATES_APP_ID"),
retrieve_every: :timer.hours(24) retrieve_every: :timer.hours(24)

View File

@ -212,6 +212,7 @@ defmodule Plausible.Application do
"https://icons.duckduckgo.com", "https://icons.duckduckgo.com",
Config.Reader.merge(default_opts, conn_opts: [transport_opts: [timeout: 15_000]]) Config.Reader.merge(default_opts, conn_opts: [transport_opts: [timeout: 15_000]])
) )
|> maybe_add_persistor_pool(default_opts)
|> maybe_add_sentry_pool(default_opts) |> maybe_add_sentry_pool(default_opts)
|> maybe_add_paddle_pool(default_opts) |> maybe_add_paddle_pool(default_opts)
|> maybe_add_google_pools(default_opts) |> maybe_add_google_pools(default_opts)
@ -227,6 +228,37 @@ defmodule Plausible.Application do
end end
end end
defp maybe_add_persistor_pool(pool_config, default) do
backend =
:plausible
|> Application.fetch_env!(Plausible.Ingestion.Persistor)
|> Keyword.fetch!(:backend)
persistor_conf = Application.get_env(:plausible, Plausible.Ingestion.Persistor.Remote)
if backend in [
Plausible.Ingestion.Persistor.Remote,
Plausible.Ingestion.Persistor.EmbeddedWithRelay
] do
persistor_url = Keyword.fetch!(persistor_conf, :url)
count = Keyword.fetch!(persistor_conf, :count)
timeout_ms = Keyword.fetch!(persistor_conf, :timeout_ms)
Map.put(
pool_config,
persistor_url,
Config.Reader.merge(
default,
protocol: :http2,
count: count,
conn_opts: [transport_opts: [timeout: timeout_ms]]
)
)
else
pool_config
end
end
defp maybe_add_paddle_pool(pool_config, default) do defp maybe_add_paddle_pool(pool_config, default) do
paddle_conf = Application.get_env(:plausible, :paddle) paddle_conf = Application.get_env(:plausible, :paddle)

View File

@ -36,6 +36,9 @@ defmodule Plausible.Ingestion.Event do
| :verification_agent | :verification_agent
| :lock_timeout | :lock_timeout
| :no_session_for_engagement | :no_session_for_engagement
| :persist_timeout
| :persist_error
| :persist_decode_error
@type t() :: %__MODULE__{ @type t() :: %__MODULE__{
domain: String.t() | nil, domain: String.t() | nil,
@ -143,8 +146,7 @@ defmodule Plausible.Ingestion.Event do
put_salts: &put_salts/2, put_salts: &put_salts/2,
put_user_id: &put_user_id/2, put_user_id: &put_user_id/2,
validate_clickhouse_event: &validate_clickhouse_event/2, validate_clickhouse_event: &validate_clickhouse_event/2,
register_session: &register_session/2, register_session: &register_session/2
write_to_buffer: &write_to_buffer/2
] ]
end end
@ -381,8 +383,7 @@ defmodule Plausible.Ingestion.Event do
end end
defp register_session(%__MODULE__{} = event, context) do defp register_session(%__MODULE__{} = event, context) do
write_buffer_insert = persistor_opts = Keyword.get(context, :persistor_opts, [])
Keyword.get(context, :session_write_buffer_insert, &Plausible.Session.WriteBuffer.insert/1)
previous_user_id = previous_user_id =
generate_user_id( generate_user_id(
@ -392,35 +393,16 @@ defmodule Plausible.Ingestion.Event do
event.salts.previous event.salts.previous
) )
session_result = case Plausible.Ingestion.Persistor.persist_event(event, previous_user_id, persistor_opts) do
Plausible.Session.CacheStore.on_event( {:ok, event} ->
event.clickhouse_event, emit_telemetry_buffered(event)
event.clickhouse_session_attrs, event
previous_user_id,
buffer_insert: write_buffer_insert
)
case session_result do {:error, reason} ->
{:ok, :no_session_for_engagement} -> drop(event, reason)
drop(event, :no_session_for_engagement)
{:error, :timeout} ->
drop(event, :lock_timeout)
{:ok, session} ->
%{
event
| clickhouse_event: ClickhouseEventV2.merge_session(event.clickhouse_event, session)
}
end end
end end
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event, _context) do
{:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event)
emit_telemetry_buffered(event)
event
end
@click_id_params ["gclid", "gbraid", "wbraid", "msclkid", "fbclid", "twclid"] @click_id_params ["gclid", "gbraid", "wbraid", "msclkid", "fbclid", "twclid"]
defp get_click_id_param(nil), do: nil defp get_click_id_param(nil), do: nil

View File

@ -0,0 +1,19 @@
defmodule Plausible.Ingestion.Persistor do
@moduledoc """
Registers and persists sessions and events.
"""
def persist_event(event, previous_user_id, opts) do
{backend_override, opts} = Keyword.pop(opts, :backend)
backend(backend_override).persist_event(event, previous_user_id, opts)
end
defp backend(nil) do
:plausible
|> Application.fetch_env!(__MODULE__)
|> Keyword.fetch!(:backend)
end
defp backend(override), do: override
end

View File

@ -0,0 +1,42 @@
defmodule Plausible.Ingestion.Persistor.Embedded do
@moduledoc """
Embedded implementation of session and event persistence.
"""
alias Plausible.ClickhouseEventV2
require Logger
def persist_event(ingest_event, previous_user_id, opts) do
event = ingest_event.clickhouse_event
session_attrs = ingest_event.clickhouse_session_attrs
session_write_buffer_insert =
Keyword.get(opts, :session_write_buffer_insert, &Plausible.Session.WriteBuffer.insert/1)
event_write_buffer_insert =
Keyword.get(opts, :event_write_buffer_insert, &Plausible.Event.WriteBuffer.insert/1)
session_result =
Plausible.Session.CacheStore.on_event(
event,
session_attrs,
previous_user_id,
buffer_insert: session_write_buffer_insert
)
case session_result do
{:ok, :no_session_for_engagement} ->
{:error, :no_session_for_engagement}
{:error, :timeout} ->
{:error, :lock_timeout}
{:ok, session} ->
event = ClickhouseEventV2.merge_session(event, session)
{:ok, _} = event_write_buffer_insert.(event)
{:ok, %{ingest_event | clickhouse_event: event}}
end
end
end

View File

@ -0,0 +1,82 @@
defmodule Plausible.Ingestion.Persistor.EmbeddedWithRelay do
@moduledoc """
Embedded implementation with async relay to remote.
"""
alias Plausible.Ingestion.Persistor
def persist_event(event, previous_user_id, opts) do
{sync?, opts} = Keyword.pop(opts, :sync?, false)
if sync? do
persist_remote_event_sync(event, previous_user_id, opts)
else
persist_remote_event_async(event, previous_user_id, opts)
end
Persistor.Embedded.persist_event(event, previous_user_id, opts)
end
defp persist_remote_event_async(event, previous_user_id, opts) do
Task.start(fn ->
Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration(
telemetry_pipeline_step_duration(),
fn -> do_persist_event(event, previous_user_id, opts) end,
%{step: "remote_register_session"}
)
end)
end
defp persist_remote_event_sync(event, previous_user_id, opts) do
Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration(
telemetry_pipeline_step_duration(),
fn -> do_persist_event(event, previous_user_id, opts) end,
%{step: "remote_register_session"}
)
end
defp do_persist_event(event, previous_user_id, opts) do
result = Persistor.Remote.persist_event(event, previous_user_id, opts)
case result do
{:ok, event} ->
emit_telemetry_buffered(event)
{:error, reason} ->
emit_telemetry_dropped(event, reason)
end
end
def emit_telemetry_buffered(event) do
:telemetry.execute(telemetry_event_buffered(), %{}, %{
domain: event.domain,
request_timestamp: event.request.timestamp,
tracker_script_version: event.request.tracker_script_version
})
end
def emit_telemetry_dropped(event, reason) do
:telemetry.execute(
telemetry_event_dropped(),
%{},
%{
domain: event.domain,
reason: reason,
request_timestamp: event.request.timestamp,
tracker_script_version: event.request.tracker_script_version
}
)
end
def telemetry_event_buffered() do
[:plausible, :remote_ingest, :event, :buffered]
end
def telemetry_event_dropped() do
[:plausible, :remote_ingest, :event, :dropped]
end
def telemetry_pipeline_step_duration() do
[:plausible, :remote_ingest, :pipeline, :step]
end
end

View File

@ -0,0 +1,95 @@
defmodule Plausible.Ingestion.Persistor.Remote do
@moduledoc """
Remote implementation of session and event persistence.
"""
require Logger
def persist_event(ingest_event, previous_user_id, opts) do
event = ingest_event.clickhouse_event
session_attrs = ingest_event.clickhouse_session_attrs
site_id = event.site_id
current_user_id = event.user_id
override_url = Keyword.get(opts, :url)
headers = [
{"x-site-id", site_id},
{"x-current-user-id", current_user_id},
{"x-previous-user-id", previous_user_id}
]
case Req.post(persistor_url(override_url),
finch: Plausible.Finch,
body: encode_payload(event, session_attrs),
headers: headers
) do
{:ok, %{status: 200, body: event_payload}} ->
case decode_payload(event_payload) do
{:ok, event} ->
{:ok, %{ingest_event | clickhouse_event: event}}
{:error, decode_error} ->
log_error(site_id, current_user_id, previous_user_id, decode_error)
{:error, :persist_decode_error}
end
{:ok, %{body: error}} ->
log_error(site_id, current_user_id, previous_user_id, error)
{:error, decode_error(error)}
{:error, %{reason: :timeout}} ->
{:error, :persist_timeout}
{:error, error} ->
log_error(site_id, current_user_id, previous_user_id, error)
{:error, :persist_error}
end
end
defp encode_payload(event, session_attrs) do
event_data =
event
|> Map.from_struct()
|> Map.delete(:__meta__)
{event_data, session_attrs}
|> :erlang.term_to_binary()
|> Base.encode64(padding: false)
end
defp decode_payload(payload) do
case Base.decode64(payload, padding: false) do
{:ok, data} ->
event_data = :erlang.binary_to_term(data)
event = struct(Plausible.ClickhouseEventV2, event_data)
{:ok, event}
_ ->
{:error, :invalid_web_encoding}
end
catch
_, _ ->
{:error, :invalid_payload}
end
defp decode_error("no_session_for_engagement"), do: :no_session_for_engagement
defp decode_error("lock_timeout"), do: :lock_timeout
defp decode_error(_), do: :persist_error
defp persistor_url(nil) do
Keyword.fetch!(Application.fetch_env!(:plausible, __MODULE__), :url)
end
defp persistor_url(url) when is_binary(url) do
url
end
defp log_error(site_id, current_user_id, previous_user_id, error) do
Logger.warning(
"Persisting event for (#{site_id};#{current_user_id},#{previous_user_id}) failed: #{inspect(error)}"
)
end
end

View File

@ -33,9 +33,9 @@ export const options = {
scenarios: { scenarios: {
constant_rps: { constant_rps: {
executor: "constant-arrival-rate", executor: "constant-arrival-rate",
rate: 12000, rate: 6000,
timeUnit: "1s", timeUnit: "1s",
duration: "15m", duration: "1m",
preAllocatedVUs: 10000, preAllocatedVUs: 10000,
maxVUs: 30000, maxVUs: 30000,
}, },

View File

@ -343,7 +343,7 @@ defmodule Plausible.Ingestion.EventTest do
Task.start(fn -> Task.start(fn ->
assert {:ok, %{buffered: [_event], dropped: []}} = assert {:ok, %{buffered: [_event], dropped: []}} =
Event.build_and_buffer(first_request, Event.build_and_buffer(first_request,
session_write_buffer_insert: very_slow_buffer persistor_opts: [session_write_buffer_insert: very_slow_buffer]
) )
end) end)
@ -351,7 +351,7 @@ defmodule Plausible.Ingestion.EventTest do
:slow_buffer_insert_started -> :slow_buffer_insert_started ->
assert {:ok, %{buffered: [], dropped: [dropped]}} = assert {:ok, %{buffered: [], dropped: [dropped]}} =
Event.build_and_buffer(second_request, Event.build_and_buffer(second_request,
session_write_buffer_insert: very_slow_buffer persistor_opts: [session_write_buffer_insert: very_slow_buffer]
) )
assert dropped.drop_reason == :lock_timeout assert dropped.drop_reason == :lock_timeout

View File

@ -0,0 +1,244 @@
defmodule Plausible.Ingestion.PersistorTest do
use Plausible.DataCase
import ExUnit.CaptureLog
alias Plausible.Ingestion.Event
alias Plausible.Ingestion.Persistor
@session_params %{
referrer: "ref",
referrer_source: "refsource",
utm_medium: "medium",
utm_source: "source",
utm_campaign: "campaign",
utm_content: "content",
utm_term: "term",
browser: "browser",
browser_version: "55",
country_code: "EE",
screen_size: "Desktop",
operating_system: "Mac",
operating_system_version: "11"
}
test "ingests using default, embedded persistor" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
assert {:ok, ingested_event} = Persistor.persist_event(ingest_event, nil, [])
refute ingested_event.dropped?
assert ingested_event.clickhouse_event.operating_system == "Mac"
assert is_integer(ingested_event.clickhouse_event.session_id)
end
test "ingests using remote persistor" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
expect_persistor(bypass, fn input_event, session_attrs ->
assert session_attrs == @session_params
assert input_event.user_id == event.user_id
input_event
|> Map.merge(session_attrs)
|> Map.put(:session_id, 123)
end)
assert {:ok, ingested_event} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
refute ingested_event.dropped?
assert ingested_event.clickhouse_event.session_id == 123
end
test "ingests using persistor with relay" do
conn =
Phoenix.ConnTest.build_conn(:post, "/api/events", %{
name: "pageview",
url: "http://dummy.site"
})
{:ok, request} = Plausible.Ingestion.Request.build(conn)
event = build(:event, name: "pageview")
ingest_event = %Event{
clickhouse_event: event,
clickhouse_session_attrs: @session_params,
request: request
}
bypass = Bypass.open()
expect_persistor(bypass, fn input_event, session_attrs ->
assert session_attrs == @session_params
assert input_event.user_id == event.user_id
input_event
|> Map.merge(session_attrs)
|> Map.put(:session_id, 123)
end)
assert {:ok, ingested_event} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.EmbeddedWithRelay,
url: bypass_url(bypass),
sync?: true
)
refute ingested_event.dropped?
assert is_integer(ingested_event.clickhouse_event.session_id)
assert ingested_event.clickhouse_event.session_id != 123
end
test "remote persistor failing due to invalid response payload" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
event_payload = Base.encode64("invalid", padding: false)
conn
|> Plug.Conn.resp(200, event_payload)
end)
assert capture_log(fn ->
assert {:error, :persist_decode_error} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "invalid_payload"
end
test "remote persistor failing due to invalid response payload encoding" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
conn
|> Plug.Conn.resp(200, "invalid encoding")
end)
assert capture_log(fn ->
assert {:error, :persist_decode_error} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "invalid_web_encoding"
end
test "remote persistor failing due to no session for engagement" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
conn
|> Plug.Conn.resp(500, "no_session_for_engagement")
end)
assert capture_log(fn ->
assert {:error, :no_session_for_engagement} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "no_session_for_engagement"
end
test "remote persistor failing due to lock timeout" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
conn
|> Plug.Conn.resp(500, "lock_timeout")
end)
assert capture_log(fn ->
assert {:error, :lock_timeout} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "lock_timeout"
end
test "remote persistor failing due to unknown server error" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
conn
|> Plug.Conn.resp(500, "unknown_error")
end)
assert capture_log(fn ->
assert {:error, :persist_error} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "unknown_error"
end
test "remote persistor failing due to network error" do
event = build(:event, name: "pageview")
ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params}
bypass = Bypass.open()
Bypass.down(bypass)
assert capture_log(fn ->
assert {:error, :persist_error} =
Persistor.persist_event(ingest_event, nil,
backend: Persistor.Remote,
url: bypass_url(bypass)
)
end) =~ "econnrefused"
end
defp bypass_url(bypass) do
"http://localhost:#{bypass.port}/event"
end
defp expect_persistor(bypass, callback_fn) do
Bypass.expect_once(bypass, "POST", "/event", fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
{input_event, session_attrs} =
body
|> Base.decode64!(padding: false)
|> :erlang.binary_to_term()
output_event = callback_fn.(input_event, session_attrs)
event_payload =
output_event
|> Map.merge(session_attrs)
|> Map.put(:session_id, 123)
|> :erlang.term_to_binary()
|> Base.encode64(padding: false)
conn
|> Plug.Conn.resp(200, event_payload)
end)
end
end