213 lines
6.6 KiB
Elixir
213 lines
6.6 KiB
Elixir
defmodule Plausible.Imported do
|
|
@moduledoc """
|
|
Context for managing site statistics imports.
|
|
|
|
For list of currently supported import sources see `Plausible.Imported.ImportSources`.
|
|
|
|
For more information on implementing importers, see `Plausible.Imported.Importer`.
|
|
"""
|
|
|
|
import Ecto.Query
|
|
|
|
alias Plausible.{Site, Repo, Imported}
|
|
alias Plausible.Imported.SiteImport
|
|
alias Plausible.Stats.Query
|
|
|
|
require Plausible.Imported.SiteImport
|
|
|
|
@tables [
|
|
Imported.Visitor,
|
|
Imported.Source,
|
|
Imported.Page,
|
|
Imported.EntryPage,
|
|
Imported.ExitPage,
|
|
Imported.CustomEvent,
|
|
Imported.Location,
|
|
Imported.Device,
|
|
Imported.Browser,
|
|
Imported.OperatingSystem
|
|
]
|
|
|
|
@table_names Enum.map(@tables, & &1.__schema__(:source))
|
|
# Maximum number of complete imports to account for when querying stats
|
|
@max_complete_imports 5
|
|
|
|
@spec schemas() :: [module()]
|
|
def schemas, do: @tables
|
|
|
|
@spec tables() :: [String.t()]
|
|
def tables, do: @table_names
|
|
|
|
@spec max_complete_imports() :: non_neg_integer()
|
|
def max_complete_imports() do
|
|
@max_complete_imports
|
|
end
|
|
|
|
@spec imported_custom_props() :: [String.t()]
|
|
def imported_custom_props do
|
|
# NOTE: Keep up to date with `Plausible.Props.internal_keys/1`,
|
|
# but _ignore_ unsupported keys. Currently, `search_query` is
|
|
# not supported in imported queries.
|
|
Enum.map(~w(url path), &("event:props:" <> &1))
|
|
end
|
|
|
|
@spec any_completed_imports?(Site.t()) :: boolean()
|
|
def any_completed_imports?(site) do
|
|
get_completed_imports(site) != []
|
|
end
|
|
|
|
@spec earliest_import_start_date(Site.t()) :: Date.t() | nil
|
|
def earliest_import_start_date(site) do
|
|
site
|
|
|> get_completed_imports()
|
|
|> Enum.map(& &1.start_date)
|
|
|> Enum.min(Date, fn -> nil end)
|
|
end
|
|
|
|
@spec complete_import_ids(Site.t()) :: [non_neg_integer()]
|
|
def complete_import_ids(site) do
|
|
imports = get_completed_imports(site)
|
|
has_legacy? = Enum.any?(imports, fn %{legacy: legacy?} -> legacy? end)
|
|
ids = Enum.map(imports, fn %{id: id} -> id end)
|
|
|
|
# account for legacy imports as well
|
|
if has_legacy? do
|
|
[0 | ids]
|
|
else
|
|
ids
|
|
end
|
|
end
|
|
|
|
@spec completed_imports_in_query_range(Site.t(), Query.t()) :: [SiteImport.t()]
|
|
def completed_imports_in_query_range(%Site{} = site, %Query{} = query) do
|
|
date_range = Query.date_range(query)
|
|
|
|
site
|
|
|> get_completed_imports()
|
|
|> Enum.reject(fn site_import ->
|
|
Date.after?(site_import.start_date, date_range.last) or
|
|
Date.before?(site_import.end_date, date_range.first)
|
|
end)
|
|
end
|
|
|
|
@spec get_import(Site.t(), non_neg_integer()) :: SiteImport.t() | nil
|
|
def get_import(site, import_id) do
|
|
Repo.get_by(SiteImport, id: import_id, site_id: site.id)
|
|
end
|
|
|
|
defdelegate listen(), to: Imported.Importer
|
|
|
|
@spec list_all_imports(Site.t(), atom()) :: [SiteImport.t()]
|
|
def list_all_imports(site, status \\ nil) do
|
|
from(i in SiteImport, where: i.site_id == ^site.id, order_by: [desc: i.inserted_at])
|
|
|> maybe_filter_by_status(status)
|
|
|> Repo.all()
|
|
end
|
|
|
|
@spec other_imports_in_progress?(SiteImport.t()) :: boolean()
|
|
def other_imports_in_progress?(site_import) do
|
|
Repo.exists?(
|
|
from(i in SiteImport,
|
|
where: i.site_id == ^site_import.site_id and i.id != ^site_import.id,
|
|
where: i.status in ^[SiteImport.pending(), SiteImport.importing()]
|
|
)
|
|
)
|
|
end
|
|
|
|
defp maybe_filter_by_status(query, nil), do: query
|
|
|
|
defp maybe_filter_by_status(query, status) do
|
|
where(query, [i], i.status == ^status)
|
|
end
|
|
|
|
@spec delete_imports_for_site(Site.t()) :: :ok
|
|
def delete_imports_for_site(site) do
|
|
Repo.delete_all(from i in SiteImport, where: i.site_id == ^site.id)
|
|
|
|
:ok
|
|
end
|
|
|
|
@spec clamp_dates(Site.t(), Date.t(), Date.t()) ::
|
|
{:ok, Date.t(), Date.t()} | {:error, :no_time_window}
|
|
def clamp_dates(site, start_date, end_date) do
|
|
cutoff_date = get_cutoff_date(site)
|
|
occupied_ranges = get_occupied_date_ranges(site)
|
|
|
|
clamp_dates(occupied_ranges, cutoff_date, start_date, end_date)
|
|
end
|
|
|
|
@spec clamp_dates([Date.Range.t()], Date.t(), Date.t(), Date.t()) ::
|
|
{:ok, Date.t(), Date.t()} | {:error, :no_time_window}
|
|
def clamp_dates(occupied_ranges, cutoff_date, start_date, end_date) do
|
|
end_date = Enum.min([end_date, cutoff_date], Date)
|
|
|
|
with true <- Date.diff(end_date, start_date) >= 2,
|
|
[_ | _] = free_ranges <- find_free_ranges(start_date, end_date, occupied_ranges) do
|
|
longest = Enum.max_by(free_ranges, &Date.diff(&1.last, &1.first))
|
|
{:ok, longest.first, longest.last}
|
|
else
|
|
_ -> {:error, :no_time_window}
|
|
end
|
|
end
|
|
|
|
@spec get_occupied_date_ranges(Site.t()) :: [Date.Range.t()]
|
|
def get_occupied_date_ranges(site) do
|
|
site
|
|
|> Imported.list_all_imports(Imported.SiteImport.completed())
|
|
|> Enum.reject(&(Date.diff(&1.end_date, &1.start_date) < 2))
|
|
|> Enum.map(&Date.range(&1.start_date, &1.end_date))
|
|
|> Enum.sort_by(& &1.first, Date)
|
|
end
|
|
|
|
@spec get_cutoff_date(Site.t()) :: Date.t()
|
|
def get_cutoff_date(site) do
|
|
Plausible.Sites.native_stats_start_date(site) ||
|
|
DateTime.to_date(DateTime.now!(site.timezone))
|
|
end
|
|
|
|
defp find_free_ranges(start_date, end_date, occupied_ranges) do
|
|
Date.range(start_date, end_date)
|
|
|> free_ranges(start_date, occupied_ranges, [])
|
|
end
|
|
|
|
# This function recursively finds open ranges that are not yet occupied
|
|
# by existing imported data. The idea is that we keep moving a dynamic
|
|
# date index `d` from start until the end of `imported_range`, hopping
|
|
# over each occupied range, and capturing the open ranges step-by-step
|
|
# in the `result` array.
|
|
defp free_ranges(import_range, d, [occupied_range | rest_of_occupied_ranges], result) do
|
|
cond do
|
|
Date.diff(occupied_range.last, d) <= 0 ->
|
|
free_ranges(import_range, d, rest_of_occupied_ranges, result)
|
|
|
|
in_range?(d, occupied_range) || Date.diff(occupied_range.first, d) < 2 ->
|
|
d = occupied_range.last
|
|
free_ranges(import_range, d, rest_of_occupied_ranges, result)
|
|
|
|
true ->
|
|
free_range = Date.range(d, occupied_range.first)
|
|
result = result ++ [free_range]
|
|
d = occupied_range.last
|
|
free_ranges(import_range, d, rest_of_occupied_ranges, result)
|
|
end
|
|
end
|
|
|
|
defp free_ranges(import_range, d, [], result) do
|
|
if Date.diff(import_range.last, d) < 2 do
|
|
result
|
|
else
|
|
result ++ [Date.range(d, import_range.last)]
|
|
end
|
|
end
|
|
|
|
defp in_range?(date, range) do
|
|
Date.before?(range.first, date) && Date.after?(range.last, date)
|
|
end
|
|
|
|
defp get_completed_imports(site) do
|
|
site
|
|
|> Repo.preload(:completed_imports)
|
|
|> Map.fetch!(:completed_imports)
|
|
end
|
|
end
|