ReplicatedClient

  • ReplicatedClient interface

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    type ReplicatedClient interface {
    // Init initializes the client to use the given servers.
    // To make a particular request later,
    // the client can use callOne(srv, args),
    // where srv is one of the servers from the list.
    Init(servers []string, callOne func(string, Args) Reply)

    // Call makes a request an available server.
    // Multiple goroutines may call Call concurrently.
    // Find an available server and keep using the same server
    // until it becomes unavailable.
    Call(args Args) Reply
    }
  • Init and Call function implementation

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    type Client struct {
    servers []string
    callOne func(string, Args) Reply
    mu sync.Mutex
    //server id of the last successful request
    prefer int
    }

    func (c *Client) Init(servers []string, callOne func(string, Args) Reply){
    c.servers = servers
    c.callOne = callOne
    }

    func (c *Client) Call(args Args) Reply{
    type result struct {
    serverID int
    reply Reply
    }

    done := make(chan result, 1)
    id := ...

    go func() {
    done <- result{id, c.callOne(c.servers[id], args)}
    }()
    }

    Hint: Use a mutex if that is the clearest way to write the code.

Call function failed

  • If Call function takes too long that means it failed. In the case of the timeout we’re going to need to try a different server.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    func (c *Client) Call(args Args) Reply{
    type result struct {
    serverID int
    reply Reply
    }

    const timeout = 1 * time.Second
    t := time.NewTimer(timeout)
    defer t.Stop()

    // 可能会同时发出去多个请求,但是只要一个请求成功ok就行
    // 这里使用buffer channel的原因接收其他的返回请求
    // 避免一直阻塞在goroutine
    done := make(chan result, len(c.servers))

    for id := 0; id < len(c.servers); id++{
    id := id
    log.Println("try to call", id)
    go func() {
    res := <- result{id, c.callOne(c.servers[id], args)}
    done <- res
    defer log.Println("goroutine call return", res)
    }()

    select{
    case r := <-done:
    return r.reply
    case <-t.C:
    //timeout
    t.Reset(timeout)
    }
    }
    //如果全部超时的话则会阻塞住
    r := <-done
    return r.reply
    }
    • Hint: Stop timers you don’t need. Otherwise you might end up with a lot of timers in a timer queue.

prefer last available server

  • Find an available server and keep using the same server until it becomes unavailable.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    func (c *Client) Call(args Args) Reply{
    type result struct {
    serverID int
    reply Reply
    }

    const timeout = 1 * time.Second
    t := time.NewTimer(timeout)
    defer t.Stop()

    c.mu.Lock()
    prefer := c.prefer
    c.mu.Unlock()

    done := make(chan result, len(c.servers))

    var r result
    for id := 0; id < len(c.servers); id++{
    id := (prefer + id) % len(c.servers)
    log.Println("try to call", id)
    go func() {
    res := result{id, c.callOne(c.servers[id], args)}
    done <- res
    defer log.Println("goroutine call finish, return", res)
    }()

    select{
    case r = <-done:
    goto Done
    case <-t.C:
    //timeout
    t.Reset(timeout)
    }
    }
    r = <-done
    Done:
    c.mu.Lock()
    c.prefer = r.serverID
    c.mu.Unlock()
    return r.reply
    }

使用示例

  • 使用示例
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func main() {
    servers := []string{"1", "2", "3", "4", "5"}
    callOne := func(server string, args Args) Reply{
    if server != "5"{
    time.Sleep(2*time.Second)
    }
    return "yes"
    }

    var c Client
    c.Init(servers, callOne)
    log.Println("call return:", c.Call("args"))
    log.Println("finish")
    }