...
Tawesoft Logo

Package queue

import "tawesoft.co.uk/go/queue"
Overview
Index
Subdirectories

Overview ▾

Package queue implements simple, durable/ACID, same-process message queues with best-effort ordering by priority and/or time.

Examples

See examples folder.

Package Information

License: MIT (see LICENSE.txt)

Stable: candidate

For more information, documentation, source code, examples, support, links, etc. please see https://www.tawesoft.co.uk/go and https://www.tawesoft.co.uk/go/queue

2021-07-06

    * The Queue RetryItem method now takes an Attempt parameter.

    * Calling the Delete() method on a Queue now attempts to avoid deleting
      an in-memory database opened as ":memory:".

type Item

type Item struct {
    // ID uniquely identifies the item in a single queue
    ID ItemID

    // Priority orders items in the queue so that due items with a higher
    // priority come before other due items, even if they were due sooner.
    Priority int // default 0, limit +/- math.MaxInt16

    // Message is any user-supplied string of bytes
    Message string

    // Attempt records how many times the given item has been unsuccessfully
    // processed and put back in the queue
    Attempt int

    // Created is the time the item was first added to the queue
    // (use `time.Now().UTC()`)
    Created time.Time // UTC

    // RetryAfter is the earliest time the queue will attempt to process the
    // item
    RetryAfter time.Time // UTC
}

func (Item) String

func (i Item) String() string

type ItemID

type ItemID int64

type NewItem

type NewItem struct {
    Message    string
    Priority   int
    Created    time.Time
    RetryAfter time.Time
}

type Queue

type Queue interface {
    // CreateItem places a new Item in the queue
    CreateItem(item NewItem) error

    // PeekItems returns up to `n` items with a priority >= `minPriority`,
    // and with a due time >= `due`. Returned items are ordered by (highest
    // priority, earliest due, earliest created, lowest ID). Items with IDs
    // in `excluding` (which may be nil or empty) are not included.
    PeekItems(n int, minPriority int, due time.Time, excluding []ItemID) ([]Item, error)

    // RetryItem reorders an item in the queue at a later `due` time and a
    // given priority. Also updates its attempt number (e.g. on the case of
    // temporary failure, this will be set to the item's Attempt field plus
    // one. In other cases, such as rescheduling to a better time, it might
    // be kept at the current Attempt value)
    RetryItem(item ItemID, priority int, due time.Time, attempt int) error

    // DeleteItem removes an item from the queue.
    DeleteItem(ItemID) error

    // Close any resources such as database handles.
    Close() error

    // Delete removes a queue database from disk. If you've opened an in-memory
    // database then don't try deleting it!
    Delete() error
}

type QueueService

QueueService defines an interface for the creation of queues.

One such implementation is the NewQueueSqliteService which provides a reliable persistent queues backed by SQLite databases.

type QueueService interface {
    // OpenQueue opens (or creates) a new queue with a given name, backed
    // by a file at the given path (the SQLite backend also supports
    // ":memory:" as a target path)
    OpenQueue(name string, path string) (Queue, error)

    // Close any resources such as database handles. You should individually
    // close any open queues first.
    Close() error
}

func NewQueueSqliteService

func NewQueueSqliteService() (QueueService, error)

NewQueueSqliteService creates a new QueueService implemented by a SQLite backend that persists queues to individual database files.

The SQLite backend may place a limit on the number of attached queue databases per connection (default 7).

SQLite is used in SecureDelete mode so that deleted items are overwritten by zeros on disk to protect possibly sensitive data.

A queue databases is VACUUMed when first attached by OpenQueue

type UUIDService

type UUIDService interface {
    Generate() ([]byte, error)
}

Subdirectories

Name Synopsis
examples
simple Simple example of creating queues, adding items, peeking at due items.