Drop event explicitly on session lock timeout (#4582)

* Drop event explicitly on session lock timeout

* Make session `user_id` more random in tests to avoid excess locking

* Improve testability to event ingestion

* Test lock timeout in ingestion
This commit is contained in:
Adrian Gruntkowski 2024-09-16 12:36:53 +02:00 committed by GitHub
parent 9f1017e2b4
commit ae0c5a173d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 131 additions and 76 deletions

View File

@ -113,11 +113,18 @@ defmodule Plausible.Cache.Adapter do
[]
end
@spec with_lock!(atom(), any(), pos_integer(), (-> result)) :: result when result: any()
def with_lock!(cache_name, key, timeout, fun) do
ConCache.isolated(cache_name, key, timeout, fun)
@spec with_lock(atom(), any(), pos_integer(), (-> result)) :: {:ok, result} | {:error, :timeout}
when result: any()
def with_lock(cache_name, key, timeout, fun) do
result = ConCache.isolated(cache_name, key, timeout, fun)
{:ok, result}
catch
:exit, {:timeout, _} ->
Logger.error("Timeout while executing with lock on key in '#{inspect(cache_name)}'")
Sentry.capture_message(
"Timeout while executing with lock on key in '#{inspect(cache_name)}'",
extra: %{key: key}
)
{:error, :timeout}
end
end

View File

@ -48,8 +48,8 @@ defmodule Plausible.Ingestion.Event do
changeset: %Ecto.Changeset{}
}
@spec build_and_buffer(Request.t()) :: {:ok, %{buffered: [t()], dropped: [t()]}}
def build_and_buffer(%Request{domains: domains} = request) do
@spec build_and_buffer(Request.t(), Keyword.t()) :: {:ok, %{buffered: [t()], dropped: [t()]}}
def build_and_buffer(%Request{domains: domains} = request, context \\ []) do
processed_events =
if spam_referrer?(request) do
for domain <- domains, do: drop(new(domain, request), :spam_referrer)
@ -60,7 +60,7 @@ defmodule Plausible.Ingestion.Event do
processed =
domain
|> new(site, request)
|> process_unless_dropped(pipeline())
|> process_unless_dropped(pipeline(), context)
[processed | acc]
@ -107,39 +107,39 @@ defmodule Plausible.Ingestion.Event do
defp pipeline() do
[
drop_verification_agent: &drop_verification_agent/1,
drop_datacenter_ip: &drop_datacenter_ip/1,
drop_shield_rule_hostname: &drop_shield_rule_hostname/1,
drop_shield_rule_page: &drop_shield_rule_page/1,
drop_shield_rule_ip: &drop_shield_rule_ip/1,
put_geolocation: &put_geolocation/1,
drop_shield_rule_country: &drop_shield_rule_country/1,
put_user_agent: &put_user_agent/1,
put_basic_info: &put_basic_info/1,
put_referrer: &put_referrer/1,
put_utm_tags: &put_utm_tags/1,
put_props: &put_props/1,
put_revenue: &put_revenue/1,
put_salts: &put_salts/1,
put_user_id: &put_user_id/1,
validate_clickhouse_event: &validate_clickhouse_event/1,
register_session: &register_session/1,
write_to_buffer: &write_to_buffer/1
drop_verification_agent: &drop_verification_agent/2,
drop_datacenter_ip: &drop_datacenter_ip/2,
drop_shield_rule_hostname: &drop_shield_rule_hostname/2,
drop_shield_rule_page: &drop_shield_rule_page/2,
drop_shield_rule_ip: &drop_shield_rule_ip/2,
put_geolocation: &put_geolocation/2,
drop_shield_rule_country: &drop_shield_rule_country/2,
put_user_agent: &put_user_agent/2,
put_basic_info: &put_basic_info/2,
put_referrer: &put_referrer/2,
put_utm_tags: &put_utm_tags/2,
put_props: &put_props/2,
put_revenue: &put_revenue/2,
put_salts: &put_salts/2,
put_user_id: &put_user_id/2,
validate_clickhouse_event: &validate_clickhouse_event/2,
register_session: &register_session/2,
write_to_buffer: &write_to_buffer/2
]
end
defp process_unless_dropped(%__MODULE__{} = initial_event, pipeline) do
defp process_unless_dropped(%__MODULE__{} = initial_event, pipeline, context) do
Enum.reduce_while(pipeline, initial_event, fn {step_name, step_fn}, acc_event ->
Plausible.PromEx.Plugins.PlausibleMetrics.measure_duration(
telemetry_pipeline_step_duration(),
fn -> execute_step(step_fn, acc_event) end,
fn -> execute_step(step_fn, acc_event, context) end,
%{step: "#{step_name}"}
)
end)
end
defp execute_step(step_fn, acc_event) do
case step_fn.(acc_event) do
defp execute_step(step_fn, acc_event, context) do
case step_fn.(acc_event, context) do
%__MODULE__{dropped?: true} = dropped -> {:halt, dropped}
%__MODULE__{dropped?: false} = event -> {:cont, event}
end
@ -171,7 +171,7 @@ defmodule Plausible.Ingestion.Event do
struct!(event, clickhouse_session_attrs: Map.merge(event.clickhouse_session_attrs, attrs))
end
defp drop_verification_agent(%__MODULE__{} = event) do
defp drop_verification_agent(%__MODULE__{} = event, _context) do
case event.request.user_agent do
@verification_user_agent ->
drop(event, :verification_agent)
@ -181,7 +181,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp drop_datacenter_ip(%__MODULE__{} = event) do
defp drop_datacenter_ip(%__MODULE__{} = event, _context) do
case event.request.ip_classification do
"dc_ip" ->
drop(event, :dc_ip)
@ -191,7 +191,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp drop_shield_rule_ip(%__MODULE__{} = event) do
defp drop_shield_rule_ip(%__MODULE__{} = event, _context) do
if Plausible.Shields.ip_blocked?(event.domain, event.request.remote_ip) do
drop(event, :site_ip_blocklist)
else
@ -199,7 +199,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp drop_shield_rule_hostname(%__MODULE__{} = event) do
defp drop_shield_rule_hostname(%__MODULE__{} = event, _context) do
if Plausible.Shields.hostname_allowed?(event.domain, event.request.hostname) do
event
else
@ -207,7 +207,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp drop_shield_rule_page(%__MODULE__{} = event) do
defp drop_shield_rule_page(%__MODULE__{} = event, _context) do
if Plausible.Shields.page_blocked?(event.domain, event.request.pathname) do
drop(event, :site_page_blocklist)
else
@ -215,7 +215,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp put_user_agent(%__MODULE__{} = event) do
defp put_user_agent(%__MODULE__{} = event, _context) do
case parse_user_agent(event.request) do
%UAInspector.Result{client: %UAInspector.Result.Client{name: "Headless Chrome"}} ->
drop(event, :bot)
@ -237,7 +237,7 @@ defmodule Plausible.Ingestion.Event do
end
end
defp put_basic_info(%__MODULE__{} = event) do
defp put_basic_info(%__MODULE__{} = event, _context) do
update_event_attrs(event, %{
domain: event.domain,
site_id: event.site.id,
@ -248,7 +248,7 @@ defmodule Plausible.Ingestion.Event do
})
end
defp put_referrer(%__MODULE__{} = event) do
defp put_referrer(%__MODULE__{} = event, _context) do
ref = parse_referrer(event.request.uri, event.request.referrer)
source = get_referrer_source(event.request, ref)
channel = Plausible.Ingestion.Acquisition.get_channel(event.request, source)
@ -260,7 +260,7 @@ defmodule Plausible.Ingestion.Event do
})
end
defp put_utm_tags(%__MODULE__{} = event) do
defp put_utm_tags(%__MODULE__{} = event, _context) do
query_params = event.request.query_params
update_session_attrs(event, %{
@ -272,7 +272,7 @@ defmodule Plausible.Ingestion.Event do
})
end
defp put_geolocation(%__MODULE__{} = event) do
defp put_geolocation(%__MODULE__{} = event, _context) do
case event.request.ip_classification do
"anonymous_vpn_ip" ->
update_session_attrs(event, %{country_code: "A1"})
@ -284,7 +284,8 @@ defmodule Plausible.Ingestion.Event do
end
defp drop_shield_rule_country(
%__MODULE__{domain: domain, clickhouse_session_attrs: %{country_code: cc}} = event
%__MODULE__{domain: domain, clickhouse_session_attrs: %{country_code: cc}} = event,
_context
)
when is_binary(domain) and is_binary(cc) do
if Plausible.Shields.country_blocked?(domain, cc) do
@ -294,9 +295,9 @@ defmodule Plausible.Ingestion.Event do
end
end
defp drop_shield_rule_country(%__MODULE__{} = event), do: event
defp drop_shield_rule_country(%__MODULE__{} = event, _context), do: event
defp put_props(%__MODULE__{request: %{props: %{} = props}} = event) do
defp put_props(%__MODULE__{request: %{props: %{} = props}} = event, _context) do
# defensive: ensuring the keys/values are always in the same order
{keys, values} = Enum.unzip(props)
@ -306,9 +307,9 @@ defmodule Plausible.Ingestion.Event do
})
end
defp put_props(%__MODULE__{} = event), do: event
defp put_props(%__MODULE__{} = event, _context), do: event
defp put_revenue(event) do
defp put_revenue(event, _context) do
on_ee do
attrs = Plausible.Ingestion.Event.Revenue.get_revenue_attrs(event)
update_event_attrs(event, attrs)
@ -317,11 +318,11 @@ defmodule Plausible.Ingestion.Event do
end
end
defp put_salts(%__MODULE__{} = event) do
defp put_salts(%__MODULE__{} = event, _context) do
%{event | salts: Plausible.Session.Salts.fetch()}
end
defp put_user_id(%__MODULE__{} = event) do
defp put_user_id(%__MODULE__{} = event, _context) do
update_event_attrs(event, %{
user_id:
generate_user_id(
@ -333,7 +334,7 @@ defmodule Plausible.Ingestion.Event do
})
end
defp validate_clickhouse_event(%__MODULE__{} = event) do
defp validate_clickhouse_event(%__MODULE__{} = event, _context) do
clickhouse_event =
event
|> Map.fetch!(:clickhouse_event_attrs)
@ -348,7 +349,10 @@ defmodule Plausible.Ingestion.Event do
end
end
defp register_session(%__MODULE__{} = event) do
defp register_session(%__MODULE__{} = event, context) do
write_buffer_insert =
Keyword.get(context, :session_write_buffer_insert, &Plausible.Session.WriteBuffer.insert/1)
previous_user_id =
generate_user_id(
event.request,
@ -357,20 +361,27 @@ defmodule Plausible.Ingestion.Event do
event.salts.previous
)
session =
session_result =
Plausible.Session.CacheStore.on_event(
event.clickhouse_event,
event.clickhouse_session_attrs,
previous_user_id
previous_user_id,
write_buffer_insert
)
%{
event
| clickhouse_event: ClickhouseEventV2.merge_session(event.clickhouse_event, session)
}
case session_result do
{:ok, session} ->
%{
event
| clickhouse_event: ClickhouseEventV2.merge_session(event.clickhouse_event, session)
}
{:error, :timeout} ->
drop(event, :lock_timeout)
end
end
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event) do
defp write_to_buffer(%__MODULE__{clickhouse_event: clickhouse_event} = event, _context) do
{:ok, _} = Plausible.Event.WriteBuffer.insert(clickhouse_event)
emit_telemetry_buffered(event)
event

View File

@ -11,7 +11,7 @@ defmodule Plausible.Session.CacheStore do
def on_event(event, session_attributes, prev_user_id, buffer_insert \\ &WriteBuffer.insert/1) do
lock_requested_at = System.monotonic_time()
Plausible.Cache.Adapter.with_lock!(
Plausible.Cache.Adapter.with_lock(
:sessions,
{event.site_id, event.user_id},
@lock_timeout,

View File

@ -278,6 +278,46 @@ defmodule Plausible.Ingestion.EventTest do
assert dropped.drop_reason == :payment_required
end
@tag :slow
test "drops events on session lock timeout" do
site = insert(:site)
very_slow_buffer = fn sessions ->
Process.sleep(1000)
Plausible.Session.WriteBuffer.insert(sessions)
end
first_conn =
build_conn(:post, "/api/events", %{
name: "pageview",
url: "http://dummy.site",
d: "#{site.domain}"
})
assert {:ok, first_request} = Request.build(first_conn)
second_conn =
build_conn(:post, "/api/events", %{
name: "page_scrolled",
url: "http://dummy.site",
d: "#{site.domain}"
})
assert {:ok, second_request} = Request.build(second_conn)
Task.start(fn ->
assert {:ok, %{buffered: [_event], dropped: []}} =
Event.build_and_buffer(first_request,
session_write_buffer_insert: very_slow_buffer
)
end)
Process.sleep(100)
assert {:ok, %{buffered: [], dropped: [dropped]}} = Event.build_and_buffer(second_request)
assert dropped.drop_reason == :lock_timeout
end
@tag :ee_only
test "saves revenue amount" do
site = insert(:site)

View File

@ -1,8 +1,6 @@
defmodule Plausible.Session.CacheStoreTest do
use Plausible.DataCase
import ExUnit.CaptureLog
alias Plausible.Session.CacheStore
setup do
@ -156,9 +154,7 @@ defmodule Plausible.Session.CacheStoreTest do
CacheStore.on_event(event3, session_params, nil, buffer)
end)
capture_log(fn ->
Task.await_many([async1, async2, async3])
end) =~ "Timeout while executing with lock on key in ':sessions'"
Task.await_many([async1, async2, async3])
assert_receive({:very_slow_buffer, :insert, [[_session]]})
refute_receive({:buffer, :insert, [[_updated_session]]})

View File

@ -4,7 +4,7 @@ defmodule PlausibleWeb.Api.ExternalStatsController.AggregateTest do
alias Plausible.Billing.Feature
setup [:create_user, :create_new_site, :create_api_key, :use_api_key]
@user_id 123
@user_id Enum.random(1000..9999)
describe "feature access" do
test "cannot filter by a custom prop without access to the props feature", %{

View File

@ -2,7 +2,7 @@ defmodule PlausibleWeb.Api.ExternalStatsController.BreakdownTest do
use PlausibleWeb.ConnCase
alias Plausible.Billing.Feature
@user_id 1231
@user_id Enum.random(1000..9999)
setup [:create_user, :create_new_site, :create_api_key, :use_api_key]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.ExternalStatsController.QueryGoalDimensionTest do
use PlausibleWeb.ConnCase
@user_id 1231
@user_id Enum.random(1000..9999)
setup [:create_user, :create_new_site, :create_api_key, :use_api_key]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.ExternalStatsController.QueryImportedTest do
use PlausibleWeb.ConnCase
@user_id 1231
@user_id Enum.random(1000..9999)
setup [:create_user, :create_new_site, :create_api_key, :use_api_key]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.ExternalStatsController.QueryTest do
use PlausibleWeb.ConnCase
@user_id 1231
@user_id Enum.random(1000..9999)
setup [:create_user, :create_new_site, :create_api_key, :use_api_key]

View File

@ -114,7 +114,8 @@ defmodule PlausibleWeb.Api.ExternalStatsController.TimeseriesTest do
end
end
@user_id 123
@user_id Enum.random(1000..9999)
test "shows hourly data for a certain date", %{conn: conn, site: site} do
populate_stats(site, [
build(:pageview, user_id: @user_id, timestamp: ~N[2021-01-01 00:00:00]),

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.StatsController.ConversionsTest do
use PlausibleWeb.ConnCase
@user_id 123
@user_id Enum.random(1000..9999)
describe "GET /api/stats/:domain/conversions" do
setup [:create_user, :log_in, :create_new_site]

View File

@ -4,8 +4,8 @@ defmodule PlausibleWeb.Api.StatsController.FunnelsTest do
@moduletag :ee_only
on_ee do
@user_id 123
@other_user_id 456
@user_id Enum.random(1000..9999)
@other_user_id @user_id + 1
@build_funnel_with [
{"page_path", "/blog/announcement"},

View File

@ -2,7 +2,7 @@ defmodule PlausibleWeb.Api.StatsController.ImportedTest do
use PlausibleWeb.ConnCase
use Timex
@user_id 123
@user_id Enum.random(1000..9999)
defp import_data(ga_data, site_id, import_id, table_name) do
ga_data

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.StatsController.MainGraphTest do
use PlausibleWeb.ConnCase
@user_id 123
@user_id Enum.random(1000..9999)
describe "GET /api/stats/main-graph - plot" do
setup [:create_user, :log_in, :create_new_site, :create_legacy_site_import]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.StatsController.PagesTest do
use PlausibleWeb.ConnCase
@user_id 123
@user_id Enum.random(1000..9999)
describe "GET /api/stats/:domain/pages" do
setup [:create_user, :log_in, :create_new_site, :create_legacy_site_import]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.StatsController.SourcesTest do
use PlausibleWeb.ConnCase
@user_id 123
@user_id Enum.random(1000..9999)
describe "GET /api/stats/:domain/sources" do
setup [:create_user, :log_in, :create_new_site, :create_legacy_site_import]

View File

@ -1,7 +1,7 @@
defmodule PlausibleWeb.Api.StatsController.TopStatsTest do
use PlausibleWeb.ConnCase
@user_id 123
@user_id Enum.random(1000..9999)
describe "GET /api/stats/top-stats - default" do
setup [:create_user, :log_in, :create_new_site]

View File

@ -182,7 +182,7 @@ defmodule Plausible.TestUtils do
defp populate_native_stats(events) do
for event_params <- events do
session = Plausible.Session.CacheStore.on_event(event_params, event_params, nil)
{:ok, session} = Plausible.Session.CacheStore.on_event(event_params, event_params, nil)
event_params
|> Plausible.ClickhouseEventV2.merge_session(session)