Source file
src/tawesoft.co.uk/go/loader/loader.go
Documentation:
src/tawesoft.co.uk/go/loader/loader.go
1 package loader
2
3 import (
4 "fmt"
5 "runtime"
6 "time"
7 )
8
9 type defaultStrategyT struct {}
10 var defaultStrategy *defaultStrategyT
11 func (s *defaultStrategyT) Start(_ interface{}) bool { return true }
12 func (s *defaultStrategyT) End(_ interface{}) {}
13
14
15
16 type Progress struct {
17 Completed int
18 Remaining int
19 Total int
20 Done bool
21 }
22
23 type workResult struct {
24 value interface{}
25 err error
26 }
27
28
29
30
31 type Loader struct {
32 namedResults map[string]workResult
33 progress Progress
34 consumers []consumer
35 seqConsumers []*seqConsumer
36 dag dag
37 resultsChan chan consumerWorkResult
38 }
39
40
41 func New() *Loader {
42
43
44
45
46
47 resultBufferSize := 64 * 4 * runtime.NumCPU();
48
49 l := &Loader{
50 namedResults: make(map[string]workResult),
51 resultsChan: make(chan consumerWorkResult, resultBufferSize),
52
53 }
54
55
56 l.NewConsumer(0, nil)
57
58 return l
59 }
60
61
62
63 func (l *Loader) Result(name string) (workResult, bool) {
64 value, exists := l.namedResults[name]
65 return value, exists
66 }
67
68 func (l *Loader) MustResult(name string) interface{} {
69 return l.namedResults[name].value
70 }
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 func (l *Loader) NewConsumer(concurrency int, strategy Strategy) ConsumerID {
86 if strategy == nil {
87 strategy = defaultStrategy
88 }
89
90 var c consumer
91 id := ConsumerID(len(l.consumers))
92
93 if concurrency > 0 {
94 mtc := &mtConsumer{
95 id: id,
96 concurrency: concurrency,
97 strategy: strategy,
98 pending: make([]int, 0),
99 current: -1,
100
101 jobs: make(chan int),
102 }
103
104 go mtc.manager(&l.dag, l.resultsChan)
105 c = mtc
106 } else {
107 seqc := &seqConsumer{
108 id: id,
109 strategy: strategy,
110 pending: make([]int, 0),
111 current: -1,
112 }
113 c = seqc
114 l.seqConsumers = append(l.seqConsumers, seqc)
115 }
116
117 l.consumers = append(l.consumers, c)
118 return id
119 }
120
121 func (l *Loader) sendPendingTasksToConsumers() []consumerWorkResult {
122 var results []consumerWorkResult
123
124 for _, idx := range l.dag.pending {
125 task := l.dag.nodes[idx]
126 consumer := l.consumers[task.Consumer]
127 rs := consumer.sendTask(idx, l.resultsChan)
128 if rs != nil {
129 results = append(results, rs...)
130 }
131 }
132
133
134 l.dag.pending = l.dag.pending[:0]
135
136 return results
137 }
138
139
140 func (l *Loader) registerResult(idx int, result interface{}, err error) error {
141 l.progress.Remaining--
142 l.progress.Completed++
143
144 if err == nil {
145 l.dag.registerResult(idx, result)
146 } else {
147
148 l.dag.registerResult(idx, nil)
149 }
150
151
152
153 task := l.dag.nodes[idx]
154 if task.Keep {
155 name := task.Name
156 _, alreadyExists := l.namedResults[name]
157 if alreadyExists {
158 return fmt.Errorf("kept named result %q already exists (but must be unique)", name)
159 }
160 l.namedResults[name] = workResult{
161 value: result,
162 err: err,
163 }
164 }
165
166 return nil
167 }
168
169
170
171
172
173 func (l *Loader) Load(budget time.Duration) (Progress, error) {
174
175 start := time.Now()
176
177 outer:
178 for {
179 earlyResults := l.sendPendingTasksToConsumers()
180
181
182 for _, c := range l.seqConsumers {
183 c.manage(&l.dag, &l.dag.nodes, l.registerResult)
184 }
185
186 if earlyResults != nil {
187 for _, result := range earlyResults {
188 l.registerResult(result.idx, result.result, result.err)
189 }
190 earlyResults = nil
191 }
192
193
194 select {
195 case result := <- l.resultsChan:
196 l.registerResult(result.idx, result.result, result.err)
197 default:
198
199 break outer
200 }
201
202
203 elapsed := time.Now().Sub(start)
204 if elapsed >= budget { break }
205 }
206
207 l.progress.Done = (l.progress.Remaining == 0)
208
209 return l.progress, nil
210 }
211
212
213 func (l *Loader) LoadAll() (Progress, error) {
214
215 for l.progress.Remaining > 0 {
216 earlyResults := l.sendPendingTasksToConsumers()
217
218
219 for _, c := range l.seqConsumers {
220 c.manage(&l.dag, &l.dag.nodes, l.registerResult)
221 }
222
223 if earlyResults != nil {
224 for _, result := range earlyResults {
225 l.registerResult(result.idx, result.result, result.err)
226 }
227 earlyResults = nil
228 }
229
230
231 if l.progress.Remaining == 0 { break }
232
233
234 select {
235 case result := <- l.resultsChan:
236 l.registerResult(result.idx, result.result, result.err)
237 }
238 }
239
240 l.progress.Done = true
241 return l.progress, nil
242 }
243
244
245 func (l *Loader) Close() {
246
247 }
248
249
250
251
252
253
254
255
256 func (l *Loader) identify(idx int) {
257
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271 func (l *Loader) Add(tasks []Task) error {
272 err := l.dag.add(tasks)
273 l.progress.Remaining += len(l.dag.nodes)
274 l.progress.Total += len(l.dag.nodes)
275 l.progress.Done = false
276 return err
277 }
278
279
280
281 type Task struct {
282
283
284
285
286 Name string
287
288
289
290
291
292
293 Keep bool
294
295
296
297
298
299
300
301 Load func(results ... interface{}) (interface{}, error)
302
303
304
305 Free func(i interface{})
306
307
308
309
310
311
312 RequiresNamed []string
313
314
315
316
317 RequiresDirect []Task
318
319
320
321
322
323 Consumer ConsumerID
324
325
326
327
328 Info func() interface{}
329 }
330
331
332
333 func (t *Task) info() interface{} {
334 if t.Info != nil { return t.Info() }
335 return nil
336 }
337
338
339
340
341
342 func NamedTask(name string) Task {
343 return Task{
344 Name: fmt.Sprintf("<loader NamedTask(%q) passthrough>", name),
345 RequiresNamed: []string{name},
346 Load: func(results ... interface{}) (interface{}, error) {
347 if len(results) < 1 { return "<NamedTask TODO>", nil }
348 return results[0], nil
349 },
350 Consumer: 0,
351 }
352 }
353
354
View as plain text