analytics/lib/plausible/clickhouse_repo.ex

59 lines
1.5 KiB
Elixir

defmodule Plausible.ClickhouseRepo do
use Plausible
use Ecto.Repo,
otp_app: :plausible,
adapter: Ecto.Adapters.ClickHouse,
read_only: true
defmacro __using__(_) do
quote do
alias Plausible.ClickhouseRepo
import Ecto
import Ecto.Query, only: [from: 1, from: 2]
end
end
@task_timeout 60_000
def parallel_tasks(queries, opts \\ []) do
ctx = OpenTelemetry.Ctx.get_current()
execute_with_tracing = fn fun ->
OpenTelemetry.Ctx.attach(ctx)
fun.()
end
max_concurrency = Keyword.get(opts, :max_concurrency, 3)
task_timeout =
on_ee do
@task_timeout
else
# Quadruple the repo timeout to ensure the task doesn't timeout before db_connection does.
# This maintains the default ratio (@task_timeout / default_timeout = 60_000 / 15_000 = 4).
ch_timeout = Keyword.fetch!(config(), :timeout)
max(ch_timeout * 4, @task_timeout)
end
Task.async_stream(queries, execute_with_tracing,
max_concurrency: max_concurrency,
timeout: task_timeout
)
|> Enum.to_list()
|> Keyword.values()
end
@impl true
def prepare_query(_operation, query, opts) do
{plausible_query, opts} = Keyword.pop(opts, :query)
log_comment = if(plausible_query, do: Jason.encode!(plausible_query.debug_metadata), else: "")
opts =
Keyword.update(opts, :settings, [log_comment: log_comment], fn settings ->
[{:log_comment, log_comment} | settings]
end)
{query, opts}
end
end