DEV Community

Cover image for How to Create a Event Bus in Go
Leapcell
Leapcell

Posted on

How to Create a Event Bus in Go

Cover

Preface

In today’s landscape where microservices and distributed systems are prevalent, Event-Driven Architecture (EDA) plays a critical role. This architectural design allows services to communicate via events, either synchronously or asynchronously, instead of traditional direct interface calls. The event-based interaction mode promotes loose coupling between services and improves the system’s scalability.

The publish-subscribe pattern is one way to implement event-driven architecture. It allows different components or services in the system to publish events, while other components or services can subscribe to these events and respond based on the event content. Most developers are likely familiar with this pattern; common technical implementations include message queues (MQ) and Redis’ publish/subscribe (PUB/SUB) features.

In Go, we can leverage its powerful channels and concurrency mechanisms to implement the publish-subscribe pattern. This article will delve into how to implement a simple event bus in Go, which is a concrete realization of the publish-subscribe pattern.

Event Bus

The event bus is a concrete implementation of the publish-subscribe pattern. As middleware between publishers and subscribers, it manages event delivery and distribution, ensuring events are smoothly transmitted from publishers to subscribers.

The main advantages of the event bus include:

  • Decoupling: Services do not need to communicate directly, but instead interact through events, reducing dependencies between services.
  • Asynchronous processing: Events can be handled asynchronously, improving the system’s responsiveness and performance.
  • Scalability: New subscribers can easily subscribe to events without modifying the existing publisher code.
  • Fault isolation: Failures in event handling do not directly affect the normal operation of other services.

Code Implementation of the Event Bus

Next, we will introduce how to implement a simple event bus in Go, which includes the following key functionalities:

  • Publish: Allows various services in the system to send events.
  • Subscribe: Allows interested services to subscribe to and receive specific types of events.
  • Unsubscribe: Allows services to remove events they have previously subscribed to.

Event Data Structure Definition

type Event struct {
    Payload any
}
Enter fullscreen mode Exit fullscreen mode

Event is a structure that encapsulates an event, where Payload represents the contextual information of the event, and its type is any.

Event Bus Definition

type (
    EventChan chan Event
)

type EventBus struct {
    mu    sync.RWMutex
    subscribers map[string][]EventChan
}

func NewEventBus() *EventBus {
    return &EventBus{
        subscribers: make(map[string][]EventChan),
    }
}
Enter fullscreen mode Exit fullscreen mode

EventChan is a type alias, defined as a channel for passing Event structs: chan Event.

EventBus is the definition of the event bus. It contains two properties:

  • mu: A read-write mutex (sync.RWMutex), used to ensure concurrent read and write safety for the subscribers below.
  • subscribers: A map where the key is a string representing the subscription topic, and the value is a slice of EventChan. This property is used to store all subscribers for each topic. Each subscriber receives events through its own EventChan.

The NewEventBus function is used to create a new EventBus instance.

Event Bus Method Implementation

The event bus implements three methods: publishing events (Publish), subscribing to events (Subscribe), and unsubscribing from events (Unsubscribe).

Publish

func (eb *EventBus) Publish(topic string, event Event) {
  eb.mu.RLock()
  defer eb.mu.RUnlock()
  // Copy a new subscriber list to avoid modifying the list while publishing
  subscribers := append([]EventChan{}, eb.subscribers[topic]...)
  go func() {
    for _, subscriber := range subscribers {
      subscriber <- event
    }
  }()
}
Enter fullscreen mode Exit fullscreen mode

The Publish method is used to publish events. This method receives two parameters: topic (the subject) and event (the encapsulated event object).

In the implementation of Publish, a read lock is first obtained via the mu property to ensure that the following operations on subscribers are safe in concurrent routines. Then, a copy of the current subscriber list for the topic is made. A new goroutine is started, which iterates through the copied subscriber list and sends the event to each subscriber through their channel. After these operations are complete, the read lock is released.

Why make a copy of the subscriber list?

Answer: Copying the subscriber list ensures data consistency and stability while sending events. Since sending data to the channels is done in a new goroutine, by the time the data is sent, the read lock has already been released, and the original subscriber list might have changed due to adding or removing subscribers. If you use the original subscriber list directly, unexpected errors may occur (for example, sending data to a closed channel can cause a panic).

Subscribe

func (eb *EventBus) Subscribe(topic string) EventChan {
  eb.mu.Lock()
  defer eb.mu.Unlock()
  ch := make(EventChan)
  eb.subscribers[topic] = append(eb.subscribers[topic], ch)
  return ch
}
Enter fullscreen mode Exit fullscreen mode

The Subscribe method is used to subscribe to events for a specific topic. It accepts a topic parameter, which specifies the topic to subscribe to. Through this method, you get an EventChan channel to receive events for the topic.

In the implementation of Subscribe, a write lock is first obtained via the mu property to ensure that the upcoming read and write operations on subscribers are safe in concurrent routines. Then, a new EventChan channel ch is created and appended to the relevant topic’s subscriber slice. After these operations are complete, the write lock is released.

Unsubscribe

func (eb *EventBus) Unsubscribe(topic string, ch EventChan) {
  eb.mu.Lock()
  defer eb.mu.Unlock()
  if subscribers, ok := eb.subscribers[topic]; ok {
    for i, subscriber := range subscribers {
      if ch == subscriber {
        eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
        close(ch)
        // Drain the channel
        for range ch {
        }
        return
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

The Unsubscribe method is used to unsubscribe from events. It receives two parameters: topic (the topic subscribed to) and ch (the issued channel).

Inside the Unsubscribe method, a write lock is first obtained via the mu property to ensure concurrent read and write safety for the upcoming operations on subscribers. Then, it checks whether the topic has corresponding subscribers. If it does, it traverses the subscriber slice for that topic, finds the channel matching ch, removes it from the subscriber slice, and closes the channel. Then the channel is drained. After these operations, the write lock is released.

Usage Example

package main

import (
  "fmt"
  "time"

  "demo-eventbus"
)

func main() {
  eventBus := eventbus.NewEventBus()

  // Subscribe to the "post" topic event
  subscribe := eventBus.Subscribe("post")

  go func() {
    for event := range subscribe {
      fmt.Println(event.Payload)
    }
  }()

  eventBus.Publish("post", eventbus.Event{Payload: map[string]any{
    "postId": 1,
    "title":  "Welcome to Leapcell",
    "author": "Leapcell",
  }})
  // Topic with no subscribers
  eventBus.Publish("pay", eventbus.Event{Payload: "pay"})

  time.Sleep(time.Second * 2)
  // Unsubscribe from the "post" topic event
  eventBus.Unsubscribe("post", subscribe)
}
Enter fullscreen mode Exit fullscreen mode

Suggestions for Extensions

The event bus implemented in this article is relatively simple. If you want to enhance the flexibility, reliability, and usability of the event bus, you can consider extending it in the following ways:

  • Event persistence: Implement persistent storage for events to ensure that unprocessed events can be recovered after a system crash.
  • Wildcard and pattern-matching subscriptions: Allow the use of wildcards or regular expressions to subscribe to a group of related topics, rather than just a single specific topic.
  • Load balancing and message distribution strategies: Distribute events among multiple subscribers to achieve load balancing.
  • Plugin support: Enable functionality extensions through plugins, such as logging, message filtering, transformation, etc.

Conclusion

This article thoroughly explores the process of implementing a simple event bus in Go. By utilizing Go's powerful features such as channels and concurrency mechanisms, we can easily implement the publish-subscribe pattern.

The article starts by introducing the advantages of the event bus, including decoupling, asynchronous processing, scalability, and fault isolation. It then explains in detail how to define the event data structure and the event bus structure, and how to implement the methods for publishing, subscribing, and unsubscribing events. Finally, it proposes several potential directions for extension, such as event persistence, wildcard subscriptions, load balancing, and plugin support, to enhance the flexibility and functionality of the event bus.

By reading this article, you can learn how to implement a simple yet powerful event bus in Go, and extend it according to possible requirements.


We are Leapcell, your top choice for hosting Go projects.

Leapcell

Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:

Multi-Language Support

  • Develop with Node.js, Python, Go, or Rust.

Deploy unlimited projects for free

  • pay only for usage — no requests, no charges.

Unbeatable Cost Efficiency

  • Pay-as-you-go with no idle charges.
  • Example: $25 supports 6.94M requests at a 60ms average response time.

Streamlined Developer Experience

  • Intuitive UI for effortless setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real-time metrics and logging for actionable insights.

Effortless Scalability and High Performance

  • Auto-scaling to handle high concurrency with ease.
  • Zero operational overhead — just focus on building.

Explore more in the Documentation!

Try Leapcell

Follow us on X: @LeapcellHQ


Read on our blog

Top comments (0)