Patterns and Hints for Concurrency in Go -- Replicated service client
ReplicatedClient
ReplicatedClient interface
1
2
3
4
5
6
7
8
9
10
11
12
13type ReplicatedClient interface {
// Init initializes the client to use the given servers.
// To make a particular request later,
// the client can use callOne(srv, args),
// where srv is one of the servers from the list.
Init(servers []string, callOne func(string, Args) Reply)
// Call makes a request an available server.
// Multiple goroutines may call Call concurrently.
// Find an available server and keep using the same server
// until it becomes unavailable.
Call(args Args) Reply
}Init and Call function implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26type Client struct {
servers []string
callOne func(string, Args) Reply
mu sync.Mutex
//server id of the last successful request
prefer int
}
func (c *Client) Init(servers []string, callOne func(string, Args) Reply){
c.servers = servers
c.callOne = callOne
}
func (c *Client) Call(args Args) Reply{
type result struct {
serverID int
reply Reply
}
done := make(chan result, 1)
id := ...
go func() {
done <- result{id, c.callOne(c.servers[id], args)}
}()
}Hint: Use a mutex if that is the clearest way to write the code.
Call function failed
If Call function takes too long that means it failed. In the case of the timeout we’re going to need to try a different server.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36func (c *Client) Call(args Args) Reply{
type result struct {
serverID int
reply Reply
}
const timeout = 1 * time.Second
t := time.NewTimer(timeout)
defer t.Stop()
// 可能会同时发出去多个请求,但是只要一个请求成功ok就行
// 这里使用buffer channel的原因接收其他的返回请求
// 避免一直阻塞在goroutine
done := make(chan result, len(c.servers))
for id := 0; id < len(c.servers); id++{
id := id
log.Println("try to call", id)
go func() {
res := <- result{id, c.callOne(c.servers[id], args)}
done <- res
defer log.Println("goroutine call return", res)
}()
select{
case r := <-done:
return r.reply
case <-t.C:
//timeout
t.Reset(timeout)
}
}
//如果全部超时的话则会阻塞住
r := <-done
return r.reply
}- Hint: Stop timers you don’t need. Otherwise you might end up with a lot of timers in a timer queue.
prefer last available server
- Find an available server and keep using the same server until it becomes unavailable.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42func (c *Client) Call(args Args) Reply{
type result struct {
serverID int
reply Reply
}
const timeout = 1 * time.Second
t := time.NewTimer(timeout)
defer t.Stop()
c.mu.Lock()
prefer := c.prefer
c.mu.Unlock()
done := make(chan result, len(c.servers))
var r result
for id := 0; id < len(c.servers); id++{
id := (prefer + id) % len(c.servers)
log.Println("try to call", id)
go func() {
res := result{id, c.callOne(c.servers[id], args)}
done <- res
defer log.Println("goroutine call finish, return", res)
}()
select{
case r = <-done:
goto Done
case <-t.C:
//timeout
t.Reset(timeout)
}
}
r = <-done
Done:
c.mu.Lock()
c.prefer = r.serverID
c.mu.Unlock()
return r.reply
}
使用示例
- 使用示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14func main() {
servers := []string{"1", "2", "3", "4", "5"}
callOne := func(server string, args Args) Reply{
if server != "5"{
time.Sleep(2*time.Second)
}
return "yes"
}
var c Client
c.Init(servers, callOne)
log.Println("call return:", c.Call("args"))
log.Println("finish")
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 GreenHatHGのBlog!
评论