analytics/lib/plausible/data_migration/numeric_ids.ex

161 lines
4.6 KiB
Elixir

defmodule Plausible.DataMigration.NumericIDs do
@moduledoc """
Numeric IDs migration, SQL files available at:
priv/data_migrations/NumericIDs/sql
"""
use Plausible.DataMigration, dir: "NumericIDs"
import Ecto.Query
defmodule DomainsLookup do
@moduledoc false
use Ecto.Schema
@primary_key false
schema "domains_lookup" do
field :site_id, Ch, type: "UInt64"
field :domain, :string
end
end
def run(opts \\ []) do
interactive? = Keyword.get(opts, :interactive?, true)
db_url =
System.get_env(
"NUMERIC_IDS_MIGRATION_DB_URL",
Application.get_env(:plausible, Plausible.IngestRepo)[:url]
)
max_threads =
"NUMERIC_IDS_MIGRATION_MAX_THREADS" |> System.get_env("16") |> String.to_integer()
table_settings =
Keyword.get(opts, :table_settings) || System.get_env("NUMERIC_IDS_TABLE_SETTINGS") ||
Plausible.MigrationUtils.table_settings_expr()
start_from =
Keyword.get(opts, :start_from) || System.get_env("NUMERIC_IDS_PARTITION_START_FROM")
stop_at = Keyword.get(opts, :stop_at) || System.get_env("NUMERIC_IDS_PARTITION_STOP_AT")
@repo.start(db_url, max_threads)
cluster? =
case run_sql("check-replicas") do
{:ok, %{num_rows: 0}} -> false
{:ok, %{num_rows: 1}} -> true
end
{:ok, %{rows: partitions}} =
run_sql("list-partitions", start_from: start_from, stop_at: stop_at)
partitions = Enum.map(partitions, fn [part] -> part end)
start_from = start_from || List.first(partitions)
IO.puts("""
Got the following migration settings:
- max_threads: #{max_threads}
- table_settings: #{table_settings}
- db url: #{db_url}
- cluster?: #{cluster?}
- partitions to do: #{inspect(partitions, pretty: true, limit: :infinity, width: 80)}
- start from: #{start_from}
- stop at: #{stop_at}
""")
run_sql_fn =
if interactive? do
&run_sql_confirm/3
else
&run_sql/3
end
confirm_fn =
if interactive? do
&confirm/2
else
fn _, run_fn ->
run_fn.()
end
end
drop_v2_extra_opts = fn table ->
case @repo.query("select count(*) from {$0:Identifier}", [table]) do
{:ok, %{rows: [[count]]}} when count > 0 ->
[
prompt_message: "The table contains #{count} rows. Execute?",
prompt_default_choice: :no
]
{:ok, _} ->
[prompt_default_choice: :no]
{:error, _} ->
[]
end
end
{:ok, _} =
run_sql_fn.("drop-events-v2", [cluster?: cluster?], drop_v2_extra_opts.("events_v2"))
{:ok, _} =
run_sql_fn.("drop-sessions-v2", [cluster?: cluster?], drop_v2_extra_opts.("sessions_v2"))
{:ok, _} = run_sql_fn.("drop-tmp-events-v2", [], [])
{:ok, _} = run_sql_fn.("drop-tmp-sessions-v2", [], [])
{:ok, _} = run_sql_fn.("drop-domains-lookup", [], [])
{:ok, _} =
run_sql_fn.("create-events-v2", [table_settings: table_settings, cluster?: cluster?], [])
{:ok, _} =
run_sql_fn.("create-sessions-v2", [table_settings: table_settings, cluster?: cluster?], [])
{:ok, _} = run_sql_fn.("create-tmp-events-v2", [table_settings: table_settings], [])
{:ok, _} = run_sql_fn.("create-tmp-sessions-v2", [table_settings: table_settings], [])
case run_sql_fn.("create-domains-lookup", [table_settings: table_settings], []) do
{:ok, _} ->
confirm_fn.("Populate domains-lookup with postgres sites", fn ->
mappings =
Plausible.Site
|> select([s], %{site_id: s.id, domain: s.domain})
|> Plausible.Repo.all()
@repo.insert_all(DomainsLookup, mappings)
end)
_ ->
:ignore
end
confirm_fn.("Start migration? (starting from partition: #{start_from})", fn ->
IO.puts("start.. #{DateTime.utc_now()}")
for part <- partitions do
part_start = System.monotonic_time()
confirm_fn.("Run partition: #{part}?", fn ->
{:ok, _} = run_sql("insert-into-tmp-events-v2", partition: part)
{:ok, _} = run_sql("attach-tmp-events-v2", partition: part)
{:ok, _} = run_sql("truncate-tmp-events-v2", [])
{:ok, _} = run_sql("insert-into-tmp-sessions-v2", partition: part)
{:ok, _} = run_sql("attach-tmp-sessions-v2", partition: part)
{:ok, _} = run_sql("truncate-tmp-sessions-v2", [])
end)
part_end = System.monotonic_time()
IO.puts(
"#{part} took #{System.convert_time_unit(part_end - part_start, :native, :second)} seconds"
)
end
IO.puts("end.. #{DateTime.utc_now()}")
end)
end
end