Transactions and run_in_transaction

PormG transactions are async-aware and connection-aware, designed to integrate seamlessly with multithreaded Julia code and web frameworks like Genie.jl. The primary entry point is run_in_transaction, which borrows a connection from the pool, executes your block inside a PostgreSQL transaction, and either commits or rolls back automatically.

Because every operation runs through the async core, even synchronous-looking code never blocks the scheduler. This means you can safely wrap database work inside @async tasks, web request handlers, or multithreaded @sync loops without deadlocking.

Table of Contents


Basic Usage

The simplest pattern: pass your "db_2" database key to run_in_transaction, then execute your query block inside the callback.

using PormG
using DataFrames

# Load configuration
PormG.Configuration.load("db_2")

# Load models
include("db_2/models.jl")
import .models as M

# Run a transaction
PormG.run_in_transaction("db_2") do
  # All queries inside this block share the same connection
  query = M.Result.objects
  query.filter(
    "raceid__year" => 2025,
    "positionorder" => 1,
  )
  
  query.values(
    "driverid__forename",
    "driverid__surname",
    "constructorid__name",
    "points",
  )
  
  df = query |> DataFrame
  @info "2025 Champion" df=df
end

The connection is automatically returned to the pool when the block exits. If any exception is raised inside the block, the transaction is rolled back and the exception rethrows.


Why Use Transactions

Atomicity with Multiple Writes

When you update multiple tables and need them to succeed or fail together:

# Real-world scenario: Record a pit stop and update driver stats
PormG.run_in_transaction("db_2") do
  # Record the pit stop
  stop_query = M.Pit_stop.objects
  stop_query.create(
    "raceid" => 900,
    "driverid" => 1,
    "stop" => 1,
    "lap" => 42,
    "time" => 35,
    "duration" => 22.5,
  )
  
  # Update driver pit-stop count
  driver_query = M.Driver.objects
  driver_query.filter("driverid" => 1)
  driver_query.update("pit_stops_count" => F("pit_stops_count") + 1)
  
  # Both changes succeed together, or both roll back
end

If the driver update fails, the pit stop is never recorded. This keeps championship data consistent.

Avoiding Partial States

Bulk operations (inserts, updates, deletes) already use transactions internally, but wrapping them in an explicit transaction ensures all-or-nothing behavior across multiple bulk calls:

# Scenario: Load a season's constructor standings
bulk_data = [
  Dict("year" => 2025, "constructorid" => 1, "points" => 250, "position" => 1),
  Dict("year" => 2025, "constructorid" => 2, "points" => 240, "position" => 2),
  # ... 8 more constructors
]
df = DataFrame(bulk_data)

query = M.Constructor_standing.objects

# If any row fails, all 10 are rolled back
PormG.bulk_insert(query, df, chunk_size=5)

Async Context Propagation

One of PormG's key strengths is that spawned @async tasks automatically inherit the transaction context. This lets you write elegant concurrent code without worrying about connection pools.

Single Async Task

child_saw_context = Atomic{Bool}(false)

PormG.run_in_transaction("db_2") do
  (M.Result.objects).create(
    "raceid" => 900,
    "driverid" => 11,
    "constructorid" => 6,
    "points" => 26,
  )
  
  t = @async begin
    # The async task can detect the transaction context
    child_saw_context[] = PormG.Configuration.get_tx_connection() !== nothing
    
    # Use the same transaction without requesting a new connection
    query = M.Driver.objects
    query.filter("driverid" => 11)
    query.update("code" => "WIN")
  end
  
  wait(t)
end

@test child_saw_context[]  # true: async inherited the context

Tasks created outside a transaction

Creating a Task with Task(...) (or the @task macro) outside a transaction captures the current task-local state. Scheduling that pre-created task later inside a run_in_transaction block does not retroactively install the transaction context into the Task — it keeps the context it captured at creation time.

# Create a task outside any transaction
child_saw_tx = Atomic{Bool}(false)

t = Task(() -> begin
  child_saw_tx[] = PormG.Configuration.get_tx_connection() !== nothing
end)

# Scheduling it inside a transaction does NOT make it transactional
PormG.run_in_transaction(settings) do
  schedule(t)
  wait(t)
end

@info "Task saw tx?" saw_tx=child_saw_tx[]  # expected: false

If you need a task to participate in a transaction, create and start it from inside run_in_transaction (e.g., with @async) or explicitly acquire and install the transaction connection with with_tx_context before running the work.

Multiple Concurrent Tasks

You can spawn many tasks and let Julia's scheduler coordinate them:

PormG.run_in_transaction(settings) do
  # Create base records
  for i in 1:3
    (M.Race.objects).create(
      "year" => 2025,
      "round" => i,
      "name" => "Race $i",
      "circuitid" => i,
      "date" => Date(2025, i, 1),
    )
  end
  
  # Spawn workers that all share the transaction
  worker_count = 5
  @sync for worker in 1:worker_count
    @async begin
      for race_round in 1:3
        # All inserts use the same connection
        (M.Result.objects).create(
          "raceid" => 900 + race_round,
          "driverid" => worker,
          "constructorid" => 1,
          "points" => 25 - worker,
        )
      end
    end
  end
end

When the @sync block finishes, all tasks have completed and the transaction commits.


Bulk Operations in Transactions

PormG provides bulk_insert and bulk_update for efficient batch operations. Always wrap them in a transaction to ensure all-or-nothing semantics.

Transaction with deletion and bulk operations (example)

This example demonstrates a common pattern: delete matching rows, perform a bulk insert, and then run a bulk update — all inside a single transaction so either all changes persist or none do.

# Example: Delete matching rows, then perform bulk insert and bulk update inside a single transaction
# Setup: remove previous data and create seed records
delete(M.Just_a_test_deletion.objects, allow_delete_all = true)

# Pre-insert some records
q = M.Just_a_test_deletion.objects
for i in 1:10
  q.create("name" => "to-be-deleted-$(i)", "test_result" => 800 + i)
end

(M.Just_a_test_deletion.objects).create("name" => "test_update", "test_result" => 456)

q = M.Just_a_test_deletion.objects
q.filter("name" => "test_update")
df_u = q |> DataFrame
# Prepare the DataFrame used for bulk_update
df_u[1, :test_result2] = 457

# Perform delete + bulk-insert + bulk-update inside a single transaction
PormG.run_in_transaction(settings) do
  q = M.Just_a_test_deletion.objects
  q.filter("name__@icontains" => "to-be-deleted")
  df = q |> DataFrame
  delete(q)

  # Bulk insert
  bulk_data = [Dict("name" => "bulk-$(i)", "test_result" => 900 + i) for i in 1:5]
  df_bulk = DataFrame(bulk_data)
  q = M.Just_a_test_deletion.objects
  PormG.bulk_insert(q, df_bulk)

  # Bulk update (apply the single-row update prepared above)
  PormG.bulk_update(q, df_u)
end

# Inspect results (non-test checks)
q = M.Just_a_test_deletion.objects
println("Total rows after transaction: ", q.count())
println("Names: ", sort(q.list() .|> x -> x[:name]))

Note: This mirrors test_transactions.jl coverage — it demonstrates delete + bulk insert + bulk update inside one transaction and shows how a later rollback would revert all operations.


Multithreaded Work

When mixing transactions with Julia's thread pool, run_in_transaction ensures all workers see the same connection, avoiding race conditions on the pool.

using Base.Threads: Atomic, atomic_add!

inserted_count = Atomic{Int}(0)

PormG.run_in_transaction("db_2") do
  @sync for i in 1:Threads.nthreads()
    @async begin
      for j in 1:100
        (M.Result.objects).create(
          "raceid" => 1000 + i,
          "driverid" => j,
          "constructorid" => 1,
          "points" => 10,
        )
        atomic_add!(inserted_count, 1)
      end
    end
  end
end

@test inserted_count[] == Threads.nthreads() * 100

All inserts happen on the same connection, and they either all commit or all roll back.


Error Handling and Rollback

If any exception is raised inside the block, PormG automatically rolls back the transaction and rethrows. Use try/catch for cleanup:

using Logging

try
  PormG.run_in_transaction("db_2") do
    (M.Result.objects).create(
      "raceid" => 1,
      "driverid" => 1,
      "constructorid" => 1,
      "points" => 25,
    )
    
    # Simulate a validation error
    throw(ErrorException("Driver not found"))
  end
catch e
  @error "Transaction failed, rolling back" exception=e
  # Perform cleanup (e.g., release temporary resources)
end

# The record was never inserted because the transaction rolled back
q = M.Result.objects
q.filter("raceid" => 1)
@test q.count() == 0

Nested Exception Handling

You can nest try/catch blocks and still maintain transaction semantics:

PormG.run_in_transaction("db_2") do
  (M.Result.objects).create("raceid" => 1, "driverid" => 1, ...)
  
  try
    (M.Result.objects).create("raceid" => 999, "driverid" => 999, ...)
    throw(ErrorException("Simulated error"))
  catch e
    @warn "Inner block failed, outer transaction will roll back" exception=e
  end
  
  # Even though we caught the error, it was already logged
  # The outer transaction will still roll back when this block exits
end

Lower-Level Helpers

If you need finer control (e.g., manual SAVEPOINT or multi-statement blocks), PormG exposes:

  • with_tx_context(conn_pool, conn::LibPQ.Connection, block): Install a connection in thread-local storage so child tasks inherit it.
  • with_transaction(settings, sql, conn=nothing, release_conn=false): Execute raw SQL inside a transaction context.
  • get_tx_connection(): Check if a transaction context is active and return the connection.

Example: Manual Savepoint

import PormG.Configuration: with_tx_context, with_transaction, get_tx_connection

settings = PormG.Configuration.get_settings("db_2")
result, conn = with_transaction(settings, "BEGIN;")
try
  with_tx_context(settings.connections, conn) do
    # Insert first batch
    for i in 1:5
      (M.Result.objects).create("raceid" => i, "driverid" => 1, ...)
    end
    
    # Create savepoint
    with_transaction(settings, "SAVEPOINT sp1;", conn=conn)
    
    # Insert second batch
    for i in 6:10
      (M.Result.objects).create("raceid" => i, "driverid" => 2, ...)
    end
    
    # If this fails, we can rollback just the second batch
    if some_validation_error
      with_transaction(settings, "ROLLBACK TO sp1;", conn=conn)
    end
  end
  
  # Commit
  with_transaction(settings, "COMMIT;", conn=conn, release_conn=true)
catch e
  with_transaction(settings, "ROLLBACK;", conn=conn, release_conn=true)
  rethrow(e)
end

Important: When manually managing connections, always pair get_tx_connection() calls with release_conn=true so the pool recovers the connection.


Summary

PatternUse Case
run_in_transaction(settings) do ... endMost common; automatic commit/rollback
Async tasks inside transactionSpawn concurrent workers that share the connection
Bulk operations inside transactionEnsure all-or-nothing for large batch inserts/updates
Manual with_tx_context + with_transactionFine-grained control; savepoints; multi-statement workflows

Start with run_in_transaction for all your database work. Drop to the lower-level helpers only when you need explicit savepoints or multi-database coordination.