161 lines
4.6 KiB
Elixir
161 lines
4.6 KiB
Elixir
defmodule Plausible.DataMigration.NumericIDs do
|
|
@moduledoc """
|
|
Numeric IDs migration, SQL files available at:
|
|
priv/data_migrations/NumericIDs/sql
|
|
"""
|
|
use Plausible.DataMigration, dir: "NumericIDs"
|
|
|
|
import Ecto.Query
|
|
|
|
defmodule DomainsLookup do
|
|
@moduledoc false
|
|
use Ecto.Schema
|
|
|
|
@primary_key false
|
|
schema "domains_lookup" do
|
|
field :site_id, Ch, type: "UInt64"
|
|
field :domain, :string
|
|
end
|
|
end
|
|
|
|
def run(opts \\ []) do
|
|
interactive? = Keyword.get(opts, :interactive?, true)
|
|
|
|
db_url =
|
|
System.get_env(
|
|
"NUMERIC_IDS_MIGRATION_DB_URL",
|
|
Application.get_env(:plausible, Plausible.IngestRepo)[:url]
|
|
)
|
|
|
|
max_threads =
|
|
"NUMERIC_IDS_MIGRATION_MAX_THREADS" |> System.get_env("16") |> String.to_integer()
|
|
|
|
table_settings =
|
|
Keyword.get(opts, :table_settings) || System.get_env("NUMERIC_IDS_TABLE_SETTINGS") ||
|
|
Plausible.MigrationUtils.table_settings_expr()
|
|
|
|
start_from =
|
|
Keyword.get(opts, :start_from) || System.get_env("NUMERIC_IDS_PARTITION_START_FROM")
|
|
|
|
stop_at = Keyword.get(opts, :stop_at) || System.get_env("NUMERIC_IDS_PARTITION_STOP_AT")
|
|
|
|
@repo.start(db_url, max_threads)
|
|
|
|
cluster? =
|
|
case run_sql("check-replicas") do
|
|
{:ok, %{num_rows: 0}} -> false
|
|
{:ok, %{num_rows: 1}} -> true
|
|
end
|
|
|
|
{:ok, %{rows: partitions}} =
|
|
run_sql("list-partitions", start_from: start_from, stop_at: stop_at)
|
|
|
|
partitions = Enum.map(partitions, fn [part] -> part end)
|
|
|
|
start_from = start_from || List.first(partitions)
|
|
|
|
IO.puts("""
|
|
Got the following migration settings:
|
|
|
|
- max_threads: #{max_threads}
|
|
- table_settings: #{table_settings}
|
|
- db url: #{db_url}
|
|
- cluster?: #{cluster?}
|
|
- partitions to do: #{inspect(partitions, pretty: true, limit: :infinity, width: 80)}
|
|
- start from: #{start_from}
|
|
- stop at: #{stop_at}
|
|
""")
|
|
|
|
run_sql_fn =
|
|
if interactive? do
|
|
&run_sql_confirm/3
|
|
else
|
|
&run_sql/3
|
|
end
|
|
|
|
confirm_fn =
|
|
if interactive? do
|
|
&confirm/2
|
|
else
|
|
fn _, run_fn ->
|
|
run_fn.()
|
|
end
|
|
end
|
|
|
|
drop_v2_extra_opts = fn table ->
|
|
case @repo.query("select count(*) from {$0:Identifier}", [table]) do
|
|
{:ok, %{rows: [[count]]}} when count > 0 ->
|
|
[
|
|
prompt_message: "The table contains #{count} rows. Execute?",
|
|
prompt_default_choice: :no
|
|
]
|
|
|
|
{:ok, _} ->
|
|
[prompt_default_choice: :no]
|
|
|
|
{:error, _} ->
|
|
[]
|
|
end
|
|
end
|
|
|
|
{:ok, _} =
|
|
run_sql_fn.("drop-events-v2", [cluster?: cluster?], drop_v2_extra_opts.("events_v2"))
|
|
|
|
{:ok, _} =
|
|
run_sql_fn.("drop-sessions-v2", [cluster?: cluster?], drop_v2_extra_opts.("sessions_v2"))
|
|
|
|
{:ok, _} = run_sql_fn.("drop-tmp-events-v2", [], [])
|
|
{:ok, _} = run_sql_fn.("drop-tmp-sessions-v2", [], [])
|
|
{:ok, _} = run_sql_fn.("drop-domains-lookup", [], [])
|
|
|
|
{:ok, _} =
|
|
run_sql_fn.("create-events-v2", [table_settings: table_settings, cluster?: cluster?], [])
|
|
|
|
{:ok, _} =
|
|
run_sql_fn.("create-sessions-v2", [table_settings: table_settings, cluster?: cluster?], [])
|
|
|
|
{:ok, _} = run_sql_fn.("create-tmp-events-v2", [table_settings: table_settings], [])
|
|
{:ok, _} = run_sql_fn.("create-tmp-sessions-v2", [table_settings: table_settings], [])
|
|
|
|
case run_sql_fn.("create-domains-lookup", [table_settings: table_settings], []) do
|
|
{:ok, _} ->
|
|
confirm_fn.("Populate domains-lookup with postgres sites", fn ->
|
|
mappings =
|
|
Plausible.Site
|
|
|> select([s], %{site_id: s.id, domain: s.domain})
|
|
|> Plausible.Repo.all()
|
|
|
|
@repo.insert_all(DomainsLookup, mappings)
|
|
end)
|
|
|
|
_ ->
|
|
:ignore
|
|
end
|
|
|
|
confirm_fn.("Start migration? (starting from partition: #{start_from})", fn ->
|
|
IO.puts("start.. #{DateTime.utc_now()}")
|
|
|
|
for part <- partitions do
|
|
part_start = System.monotonic_time()
|
|
|
|
confirm_fn.("Run partition: #{part}?", fn ->
|
|
{:ok, _} = run_sql("insert-into-tmp-events-v2", partition: part)
|
|
{:ok, _} = run_sql("attach-tmp-events-v2", partition: part)
|
|
{:ok, _} = run_sql("truncate-tmp-events-v2", [])
|
|
{:ok, _} = run_sql("insert-into-tmp-sessions-v2", partition: part)
|
|
{:ok, _} = run_sql("attach-tmp-sessions-v2", partition: part)
|
|
{:ok, _} = run_sql("truncate-tmp-sessions-v2", [])
|
|
end)
|
|
|
|
part_end = System.monotonic_time()
|
|
|
|
IO.puts(
|
|
"#{part} took #{System.convert_time_unit(part_end - part_start, :native, :second)} seconds"
|
|
)
|
|
end
|
|
|
|
IO.puts("end.. #{DateTime.utc_now()}")
|
|
end)
|
|
end
|
|
end
|