Source file
src/tawesoft.co.uk/go/queue/itemSqliteService.go
Documentation:
src/tawesoft.co.uk/go/queue/itemSqliteService.go
1 package queue
2
3 import (
4 "database/sql"
5 "fmt"
6 "os"
7 "strings"
8 "time"
9
10 "tawesoft.co.uk/go/sqlp"
11 "tawesoft.co.uk/go/sqlp/sqlite3"
12 "tawesoft.co.uk/go/variadic"
13 )
14
15 type itemSqliteService struct {
16 dbname string
17 dbpath string
18 db *sql.DB
19 msgSvc messageSqliteService
20 }
21 var nilItemSqliteService = itemSqliteService{}
22
23 type itemSqliteNew struct {
24 Priority int
25 Created int64
26 RetryAfter int64
27 }
28 func (p itemSqliteNew) fields() string {
29 return "priority, created, retryAfter"
30 }
31 func (p itemSqliteNew) placeholders() string {
32 return "?, ?, ?"
33 }
34 func (p itemSqliteNew) values() []interface{} {
35 return []interface{}{p.Priority, p.Created, p.RetryAfter}
36 }
37
38 type itemSqlite struct {
39 ID ItemID
40 Priority int
41 Message string
42 Attempt int
43 Created int64
44 RetryAfter int64
45 }
46 func (p itemSqlite) fields() string {
47 return "items.id, items.priority, messages.data, items.attempt, items.created, items.retryAfter"
48 }
49 func (p itemSqlite) placeholders() string {
50 return "?, ?, ?, ?, ?, ?"
51 }
52 func (p itemSqlite) values() []interface{} {
53 return []interface{}{p.ID, p.Priority, p.Message,p.Attempt, p.Created, p.RetryAfter}
54 }
55 func (p *itemSqlite) pointers() []interface{} {
56 return []interface{}{&p.ID, &p.Priority, &p.Message, &p.Attempt, &p.Created, &p.RetryAfter}
57 }
58 func (p itemSqlite) toItem() Item {
59 return Item{
60 ID: p.ID,
61 Priority: p.Priority,
62 Message: p.Message,
63 Attempt: p.Attempt,
64 Created: time.Unix(p.Created, 0),
65 RetryAfter: time.Unix(p.RetryAfter, 0),
66 }
67 }
68
69 func initItemSqliteService(db *sql.DB, dbname string, file string) error {
70 _, err := db.Exec(`
71 ATTACH DATABASE `+file+` as `+ dbname +`;
72 VACUUM `+ dbname +`;
73
74 CREATE TABLE IF NOT EXISTS `+ dbname +`.items (
75 id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
76 priority INTEGER NOT NULL DEFAULT 0,
77 attempt INTEGER NOT NULL DEFAULT 0,
78 created INTEGER NOT NULL, -- epoch seconds
79 retryAfter INTEGER NOT NULL -- epoch seconds
80 );
81
82 CREATE INDEX IF NOT EXISTS `+ dbname +`.items_idx_sort ON items(priority DESC, retryAfter, created, id);
83 `)
84 if err != nil {
85 return fmt.Errorf("error initialising item table: %+v", err)
86 }
87 return nil
88 }
89
90
91
92 func newItemSqliteService(db *sql.DB, name string, rawpath string) (itemSqliteService, error) {
93 var err error
94
95 name, err = sqlite3.Features.EscapeIdentifier(name)
96 if err != nil { return nilItemSqliteService, err }
97 path, err := sqlite3.Features.EscapeString(rawpath)
98 if err != nil { return nilItemSqliteService, err }
99
100 err = initItemSqliteService(db, name, path)
101 if err != nil { return nilItemSqliteService, err }
102
103 msgSvc, err := newMessageSqliteService(db, name)
104 if err != nil { return nilItemSqliteService, err }
105
106 return itemSqliteService{
107 dbname: name,
108 dbpath: rawpath,
109 db: db,
110 msgSvc: msgSvc,
111 }, nil
112 }
113
114 func (s itemSqliteService) CreateItem(newItem NewItem) error {
115 err := func() error {
116 tx, err := s.db.Begin()
117 if err != nil { return err }
118 defer tx.Rollback()
119
120 i := itemSqliteNew{
121 Priority: newItem.Priority,
122 Created: newItem.Created.Unix(),
123 RetryAfter: newItem.RetryAfter.Unix(),
124 }
125
126 query := `INSERT INTO `+ s.dbname +`.items(`+i.fields()+`) VALUES (`+i.placeholders()+`)`
127
128 result, err := tx.Exec(query, i.values()...)
129 if err != nil { return err }
130
131 if n, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
132 return fmt.Errorf("rows affected %d != 1", n)
133 }
134
135 itemID, err := result.LastInsertId()
136 if err != nil { return err }
137
138 err = s.msgSvc.Create(tx, ItemID(itemID), newItem.Message)
139 if err != nil { return err }
140
141 err = tx.Commit()
142 if err != nil { return err }
143
144 return nil
145 }()
146
147 if err != nil {
148 return fmt.Errorf("error inserting new item: %+v", err)
149 }
150
151 return nil
152 }
153
154 func (s itemSqliteService) PeekItems(
155 n int,
156 minPriority int,
157 due time.Time,
158 excluding []ItemID,
159 ) ([]Item, error) {
160 items, err := func () ([]Item, error) {
161 var i itemSqlite
162
163 query := `
164 SELECT
165 `+ i.fields() +`
166 FROM
167 `+ s.dbname +`.items,
168 `+ s.dbname +`.messages
169 WHERE
170 items.id = messages.id
171 AND
172 items.priority >= ?
173 AND
174 items.retryAfter <= ?
175 AND
176 items.id NOT IN (`+ sqlp.RepeatString("?", len(excluding)) +`)
177 ORDER BY
178 items.priority DESC,
179 items.retryAfter,
180 items.created,
181 items.id
182 LIMIT ?`
183
184 args := variadic.FlattenExcludingNils(minPriority, due.Unix(), excluding, n)
185 rows, err := s.db.Query(query, args...)
186 if err != nil { return nil, err }
187 defer rows.Close()
188
189 items := make([]Item, 0, n)
190
191 for rows.Next() {
192 if err := rows.Scan(i.pointers()...); err != nil {
193 return nil, err
194 }
195 items = append(items, i.toItem())
196 }
197 if err := rows.Err(); err != nil {
198 return nil, err
199 }
200
201 return items, nil
202 }()
203
204 if err != nil {
205 return nil, fmt.Errorf("error selecting items: %+v", err)
206 }
207
208 return items, nil
209 }
210
211 func (s itemSqliteService) RetryItem(id ItemID, priority int, due time.Time, attempt int) error {
212 query := `
213 UPDATE
214 `+ s.dbname +`.items
215 SET
216 priority = ?,
217 retryAfter = ?,
218 attempts = ?,
219 WHERE
220 id = ?
221 `
222
223 result, err := s.db.Exec(query, priority, due.Unix(), attempt, id)
224 if err != nil {
225 return fmt.Errorf("error updating %s item %d: %+v", s.dbname, id, err)
226 }
227 if _, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
228 return fmt.Errorf("error updating %s item %d: rows affected != 1", s.dbname, id)
229 }
230 return nil
231 }
232
233 func (s itemSqliteService) DeleteItem(id ItemID) error {
234 query := `
235 DELETE FROM
236 `+ s.dbname +`.items
237 WHERE
238 id = ?
239 `
240
241 result, err := s.db.Exec(query, id)
242 if err != nil {
243 return fmt.Errorf("error deleting %s item %d: %+v", s.dbname, id, err)
244 }
245 if _, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
246 return fmt.Errorf("error deleting %s item %d: rows affected != 1", s.dbname, id)
247 }
248 return nil
249 }
250
251 func (s itemSqliteService) Close() error {
252 _, err := s.db.Exec(`DETACH DATABASE `+ s.dbname)
253 if err != nil {
254 return fmt.Errorf("error closing %s.item table: %+v", s.dbname, err)
255 }
256 return nil
257 }
258
259 func (s itemSqliteService) Delete() error {
260 if !strings.HasPrefix(s.dbpath, ":memory:") {
261 return os.Remove(s.dbpath)
262 }
263 return nil
264 }
265
View as plain text