use channel as synchronized queue

  • Scheduling n machines to execute m tasks, and each task may be different

    1
    2
    func Schedule(servers []string, numTask int, 
    call func(srv string, task int))
  • We can use a channel install of stack or queue, because it’s a good synchronized queue.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    func Schedule(servers []string, numTask int, call func(srv string, task int)){
    idle := make(chan string, len(servers))
    for _, srv := range servers{
    idle <- srv
    }

    for task := 0; task < numTask; task++{
    task := task
    go func() {
    srv := <-idle
    call(srv, task)
    idle <- srv
    }()
    }
    }
    • for循环会一下启动numTask个goroutine,然后等待获取空闲的server,如果只有几个server,那么性能会很浪费,所以我们可以把获取空闲的server代码放在goroutine外面,只有有空闲的server时候才创建gouroutine。
      1
      2
      3
      4
      5
      6
      7
      8
      for task := 0; task < numTask; task++{
      task := task
      srv := <-idle
      go func() {
      call(srv, task)
      idle <- srv
      }()
      }
  • Waiting for the task to finish

    1
    2
    3
    for i := 0; i < len(servers); i++{
    <-idle
    }

one goroutine per server

  • There’s only one goroutine that makes requests of a server at a particular time. So instead of having one goroutine per task maybe we should have one goroutine per server, because there are probably going to be fewer servers than tasks.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    func Schedule(servers []string, numTask int, call func(srv string, task int)){
    work := make(chan int)
    done := make(chan bool)

    runTasks := func(srv string) {
    for task := range work{
    call(srv, task)
    }
    done <- true
    }

    for _, srv := range servers{
    go runTasks(srv)
    }

    for task := 0; task < numTask; task++{
    work <- task
    }

    for i := 0; i < len(servers); i++{
    <-done
    }

    close(work)
    }

get new servers at any given time

  • In some cases, you might get new servers at any given time. So we can pass server channel instead of slice and put that loop into its own goroutine so that while we’re sending tasks to servers we can still accept new servers.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func Schedule(servers chan string, numTask int, 
    call func(srv string, task int)){
    //...
    go func() {
    for srv := range servers {
    go runTasks(srv)
    }
    }()
    //...
    }
  • Now we don’t know when all the servers are done because we don’t know how many servers there. We can count the number of tasks that have finished instead of counting number of servers come in because it’s a little tricky.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    runTasks := func(srv string) {
    for task := range work{
    call(srv, task)
    done <- true
    }
    }

    for i := 0; i < numTask; i++{
    <-done
    }
  • But we will run into deadlock. 当server完成一个task时候,执行done <- True,但是没有对应的channel接收,因为此时还处于work <- task循环中,所以修改也很简单,放到goroutine里面即可。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    go func() {
    for task := 0; task < numTask; task++{
    work <- task
    }
    close(work)
    }()

    for i := 0; i < numTask; i++{
    <-done
    }
    • But the simplest possible fix for this is to just make the work channel big enough that you’re never gonna run out of space.
      1
      work := make(chan int, numTask)

failed task and shut down

  • If the call returning a false just put the task back on the work list.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func Schedule(servers chan string, numTask int,
    call func(srv string, task int) bool){
    //...
    runTasks := func(srv string) {
    for task := range work{
    if call(srv, task){
    done <- true
    }else{
    work <- task
    }
    }
    }
    //...
    }
  • We have to do which is to shut down the loop that’s watching for new servers or you can run close(servers) when schedule function done.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    exit := make(chan struct{})

    go func() {
    for{
    select {
    case srv := <-servers:
    go runTasks(srv)
    case <- exit:
    log.Println("loop runTask finish")
    return
    }
    }
    }()

    close(work)
    exit <- struct{}{}

运行示例

  • 运行示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    package main

    import (
    "fmt"
    "log"
    )

    func Schedule(servers chan string, numTask int,
    call func(srv string, task int) bool){
    work := make(chan int, numTask)
    done := make(chan bool)
    exit := make(chan struct{})

    runTasks := func(srv string) {
    for task := range work{
    if call(srv, task){
    done <- true
    }else{
    work <- task
    }
    }
    }

    go func() {
    for{
    select {
    case srv := <-servers:
    go runTasks(srv)
    case <- exit:
    log.Println("loop runTask finish")
    return
    }
    }
    }()

    for task := 0; task < numTask; task++ {
    work <- task
    }

    for i := 0; i < numTask; i++{
    <-done
    }
    close(work)
    exit <- struct{}{}
    }

    func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    numTask := 1000
    numServer := 10
    servers := make(chan string)
    call := func(srv string, task int) bool{
    log.Println(srv, task)
    return true
    }

    go func() {
    for i := 0; i < numServer; i++{
    srv := fmt.Sprintf("srv-%d", i)
    servers <- srv
    }
    }()

    Schedule(servers, numTask, call)
    }