...
Source file
src/tawesoft.co.uk/go/loader/consumer.go
Documentation:
src/tawesoft.co.uk/go/loader/consumer.go
1 package loader
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 type Strategy interface {
23
24
25 Start(info interface{}) bool
26
27
28
29 End(info interface{})
30 }
31
32
33
34
35 type ConsumerID int
36
37 type consumer interface {
38 sendTask(idx int, resultChan chan consumerWorkResult) []consumerWorkResult
39 }
40
41 type consumerWorkResult struct {
42 idx int
43 result interface{}
44 err error
45 }
46
47
48 type mtConsumer struct {
49 id ConsumerID
50 concurrency int
51 strategy Strategy
52 pending []int
53 current int
54 jobs chan int
55 }
56
57 func (c *mtConsumer) sendTask(idx int, resultChan chan consumerWorkResult) []consumerWorkResult {
58 var results []consumerWorkResult
59
60
61
62
63
64 for {
65 select {
66 case c.jobs <- idx:
67
68
69 return results
70 case result := <- resultChan:
71 results = append(results, result)
72
73 }
74 }
75 }
76
77 func mtConsumerWorker(dag *dag, consumerID ConsumerID, workerID int, jobs <-chan int, results chan<- consumerWorkResult) {
78 for j := range jobs {
79 if j < 0 { break }
80
81
82
83 result, err := dag.nodes[j].Load(dag.inputs(j)...)
84
85
86 results <- consumerWorkResult{
87 idx: j,
88 result: result,
89 err: err,
90 }
91
92
93 }
94 }
95
96
97 func (c *mtConsumer) manager(dag *dag, results chan<- consumerWorkResult) {
98 workerJobs := make(chan int)
99 workerResults := make(chan consumerWorkResult)
100 availableWorkers := c.concurrency
101
102 for i := 1; i <= c.concurrency; i++ {
103 go mtConsumerWorker(dag, c.id, i, workerJobs, workerResults)
104 }
105
106 for {
107
108
109 select {
110 case idx := <- c.jobs:
111 if idx < 0 {
112
113 for i := 1; i <= c.concurrency; i++ {
114 workerJobs <- 0
115 }
116 return
117 }
118 c.pending = append(c.pending, idx)
119 case result := <- workerResults:
120
121 c.strategy.End(dag.nodes[result.idx].info())
122
123
124
125 availableWorkers++
126 results <- result
127 }
128
129
130
131 if c.current < 0 {
132 for i := 0; i < len(c.pending); i++ {
133 idx := c.pending[i]
134 task := dag.nodes[idx]
135 if c.strategy.Start(task.info()) {
136 c.current = idx
137 c.pending = intArrayDeleteElement(c.pending, i)
138 break
139 }
140 }
141 }
142
143
144 if (c.current >= 0) && (availableWorkers > 0) {
145
146
147
148 workerJobs <- c.current
149 c.current = -1
150 availableWorkers--
151 }
152 }
153 }
154
155
156 type seqConsumer struct {
157 id ConsumerID
158 strategy Strategy
159 pending []int
160 current int
161 }
162
163 func (c *seqConsumer) sendTask(idx int, _ chan consumerWorkResult) []consumerWorkResult {
164 c.pending = append(c.pending, idx)
165 return nil
166 }
167
168 func (c *seqConsumer) manage(dag *dag, tasksByIndex *[]*Task, registerResult func(int, interface{}, error) error) {
169 for len(c.pending) > 0 {
170
171
172 if c.current < 0 {
173 for i := 0; i < len(c.pending); i++ {
174 idx := c.pending[i]
175 task := (*tasksByIndex)[idx]
176 if c.strategy.Start(task.info()) {
177 c.current = idx
178 c.pending = intArrayDeleteElement(c.pending, i)
179 break
180 }
181 }
182 }
183
184
185 if c.current >= 0 {
186 task := (*tasksByIndex)[c.current]
187 result, err := task.Load(dag.inputs(c.current)...)
188 c.strategy.End((*tasksByIndex)[c.current].info())
189
190
191 registerResult(c.current, result, err)
192 c.current = -1
193 }
194 }
195 }
196
View as plain text