来源:https://go.dev/talks/2012/concurrency.slide

  • The boring function runs, like a boring party guest.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func boring(msg string) {
    for i := 0; ; i++ {
    fmt.Println(msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }

    func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
    }
  • A channel connects the main and boring goroutines so they can communicate.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func boring(msg string, c chan<- string) {
    for i := 0; ; i++ {
    // Expression to be sent can be any suitable value.
    c <- fmt.Sprintf("%s %d", msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }

    func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++{
    fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
    }
  • Generator Pattern: function that returns a channel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // Returns receive-only channel of strings.
    func boring(msg string) <-chan string {
    c := make(chan string)
    // We launch the goroutine from inside the function.
    go func() {
    for i := 0; ; i++ {
    c <- fmt.Sprintf("%s %d", msg, i)
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
    }()
    return c // Return the channel to the caller.

    }

    func main() {
    c := boring("boring!") // Function returning a channel.
    for i := 0; i < 5; i++{
    fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
    }
  • Multiplexing: Fan-in
    image.png

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
    }

    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 10; i++{
    fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
    }
  • Fan-in using select

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
    for {
    select {
    case s := <-input1: c <- s
    case s := <-input2: c <- s
    }
    }
    }()
    return c
    }
  • keep sequencing: Each speaker must wait for a go-ahead.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    type Message struct {
    msg string
    wait chan bool
    }

    func boring(msg string) <-chan Message {
    rand.Seed(time.Now().UnixNano())
    waitForIt := make(chan bool) // Shared between all messages.
    c := make(chan Message)
    go func() {
    for i := 0; ; i++ {
    c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
    time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    <-waitForIt
    }
    }()
    return c
    }

    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i := 0; i < 5; i++{
    msg1 := <-c
    fmt.Println(msg1.msg)
    msg2 := <-c
    fmt.Println(msg2.msg)
    msg1.wait <- true
    msg2.wait <- true
    }
    fmt.Println("You're both boring; I'm leaving.")
    }
  • Timeout:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for{
    select {
    case s := <-c: fmt.Println(s.msg)
    case <-time.After(1*time.Second):
    fmt.Println("You're too slow.")
    return
    }
    }
    }
  • Example -Google Search:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    var (
    Web = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
    )

    type Result string
    type Search func(query string) Result

    func fakeSearch(kind string) Search {
    return func(query string) Result {
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
    }

    func Google(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- Web(query) } ()
    go func() { c <- Image(query) } ()
    go func() { c <- Video(query) } ()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
    select {
    case result := <-c:
    results = append(results, result)
    case <-timeout: //Don't wait for slow servers.
    fmt.Println("timed out")
    return
    }
    }
    return
    }

    func main() {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }
    • Avoid discarding results from slow servers: Replicate the servers. Send requests to multiple replicas, and use the first response. More
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      func First(query string, replicas ...Search) Result {
      c := make(chan Result)
      searchReplica := func(i int) { c <- replicas[i](query) }
      for i := range replicas {
      go searchReplica(i)
      }
      return <-c
      }

      func Google(query string) (results []Result) {
      c := make(chan Result)
      go func() { c <- First(query, fakeSearch("web"), fakeSearch("web")) } ()
      go func() { c <- First(query, fakeSearch("image"), fakeSearch("image")) } ()
      go func() { c <- First(query, fakeSearch("video"), fakeSearch("video")) } ()

      timeout := time.After(80 * time.Millisecond)
      for i := 0; i < 3; i++ {
      select {
      case result := <-c:
      results = append(results, result)
      case <-timeout: //Don't wait for slow servers.
      fmt.Println("timed out")
      return
      }
      }
      return
      }