analytics/lib/plausible/stats/sql/query_builder.ex

287 lines
8.2 KiB
Elixir

defmodule Plausible.Stats.SQL.QueryBuilder do
@moduledoc false
use Plausible
use Plausible.Stats.SQL.Fragments
import Ecto.Query
import Plausible.Stats.Imported
import Plausible.Stats.Util
alias Plausible.Stats.{Query, QueryOptimizer, TableDecider, SQL}
alias Plausible.Stats.SQL.Expression
alias Plausible.Stats.Legacy.TimeOnPage
require Plausible.Stats.SQL.Expression
def build(query, site) do
query
|> QueryOptimizer.split()
|> Enum.map(fn {table_type, table_query} ->
q = build_table_query(table_type, site, table_query)
{table_type, table_query, q}
end)
|> join_query_results(query)
|> build_order_by(query)
|> paginate(query.pagination)
|> select_total_rows(query.include.total_rows)
end
def build_order_by(q, query) do
Enum.reduce(query.order_by || [], q, &build_order_by(&2, query, &1))
end
defp build_table_query(:events, site, events_query) do
q =
from(
e in "events_v2",
where: ^SQL.WhereBuilder.build(:events, events_query),
select: ^select_event_metrics(events_query)
)
on_ee do
q = Plausible.Stats.Sampling.add_query_hint(q, events_query)
end
q
|> join_sessions_if_needed(events_query)
|> build_group_by(:events, events_query)
|> merge_imported(site, events_query)
|> SQL.SpecialMetrics.add(site, events_query)
|> TimeOnPage.merge_legacy_time_on_page(events_query)
end
defp build_table_query(:sessions, site, sessions_query) do
q =
from(
e in "sessions_v2",
where: ^SQL.WhereBuilder.build(:sessions, sessions_query),
select: ^select_session_metrics(sessions_query)
)
on_ee do
q = Plausible.Stats.Sampling.add_query_hint(q, sessions_query)
end
q
|> join_events_if_needed(sessions_query)
|> build_group_by(:sessions, sessions_query)
|> merge_imported(site, sessions_query)
|> SQL.SpecialMetrics.add(site, sessions_query)
end
defp join_sessions_if_needed(q, query) do
if TableDecider.events_join_sessions?(query) do
%{session: dimensions} = TableDecider.partition_dimensions(query)
sessions_q =
from(
s in "sessions_v2",
where: ^SQL.WhereBuilder.build(:sessions, query),
where: s.sign == 1,
select: %{session_id: s.session_id},
group_by: s.session_id
)
# The session-only dimension columns are explicitly selected in joined
# sessions table. This enables combining session-only dimensions (entry
# and exit pages) with event-only metrics, like revenue.
sessions_q =
Enum.reduce(dimensions, sessions_q, fn dimension, acc ->
Plausible.Stats.SQL.Expression.select_dimension_internal(acc, dimension)
end)
on_ee do
sessions_q = Plausible.Stats.Sampling.add_query_hint(sessions_q, query)
end
from(
e in q,
join: sq in subquery(sessions_q),
on: e.session_id == sq.session_id
)
else
q
end
end
def join_events_if_needed(q, query) do
if TableDecider.sessions_join_events?(query) do
events_q =
from(e in "events_v2",
where: ^SQL.WhereBuilder.build(:events, query),
select: %{
session_id: fragment("DISTINCT ?", e.session_id),
_sample_factor: fragment("_sample_factor")
}
)
on_ee do
events_q = Plausible.Stats.Sampling.add_query_hint(events_q, query)
end
from(s in q,
join: e in subquery(events_q),
on: s.session_id == e.session_id
)
else
q
end
end
defp select_event_metrics(query) do
query.metrics
|> Enum.map(&SQL.Expression.event_metric(&1, query))
|> Enum.reduce(%{}, &Map.merge/2)
end
defp select_session_metrics(query) do
query.metrics
|> Enum.map(&SQL.Expression.session_metric(&1, query))
|> Enum.reduce(%{}, &Map.merge/2)
end
def build_group_by(q, :events, query) do
# Session-only dimensions are extracted from joined sessions table
%{session: session_only_dimensions} = TableDecider.partition_dimensions(query)
event_dimensions = query.dimensions -- session_only_dimensions
q =
Enum.reduce(event_dimensions, q, &dimension_group_by(&2, :events, query, &1))
Enum.reduce(
session_only_dimensions,
q,
&dimension_group_by_join(&2, query, &1)
)
end
def build_group_by(q, :sessions, query) do
Enum.reduce(query.dimensions, q, &dimension_group_by(&2, :sessions, query, &1))
end
defp dimension_group_by_join(q, query, dimension) do
key = shortname(query, dimension)
q
|> Expression.select_dimension_from_join(key, dimension)
|> group_by([], selected_as(^key))
end
defp dimension_group_by(q, :events, query, "event:goal" = dimension) do
goal_join_data = Plausible.Stats.Goals.goal_join_data(query)
from(e in q,
join: goal in Expression.event_goal_join(goal_join_data),
hints: "ARRAY",
on: true,
select_merge: %{
^shortname(query, dimension) => fragment("?", goal)
},
group_by: goal
)
end
defp dimension_group_by(q, table, query, dimension) do
key = shortname(query, dimension)
q
|> Expression.select_dimension(key, dimension, table, query)
|> group_by([], selected_as(^key))
end
defp build_order_by(q, query, {metric_or_dimension, order_direction}) do
order_by(
q,
[t],
{
^order_direction,
selected_as(^shortname(query, metric_or_dimension))
}
)
end
# Only one table is being queried - skip joining!
defp join_query_results([{_table_type, _query, q}], _main_query), do: q
# Multiple tables: join results based on dimensions, select metrics from each and the appropriate dimensions.
defp join_query_results(queries, main_query) do
queries
|> Enum.reduce(nil, fn
{_table_type, query, q}, nil ->
from(e in subquery(q))
|> select_join_metrics(query, query.metrics)
{_table_type, query, q}, acc ->
join(acc, main_query.sql_join_type, [], s in subquery(q),
on: ^build_group_by_join(main_query)
)
|> select_join_metrics(query, query.metrics -- [:sample_percent])
end)
|> select_dimensions(main_query, queries)
end
# NOTE: Old queries do their own pagination
defp paginate(q, nil = _pagination), do: q
defp paginate(q, pagination) do
q
|> limit(^pagination.limit)
|> offset(^pagination.offset)
end
defp select_total_rows(q, false = _include_total_rows), do: q
defp select_total_rows(q, true = _include_total_rows) do
q
|> select_merge([], %{total_rows: fragment("count() over ()")})
end
def build_group_by_join(%Query{dimensions: []}), do: true
def build_group_by_join(query) do
query.dimensions
|> Enum.map(fn dim ->
dynamic([a, ..., b], field(a, ^shortname(query, dim)) == field(b, ^shortname(query, dim)))
end)
|> Enum.reduce(fn condition, acc -> dynamic([], ^acc and ^condition) end)
end
defp select_join_metrics(q, query, metrics) do
Enum.reduce(metrics, q, fn
metric, q ->
select_merge_as(q, [..., x], %{
shortname(query, metric) => field(x, ^shortname(query, metric))
})
end)
end
defp select_dimensions(q, query, queries) do
Enum.reduce(query.dimensions, q, fn dimension, q ->
case select_from(dimension, query, queries) do
:leftmost_table ->
select_merge_as(q, [x], %{
shortname(query, dimension) => field(x, ^shortname(query, dimension))
})
:rightmost_table ->
select_merge_as(q, [..., x], %{
shortname(query, dimension) => field(x, ^shortname(query, dimension))
})
end
end)
end
defp select_from(dimension, query, queries) do
smeared? = Enum.any?(queries, fn {_table_type, query, _q} -> query.smear_session_metrics end)
cond do
query.sql_join_type == :left -> :leftmost_table
# We generally select dimensions from the left-most table. Only exception is time:minute/time:hour where
# we use sessions table as smeared sessions are considered on-going during the whole period.
dimension in ["time:minute", "time:hour"] and smeared? -> :rightmost_table
true -> :leftmost_table
end
end
end