Introducing River: A High-Performance Job Processing System
River is a sophisticated system designed to handle job processing efficiently using the Go programming language and PostgreSQL as its database. It is constructed to simplify how applications manage their processes, making sure jobs are processed reliably and efficiently.
Key Features
- Transactional Enqueueing: River allows jobs to be added to the queue in the same transaction as other database changes. This integration reduces complexities often found in distributed systems by ensuring jobs are only queued if their associated transaction is successful.
How It Works
Job Args and Workers
Jobs in River are defined by two primary components: the JobArgs
and the Worker
.
-
JobArgs: These contain the details of the job, including a unique identifier or "kind" for the job. For instance, jobs that sort strings can have arguments like which strings to sort.
type SortArgs struct { Strings []string `json:"strings"` } func (SortArgs) Kind() string { return "sort" }
-
Worker: This is the process that performs the job using the
Work
function. For example, aSortWorker
takes aSortArgs
job and sorts the strings provided.type SortWorker struct { river.WorkerDefaults[SortArgs] } func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error { sort.Strings(job.Args.Strings) fmt.Printf("Sorted strings: %+v\n", job.Args.Strings) return nil }
Registering Workers
Jobs are linked to workers using a "kind" string. When the system starts, workers are registered so River knows how to allocate tasks to them.
workers := river.NewWorkers()
river.AddWorker(workers, &SortWorker{})
Client Setup
River provides a client interface to handle job management. This client requires a database pool and settings that define how many workers can be active at a time. Here is an example of initializing a client:
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
})
Insert-Only Clients
Sometimes, a client is needed only for adding jobs, without processing them. This is achieved by not specifying certain configurations.
Starting and Stopping
-
Starting: Initiate the client to start processing jobs.
if err := riverClient.Start(ctx); err != nil { panic(err) }
-
Stopping: Ensure a clean shutdown by stopping the client properly while allowing active jobs to finish.
if err := riverClient.Stop(ctx); err != nil { panic(err) }
Advanced Features
River offers a multitude of additional capabilities:
- Batch Job Insertion: Efficiently add multiple jobs using PostgreSQL's
COPY FROM
. - Job Cancellation & Error Handling: Manage ease of job termination or error processing within work functions.
- Multiple Queues & Scheduling: Control over job priorities, throughput, and isolation, including cron or scheduled jobs.
- Cross Language Enqueueing: While primarily using Go, River also allows for job insertion via other languages like Python and Ruby.
Conclusion
River is inspired by a variety of existing job processing libraries, drawing on their strengths to offer a high-performance, reliable solution for job management in systems using Go and PostgreSQL. With its transactional approach and robust feature set, River streamlines the complexity of processing jobs in distributed applications.