Patterns and Hints for Concurrency in Go -- Work scheduler
use channel as synchronized queue
Scheduling n machines to execute m tasks, and each task may be different
1
2func Schedule(servers []string, numTask int,
call func(srv string, task int))We can use a channel install of stack or queue, because it’s a good synchronized queue.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15func Schedule(servers []string, numTask int, call func(srv string, task int)){
idle := make(chan string, len(servers))
for _, srv := range servers{
idle <- srv
}
for task := 0; task < numTask; task++{
task := task
go func() {
srv := <-idle
call(srv, task)
idle <- srv
}()
}
}- for循环会一下启动numTask个goroutine,然后等待获取空闲的server,如果只有几个server,那么性能会很浪费,所以我们可以把获取空闲的server代码放在goroutine外面,只有有空闲的server时候才创建gouroutine。
1
2
3
4
5
6
7
8for task := 0; task < numTask; task++{
task := task
srv := <-idle
go func() {
call(srv, task)
idle <- srv
}()
}
- for循环会一下启动numTask个goroutine,然后等待获取空闲的server,如果只有几个server,那么性能会很浪费,所以我们可以把获取空闲的server代码放在goroutine外面,只有有空闲的server时候才创建gouroutine。
Waiting for the task to finish
1
2
3for i := 0; i < len(servers); i++{
<-idle
}
one goroutine per server
There’s only one goroutine that makes requests of a server at a particular time. So instead of having one goroutine per task maybe we should have one goroutine per server, because there are probably going to be fewer servers than tasks.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25func Schedule(servers []string, numTask int, call func(srv string, task int)){
work := make(chan int)
done := make(chan bool)
runTasks := func(srv string) {
for task := range work{
call(srv, task)
}
done <- true
}
for _, srv := range servers{
go runTasks(srv)
}
for task := 0; task < numTask; task++{
work <- task
}
for i := 0; i < len(servers); i++{
<-done
}
close(work)
}
get new servers at any given time
In some cases, you might get new servers at any given time. So we can pass server channel instead of slice and put that loop into its own goroutine so that while we’re sending tasks to servers we can still accept new servers.
1
2
3
4
5
6
7
8
9
10func Schedule(servers chan string, numTask int,
call func(srv string, task int)){
//...
go func() {
for srv := range servers {
go runTasks(srv)
}
}()
//...
}Now we don’t know when all the servers are done because we don’t know how many servers there. We can count the number of tasks that have finished instead of counting number of servers come in because it’s a little tricky.
1
2
3
4
5
6
7
8
9
10runTasks := func(srv string) {
for task := range work{
call(srv, task)
done <- true
}
}
for i := 0; i < numTask; i++{
<-done
}But we will run into deadlock. 当server完成一个task时候,执行
done <- True
,但是没有对应的channel接收,因为此时还处于work <- task
循环中,所以修改也很简单,放到goroutine里面即可。1
2
3
4
5
6
7
8
9
10go func() {
for task := 0; task < numTask; task++{
work <- task
}
close(work)
}()
for i := 0; i < numTask; i++{
<-done
}- But the simplest possible fix for this is to just make the work channel big enough that you’re never gonna run out of space.
1
work := make(chan int, numTask)
- But the simplest possible fix for this is to just make the work channel big enough that you’re never gonna run out of space.
failed task and shut down
If the call returning a false just put the task back on the work list.
1
2
3
4
5
6
7
8
9
10
11
12
13
14func Schedule(servers chan string, numTask int,
call func(srv string, task int) bool){
//...
runTasks := func(srv string) {
for task := range work{
if call(srv, task){
done <- true
}else{
work <- task
}
}
}
//...
}We have to do which is to shut down the loop that’s watching for new servers or you can run close(servers) when schedule function done.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16exit := make(chan struct{})
go func() {
for{
select {
case srv := <-servers:
go runTasks(srv)
case <- exit:
log.Println("loop runTask finish")
return
}
}
}()
close(work)
exit <- struct{}{}
运行示例
运行示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65package main
import (
"fmt"
"log"
)
func Schedule(servers chan string, numTask int,
call func(srv string, task int) bool){
work := make(chan int, numTask)
done := make(chan bool)
exit := make(chan struct{})
runTasks := func(srv string) {
for task := range work{
if call(srv, task){
done <- true
}else{
work <- task
}
}
}
go func() {
for{
select {
case srv := <-servers:
go runTasks(srv)
case <- exit:
log.Println("loop runTask finish")
return
}
}
}()
for task := 0; task < numTask; task++ {
work <- task
}
for i := 0; i < numTask; i++{
<-done
}
close(work)
exit <- struct{}{}
}
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
numTask := 1000
numServer := 10
servers := make(chan string)
call := func(srv string, task int) bool{
log.Println(srv, task)
return true
}
go func() {
for i := 0; i < numServer; i++{
srv := fmt.Sprintf("srv-%d", i)
servers <- srv
}
}()
Schedule(servers, numTask, call)
}