59 lines
1.5 KiB
Elixir
59 lines
1.5 KiB
Elixir
defmodule Plausible.ClickhouseRepo do
|
|
use Plausible
|
|
|
|
use Ecto.Repo,
|
|
otp_app: :plausible,
|
|
adapter: Ecto.Adapters.ClickHouse,
|
|
read_only: true
|
|
|
|
defmacro __using__(_) do
|
|
quote do
|
|
alias Plausible.ClickhouseRepo
|
|
import Ecto
|
|
import Ecto.Query, only: [from: 1, from: 2]
|
|
end
|
|
end
|
|
|
|
@task_timeout 60_000
|
|
def parallel_tasks(queries, opts \\ []) do
|
|
ctx = OpenTelemetry.Ctx.get_current()
|
|
|
|
execute_with_tracing = fn fun ->
|
|
OpenTelemetry.Ctx.attach(ctx)
|
|
fun.()
|
|
end
|
|
|
|
max_concurrency = Keyword.get(opts, :max_concurrency, 3)
|
|
|
|
task_timeout =
|
|
on_ee do
|
|
@task_timeout
|
|
else
|
|
# Quadruple the repo timeout to ensure the task doesn't timeout before db_connection does.
|
|
# This maintains the default ratio (@task_timeout / default_timeout = 60_000 / 15_000 = 4).
|
|
ch_timeout = Keyword.fetch!(config(), :timeout)
|
|
max(ch_timeout * 4, @task_timeout)
|
|
end
|
|
|
|
Task.async_stream(queries, execute_with_tracing,
|
|
max_concurrency: max_concurrency,
|
|
timeout: task_timeout
|
|
)
|
|
|> Enum.to_list()
|
|
|> Keyword.values()
|
|
end
|
|
|
|
@impl true
|
|
def prepare_query(_operation, query, opts) do
|
|
{plausible_query, opts} = Keyword.pop(opts, :query)
|
|
log_comment = if(plausible_query, do: Jason.encode!(plausible_query.debug_metadata), else: "")
|
|
|
|
opts =
|
|
Keyword.update(opts, :settings, [log_comment: log_comment], fn settings ->
|
|
[{:log_comment, log_comment} | settings]
|
|
end)
|
|
|
|
{query, opts}
|
|
end
|
|
end
|