源码 - GitHub - riverqueue/river: Fast and reliable background jobs in Go - River: a Fast, Robust Job Queue for Go + Postgres — brandur.org

架构图

gitdiagram.com/riverqueue/river

flowchart TD
    %% Client Layer
    subgraph "Client Layer"
        A["Client API"]:::client
    end

    %% Job & Worker Layer
    subgraph "Job & Worker Layer"
        B["Job Logic"]:::job
        C["Worker Logic"]:::job
    end

    %% Database Layer
    subgraph "Database Layer"
        X["Driver Interface"]:::database
        D["SQL Driver (riverdatabasesql)"]:::database
        E["PGX Driver (riverpgxv5)"]:::database
        F["Postgres Database"]:::database
    end

    %% Maintenance Layer
    subgraph "Maintenance Layer"
        G["Maintenance Services"]:::maintenance
    end

    %% Execution Layer
    subgraph "Execution Layer"
        H["Job Executor"]:::execution
        I["Job Completer"]:::execution
        J["Leadership Election"]:::execution
    end

    %% CLI & Tools Layer
    subgraph "CLI & Tools Layer"
        K["Command Line Tools"]:::cli
    end

    %% Utilities Layer
    subgraph "Utilities Layer"
        L["Internal Utilities"]:::utility
        N["Rivershared Utilities"]:::utility
    end

    %% External Interfaces
    subgraph "External Interfaces"
        M["External Integrations"]:::external
    end

    %% Connections
    A -->|"configures"| X
    A -->|"enqueue"| F
    X -->|"uses"| D
    X -->|"uses"| E
    F -->|"dispatches"| H
    H -->|"executes"| B
    H -->|"executes"| C
    F <-->|"maintenance"| G
    G -->|"monitors"| H
    A -->|"registers"| H
    K -->|"triggers"| A
    M -->|"inspects"| F
    F -->|"reports"| M
    L -->|"supports"| A
    L -->|"supports"| H
    L -->|"supports"| F
    N -->|"supports"| A
    N -->|"supports"| H
    N -->|"supports"| F

    %% Click Events
    click A "https://github.com/riverqueue/river/blob/master/client.go"
    click B "https://github.com/riverqueue/river/blob/master/job.go"
    click C "https://github.com/riverqueue/river/blob/master/worker.go"
    click X "https://github.com/riverqueue/river/blob/master/riverdriver/river_driver_interface.go"
    click D "https://github.com/riverqueue/river/tree/master/riverdriver/riverdatabasesql"
    click E "https://github.com/riverqueue/river/tree/master/riverdriver/riverpgxv5"
    click G "https://github.com/riverqueue/river/tree/master/internal/maintenance"
    click H "https://github.com/riverqueue/river/blob/master/internal/jobexecutor/job_executor.go"
    click I "https://github.com/riverqueue/river/blob/master/internal/jobcompleter/job_completer.go"
    click J "https://github.com/riverqueue/river/blob/master/internal/leadership/elector.go"
    click K "https://github.com/riverqueue/river/tree/master/cmd/river"
    click L "https://github.com/riverqueue/river/tree/master/internal/util"
    click N "https://github.com/riverqueue/river/tree/master/rivershared/util"

    %% Styles
    classDef client fill:#bbdefb,stroke:#0d47a1,stroke-width:2px;
    classDef job fill:#c8e6c9,stroke:#1b5e20,stroke-width:2px;
    classDef database fill:#dcedc8,stroke:#33691e,stroke-width:2px;
    classDef maintenance fill:#ffe0b2,stroke:#e65100,stroke-width:2px;
    classDef execution fill:#f8bbd0,stroke:#880e4f,stroke-width:2px;
    classDef cli fill:#d1c4e9,stroke:#512da8,stroke-width:2px;
    classDef utility fill:#f0f4c3,stroke:#827717,stroke-width:2px;
    classDef external fill:#b2ebf2,stroke:#006064,stroke-width:2px;

后台维护

NewClient会创建以下后台线程

  • NewJobCleaner
  • NewRescuer
  • NewJobScheduler
  • NewPeriodicJobEnqueuer
  • NewQueueCleaner
  • NewReindexer
  • PluginMaintenanceServices
JobCleaner

定时(默认JobCleanerTimeoutDefault30秒)删除满足任意条件的任务

  • CancelledJobRetentionPeriodDefault:默认1天
  • CompletedJobRetentionPeriodDefault:默认1天
  • DiscardedJobRetentionPeriodDefault:默认7天
func NewJobCleaner(archetype *baseservice.Archetype, config *JobCleanerConfig, exec riverdriver.Executor) *JobCleaner {
    return baseservice.Init(archetype, &JobCleaner{
        Config: (&JobCleanerConfig{
            CancelledJobRetentionPeriod: valutil.ValOrDefault(config.CancelledJobRetentionPeriod, CancelledJobRetentionPeriodDefault),
            CompletedJobRetentionPeriod: valutil.ValOrDefault(config.CompletedJobRetentionPeriod, CompletedJobRetentionPeriodDefault),
            DiscardedJobRetentionPeriod: valutil.ValOrDefault(config.DiscardedJobRetentionPeriod, DiscardedJobRetentionPeriodDefault),
            Interval:                    valutil.ValOrDefault(config.Interval, JobCleanerIntervalDefault),
            Timeout:                     valutil.ValOrDefault(config.Timeout, JobCleanerTimeoutDefault),
        }).mustValidate(),

        batchSize: BatchSizeDefault,
        exec:      exec,
    })
}

func (s *JobCleaner) Start(ctx context.Context) error { //nolint:dupl
    // ...

    go func() {
        ticker := timeutil.NewTickerWithInitialTick(ctx, s.Config.Interval)
        for {
            select {
            case <-ctx.Done():
                return
            case <-ticker.C:
            }

            res, err := s.runOnce(ctx)
            // ...
        }
    }()

    return nil
}

func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, error) {
    // ...

    for {
        numDeleted, err := func() (int, error) {
            numDeleted, err := s.exec.JobDeleteBefore(ctx, &riverdriver.JobDeleteBeforeParams{
                CancelledFinalizedAtHorizon: time.Now().Add(-s.Config.CancelledJobRetentionPeriod),
                CompletedFinalizedAtHorizon: time.Now().Add(-s.Config.CompletedJobRetentionPeriod),
                DiscardedFinalizedAtHorizon: time.Now().Add(-s.Config.DiscardedJobRetentionPeriod),
                Max:                         s.batchSize,
            })
            if err != nil {
                return 0, fmt.Errorf("error deleting completed jobs: %w", err)
            }

            return numDeleted, nil
        }()
        // ...
    }

    return res, nil
}