diff --git a/CHANGELOG.md b/CHANGELOG.md index 141640ffdc..516e76ea98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,16 +16,19 @@ All notable changes to this project will be documented in this file. - Add support for JSON logger, via LOG_FORMAT=json environment variable - Add support for 2FA authentication - Add 'browser_versions.csv' to CSV export +- Add `CLICKHOUSE_MAX_BUFFER_SIZE_BYTES` env var which defaults to `100000` (100KB) ### Removed - Removed the nested custom event property breakdown UI when filtering by a goal in Goal Conversions - Removed the `prop-breakdown.csv` file from CSV export +- Deprecated `CLICKHOUSE_MAX_BUFFER_SIZE` ### Changed - Limit the number of Goal Conversions shown on the dashboard and render a "Details" link when there are more entries to show - Show Outbound Links / File Downloads / 404 Pages / Cloaked Links instead of Goal Conversions when filtering by the corresponding goal - Require custom properties to be explicitly added from Site Settings > Custom Properties in order for them to show up on the dashboard - GA/SC sections moved to new settings: Integrations +- Replace `CLICKHOUSE_MAX_BUFFER_SIZE` with `CLICKHOUSE_MAX_BUFFER_SIZE_BYTES` ### Fixed - Only return `(none)` values in custom property breakdown for the first page (pagination) of results diff --git a/config/runtime.exs b/config/runtime.exs index 27f1b54137..7339e64ec3 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -140,9 +140,15 @@ ch_db_url = |> get_var_from_path_or_env("CLICKHOUSE_FLUSH_INTERVAL_MS", "5000") |> Integer.parse() +if get_var_from_path_or_env(config_dir, "CLICKHOUSE_MAX_BUFFER_SIZE") do + Logger.warning( + "CLICKHOUSE_MAX_BUFFER_SIZE is deprecated, please use CLICKHOUSE_MAX_BUFFER_SIZE_BYTES instead" + ) +end + {ch_max_buffer_size, ""} = config_dir - |> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE", "10000") + |> get_var_from_path_or_env("CLICKHOUSE_MAX_BUFFER_SIZE_BYTES", "100000") |> Integer.parse() # Can be generated with `Base.encode64(:crypto.strong_rand_bytes(32))` from diff --git a/lib/plausible/application.ex b/lib/plausible/application.ex index b9d58c3e5c..886993d12b 100644 --- a/lib/plausible/application.ex +++ b/lib/plausible/application.ex @@ -17,8 +17,8 @@ defmodule Plausible.Application do {Finch, name: Plausible.Finch, pools: finch_pool_config()}, {Phoenix.PubSub, name: Plausible.PubSub}, Plausible.Session.Salts, - Plausible.Event.WriteBuffer, - Plausible.Session.WriteBuffer, + Supervisor.child_spec(Plausible.Event.WriteBuffer, id: Plausible.Event.WriteBuffer), + Supervisor.child_spec(Plausible.Session.WriteBuffer, id: Plausible.Session.WriteBuffer), ReferrerBlocklist, Supervisor.child_spec({Cachex, name: :user_agents, limit: 10_000, stats: true}, id: :cachex_user_agents diff --git a/lib/plausible/event/write_buffer.ex b/lib/plausible/event/write_buffer.ex index 37a9cd39b0..3966b5a3f7 100644 --- a/lib/plausible/event/write_buffer.ex +++ b/lib/plausible/event/write_buffer.ex @@ -1,79 +1,38 @@ defmodule Plausible.Event.WriteBuffer do - use GenServer - require Logger + @moduledoc false - alias Plausible.IngestRepo + %{ + header: header, + insert_sql: insert_sql, + insert_opts: insert_opts, + fields: fields, + encoding_types: encoding_types + } = + Plausible.Ingestion.WriteBuffer.compile_time_prepare(Plausible.ClickhouseEventV2) - def start_link(_opts) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) - end + def child_spec(opts) do + opts = + Keyword.merge(opts, + name: __MODULE__, + header: unquote(header), + insert_sql: unquote(insert_sql), + insert_opts: unquote(insert_opts) + ) - def init(buffer) do - Process.flag(:trap_exit, true) - timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:ok, %{buffer: buffer, timer: timer}} + Plausible.Ingestion.WriteBuffer.child_spec(opts) end def insert(event) do - GenServer.cast(__MODULE__, {:insert, event}) + row_binary = + [Enum.map(unquote(fields), fn field -> Map.fetch!(event, field) end)] + |> Ch.RowBinary._encode_rows(unquote(encoding_types)) + |> IO.iodata_to_binary() + + :ok = Plausible.Ingestion.WriteBuffer.insert(__MODULE__, row_binary) {:ok, event} end - def flush() do - GenServer.call(__MODULE__, :flush, :infinity) - :ok - end - - def handle_cast({:insert, event}, %{buffer: buffer} = state) do - new_buffer = [event | buffer] - - if length(new_buffer) >= max_buffer_size() do - Logger.info("Buffer full, flushing to disk") - Process.cancel_timer(state[:timer]) - do_flush(new_buffer) - new_timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:noreply, %{buffer: [], timer: new_timer}} - else - {:noreply, %{state | buffer: new_buffer}} - end - end - - def handle_info(:tick, %{buffer: buffer}) do - do_flush(buffer) - timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:noreply, %{buffer: [], timer: timer}} - end - - def handle_call(:flush, _from, %{buffer: buffer} = state) do - Process.cancel_timer(state[:timer]) - do_flush(buffer) - new_timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:reply, nil, %{buffer: [], timer: new_timer}} - end - - def terminate(_reason, %{buffer: buffer}) do - Logger.info("Flushing event buffer before shutdown...") - do_flush(buffer) - end - - defp do_flush(buffer) do - case buffer do - [] -> - nil - - events -> - Logger.info("Flushing #{length(events)} events") - events = Enum.map(events, &(Map.from_struct(&1) |> Map.delete(:__meta__))) - - IngestRepo.insert_all(Plausible.ClickhouseEventV2, events) - end - end - - defp flush_interval_ms() do - Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms) - end - - defp max_buffer_size() do - Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size) + def flush do + Plausible.Ingestion.WriteBuffer.flush(__MODULE__) end end diff --git a/lib/plausible/ingestion/write_buffer.ex b/lib/plausible/ingestion/write_buffer.ex new file mode 100644 index 0000000000..e62580cd5b --- /dev/null +++ b/lib/plausible/ingestion/write_buffer.ex @@ -0,0 +1,149 @@ +defmodule Plausible.Ingestion.WriteBuffer do + @moduledoc false + use GenServer + require Logger + + alias Plausible.IngestRepo + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name)) + end + + def insert(server, row_binary) do + GenServer.cast(server, {:insert, row_binary}) + end + + def flush(server) do + GenServer.call(server, :flush, :infinity) + end + + @impl true + def init(opts) do + buffer = opts[:buffer] || [] + max_buffer_size = opts[:max_buffer_size] || default_max_buffer_size() + flush_interval_ms = opts[:flush_interval_ms] || default_flush_interval_ms() + + Process.flag(:trap_exit, true) + timer = Process.send_after(self(), :tick, flush_interval_ms) + + {:ok, + %{ + buffer: buffer, + timer: timer, + name: Keyword.fetch!(opts, :name), + insert_sql: Keyword.fetch!(opts, :insert_sql), + insert_opts: Keyword.fetch!(opts, :insert_opts), + header: Keyword.fetch!(opts, :header), + buffer_size: IO.iodata_length(buffer), + max_buffer_size: max_buffer_size, + flush_interval_ms: flush_interval_ms + }} + end + + @impl true + def handle_cast({:insert, row_binary}, state) do + state = %{ + state + | buffer: [state.buffer | row_binary], + buffer_size: state.buffer_size + IO.iodata_length(row_binary) + } + + if state.buffer_size >= state.max_buffer_size do + Logger.info("#{state.name} buffer full, flushing to ClickHouse") + Process.cancel_timer(state.timer) + do_flush(state) + new_timer = Process.send_after(self(), :tick, state.flush_interval_ms) + {:noreply, %{state | buffer: [], timer: new_timer, buffer_size: 0}} + else + {:noreply, state} + end + end + + @impl true + def handle_info(:tick, state) do + do_flush(state) + timer = Process.send_after(self(), :tick, state.flush_interval_ms) + {:noreply, %{state | buffer: [], buffer_size: 0, timer: timer}} + end + + @impl true + def handle_call(:flush, _from, state) do + %{timer: timer, flush_interval_ms: flush_interval_ms} = state + Process.cancel_timer(timer) + do_flush(state) + new_timer = Process.send_after(self(), :tick, flush_interval_ms) + {:reply, :ok, %{state | buffer: [], buffer_size: 0, timer: new_timer}} + end + + @impl true + def terminate(_reason, %{name: name} = state) do + Logger.info("Flushing #{name} buffer before shutdown...") + do_flush(state) + end + + defp do_flush(state) do + %{ + buffer: buffer, + buffer_size: buffer_size, + insert_opts: insert_opts, + insert_sql: insert_sql, + header: header, + name: name + } = state + + case buffer do + [] -> + nil + + _not_empty -> + Logger.info("Flushing #{buffer_size} byte(s) RowBinary from #{name}") + IngestRepo.query!(insert_sql, [header | buffer], insert_opts) + end + end + + defp default_flush_interval_ms do + Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms) + end + + defp default_max_buffer_size do + Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size) + end + + @doc false + def compile_time_prepare(schema) do + fields = schema.__schema__(:fields) + + types = + Enum.map(fields, fn field -> + type = schema.__schema__(:type, field) || raise "missing type for #{field}" + + type + |> Ecto.Type.type() + |> Ecto.Adapters.ClickHouse.Schema.remap_type(schema, field) + end) + + encoding_types = Ch.RowBinary.encoding_types(types) + + header = + fields + |> Enum.map(&to_string/1) + |> Ch.RowBinary.encode_names_and_types(types) + |> IO.iodata_to_binary() + + insert_sql = "INSERT INTO #{schema.__schema__(:source)} FORMAT RowBinaryWithNamesAndTypes" + + %{ + fields: fields, + types: types, + encoding_types: encoding_types, + header: header, + insert_sql: insert_sql, + insert_opts: [ + command: :insert, + encode: false, + source: schema.__schema__(:source), + cast_params: [] + ] + } + end +end diff --git a/lib/plausible/session/cache_store.ex b/lib/plausible/session/cache_store.ex index 3350936ffa..b2eeffc1bc 100644 --- a/lib/plausible/session/cache_store.ex +++ b/lib/plausible/session/cache_store.ex @@ -8,7 +8,7 @@ defmodule Plausible.Session.CacheStore do session = if found_session do updated_session = update_session(found_session, event) - buffer.insert([%{updated_session | sign: 1}, %{found_session | sign: -1}]) + buffer.insert([%{found_session | sign: -1}, %{updated_session | sign: 1}]) persist_session(updated_session) else new_session = new_session_from_event(event) diff --git a/lib/plausible/session/write_buffer.ex b/lib/plausible/session/write_buffer.ex index 770fc965c1..1adcbbb67d 100644 --- a/lib/plausible/session/write_buffer.ex +++ b/lib/plausible/session/write_buffer.ex @@ -1,83 +1,43 @@ defmodule Plausible.Session.WriteBuffer do - use GenServer - require Logger + @moduledoc false - alias Plausible.IngestRepo + %{ + header: header, + insert_sql: insert_sql, + insert_opts: insert_opts, + fields: fields, + encoding_types: encoding_types + } = + Plausible.Ingestion.WriteBuffer.compile_time_prepare(Plausible.ClickhouseSessionV2) - def start_link(_opts) do - GenServer.start_link(__MODULE__, [], name: __MODULE__) - end + def child_spec(opts) do + opts = + Keyword.merge(opts, + name: __MODULE__, + header: unquote(header), + insert_sql: unquote(insert_sql), + insert_opts: unquote(insert_opts) + ) - def init(buffer) do - Process.flag(:trap_exit, true) - timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:ok, %{buffer: buffer, timer: timer}} + Plausible.Ingestion.WriteBuffer.child_spec(opts) end def insert(sessions) do - GenServer.cast(__MODULE__, {:insert, sessions}) + row_binary = + sessions + |> Enum.map(fn %{is_bounce: is_bounce} = session -> + {:ok, is_bounce} = Plausible.ClickhouseSessionV2.BoolUInt8.dump(is_bounce) + session = %{session | is_bounce: is_bounce} + Enum.map(unquote(fields), fn field -> Map.fetch!(session, field) end) + end) + |> Ch.RowBinary._encode_rows(unquote(encoding_types)) + |> IO.iodata_to_binary() + + :ok = Plausible.Ingestion.WriteBuffer.insert(__MODULE__, row_binary) {:ok, sessions} end - def flush() do - GenServer.call(__MODULE__, :flush, :infinity) - :ok - end - - def handle_cast({:insert, sessions}, %{buffer: buffer} = state) do - new_buffer = sessions ++ buffer - - if length(new_buffer) >= max_buffer_size() do - Logger.info("Buffer full, flushing to disk") - Process.cancel_timer(state[:timer]) - do_flush(new_buffer) - new_timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:noreply, %{buffer: [], timer: new_timer}} - else - {:noreply, %{state | buffer: new_buffer}} - end - end - - def handle_info(:tick, %{buffer: buffer}) do - do_flush(buffer) - timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:noreply, %{buffer: [], timer: timer}} - end - - def handle_call(:flush, _from, %{buffer: buffer} = state) do - Process.cancel_timer(state[:timer]) - do_flush(buffer) - new_timer = Process.send_after(self(), :tick, flush_interval_ms()) - {:reply, nil, %{buffer: [], timer: new_timer}} - end - - def terminate(_reason, %{buffer: buffer}) do - Logger.info("Flushing session buffer before shutdown...") - do_flush(buffer) - end - - defp do_flush(buffer) do - case buffer do - [] -> - nil - - sessions -> - Logger.info("Flushing #{length(sessions)} sessions") - - sessions = - sessions - |> Enum.map(&(Map.from_struct(&1) |> Map.delete(:__meta__))) - |> Enum.reverse() - - IngestRepo.insert_all(Plausible.ClickhouseSessionV2, sessions) - end - end - - defp flush_interval_ms() do - Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :flush_interval_ms) - end - - defp max_buffer_size() do - Keyword.fetch!(Application.get_env(:plausible, IngestRepo), :max_buffer_size) + def flush do + Plausible.Ingestion.WriteBuffer.flush(__MODULE__) end end diff --git a/test/plausible/session/cache_store_test.exs b/test/plausible/session/cache_store_test.exs index aa4a43798f..555aedd37d 100644 --- a/test/plausible/session/cache_store_test.exs +++ b/test/plausible/session/cache_store_test.exs @@ -88,7 +88,7 @@ defmodule Plausible.Session.CacheStoreTest do CacheStore.on_event(event1, nil, buffer) CacheStore.on_event(event2, nil, buffer) - assert_receive({WriteBuffer, :insert, [[session, _negative_record]]}) + assert_receive({WriteBuffer, :insert, [[_negative_record, session]]}) assert session.is_bounce == false assert session.duration == 10 assert session.pageviews == 2 @@ -113,7 +113,7 @@ defmodule Plausible.Session.CacheStoreTest do CacheStore.on_event(event1, nil, buffer) CacheStore.on_event(event2, nil, buffer) - assert_receive({WriteBuffer, :insert, [[session, _negative_record]]}) + assert_receive({WriteBuffer, :insert, [[_negative_record, session]]}) assert session.duration == 10 end