From 40ff36a4fbd626359b32543063495faebb5c229e Mon Sep 17 00:00:00 2001 From: Adrian Gruntkowski Date: Mon, 1 Sep 2025 11:13:41 +0200 Subject: [PATCH] Implement dedicated persistence service (#5653) * Implement a very crude HTTP relay to persistor service * Temporarily disable local session and event persistence * Temporarily disable Promex in dev * Setup dedicated Finch pool for persistor * Temporarily adjust load script parameters * Fiddle with pool opts * Only log errors * Bump rate * Put persistor configuration in runtime config * Move persistor client code to a separate module * Move event and session persistence behind a switchable adapter * Add scaffolding of relaying persistor * Instrument relayed persistence * Adjust persistor API to accept full ingest event * Fix persistor URL in config defaults * Switch local dev env of embedded_with_relay backend * Revert "Temporarily disable Promex in dev" This reverts commit d9c9e9075d130cc5177a2a7559b98a2026077327. * Lower default `PERSISTOR_COUNT` * Refactor payload encoding and decoding slightly * Test and slightly improve persistor logic --- config/.env.dev | 1 + config/runtime.exs | 21 ++ lib/plausible/application.ex | 32 +++ lib/plausible/ingestion/event.ex | 40 +-- lib/plausible/ingestion/persistor.ex | 19 ++ lib/plausible/ingestion/persistor/embedded.ex | 42 +++ .../persistor/embedded_with_relay.ex | 82 ++++++ lib/plausible/ingestion/persistor/remote.ex | 95 +++++++ test/load/script.js | 4 +- test/plausible/ingestion/event_test.exs | 4 +- test/plausible/ingestion/persistor_test.exs | 244 ++++++++++++++++++ 11 files changed, 551 insertions(+), 33 deletions(-) create mode 100644 lib/plausible/ingestion/persistor.ex create mode 100644 lib/plausible/ingestion/persistor/embedded.ex create mode 100644 lib/plausible/ingestion/persistor/embedded_with_relay.ex create mode 100644 lib/plausible/ingestion/persistor/remote.ex create mode 100644 test/plausible/ingestion/persistor_test.exs diff --git a/config/.env.dev b/config/.env.dev index 6295c57bdf..8ab232f09b 100644 --- a/config/.env.dev +++ b/config/.env.dev @@ -3,6 +3,7 @@ SECURE_COOKIE=false DATABASE_URL=postgres://postgres:postgres@127.0.0.1:5432/plausible_dev CLICKHOUSE_DATABASE_URL=http://127.0.0.1:8123/plausible_events_db CLICKHOUSE_MAX_BUFFER_SIZE_BYTES=1000000 +PERSISTOR_BACKEND=embedded_with_relay SECRET_KEY_BASE=/njrhntbycvastyvtk1zycwfm981vpo/0xrvwjjvemdakc/vsvbrevlwsc6u8rcg TOTP_VAULT_KEY=Q3BD4nddbkVJIPXgHuo5NthGKSIH0yesRfG05J88HIo= ENVIRONMENT=dev diff --git a/config/runtime.exs b/config/runtime.exs index 7e83ba356e..e1ccf94ff7 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -141,6 +141,20 @@ end |> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000") |> Integer.parse() +persistor_backend = + case get_var_from_path_or_env(config_dir, "PERSISTOR_BACKEND", "embedded") do + "embedded" -> Plausible.Ingestion.Persistor.Embedded + "embedded_with_relay" -> Plausible.Ingestion.Persistor.EmbeddedWithRelay + "remote" -> Plausible.Ingestion.Persistor.Remote + end + +persistor_url = + get_var_from_path_or_env(config_dir, "PERSISTOR_URL", "http://localhost:8001/event") + +persistor_count = get_int_from_path_or_env(config_dir, "PERSISTOR_COUNT", 200) + +persistor_timeout_ms = get_int_from_path_or_env(config_dir, "PERSISTOR_TIMEOUT_MS", 10_000) + # Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from # iex shell or `openssl rand -base64 32` from command line. totp_vault_key = @@ -649,6 +663,13 @@ config :plausible, Plausible.ImportDeletionRepo, transport_opts: ch_transport_opts, pool_size: 1 +config :plausible, Plausible.Ingestion.Persistor, backend: persistor_backend + +config :plausible, Plausible.Ingestion.Persistor.Remote, + url: persistor_url, + count: persistor_count, + timeout_ms: persistor_timeout_ms + config :ex_money, open_exchange_rates_app_id: get_var_from_path_or_env(config_dir, "OPEN_EXCHANGE_RATES_APP_ID"), retrieve_every: :timer.hours(24) diff --git a/lib/plausible/application.ex b/lib/plausible/application.ex index ffc144a01a..8364c67f22 100644 --- a/lib/plausible/application.ex +++ b/lib/plausible/application.ex @@ -212,6 +212,7 @@ defmodule Plausible.Application do "https://icons.duckduckgo.com", Config.Reader.merge(default_opts, conn_opts: [transport_opts: [timeout: 15_000]]) ) + |> maybe_add_persistor_pool(default_opts) |> maybe_add_sentry_pool(default_opts) |> maybe_add_paddle_pool(default_opts) |> maybe_add_google_pools(default_opts) @@ -227,6 +228,37 @@ defmodule Plausible.Application do end end + defp maybe_add_persistor_pool(pool_config, default) do + backend = + :plausible + |> Application.fetch_env!(Plausible.Ingestion.Persistor) + |> Keyword.fetch!(:backend) + + persistor_conf = Application.get_env(:plausible, Plausible.Ingestion.Persistor.Remote) + + if backend in [ + Plausible.Ingestion.Persistor.Remote, + Plausible.Ingestion.Persistor.EmbeddedWithRelay + ] do + persistor_url = Keyword.fetch!(persistor_conf, :url) + count = Keyword.fetch!(persistor_conf, :count) + timeout_ms = Keyword.fetch!(persistor_conf, :timeout_ms) + + Map.put( + pool_config, + persistor_url, + Config.Reader.merge( + default, + protocol: :http2, + count: count, + conn_opts: [transport_opts: [timeout: timeout_ms]] + ) + ) + else + pool_config + end + end + defp maybe_add_paddle_pool(pool_config, default) do paddle_conf = Application.get_env(:plausible, :paddle) diff --git a/lib/plausible/ingestion/event.ex b/lib/plausible/ingestion/event.ex index 967f7b8b75..bf59d039e8 100644 --- a/lib/plausible/ingestion/event.ex +++ b/lib/plausible/ingestion/event.ex @@ -36,6 +36,9 @@ defmodule Plausible.Ingestion.Event do | :verification_agent | :lock_timeout | :no_session_for_engagement + | :persist_timeout + | :persist_error + | :persist_decode_error @type t() :: %__MODULE__{ domain: String.t() | nil, @@ -143,8 +146,7 @@ defmodule Plausible.Ingestion.Event do put_salts: &put_salts/2, put_user_id: &put_user_id/2, validate_clickhouse_event: &validate_clickhouse_event/2, - register_session: ®ister_session/2, - write_to_buffer: &write_to_buffer/2 + register_session: ®ister_session/2 ] end @@ -381,8 +383,7 @@ defmodule Plausible.Ingestion.Event do end defp register_session(%__MODULE__{} = event, context) do - write_buffer_insert = - Keyword.get(context, :session_write_buffer_insert, &Plausible.Session.WriteBuffer.insert/1) + persistor_opts = Keyword.get(context, :persistor_opts, []) previous_user_id = generate_user_id( @@ -392,35 +393,16 @@ defmodule Plausible.Ingestion.Event do event.salts.previous ) - session_result = - Plausible.Session.CacheStore.on_event( - event.clickhouse_event, - event.clickhouse_session_attrs, - previous_user_id, - buffer_insert: write_buffer_insert - ) + case Plausible.Ingestion.Persistor.persist_event(event, previous_user_id, persistor_opts) do + {:ok, event} -> + emit_telemetry_buffered(event) + event - case session_result do - {:ok, :no_session_for_engagement} -> - drop(event, :no_session_for_engagement) - - {:error, :timeout} -> - drop(event, :lock_timeout) - - {:ok, session} -> - %{ - event - | clickhouse_event: ClickhouseEventV2.merge_session(event.clickhouse_event, session) - } + {:error, reason} -> + drop(event, reason) end end - defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event, _context) do - {:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event) - emit_telemetry_buffered(event) - event - end - @click_id_params ["gclid", "gbraid", "wbraid", "msclkid", "fbclid", "twclid"] defp get_click_id_param(nil), do: nil diff --git a/lib/plausible/ingestion/persistor.ex b/lib/plausible/ingestion/persistor.ex new file mode 100644 index 0000000000..b54a2421f7 --- /dev/null +++ b/lib/plausible/ingestion/persistor.ex @@ -0,0 +1,19 @@ +defmodule Plausible.Ingestion.Persistor do + @moduledoc """ + Registers and persists sessions and events. + """ + + def persist_event(event, previous_user_id, opts) do + {backend_override, opts} = Keyword.pop(opts, :backend) + + backend(backend_override).persist_event(event, previous_user_id, opts) + end + + defp backend(nil) do + :plausible + |> Application.fetch_env!(__MODULE__) + |> Keyword.fetch!(:backend) + end + + defp backend(override), do: override +end diff --git a/lib/plausible/ingestion/persistor/embedded.ex b/lib/plausible/ingestion/persistor/embedded.ex new file mode 100644 index 0000000000..fadbc1d5b8 --- /dev/null +++ b/lib/plausible/ingestion/persistor/embedded.ex @@ -0,0 +1,42 @@ +defmodule Plausible.Ingestion.Persistor.Embedded do + @moduledoc """ + Embedded implementation of session and event persistence. + """ + + alias Plausible.ClickhouseEventV2 + + require Logger + + def persist_event(ingest_event, previous_user_id, opts) do + event = ingest_event.clickhouse_event + session_attrs = ingest_event.clickhouse_session_attrs + + session_write_buffer_insert = + Keyword.get(opts, :session_write_buffer_insert, &Plausible.Session.WriteBuffer.insert/1) + + event_write_buffer_insert = + Keyword.get(opts, :event_write_buffer_insert, &Plausible.Event.WriteBuffer.insert/1) + + session_result = + Plausible.Session.CacheStore.on_event( + event, + session_attrs, + previous_user_id, + buffer_insert: session_write_buffer_insert + ) + + case session_result do + {:ok, :no_session_for_engagement} -> + {:error, :no_session_for_engagement} + + {:error, :timeout} -> + {:error, :lock_timeout} + + {:ok, session} -> + event = ClickhouseEventV2.merge_session(event, session) + {:ok, _} = event_write_buffer_insert.(event) + + {:ok, %{ingest_event | clickhouse_event: event}} + end + end +end diff --git a/lib/plausible/ingestion/persistor/embedded_with_relay.ex b/lib/plausible/ingestion/persistor/embedded_with_relay.ex new file mode 100644 index 0000000000..bb5642ca68 --- /dev/null +++ b/lib/plausible/ingestion/persistor/embedded_with_relay.ex @@ -0,0 +1,82 @@ +defmodule Plausible.Ingestion.Persistor.EmbeddedWithRelay do + @moduledoc """ + Embedded implementation with async relay to remote. + """ + + alias Plausible.Ingestion.Persistor + + def persist_event(event, previous_user_id, opts) do + {sync?, opts} = Keyword.pop(opts, :sync?, false) + + if sync? do + persist_remote_event_sync(event, previous_user_id, opts) + else + persist_remote_event_async(event, previous_user_id, opts) + end + + Persistor.Embedded.persist_event(event, previous_user_id, opts) + end + + defp persist_remote_event_async(event, previous_user_id, opts) do + Task.start(fn -> + Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration( + telemetry_pipeline_step_duration(), + fn -> do_persist_event(event, previous_user_id, opts) end, + %{step: "remote_register_session"} + ) + end) + end + + defp persist_remote_event_sync(event, previous_user_id, opts) do + Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration( + telemetry_pipeline_step_duration(), + fn -> do_persist_event(event, previous_user_id, opts) end, + %{step: "remote_register_session"} + ) + end + + defp do_persist_event(event, previous_user_id, opts) do + result = Persistor.Remote.persist_event(event, previous_user_id, opts) + + case result do + {:ok, event} -> + emit_telemetry_buffered(event) + + {:error, reason} -> + emit_telemetry_dropped(event, reason) + end + end + + def emit_telemetry_buffered(event) do + :telemetry.execute(telemetry_event_buffered(), %{}, %{ + domain: event.domain, + request_timestamp: event.request.timestamp, + tracker_script_version: event.request.tracker_script_version + }) + end + + def emit_telemetry_dropped(event, reason) do + :telemetry.execute( + telemetry_event_dropped(), + %{}, + %{ + domain: event.domain, + reason: reason, + request_timestamp: event.request.timestamp, + tracker_script_version: event.request.tracker_script_version + } + ) + end + + def telemetry_event_buffered() do + [:plausible, :remote_ingest, :event, :buffered] + end + + def telemetry_event_dropped() do + [:plausible, :remote_ingest, :event, :dropped] + end + + def telemetry_pipeline_step_duration() do + [:plausible, :remote_ingest, :pipeline, :step] + end +end diff --git a/lib/plausible/ingestion/persistor/remote.ex b/lib/plausible/ingestion/persistor/remote.ex new file mode 100644 index 0000000000..d0b96ebe8c --- /dev/null +++ b/lib/plausible/ingestion/persistor/remote.ex @@ -0,0 +1,95 @@ +defmodule Plausible.Ingestion.Persistor.Remote do + @moduledoc """ + Remote implementation of session and event persistence. + """ + + require Logger + + def persist_event(ingest_event, previous_user_id, opts) do + event = ingest_event.clickhouse_event + session_attrs = ingest_event.clickhouse_session_attrs + + site_id = event.site_id + current_user_id = event.user_id + override_url = Keyword.get(opts, :url) + + headers = [ + {"x-site-id", site_id}, + {"x-current-user-id", current_user_id}, + {"x-previous-user-id", previous_user_id} + ] + + case Req.post(persistor_url(override_url), + finch: Plausible.Finch, + body: encode_payload(event, session_attrs), + headers: headers + ) do + {:ok, %{status: 200, body: event_payload}} -> + case decode_payload(event_payload) do + {:ok, event} -> + {:ok, %{ingest_event | clickhouse_event: event}} + + {:error, decode_error} -> + log_error(site_id, current_user_id, previous_user_id, decode_error) + {:error, :persist_decode_error} + end + + {:ok, %{body: error}} -> + log_error(site_id, current_user_id, previous_user_id, error) + {:error, decode_error(error)} + + {:error, %{reason: :timeout}} -> + {:error, :persist_timeout} + + {:error, error} -> + log_error(site_id, current_user_id, previous_user_id, error) + + {:error, :persist_error} + end + end + + defp encode_payload(event, session_attrs) do + event_data = + event + |> Map.from_struct() + |> Map.delete(:__meta__) + + {event_data, session_attrs} + |> :erlang.term_to_binary() + |> Base.encode64(padding: false) + end + + defp decode_payload(payload) do + case Base.decode64(payload, padding: false) do + {:ok, data} -> + event_data = :erlang.binary_to_term(data) + event = struct(Plausible.ClickhouseEventV2, event_data) + + {:ok, event} + + _ -> + {:error, :invalid_web_encoding} + end + catch + _, _ -> + {:error, :invalid_payload} + end + + defp decode_error("no_session_for_engagement"), do: :no_session_for_engagement + defp decode_error("lock_timeout"), do: :lock_timeout + defp decode_error(_), do: :persist_error + + defp persistor_url(nil) do + Keyword.fetch!(Application.fetch_env!(:plausible, __MODULE__), :url) + end + + defp persistor_url(url) when is_binary(url) do + url + end + + defp log_error(site_id, current_user_id, previous_user_id, error) do + Logger.warning( + "Persisting event for (#{site_id};#{current_user_id},#{previous_user_id}) failed: #{inspect(error)}" + ) + end +end diff --git a/test/load/script.js b/test/load/script.js index 0d2c4fab20..60799791fe 100644 --- a/test/load/script.js +++ b/test/load/script.js @@ -33,9 +33,9 @@ export const options = { scenarios: { constant_rps: { executor: "constant-arrival-rate", - rate: 12000, + rate: 6000, timeUnit: "1s", - duration: "15m", + duration: "1m", preAllocatedVUs: 10000, maxVUs: 30000, }, diff --git a/test/plausible/ingestion/event_test.exs b/test/plausible/ingestion/event_test.exs index d85d51b0ab..d36246ef3a 100644 --- a/test/plausible/ingestion/event_test.exs +++ b/test/plausible/ingestion/event_test.exs @@ -343,7 +343,7 @@ defmodule Plausible.Ingestion.EventTest do Task.start(fn -> assert {:ok, %{buffered: [_event], dropped: []}} = Event.build_and_buffer(first_request, - session_write_buffer_insert: very_slow_buffer + persistor_opts: [session_write_buffer_insert: very_slow_buffer] ) end) @@ -351,7 +351,7 @@ defmodule Plausible.Ingestion.EventTest do :slow_buffer_insert_started -> assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(second_request, - session_write_buffer_insert: very_slow_buffer + persistor_opts: [session_write_buffer_insert: very_slow_buffer] ) assert dropped.drop_reason == :lock_timeout diff --git a/test/plausible/ingestion/persistor_test.exs b/test/plausible/ingestion/persistor_test.exs new file mode 100644 index 0000000000..e50f67bb34 --- /dev/null +++ b/test/plausible/ingestion/persistor_test.exs @@ -0,0 +1,244 @@ +defmodule Plausible.Ingestion.PersistorTest do + use Plausible.DataCase + + import ExUnit.CaptureLog + + alias Plausible.Ingestion.Event + alias Plausible.Ingestion.Persistor + + @session_params %{ + referrer: "ref", + referrer_source: "refsource", + utm_medium: "medium", + utm_source: "source", + utm_campaign: "campaign", + utm_content: "content", + utm_term: "term", + browser: "browser", + browser_version: "55", + country_code: "EE", + screen_size: "Desktop", + operating_system: "Mac", + operating_system_version: "11" + } + + test "ingests using default, embedded persistor" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + assert {:ok, ingested_event} = Persistor.persist_event(ingest_event, nil, []) + refute ingested_event.dropped? + assert ingested_event.clickhouse_event.operating_system == "Mac" + assert is_integer(ingested_event.clickhouse_event.session_id) + end + + test "ingests using remote persistor" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + expect_persistor(bypass, fn input_event, session_attrs -> + assert session_attrs == @session_params + assert input_event.user_id == event.user_id + + input_event + |> Map.merge(session_attrs) + |> Map.put(:session_id, 123) + end) + + assert {:ok, ingested_event} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + + refute ingested_event.dropped? + assert ingested_event.clickhouse_event.session_id == 123 + end + + test "ingests using persistor with relay" do + conn = + Phoenix.ConnTest.build_conn(:post, "/api/events", %{ + name: "pageview", + url: "http://dummy.site" + }) + + {:ok, request} = Plausible.Ingestion.Request.build(conn) + + event = build(:event, name: "pageview") + + ingest_event = %Event{ + clickhouse_event: event, + clickhouse_session_attrs: @session_params, + request: request + } + + bypass = Bypass.open() + + expect_persistor(bypass, fn input_event, session_attrs -> + assert session_attrs == @session_params + assert input_event.user_id == event.user_id + + input_event + |> Map.merge(session_attrs) + |> Map.put(:session_id, 123) + end) + + assert {:ok, ingested_event} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.EmbeddedWithRelay, + url: bypass_url(bypass), + sync?: true + ) + + refute ingested_event.dropped? + assert is_integer(ingested_event.clickhouse_event.session_id) + assert ingested_event.clickhouse_event.session_id != 123 + end + + test "remote persistor failing due to invalid response payload" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + event_payload = Base.encode64("invalid", padding: false) + + conn + |> Plug.Conn.resp(200, event_payload) + end) + + assert capture_log(fn -> + assert {:error, :persist_decode_error} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "invalid_payload" + end + + test "remote persistor failing due to invalid response payload encoding" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + conn + |> Plug.Conn.resp(200, "invalid encoding") + end) + + assert capture_log(fn -> + assert {:error, :persist_decode_error} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "invalid_web_encoding" + end + + test "remote persistor failing due to no session for engagement" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + conn + |> Plug.Conn.resp(500, "no_session_for_engagement") + end) + + assert capture_log(fn -> + assert {:error, :no_session_for_engagement} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "no_session_for_engagement" + end + + test "remote persistor failing due to lock timeout" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + conn + |> Plug.Conn.resp(500, "lock_timeout") + end) + + assert capture_log(fn -> + assert {:error, :lock_timeout} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "lock_timeout" + end + + test "remote persistor failing due to unknown server error" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + conn + |> Plug.Conn.resp(500, "unknown_error") + end) + + assert capture_log(fn -> + assert {:error, :persist_error} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "unknown_error" + end + + test "remote persistor failing due to network error" do + event = build(:event, name: "pageview") + ingest_event = %Event{clickhouse_event: event, clickhouse_session_attrs: @session_params} + + bypass = Bypass.open() + Bypass.down(bypass) + + assert capture_log(fn -> + assert {:error, :persist_error} = + Persistor.persist_event(ingest_event, nil, + backend: Persistor.Remote, + url: bypass_url(bypass) + ) + end) =~ "econnrefused" + end + + defp bypass_url(bypass) do + "http://localhost:#{bypass.port}/event" + end + + defp expect_persistor(bypass, callback_fn) do + Bypass.expect_once(bypass, "POST", "/event", fn conn -> + {:ok, body, conn} = Plug.Conn.read_body(conn) + + {input_event, session_attrs} = + body + |> Base.decode64!(padding: false) + |> :erlang.binary_to_term() + + output_event = callback_fn.(input_event, session_attrs) + + event_payload = + output_event + |> Map.merge(session_attrs) + |> Map.put(:session_id, 123) + |> :erlang.term_to_binary() + |> Base.encode64(padding: false) + + conn + |> Plug.Conn.resp(200, event_payload) + end) + end +end