Golang笔记-Fan-in
来源:https://go.dev/talks/2012/concurrency.slide
The boring function runs, like a boring party guest.
1
2
3
4
5
6
7
8
9
10
11
12
13func boring(msg string) {
for i := 0; ; i++ {
fmt.Println(msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
go boring("boring!")
fmt.Println("I'm listening.")
time.Sleep(2 * time.Second)
fmt.Println("You're boring; I'm leaving.")
}A channel connects the main and boring goroutines so they can communicate.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func boring(msg string, c chan<- string) {
for i := 0; ; i++ {
// Expression to be sent can be any suitable value.
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
c := make(chan string)
go boring("boring!", c)
for i := 0; i < 5; i++{
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}Generator Pattern: function that returns a channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// Returns receive-only channel of strings.
func boring(msg string) <-chan string {
c := make(chan string)
// We launch the goroutine from inside the function.
go func() {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return c // Return the channel to the caller.
}
func main() {
c := boring("boring!") // Function returning a channel.
for i := 0; i < 5; i++{
fmt.Printf("You say: %q\n", <-c)
}
fmt.Println("You're boring; I'm leaving.")
}Multiplexing: Fan-in
1
2
3
4
5
6
7
8
9
10
11
12
13
14func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() { for { c <- <-input1 } }()
go func() { for { c <- <-input2 } }()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 10; i++{
fmt.Println(<-c)
}
fmt.Println("You're both boring; I'm leaving.")
}Fan-in using select
1
2
3
4
5
6
7
8
9
10
11
12func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}keep sequencing: Each speaker must wait for a go-ahead.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31type Message struct {
msg string
wait chan bool
}
func boring(msg string) <-chan Message {
rand.Seed(time.Now().UnixNano())
waitForIt := make(chan bool) // Shared between all messages.
c := make(chan Message)
go func() {
for i := 0; ; i++ {
c <- Message{ fmt.Sprintf("%s: %d", msg, i), waitForIt }
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-waitForIt
}
}()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 5; i++{
msg1 := <-c
fmt.Println(msg1.msg)
msg2 := <-c
fmt.Println(msg2.msg)
msg1.wait <- true
msg2.wait <- true
}
fmt.Println("You're both boring; I'm leaving.")
}Timeout:
1
2
3
4
5
6
7
8
9
10
11func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for{
select {
case s := <-c: fmt.Println(s.msg)
case <-time.After(1*time.Second):
fmt.Println("You're too slow.")
return
}
}
}Example -Google Search:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43var (
Web = fakeSearch("web")
Image = fakeSearch("image")
Video = fakeSearch("video")
)
type Result string
type Search func(query string) Result
func fakeSearch(kind string) Search {
return func(query string) Result {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return Result(fmt.Sprintf("%s result for %q\n", kind, query))
}
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- Web(query) } ()
go func() { c <- Image(query) } ()
go func() { c <- Video(query) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout: //Don't wait for slow servers.
fmt.Println("timed out")
return
}
}
return
}
func main() {
rand.Seed(time.Now().UnixNano())
start := time.Now()
results := Google("golang")
elapsed := time.Since(start)
fmt.Println(results)
fmt.Println(elapsed)
}- Avoid discarding results from slow servers: Replicate the servers. Send requests to multiple replicas, and use the first response. More
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func First(query string, replicas ...Search) Result {
c := make(chan Result)
searchReplica := func(i int) { c <- replicas[i](query) }
for i := range replicas {
go searchReplica(i)
}
return <-c
}
func Google(query string) (results []Result) {
c := make(chan Result)
go func() { c <- First(query, fakeSearch("web"), fakeSearch("web")) } ()
go func() { c <- First(query, fakeSearch("image"), fakeSearch("image")) } ()
go func() { c <- First(query, fakeSearch("video"), fakeSearch("video")) } ()
timeout := time.After(80 * time.Millisecond)
for i := 0; i < 3; i++ {
select {
case result := <-c:
results = append(results, result)
case <-timeout: //Don't wait for slow servers.
fmt.Println("timed out")
return
}
}
return
}
- Avoid discarding results from slow servers: Replicate the servers. Send requests to multiple replicas, and use the first response. More
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 GreenHatHGのBlog!
评论