...
Tawesoft Logo

Source file src/tawesoft.co.uk/go/loader/loader.go

Documentation: src/tawesoft.co.uk/go/loader/loader.go

     1  package loader
     2  
     3  import (
     4      "fmt"
     5      "runtime"
     6      "time"
     7  )
     8  
     9  type defaultStrategyT struct {}
    10  var defaultStrategy *defaultStrategyT
    11  func (s *defaultStrategyT) Start(_ interface{}) bool { return true }
    12  func (s *defaultStrategyT) End(_ interface{}) {}
    13  
    14  // Progress holds the result of a Loader's Load or LoadAll methods and
    15  // represents progress in completing all Task items.
    16  type Progress struct {
    17      Completed int
    18      Remaining int
    19      Total     int
    20      Done bool
    21  }
    22  
    23  type workResult struct {
    24      value interface{}
    25      err error
    26  }
    27  
    28  // Loader is used to manage a graph of Task items to be completed synchronously
    29  // or concurrently with different types of work divided among a set of
    30  // Consumer objects.
    31  type Loader struct {
    32      namedResults map[string]workResult // todo index instead
    33      progress Progress
    34      consumers []consumer // all consumers
    35      seqConsumers []*seqConsumer // subset
    36      dag         dag
    37      resultsChan chan consumerWorkResult
    38  }
    39  
    40  // Returns a new Loader
    41  func New() *Loader {
    42  
    43      // Maximum number of tasks that can complete without blocking between
    44      // successive calls to Load. Assume as low as 16 Hz between calls (~64ms),
    45      // and all tasks taking 0.25ms, assume ideal fully linear multithreaded
    46      // speedup by adding logical CPUs, and assume no other bottlenecks.
    47      resultBufferSize := 64 * 4 * runtime.NumCPU();
    48  
    49      l := &Loader{
    50          namedResults: make(map[string]workResult),
    51          resultsChan:  make(chan consumerWorkResult, resultBufferSize),
    52          //results: make(chan consumerWorkResult), // unbuffered
    53      }
    54  
    55      // Default sequential consumer with zero-value ConsumerID
    56      l.NewConsumer(0, nil)
    57  
    58      return l
    59  }
    60  
    61  // Returns a named result from a Task where Keep is true and the Name is unique
    62  // across all Tasks where Keep is True.
    63  func (l *Loader) Result(name string) (workResult, bool) {
    64      value, exists := l.namedResults[name]
    65      return value, exists
    66  }
    67  
    68  func (l *Loader) MustResult(name string) interface{} {
    69      return l.namedResults[name].value
    70  }
    71  
    72  // NewConsumer creates a task consumer that performs the (possibly
    73  // asynchronous) completion of tasks at a given level of concurrency (e.g.
    74  // number of goroutines) and returns an opaque ID that uniquely identifies that
    75  // consumer with the active Loader.
    76  //
    77  // A concurrency of zero means that the consumer's tasks will be performed
    78  // sequentially on the same thread as the Loader's Load() or LoadAll() methods.
    79  //
    80  // The strategy argument allows control over temporarily delaying a task.
    81  // Strategy may be nil to always accept.
    82  //
    83  // The special ConsumerID of zero corresponds to a default builtin consumer
    84  // that has a concurrency of zero and a nil strategy.
    85  func (l *Loader) NewConsumer(concurrency int, strategy Strategy) ConsumerID {
    86      if strategy == nil {
    87          strategy = defaultStrategy
    88      }
    89  
    90      var c consumer
    91      id := ConsumerID(len(l.consumers))
    92  
    93      if concurrency > 0 {
    94          mtc := &mtConsumer{
    95              id:          id,
    96              concurrency: concurrency,
    97              strategy:    strategy,
    98              pending:     make([]int, 0),
    99              current:     -1,
   100  
   101              jobs:       make(chan int), // maybe buffer of size c.concurrency
   102          }
   103  
   104          go mtc.manager(&l.dag, l.resultsChan)
   105          c = mtc
   106      } else {
   107          seqc := &seqConsumer{
   108              id: id,
   109              strategy:    strategy,
   110              pending:     make([]int, 0),
   111              current:     -1,
   112          }
   113          c = seqc
   114          l.seqConsumers = append(l.seqConsumers, seqc)
   115      }
   116  
   117      l.consumers = append(l.consumers, c)
   118      return id
   119  }
   120  
   121  func (l *Loader) sendPendingTasksToConsumers() []consumerWorkResult {
   122      var results []consumerWorkResult
   123  
   124      for _, idx := range l.dag.pending {
   125          task := l.dag.nodes[idx]
   126          consumer := l.consumers[task.Consumer]
   127          rs := consumer.sendTask(idx, l.resultsChan)
   128          if rs != nil {
   129              results = append(results, rs...)
   130          }
   131      }
   132  
   133      // empty pending tasks
   134      l.dag.pending = l.dag.pending[:0]
   135  
   136      return results
   137  }
   138  
   139  // TODO callers need to check this for error
   140  func (l *Loader) registerResult(idx int, result interface{}, err error) error {
   141      l.progress.Remaining--
   142      l.progress.Completed++
   143  
   144      if err == nil {
   145          l.dag.registerResult(idx, result)
   146      } else {
   147          // if err is not fatal, this can be a nil input for a parent task
   148          l.dag.registerResult(idx, nil)
   149      }
   150  
   151      // stash the result if its a kept task result with a unique name
   152      // TODO store the index instead
   153      task := l.dag.nodes[idx]
   154      if task.Keep {
   155          name := task.Name
   156          _, alreadyExists := l.namedResults[name]
   157          if alreadyExists {
   158              return fmt.Errorf("kept named result %q already exists (but must be unique)", name)
   159          }
   160          l.namedResults[name] = workResult{
   161              value: result,
   162              err:   err,
   163          }
   164      }
   165  
   166      return nil
   167  }
   168  
   169  // Load completes as many loading tasks as possible within the time budget. If
   170  // idle while waiting for concurrent results, it may return early.
   171  //
   172  // See also the LoadAll() method.
   173  func (l *Loader) Load(budget time.Duration) (Progress, error) {
   174  
   175      start := time.Now()
   176  
   177      outer:
   178      for {
   179          earlyResults := l.sendPendingTasksToConsumers()
   180  
   181          // while there is sequential work to be done...
   182          for _, c := range l.seqConsumers {
   183              c.manage(&l.dag, &l.dag.nodes, l.registerResult)
   184          }
   185  
   186          if earlyResults != nil {
   187              for _, result := range earlyResults {
   188                  l.registerResult(result.idx, result.result, result.err)
   189              }
   190              earlyResults = nil
   191          }
   192  
   193          // pick up any completed jobs from any consumer, without blocking
   194          select {
   195              case result := <- l.resultsChan:
   196                  l.registerResult(result.idx, result.result, result.err)
   197              default:
   198                  // no progress possible yet, so break early
   199                  break outer
   200          }
   201  
   202          // continue unless budget exceeded...
   203          elapsed := time.Now().Sub(start)
   204          if elapsed >= budget { break }
   205      }
   206  
   207      l.progress.Done = (l.progress.Remaining == 0)
   208  
   209      return l.progress, nil
   210  }
   211  
   212  // LoadAll completes all loading tasks and blocks until finished
   213  func (l *Loader) LoadAll() (Progress, error) {
   214  
   215      for l.progress.Remaining > 0 {
   216          earlyResults := l.sendPendingTasksToConsumers()
   217  
   218          // while there is sequential work to be done...
   219          for _, c := range l.seqConsumers {
   220              c.manage(&l.dag, &l.dag.nodes, l.registerResult)
   221          }
   222  
   223          if earlyResults != nil {
   224              for _, result := range earlyResults {
   225                  l.registerResult(result.idx, result.result, result.err)
   226              }
   227              earlyResults = nil
   228          }
   229  
   230          // if there are still concurrent tasks left...
   231          if l.progress.Remaining == 0 { break }
   232  
   233          // then any further progress is blocked on a concurrent task
   234          select {
   235              case result := <- l.resultsChan:
   236                  l.registerResult(result.idx, result.result, result.err)
   237          }
   238      }
   239  
   240      l.progress.Done = true
   241      return l.progress, nil
   242  }
   243  
   244  // Close TODO
   245  func (l *Loader) Close() {
   246      // TODO
   247  }
   248  
   249  // identify returns a fully qualified string representation of a task's
   250  // position in the hierarchy of tasks e.g. for locating errors.
   251  //
   252  // For example, "parentTask1.parentTask2.<anonymous-task>.taskName"
   253  //
   254  // The string should be considered opaque but human-readable. Do not rely on
   255  // the format being fixed or parsable.
   256  func (l *Loader) identify(idx int) {
   257  
   258  }
   259  
   260  // Add adds a graph of Tasks to the Loader to later process with its Load() or
   261  // LoadAll() methods. Tasks may be added even during or after a Load() loop.
   262  //
   263  // Because Task names, even at the top level of the array, are scoped to this
   264  // function, Two Tasks across a Loader Add() boundary cannot refer to each
   265  // other by name. If this behaviour is desired, append to a Task array and send
   266  // the combined array in one call to Add.
   267  //
   268  // An error is generated if a named Task requirement is not in scope. In this
   269  // event, the state of the task dependency graph is undefined and no methods
   270  // on the Loader, other than Close, may be called.
   271  func (l *Loader) Add(tasks []Task) error {
   272      err := l.dag.add(tasks)
   273      l.progress.Remaining += len(l.dag.nodes)
   274      l.progress.Total     += len(l.dag.nodes)
   275      l.progress.Done = false
   276      return err
   277  }
   278  
   279  // Task is a (possibly recursively nested) unit of work that is to be performed
   280  // (possibly concurrently), subject to dependencies and limits.
   281  type Task struct {
   282      // Optional name of the task. Used to reference a task as a dependency of
   283      // another and to retrieve its result, if kept. Does not have to be unique
   284      // (it is scoped to its subtasks and successor siblings in one call to a
   285      // Loader Add method), unless it is kept (see the Keep field, below).
   286      Name string
   287  
   288      // Keep indicates that the task's Load() result will be available from
   289      // the Loader Result() method by the task Name. If Keep is true, the
   290      // task's Name must be globally unique across all Tasks kept by the loader.
   291      // If Keep is false, the task's name does not have to be unique, even if a
   292      // kept Task has the same name.
   293      Keep bool
   294  
   295      // Load performs the (possibly asynchronous) completion of the task e.g.
   296      // reading a file from disk, a unit of computation, etc.
   297      //
   298      // The results argument is the ordered results of the tasks in
   299      // RequiresNamed (if any) followed by the ordered results of the tasks in
   300      // RequiresDirect (if any).
   301      Load func(results ... interface{}) (interface{}, error)
   302  
   303      // Free performs the (possibly asynchronous) removal of a task's Load()
   304      // result e.g. releasing memory. May be nil.
   305      Free func(i interface{})
   306  
   307      // RequiresNamed is an array of names of tasks required to complete first
   308      // as a dependency of this task. May be nil.
   309      //
   310      // Note that a required named task must be defined before a task can depend
   311      // on it (e.g. by appearing earlier in the array passed to Loader Add()).
   312      RequiresNamed []string
   313  
   314      // RequiresDirect is an array of tasks required to complete first as a
   315      // dependency of this task. May be nil. These "subtasks" are in a new scope
   316      // for naming purposes.
   317      RequiresDirect []Task
   318  
   319      // Consumer performs the asynchronous completion of tasks at a given level
   320      // of concurrency. Use an ID returned by a Loader Consumer() method. May be
   321      // zero, in which case the task is completed in the same thread as the
   322      // caller Loader's Load() or LoadAll() methods.
   323      Consumer ConsumerID
   324  
   325      // Info returns some value for a task understood by the given Strategy.
   326      // May be nil. Must be a constant function i.e. always return the same
   327      // value for a given Task.
   328      Info func() interface{}
   329  }
   330  
   331  // info() calls a Task's Info() method, or if not defined acts as if the
   332  // Info() method does nothing except return nil.
   333  func (t *Task) info() interface{} {
   334      if t.Info != nil { return t.Info() }
   335      return nil
   336  }
   337  
   338  // NamedTask is used to reference another task by name as a subtask i.e. in
   339  // a Task's RequiresDirect instead of RequiresNamed.
   340  //
   341  // TODO NamedTasks should be simplified in the DAG to remove the node entirely.
   342  func NamedTask(name string) Task {
   343      return Task{
   344          Name: fmt.Sprintf("<loader NamedTask(%q) passthrough>", name),
   345          RequiresNamed: []string{name},
   346          Load: func(results ... interface{}) (interface{}, error) {
   347              if len(results) < 1 { return "<NamedTask TODO>", nil } // panic("oops") }
   348              return results[0], nil
   349          },
   350          Consumer: 0, // special maybe -1 for high priority
   351      }
   352  }
   353  
   354  

View as plain text