analytics/lib/plausible/imported.ex

213 lines
6.6 KiB
Elixir

defmodule Plausible.Imported do
@moduledoc """
Context for managing site statistics imports.
For list of currently supported import sources see `Plausible.Imported.ImportSources`.
For more information on implementing importers, see `Plausible.Imported.Importer`.
"""
import Ecto.Query
alias Plausible.{Site, Repo, Imported}
alias Plausible.Imported.SiteImport
alias Plausible.Stats.Query
require Plausible.Imported.SiteImport
@tables [
Imported.Visitor,
Imported.Source,
Imported.Page,
Imported.EntryPage,
Imported.ExitPage,
Imported.CustomEvent,
Imported.Location,
Imported.Device,
Imported.Browser,
Imported.OperatingSystem
]
@table_names Enum.map(@tables, & &1.__schema__(:source))
# Maximum number of complete imports to account for when querying stats
@max_complete_imports 5
@spec schemas() :: [module()]
def schemas, do: @tables
@spec tables() :: [String.t()]
def tables, do: @table_names
@spec max_complete_imports() :: non_neg_integer()
def max_complete_imports() do
@max_complete_imports
end
@spec imported_custom_props() :: [String.t()]
def imported_custom_props do
# NOTE: Keep up to date with `Plausible.Props.internal_keys/1`,
# but _ignore_ unsupported keys. Currently, `search_query` is
# not supported in imported queries.
Enum.map(~w(url path), &("event:props:" <> &1))
end
@spec any_completed_imports?(Site.t()) :: boolean()
def any_completed_imports?(site) do
get_completed_imports(site) != []
end
@spec earliest_import_start_date(Site.t()) :: Date.t() | nil
def earliest_import_start_date(site) do
site
|> get_completed_imports()
|> Enum.map(& &1.start_date)
|> Enum.min(Date, fn -> nil end)
end
@spec complete_import_ids(Site.t()) :: [non_neg_integer()]
def complete_import_ids(site) do
imports = get_completed_imports(site)
has_legacy? = Enum.any?(imports, fn %{legacy: legacy?} -> legacy? end)
ids = Enum.map(imports, fn %{id: id} -> id end)
# account for legacy imports as well
if has_legacy? do
[0 | ids]
else
ids
end
end
@spec completed_imports_in_query_range(Site.t(), Query.t()) :: [SiteImport.t()]
def completed_imports_in_query_range(%Site{} = site, %Query{} = query) do
date_range = Query.date_range(query)
site
|> get_completed_imports()
|> Enum.reject(fn site_import ->
Date.after?(site_import.start_date, date_range.last) or
Date.before?(site_import.end_date, date_range.first)
end)
end
@spec get_import(Site.t(), non_neg_integer()) :: SiteImport.t() | nil
def get_import(site, import_id) do
Repo.get_by(SiteImport, id: import_id, site_id: site.id)
end
defdelegate listen(), to: Imported.Importer
@spec list_all_imports(Site.t(), atom()) :: [SiteImport.t()]
def list_all_imports(site, status \\ nil) do
from(i in SiteImport, where: i.site_id == ^site.id, order_by: [desc: i.inserted_at])
|> maybe_filter_by_status(status)
|> Repo.all()
end
@spec other_imports_in_progress?(SiteImport.t()) :: boolean()
def other_imports_in_progress?(site_import) do
Repo.exists?(
from(i in SiteImport,
where: i.site_id == ^site_import.site_id and i.id != ^site_import.id,
where: i.status in ^[SiteImport.pending(), SiteImport.importing()]
)
)
end
defp maybe_filter_by_status(query, nil), do: query
defp maybe_filter_by_status(query, status) do
where(query, [i], i.status == ^status)
end
@spec delete_imports_for_site(Site.t()) :: :ok
def delete_imports_for_site(site) do
Repo.delete_all(from i in SiteImport, where: i.site_id == ^site.id)
:ok
end
@spec clamp_dates(Site.t(), Date.t(), Date.t()) ::
{:ok, Date.t(), Date.t()} | {:error, :no_time_window}
def clamp_dates(site, start_date, end_date) do
cutoff_date = get_cutoff_date(site)
occupied_ranges = get_occupied_date_ranges(site)
clamp_dates(occupied_ranges, cutoff_date, start_date, end_date)
end
@spec clamp_dates([Date.Range.t()], Date.t(), Date.t(), Date.t()) ::
{:ok, Date.t(), Date.t()} | {:error, :no_time_window}
def clamp_dates(occupied_ranges, cutoff_date, start_date, end_date) do
end_date = Enum.min([end_date, cutoff_date], Date)
with true <- Date.diff(end_date, start_date) >= 2,
[_ | _] = free_ranges <- find_free_ranges(start_date, end_date, occupied_ranges) do
longest = Enum.max_by(free_ranges, &Date.diff(&1.last, &1.first))
{:ok, longest.first, longest.last}
else
_ -> {:error, :no_time_window}
end
end
@spec get_occupied_date_ranges(Site.t()) :: [Date.Range.t()]
def get_occupied_date_ranges(site) do
site
|> Imported.list_all_imports(Imported.SiteImport.completed())
|> Enum.reject(&(Date.diff(&1.end_date, &1.start_date) < 2))
|> Enum.map(&Date.range(&1.start_date, &1.end_date))
|> Enum.sort_by(& &1.first, Date)
end
@spec get_cutoff_date(Site.t()) :: Date.t()
def get_cutoff_date(site) do
Plausible.Sites.native_stats_start_date(site) ||
DateTime.to_date(DateTime.now!(site.timezone))
end
defp find_free_ranges(start_date, end_date, occupied_ranges) do
Date.range(start_date, end_date)
|> free_ranges(start_date, occupied_ranges, [])
end
# This function recursively finds open ranges that are not yet occupied
# by existing imported data. The idea is that we keep moving a dynamic
# date index `d` from start until the end of `imported_range`, hopping
# over each occupied range, and capturing the open ranges step-by-step
# in the `result` array.
defp free_ranges(import_range, d, [occupied_range | rest_of_occupied_ranges], result) do
cond do
Date.diff(occupied_range.last, d) <= 0 ->
free_ranges(import_range, d, rest_of_occupied_ranges, result)
in_range?(d, occupied_range) || Date.diff(occupied_range.first, d) < 2 ->
d = occupied_range.last
free_ranges(import_range, d, rest_of_occupied_ranges, result)
true ->
free_range = Date.range(d, occupied_range.first)
result = result ++ [free_range]
d = occupied_range.last
free_ranges(import_range, d, rest_of_occupied_ranges, result)
end
end
defp free_ranges(import_range, d, [], result) do
if Date.diff(import_range.last, d) < 2 do
result
else
result ++ [Date.range(d, import_range.last)]
end
end
defp in_range?(date, range) do
Date.before?(range.first, date) && Date.after?(range.last, date)
end
defp get_completed_imports(site) do
site
|> Repo.preload(:completed_imports)
|> Map.fetch!(:completed_imports)
end
end