1 package queue 2 3 import ( 4 "time" 5 ) 6 7 type UUIDService interface { 8 Generate() ([]byte, error) 9 } 10 11 type ItemID int64 12 13 type Item struct { 14 // ID uniquely identifies the item in a single queue 15 ID ItemID 16 17 // Priority orders items in the queue so that due items with a higher 18 // priority come before other due items, even if they were due sooner. 19 Priority int // default 0, limit +/- math.MaxInt16 20 21 // Message is any user-supplied string of bytes 22 Message string 23 24 // Attempt records how many times the given item has been unsuccessfully 25 // processed and put back in the queue 26 Attempt int 27 28 // Created is the time the item was first added to the queue 29 // (use `time.Now().UTC()`) 30 Created time.Time // UTC 31 32 // RetryAfter is the earliest time the queue will attempt to process the 33 // item 34 RetryAfter time.Time // UTC 35 } 36 37 func (i Item) String() string { 38 return i.Message 39 } 40 41 type NewItem struct { 42 Message string 43 Priority int 44 Created time.Time 45 RetryAfter time.Time 46 } 47 48 // QueueService defines an interface for the creation of queues. 49 // 50 // One such implementation is the NewQueueSqliteService which provides a 51 // reliable persistent queues backed by SQLite databases. 52 type QueueService interface{ 53 // OpenQueue opens (or creates) a new queue with a given name, backed 54 // by a file at the given path (the SQLite backend also supports 55 // ":memory:" as a target path) 56 OpenQueue(name string, path string) (Queue, error) 57 58 // Close any resources such as database handles. You should individually 59 // close any open queues first. 60 Close() error 61 } 62 63 type Queue interface { 64 // CreateItem places a new Item in the queue 65 CreateItem(item NewItem) error 66 67 // PeekItems returns up to `n` items with a priority >= `minPriority`, 68 // and with a due time >= `due`. Returned items are ordered by (highest 69 // priority, earliest due, earliest created, lowest ID). Items with IDs 70 // in `excluding` (which may be nil or empty) are not included. 71 PeekItems(n int, minPriority int, due time.Time, excluding []ItemID) ([]Item, error) 72 73 // RetryItem reorders an item in the queue at a later `due` time and a 74 // given priority. Also updates its attempt number (e.g. on the case of 75 // temporary failure, this will be set to the item's Attempt field plus 76 // one. In other cases, such as rescheduling to a better time, it might 77 // be kept at the current Attempt value) 78 RetryItem(item ItemID, priority int, due time.Time, attempt int) error 79 80 // DeleteItem removes an item from the queue. 81 DeleteItem(ItemID) error 82 83 // Close any resources such as database handles. 84 Close() error 85 86 // Delete removes a queue database from disk. If you've opened an in-memory 87 // database then don't try deleting it! 88 Delete() error 89 } 90