From fb2249428fc55b207bcc5c61de75ee3c35995b13 Mon Sep 17 00:00:00 2001 From: Parker Selbert Date: Sat, 11 Jan 2025 11:46:52 +0100 Subject: [PATCH] Simplify and enhance dolphin engine insert_all The dolphin engine now does a single bulk insert for multiple jobs. That speeds up the overall insert speed, reduces calls over the wire, and matches the approach taken in other engines. The only downside is that jobs inserted by the dolphin engine lack database generates values such as the primary key. Closes #1220 --- lib/oban.ex | 13 ++++++++++--- lib/oban/engines/dolphin.ex | 20 +++----------------- test/oban/engine_test.exs | 27 ++++++++++++++------------- test/oban/plugins/lifeline_test.exs | 10 +++++----- 4 files changed, 32 insertions(+), 38 deletions(-) diff --git a/lib/oban.ex b/lib/oban.ex index 5038b154..2f7fc0d0 100644 --- a/lib/oban.ex +++ b/lib/oban.ex @@ -667,11 +667,18 @@ defmodule Oban do `* (Ecto.InvalidChangesetError) could not perform insert because changeset is invalid.` - > #### 🌟 Unique Jobs and Batching {: .warning} + #### Dolphin Engine and Generated Values + + MySQL doesn't return anything on insertion into the database. That means any values generated by + the database, namely the primary key and timestamps, aren't included in the job structs returned + from `insert_all`. + + > #### 🌟 Unique Jobs and Batching {: .tip} > > Only the [Smart Engine](https://oban.pro/docs/pro/Oban.Pro.Engines.Smart.html) in [Oban - > Pro](https://oban.pro) supports bulk unique jobs and automatic batching. With the basic - > engine, you must use `insert/3` for unique support. + > Pro](https://oban.pro) supports bulk unique jobs, automatic insert batching, and minimizes + > parameters sent over the wire. With the basic engine, you must use `insert/3` to insert unique + > jobs one at a time. ## Options diff --git a/lib/oban/engines/dolphin.ex b/lib/oban/engines/dolphin.ex index ae747502..669e6342 100644 --- a/lib/oban/engines/dolphin.ex +++ b/lib/oban/engines/dolphin.ex @@ -67,25 +67,11 @@ defmodule Oban.Engines.Dolphin do @impl Engine def insert_all_jobs(%Config{} = conf, changesets, opts) do - # MySQL doesn't return a primary key from a bulk insert, which violates the insert_all_jobs - # contract. Inserting one at a time is far less efficient, but it does what's required. - {:ok, jobs} = - Repo.transaction(conf, fn -> - Enum.map(changesets, fn changeset -> - case insert_job(conf, changeset, opts) do - {:ok, job} -> - job - - {:error, %Changeset{} = changeset} -> - raise Ecto.InvalidChangesetError, action: :insert, changeset: changeset + jobs = Enum.map(changesets, &Job.to_map/1) - {:error, reason} -> - raise RuntimeError, inspect(reason) - end - end) - end) + {_count, _jobs} = Repo.insert_all(conf, Job, jobs, opts) - jobs + Enum.map(changesets, &Ecto.Changeset.apply_action!(&1, :insert)) end @impl Engine diff --git a/test/oban/engine_test.exs b/test/oban/engine_test.exs index 702a6d71..f358f903 100644 --- a/test/oban/engine_test.exs +++ b/test/oban/engine_test.exs @@ -659,12 +659,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do test "inserting and executing jobs", %{name: name} do TelemetryHandler.attach_events() - changesets = + [job_1, job_2, job_3, job_4, job_5] = ~w(OK CANCEL DISCARD ERROR SNOOZE) |> Enum.with_index(1) - |> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end) - - [job_1, job_2, job_3, job_4, job_5] = Oban.insert_all(name, changesets) + |> Enum.map(fn {act, ref} -> + %{action: act, ref: ref} + |> Worker.new() + |> then(&Oban.insert!(name, &1)) + end) assert_receive {:event, [:fetch_jobs, :stop], _, %{jobs: _}} @@ -685,12 +687,14 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do @tag :capture_log test "safely executing jobs with any type of exit", %{name: name} do - changesets = + jobs = ~w(EXIT KILL TASK_ERROR TASK_EXIT) |> Enum.with_index(1) - |> Enum.map(fn {act, ref} -> Worker.new(%{action: act, ref: ref}) end) - - jobs = Oban.insert_all(name, changesets) + |> Enum.map(fn {act, ref} -> + %{action: act, ref: ref} + |> Worker.new() + |> then(&Oban.insert!(name, &1)) + end) assert_receive {:exit, 1} assert_receive {:kill, 2} @@ -732,11 +736,8 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite, Oban.Engines.Dolphin] do end test "discarding jobs that exceed max attempts", %{name: name} do - [job_1, job_2] = - Oban.insert_all(name, [ - Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1), - Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2) - ]) + job_1 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 1}, max_attempts: 1)) + job_2 = Oban.insert!(name, Worker.new(%{action: "ERROR", ref: 2}, max_attempts: 2)) assert_receive {:error, 1} assert_receive {:error, 2} diff --git a/test/oban/plugins/lifeline_test.exs b/test/oban/plugins/lifeline_test.exs index 067b1658..2ae88398 100644 --- a/test/oban/plugins/lifeline_test.exs +++ b/test/oban/plugins/lifeline_test.exs @@ -78,11 +78,11 @@ defmodule Oban.Plugins.LifelineTest do repo: DolphinRepo ) - [job_1, job_2] = - Oban.insert_all(name, [ - Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3)), - Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8)) - ]) + job_1 = + Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(3))) + + job_2 = + Oban.insert!(name, Worker.new(%{}, state: "executing", attempted_at: seconds_ago(8))) send_rescue(name)