...
Tawesoft Logo

Source file src/tawesoft.co.uk/go/loader/consumer.go

Documentation: src/tawesoft.co.uk/go/loader/consumer.go

     1  package loader
     2  
     3  // Strategy allows temporarily delaying of a task based on other currently
     4  // progressing tasks e.g. see the examples folder for an implementation that
     5  // avoids multiple concurrent connections to a single host.
     6  //
     7  // Note: do not share the same instance of a Strategy across two consumers
     8  // without making it thread safe e.g. use of mutexes.
     9  //
    10  // Note: the Strategy must give a constant answer for the same sequence of
    11  // Start and End methods and arguments i.e. must not depend on an external
    12  // factor such as time or user input but must only depend on some innate
    13  // property of the currently accepted tasks.
    14  //
    15  // Note: if not used carefully, a Strategy, or the interaction between two
    16  // Strategies, may lead to deadlocks. This may be the case if there is any
    17  // construction of a Task dependency graph (considering only those Tasks that
    18  // may be delayed by a Strategy), or any subgraph thereof formidable by
    19  // removing leaf nodes, where the Strategy's lower bound on the number of
    20  // simultaneous tasks is less than or equal to the number of leaf nodes minus
    21  // the number of vertex disjoint subgraphs of that graph.
    22  type Strategy interface {
    23      // Start takes the result of a task Info() and returns true if the task is
    24      // accepted, or false if the task must be temporarily delayed.
    25      Start(info interface{}) bool
    26  
    27      // End takes the result of a task Info() and registers that the task has
    28      // completed processing.
    29      End(info interface{})
    30  }
    31  
    32  // ConsumerID uniquely identifies a consumer in a given Loader.
    33  //
    34  // See the NewConsumer() method on the Loader type.
    35  type ConsumerID int
    36  
    37  type consumer interface {
    38      sendTask(idx int, resultChan chan consumerWorkResult) []consumerWorkResult
    39  }
    40  
    41  type consumerWorkResult struct {
    42      idx int // task index
    43      result interface{} // result of Load() function
    44      err error // result of Load() function
    45  }
    46  
    47  // mtConsumer is a concurrent implementation
    48  type mtConsumer struct {
    49      id ConsumerID
    50      concurrency int
    51      strategy Strategy
    52      pending []int  // index of tasks to send to workers
    53      current int    // index of next task to immediately send to first available worker
    54      jobs chan int  // index of task sent to workers; < 0 to kill workers and consumer
    55  }
    56  
    57  func (c *mtConsumer) sendTask(idx int, resultChan chan consumerWorkResult) []consumerWorkResult {
    58      var results []consumerWorkResult
    59  
    60      // this thorny code is because a consumer might finish jobs even before
    61      // we're finished sending them, so we have to receive and queue at the
    62      // same time
    63  
    64      for {
    65          select {
    66              case c.jobs <- idx: // send
    67                 // OK
    68                 // fmt.Printf("send %d ok\n", idx)
    69                 return results
    70              case result := <- resultChan: // have to receive first
    71                  results = append(results, result)
    72                  // fmt.Printf("early recieve %d\n", result.idx)
    73          }
    74      }
    75  }
    76  
    77  func mtConsumerWorker(dag *dag, consumerID ConsumerID, workerID int, jobs <-chan int, results chan<- consumerWorkResult) {
    78      for j := range jobs {
    79          if j < 0 { break } // kill signal
    80  
    81          // fmt.Printf("worker (%d, %d) started job %d\n", consumerID, workerID, j)
    82  
    83          result, err := dag.nodes[j].Load(dag.inputs(j)...)
    84          //dag.results[j] = result
    85  
    86          results <- consumerWorkResult{
    87              idx: j,
    88              result: result,
    89              err: err,
    90          }
    91  
    92          // fmt.Printf("worker (%d, %d) returned job %d\n", consumerID, workerID, j)
    93      }
    94  }
    95  
    96  // manager sends and collects work and results from worker routines
    97  func (c *mtConsumer) manager(dag *dag, results chan<- consumerWorkResult) {
    98      workerJobs    := make(chan int) // maybe buffer of size c.concurrency
    99      workerResults := make(chan consumerWorkResult)
   100      availableWorkers := c.concurrency
   101  
   102      for i := 1; i <= c.concurrency; i++ {
   103          go mtConsumerWorker(dag, c.id, i, workerJobs, workerResults)
   104      }
   105  
   106      for {
   107  
   108          // block until a job or a result appears
   109          select {
   110              case idx := <- c.jobs:
   111                  if idx < 0 {
   112                      // kill
   113                      for i := 1; i <= c.concurrency; i++ {
   114                          workerJobs <- 0
   115                      }
   116                      return
   117                  }
   118                  c.pending = append(c.pending, idx)
   119              case result := <- workerResults:
   120                  // notify strategy we're done with this task
   121                  c.strategy.End(dag.nodes[result.idx].info())
   122  
   123                  // pass result back up to loader
   124                  // (we have to capture it first in order to wake)
   125                  availableWorkers++
   126                  results <- result
   127          }
   128  
   129          // nothing immediately queued, find the first acceptable pending task
   130          // according to strategy that is to become immediately queued
   131          if c.current < 0 {
   132              for i := 0; i < len(c.pending); i++ {
   133                  idx := c.pending[i]
   134                  task := dag.nodes[idx]
   135                  if c.strategy.Start(task.info()) {
   136                      c.current = idx
   137                      c.pending = intArrayDeleteElement(c.pending, i)
   138                      break
   139                  }
   140              }
   141          }
   142  
   143          // attempt to send immediately queued task, if any
   144          if (c.current >= 0) && (availableWorkers > 0) {
   145              // a worker is guaranteed to be ready (although in practice may be
   146              // a few nanoseconds away due to loop overhead), so block waiting
   147              // for it rather than use a non-blocking select
   148              workerJobs <- c.current
   149              c.current = -1
   150              availableWorkers--
   151          }
   152      }
   153  }
   154  
   155  // seqConsumer is a non-concurrent / sequential implementation
   156  type seqConsumer struct {
   157      id ConsumerID
   158      strategy Strategy
   159      pending []int  // index of tasks to send to workers
   160      current int    // index of next task to immediately send to first available worker
   161  }
   162  
   163  func (c *seqConsumer) sendTask(idx int, _ chan consumerWorkResult) []consumerWorkResult {
   164      c.pending = append(c.pending, idx)
   165      return nil
   166  }
   167  
   168  func (c *seqConsumer) manage(dag *dag, tasksByIndex *[]*Task, registerResult func(int, interface{}, error) error) {
   169      for len(c.pending) > 0 {
   170          // nothing immediately queued, find the first acceptable pending task
   171          // according to strategy that is to become immediately queued
   172          if c.current < 0 {
   173              for i := 0; i < len(c.pending); i++ {
   174                  idx := c.pending[i]
   175                  task := (*tasksByIndex)[idx]
   176                  if c.strategy.Start(task.info()) {
   177                      c.current = idx
   178                      c.pending = intArrayDeleteElement(c.pending, i)
   179                      break
   180                  }
   181              }
   182          }
   183  
   184          // run immediately queued task, if any
   185          if c.current >= 0 {
   186              task := (*tasksByIndex)[c.current]
   187              result, err := task.Load(dag.inputs(c.current)...)
   188              c.strategy.End((*tasksByIndex)[c.current].info())
   189  
   190              // dag.results[c.current] = result
   191              registerResult(c.current, result, err)
   192              c.current = -1
   193          }
   194      }
   195  }
   196  

View as plain text