riverqueue
源码
- GitHub - riverqueue/river: Fast and reliable background jobs in Go
- River: a Fast, Robust Job Queue for Go + Postgres — brandur.org
架构图
flowchart TD
%% Client Layer
subgraph "Client Layer"
A["Client API"]:::client
end
%% Job & Worker Layer
subgraph "Job & Worker Layer"
B["Job Logic"]:::job
C["Worker Logic"]:::job
end
%% Database Layer
subgraph "Database Layer"
X["Driver Interface"]:::database
D["SQL Driver (riverdatabasesql)"]:::database
E["PGX Driver (riverpgxv5)"]:::database
F["Postgres Database"]:::database
end
%% Maintenance Layer
subgraph "Maintenance Layer"
G["Maintenance Services"]:::maintenance
end
%% Execution Layer
subgraph "Execution Layer"
H["Job Executor"]:::execution
I["Job Completer"]:::execution
J["Leadership Election"]:::execution
end
%% CLI & Tools Layer
subgraph "CLI & Tools Layer"
K["Command Line Tools"]:::cli
end
%% Utilities Layer
subgraph "Utilities Layer"
L["Internal Utilities"]:::utility
N["Rivershared Utilities"]:::utility
end
%% External Interfaces
subgraph "External Interfaces"
M["External Integrations"]:::external
end
%% Connections
A -->|"configures"| X
A -->|"enqueue"| F
X -->|"uses"| D
X -->|"uses"| E
F -->|"dispatches"| H
H -->|"executes"| B
H -->|"executes"| C
F <-->|"maintenance"| G
G -->|"monitors"| H
A -->|"registers"| H
K -->|"triggers"| A
M -->|"inspects"| F
F -->|"reports"| M
L -->|"supports"| A
L -->|"supports"| H
L -->|"supports"| F
N -->|"supports"| A
N -->|"supports"| H
N -->|"supports"| F
%% Click Events
click A "https://github.com/riverqueue/river/blob/master/client.go"
click B "https://github.com/riverqueue/river/blob/master/job.go"
click C "https://github.com/riverqueue/river/blob/master/worker.go"
click X "https://github.com/riverqueue/river/blob/master/riverdriver/river_driver_interface.go"
click D "https://github.com/riverqueue/river/tree/master/riverdriver/riverdatabasesql"
click E "https://github.com/riverqueue/river/tree/master/riverdriver/riverpgxv5"
click G "https://github.com/riverqueue/river/tree/master/internal/maintenance"
click H "https://github.com/riverqueue/river/blob/master/internal/jobexecutor/job_executor.go"
click I "https://github.com/riverqueue/river/blob/master/internal/jobcompleter/job_completer.go"
click J "https://github.com/riverqueue/river/blob/master/internal/leadership/elector.go"
click K "https://github.com/riverqueue/river/tree/master/cmd/river"
click L "https://github.com/riverqueue/river/tree/master/internal/util"
click N "https://github.com/riverqueue/river/tree/master/rivershared/util"
%% Styles
classDef client fill:#bbdefb,stroke:#0d47a1,stroke-width:2px;
classDef job fill:#c8e6c9,stroke:#1b5e20,stroke-width:2px;
classDef database fill:#dcedc8,stroke:#33691e,stroke-width:2px;
classDef maintenance fill:#ffe0b2,stroke:#e65100,stroke-width:2px;
classDef execution fill:#f8bbd0,stroke:#880e4f,stroke-width:2px;
classDef cli fill:#d1c4e9,stroke:#512da8,stroke-width:2px;
classDef utility fill:#f0f4c3,stroke:#827717,stroke-width:2px;
classDef external fill:#b2ebf2,stroke:#006064,stroke-width:2px;
后台维护
NewClient
会创建以下后台线程NewJobCleaner
NewRescuer
NewJobScheduler
NewPeriodicJobEnqueuer
NewQueueCleaner
NewReindexer
PluginMaintenanceServices
JobCleaner
定时(默认
JobCleanerTimeoutDefault
30秒)删除满足任意条件的任务CancelledJobRetentionPeriodDefault
:默认1天CompletedJobRetentionPeriodDefault
:默认1天DiscardedJobRetentionPeriodDefault
:默认7天
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner {
return baseservice.Init(archetype, &JobCleaner{
Config: (&JobCleanerConfig{
CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, CancelledJobRetentionPeriodDefault),
CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
Interval: valutil.ValOrDefault(config.Interval, JobCleanerIntervalDefault),
Timeout: valutil.ValOrDefault(config.Timeout, JobCleanerTimeoutDefault),
}).mustValidate(),
batchSize: BatchSizeDefault,
exec: exec,
})
}
func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
// ...
go func() {
ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
res, err := s.runOnce(ctx)
// ...
}
}()
return nil
}
func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, error) {
// ...
for {
numDeleted, err := func() (int, error) {
numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod),
CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
Max: s.batchSize,
})
if err != nil {
return 0, fmt.Errorf("error deleting completed jobs: %w", err)
}
return numDeleted, nil
}()
// ...
}
return res, nil
}