...
Tawesoft Logo

Source file src/tawesoft.co.uk/go/queue/itemSqliteService.go

Documentation: src/tawesoft.co.uk/go/queue/itemSqliteService.go

     1  package queue
     2  
     3  import (
     4      "database/sql"
     5      "fmt"
     6      "os"
     7      "strings"
     8      "time"
     9  
    10      "tawesoft.co.uk/go/sqlp"
    11      "tawesoft.co.uk/go/sqlp/sqlite3"
    12      "tawesoft.co.uk/go/variadic"
    13  )
    14  
    15  type itemSqliteService struct {
    16      dbname  string
    17      dbpath  string
    18      db      *sql.DB
    19      msgSvc  messageSqliteService
    20  }
    21  var nilItemSqliteService = itemSqliteService{}
    22  
    23  type itemSqliteNew struct {
    24      Priority   int // default 0
    25      Created    int64 // time.Time // UTC
    26      RetryAfter int64 // time.Time // UTC
    27  }
    28  func (p itemSqliteNew) fields() string {
    29      return "priority, created, retryAfter"
    30  }
    31  func (p itemSqliteNew) placeholders() string {
    32      return "?, ?, ?"
    33  }
    34  func (p itemSqliteNew) values() []interface{} {
    35      return []interface{}{p.Priority, p.Created, p.RetryAfter}
    36  }
    37  
    38  type itemSqlite struct {
    39      ID ItemID
    40      Priority int
    41      Message string
    42      Attempt int
    43      Created int64 // time.Time // UTC
    44      RetryAfter int64 // time.Time // UTC
    45  }
    46  func (p itemSqlite) fields() string {
    47      return "items.id, items.priority, messages.data, items.attempt, items.created, items.retryAfter"
    48  }
    49  func (p itemSqlite) placeholders() string {
    50      return "?, ?, ?, ?, ?, ?"
    51  }
    52  func (p itemSqlite) values() []interface{} {
    53      return []interface{}{p.ID, p.Priority, p.Message,p.Attempt, p.Created, p.RetryAfter}
    54  }
    55  func (p *itemSqlite) pointers() []interface{} {
    56      return []interface{}{&p.ID, &p.Priority, &p.Message, &p.Attempt, &p.Created, &p.RetryAfter}
    57  }
    58  func (p itemSqlite) toItem() Item {
    59      return Item{
    60          ID:         p.ID,
    61          Priority:   p.Priority,
    62          Message:    p.Message,
    63          Attempt:    p.Attempt,
    64          Created:    time.Unix(p.Created, 0),
    65          RetryAfter: time.Unix(p.RetryAfter, 0),
    66      }
    67  }
    68  
    69  func initItemSqliteService(db *sql.DB, dbname string, file string) error {
    70      _, err := db.Exec(`
    71          ATTACH DATABASE `+file+` as `+ dbname +`;
    72          VACUUM `+ dbname +`;
    73  
    74          CREATE TABLE IF NOT EXISTS `+ dbname +`.items (
    75              id         INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
    76              priority   INTEGER NOT NULL DEFAULT 0,
    77              attempt    INTEGER NOT NULL DEFAULT 0,
    78              created    INTEGER NOT NULL, -- epoch seconds
    79              retryAfter INTEGER NOT NULL  -- epoch seconds
    80          );
    81  
    82          CREATE INDEX IF NOT EXISTS `+ dbname +`.items_idx_sort ON items(priority DESC, retryAfter, created, id);
    83      `)
    84      if err != nil {
    85          return fmt.Errorf("error initialising item table: %+v", err)
    86      }
    87      return nil
    88  }
    89  
    90  // newItemSqliteService creates a new itemService implemented by a
    91  // itemSqliteService
    92  func newItemSqliteService(db *sql.DB, name string, rawpath string) (itemSqliteService, error) {
    93      var err error
    94  
    95      name, err = sqlite3.Features.EscapeIdentifier(name)
    96      if err != nil { return nilItemSqliteService, err }
    97      path, err := sqlite3.Features.EscapeString(rawpath)
    98      if err != nil { return nilItemSqliteService, err }
    99  
   100      err = initItemSqliteService(db, name, path)
   101      if err != nil { return nilItemSqliteService, err }
   102  
   103      msgSvc, err := newMessageSqliteService(db, name)
   104      if err != nil { return nilItemSqliteService, err }
   105  
   106      return itemSqliteService{
   107          dbname:  name,
   108          dbpath:  rawpath,
   109          db:      db,
   110          msgSvc:  msgSvc,
   111      }, nil
   112  }
   113  
   114  func (s itemSqliteService) CreateItem(newItem NewItem) error {
   115      err := func() error {
   116          tx, err := s.db.Begin()
   117          if err != nil { return err }
   118          defer tx.Rollback()
   119  
   120          i := itemSqliteNew{
   121              Priority:   newItem.Priority,
   122              Created:    newItem.Created.Unix(),
   123              RetryAfter: newItem.RetryAfter.Unix(),
   124          }
   125  
   126          query := `INSERT INTO `+ s.dbname +`.items(`+i.fields()+`) VALUES (`+i.placeholders()+`)`
   127  
   128          result, err := tx.Exec(query, i.values()...)
   129          if err != nil { return err }
   130  
   131          if n, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
   132              return fmt.Errorf("rows affected %d != 1", n)
   133          }
   134  
   135          itemID, err := result.LastInsertId()
   136          if err != nil { return err }
   137  
   138          err = s.msgSvc.Create(tx, ItemID(itemID), newItem.Message)
   139          if err != nil { return err }
   140  
   141          err = tx.Commit()
   142          if err != nil { return err }
   143  
   144          return nil
   145      }()
   146  
   147      if err != nil {
   148          return fmt.Errorf("error inserting new item: %+v", err)
   149      }
   150  
   151      return nil
   152  }
   153  
   154  func (s itemSqliteService) PeekItems(
   155      n int,
   156      minPriority int,
   157      due time.Time,
   158      excluding []ItemID,
   159  ) ([]Item, error) {
   160      items, err := func () ([]Item, error) {
   161          var i itemSqlite
   162  
   163          query := `
   164          SELECT
   165              `+ i.fields() +`
   166          FROM
   167              `+ s.dbname +`.items,
   168              `+ s.dbname +`.messages
   169          WHERE
   170              items.id = messages.id
   171              AND
   172                  items.priority >= ?
   173              AND
   174                  items.retryAfter <= ?
   175              AND
   176                  items.id NOT IN (`+ sqlp.RepeatString("?", len(excluding)) +`)
   177          ORDER BY
   178              items.priority DESC,
   179              items.retryAfter,
   180              items.created,
   181              items.id
   182          LIMIT ?`
   183  
   184          args := variadic.FlattenExcludingNils(minPriority, due.Unix(), excluding, n)
   185          rows, err := s.db.Query(query, args...)
   186          if err != nil { return nil, err }
   187          defer rows.Close()
   188  
   189          items := make([]Item, 0, n)
   190  
   191          for rows.Next() {
   192              if err := rows.Scan(i.pointers()...); err != nil {
   193                  return nil, err
   194              }
   195              items = append(items, i.toItem())
   196          }
   197          if err := rows.Err(); err != nil {
   198              return nil, err
   199          }
   200  
   201          return items, nil
   202      }()
   203  
   204      if err != nil {
   205          return nil, fmt.Errorf("error selecting items: %+v", err)
   206      }
   207  
   208      return items, nil
   209  }
   210  
   211  func (s itemSqliteService) RetryItem(id ItemID, priority int, due time.Time, attempt int) error {
   212      query := `
   213          UPDATE
   214              `+ s.dbname +`.items
   215          SET
   216              priority = ?,
   217              retryAfter = ?,
   218              attempts = ?,
   219          WHERE
   220              id = ?
   221      `
   222  
   223      result, err := s.db.Exec(query, priority, due.Unix(), attempt, id)
   224      if err != nil {
   225          return fmt.Errorf("error updating %s item %d: %+v", s.dbname, id, err)
   226      }
   227      if _, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
   228          return fmt.Errorf("error updating %s item %d: rows affected != 1", s.dbname, id)
   229      }
   230      return nil
   231  }
   232  
   233  func (s itemSqliteService) DeleteItem(id ItemID) error {
   234      query := `
   235          DELETE FROM
   236              `+ s.dbname +`.items
   237          WHERE
   238              id = ?
   239      `
   240  
   241      result, err := s.db.Exec(query, id)
   242      if err != nil {
   243          return fmt.Errorf("error deleting %s item %d: %+v", s.dbname, id, err)
   244      }
   245      if _, ok := sqlp.RowsAffectedBetween(result, 1, 1); !ok {
   246          return fmt.Errorf("error deleting %s item %d: rows affected != 1", s.dbname, id)
   247      }
   248      return nil
   249  }
   250  
   251  func (s itemSqliteService) Close() error {
   252      _, err := s.db.Exec(`DETACH DATABASE `+ s.dbname)
   253      if err != nil {
   254          return fmt.Errorf("error closing %s.item table: %+v", s.dbname, err)
   255      }
   256      return nil
   257  }
   258  
   259  func (s itemSqliteService) Delete() error {
   260      if !strings.HasPrefix(s.dbpath, ":memory:") {
   261          return os.Remove(s.dbpath)
   262      }
   263      return nil
   264  }
   265  

View as plain text