PubSub interface

  • A classic publish subscribe server operates in three ways: subscribe、publish、cancel.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    type PubSub interface {
    // publishes the event e to all current subscriptions.
    Publish (e Event)

    // registers c to receive future events.
    // if Publish(e1) happens before Publish(e2),
    // subscribers receive e1 before e2.
    Subscribe (c chan<- Event)

    // cancels the prior subscription of channel c.
    Cancel(c chan<- Event)
    }
  • A simple implementation is as follows:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    type Server struct {
    mu sync.Mutex
    // key of map represents the channel
    // used to receive the event.
    sub map[chan <- Event]bool
    }

    func (s *Server)Init(){
    s.sub = make(map[chan<-Event]bool)
    }

    func (s *Server)Publish(e Event){
    s.mu.Lock()
    defer s.mu.Unlock()

    for c := range s.sub{
    c <- e
    }
    }

    func (s *Server)Subscribe(c chan<- Event){
    s.mu.Lock()
    defer s.mu.Unlock()

    if s.sub[c]{
    panic("pubsub: already subscribed")
    }
    s.sub[c] = true
    }

    func (s *Server)Cancel(c chan<- Event){
    s.mu.Lock()
    s.mu.Unlock()

    if !s.sub[c]{
    panic("pubsub: not subscribed")
    }
    close(c)
    delete(s.sub, c)
    }
  • Hint: Prefer defer for unlocking mutexes

One slow subscriber can slow down everyone

  • If one subscriber falls behind, the next subscriber doesn’t get the event until that slow subscriber wakes up. So one slow subscriber can slow down everyone.
    • One way is to use channel buffer.
      As long as they’re not too far behind When you’re publishing. The new event always go into the channel buffer and then the actual publish won’t block for too long.
    • There’s a better way to deal with arbitrarily slow subscribers in a really big program.

Options for slow goroutines

  • Slow down event generation
    Publish stops until the subscribers catch up.

  • Coalesce or drop events

    • The subscriber might find out that it missed some events. No one knows what they were because didn’t save them but at least subscriber can see how many events are missing and maybe can do something try to catch up.
    • Profiler:
      • There’s a separate goroutine that fills the profile on a signal handler. Whose job is to read the data back out and write it to disk or send it to a http request or whatever it is you’re doing with profile data.
      • There’s a buffer in the middle and if the receiver from the profile data falls behind when the buffer fills up we start adding entries to a final profile by call runtime.lost function.
      • if you go look at the profile you see the program spent five percent of its time in lost profile data. we’re clear about exactly the error rate is in the profile.
    • OS signal package:
      • You have to pass in a channel that will be ready to receive the signal like SIGHUP or SIGQUIT.
      • When the signal comes in the run time tries to send to each of the channels and if it can’t send to it, it’s just gone.
      • The callers have to do is they have to pass in a buffered channel which length at least one and they only register that channel to a single signal.
      • If a signal comes in you’re definitely going to get told about it if it comes in twice you might only get told about it once. That’s actually the same semantics that unix gives to processes for signals.
  • Save all the events that slow subscriber hasn’t seen

    • There’s always slow computers that have fallen offline or whatever and they might be gone for a while.
    • In general you want to think very carefully before you do that and think well you know how unbounded is it really and can I tolerate.

Implement the third option

  • start a new goroutine handles requests three channels:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    type Server struct {
    publish chan Event
    subscribe chan subReq
    cancel chan subReq
    }

    type subReq struct {
    c chan<- Event
    ok chan bool
    }

    func (s *Server)Init(){
    s.publish = make(chan Event)
    s.subscribe = make(chan subReq)
    s.cancel = make(chan subReq)
    go s.loop()
    }
  • Can’t ever have a publish and subscribe happening at the same time because it’s just single threaded code.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    func (s *Server) loop(){
    sub := make(map[chan<- Event]bool)

    for{
    select {
    case e := <-s.publish:
    for c := range sub {
    c <- e
    }
    case req := <-s.subscribe:
    if sub[req.c] {
    req.ok <- false
    // only break select
    break
    }
    sub[req.c] = true
    req.ok <- true
    case req := <-s.cancel:
    if !sub[req.c] {
    req.ok <- false
    break
    }
    close(req.c)
    delete(sub, req.c)
    req.ok <- true
    }
    }
    }

    func (s *Server)Publish(e Event){
    s.publish <- e
    }

    func (s *Server)Subscribe(c chan<- Event){
    req := subReq{
    c: c,
    ok: make(chan bool),
    }
    s.subscribe <- req
    if ! <- req.ok{
    panic("pubsub: already subscribed")
    }
    }

    func (s *Server)Cancel(c chan<- Event){
    req := subReq{
    c: c,
    ok: make(chan bool),
    }
    s.cancel <- req
    if ! <- req.ok{
    panic("pubsub: not subscribed")
    }
    }
  • Convert mutexes into goroutines when it makes programs clearer. But implement raft probably prefer the state with the mutex is because raft is so different from most concurrent programs and that like each replica is just kind of profoundly uncertain of its state.

  • Deal with the slow subscribers, keep the overall program from blocking. Main loop goroutine will send the events to the helper.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func helper(in <-chan Event, out chan<- Event){
    var q []Event
    for{
    select {
    case e := <-in:
    q = append(q, e)
    case out <- q[0]:
    q = q[:1]
    }
    }
    }
    • Improvement: if queue is empty send to empty channel
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      func helper(in <-chan Event, out chan<- Event){
      var q []Event
      for{
      // Decide whether and what to send.
      var sendOut chan<- Event
      var next Event
      if len(q) > 0{
      sendOut = out
      next = q[0]
      }
      select {
      case e := <-in:
      q = append(q, e)
      case sendOut <- next:
      q = q[:1]
      }
      }
      }
    • Improvement: the input channel closes and queue is finally empty we can exit
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      func helper(in <-chan Event, out chan<- Event){
      var q []Event
      for in != nil || len(q) > 0{
      // Decide whether and what to send.
      var sendOut chan<- Event
      var next Event
      if len(q) > 0{
      sendOut = out
      next = q[0]
      }
      select {
      case e, ok := <-in:
      if !ok{
      in = nil // stop receiving from in
      break
      }
      q = append(q, e)
      fmt.Println("in")
      case sendOut <- next:
      q = q[:1]
      fmt.Printf("out")
      }
      }
      close(out)
      }
  • Every time we get a new subscription we make a helper channel. If you wanted to have a different strategy for clients that fall too far behind that can modify the code in the helper.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    func (s *Server) loop(){
    //map from subscribe channel to helper in channel
    sub := make(map[chan<- Event]chan<- Event)
    for{
    select {
    case e := <-s.publish:
    for _, helperIn := range sub {
    helperIn <- e
    }

    case req := <-s.subscribe:
    if sub[req.c] != nil {
    req.ok <- false
    break
    }
    helperIn := make(chan Event)
    go helper(helperIn, req.c)
    sub[req.c] = helperIn
    req.ok <- true

    case req := <-s.cancel:
    if sub[req.c] != nil{
    req.ok <- false
    break
    }
    close(sub[req.c])
    delete(sub, req.c)
    req.ok <- true
    }
    }
    }
  • Hint: you can use goroutines a lot of the time to separate independent concerns.

运行示例

  • 在上文Russ Cox的代码中,Subscribe的参数使用的是单向channel,但是在实际中subscribe后还应该把值拿出来处理,所以参数改为双向channel就好了。
    (对于chan<-<-chan的提示:因为上面对于订阅者(subscription)来讲,并不需要拿出值消费,所以传一个chan<- Event去让server发送消息给它,因为这个channel只能用来发送,以达到发送的目的)

    The optional <- operator specifies the channel direction, send or receive. If no direction is given, the channel is bidirectional.

    chan<- float64 // can only be used to send float64s
    <-chan int // can only be used to receive ints

  • 下面的代码创建了10个subscription后接着publish 100个event,每个subscription拿到event后会sleep一下再打印出来,模拟不同的消费速度。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    pubsub := Server{}
    pubsub.Init()

    subscriptionCount := 10

    log.Println("create a lot of subscription channel")
    subscriptionChannels := make([]chan Event, subscriptionCount, subscriptionCount)

    for i := range subscriptionChannels {
    subscriptionChannels[i] = make(chan Event)
    pubsub.Subscribe(subscriptionChannels[i])
    }

    consumer := func(subscriptionIndex int, c chan Event) {
    for event := range c{
    generator := rand.New(rand.NewSource(time.Now().UnixNano() + int64(subscriptionIndex*1000)))
    max, min := 15.0, 2.0 // random 2~15
    n := min + generator.Float64() * (max - min)

    time.Sleep(time.Duration(n)*time.Second)
    log.Println("consumer", subscriptionIndex, "time.sleep", n, "receive", event)
    }
    }
    log.Println("start receive from subscription channel handler")
    for i := range subscriptionChannels{
    go consumer(i, subscriptionChannels[i])
    }

    log.Println("publish Event to all subscription")
    publish := func(publishCount int) {
    for i := 0; i < publishCount; i++{
    event := Event(fmt.Sprintf("test-%d", i))
    log.Println("publish", event)
    pubsub.Publish(event)
    }
    }
    go publish(subscriptionCount*10)

    time.Sleep(1*time.Hour)
    }
  • 程序的一个简单流程如下:

    • subscribe 10个 subscription channel并启动10个接收Event的goroutine
    • 启动publish goroutine,设置同时publish 100个event
    • s.loop()处理publish,将event传递给每个subscription的helper
    • 此时消费速度慢于生产速度,但是不会拖延publish进度,因为publish进来的会存到event slice里面再消费
    • 接收event的channel接收到打印数值
  • 所以Russ Cox其实是用channel进行解耦,并使用slice当作缓存解决消费慢的问题。