Asynchronous Communication
This site has been copy-pasted from a merge-request documentation.
It is not meant to be the final documentation yet and contains implementation details that probably do not belong in this place.
Primer
Before starting with the implementation details some fundamentals need to be understood.
Domain Events
A domain event is a change of interest that happened in our domain. What exactly is of interest can differ from domain to domain and context. Note that a domain event represents something that has already happened in the past. Typically a domain event is used to notify parts of a software system about changes - especially those parts that lie outside of its own bounded context or service boundaries, but it is also feasible to use domain events to inform other aggregates within the service boundaries about those changes. These informed parts can react to the event in a specific manner - for example enforce consistency.
Figure 7-14 shows how consistency between aggregates is achieved by domain events. When the user initiates an order, the Order Aggregate sends an OrderStarted domain event. The OrderStarted domain event is handled by the Buyer Aggregate to create a Buyer object in the ordering microservice, based on the original user info from the identity microservice (with information provided in the CreateOrder command).
Alternately, you can have the aggregate root subscribed for events raised by members of its aggregates (child entities). For instance, each OrderItem child entity can raise an event when the item price is higher than a specific amount, or when the product item amount is too high. The aggregate root can then receive those events and perform a global calculation or aggregation.
It is important to note that a domain event is itself a DTO as it transfers data from the aggregate to other layers or systems. Therefore it must be meaningful and immutable.
Event Driven Architecture
An Event Driven Architecture (in short EDA) focuses its “view of the reality” on events
Note that a EDA is not bound to just domain events although this would probably make a big part of it. For example application status events like ApplicationStarted
, ApplicationStopped
or InitiatedDataImport
can also be used.
There are different meanings of EDA around like “Event Notification”, “Event-Carried State Transfer” and “Event Sourcing / CQRS”. Martin Fowler has an excellent Talk and Blog Post about this explaining the differences and semantics between those. In simple words: Event Notification simply notifies consumers about something that happend and its the consumers responsibility to work out a strategy how to receive more information about this event, while Event-Carried State Transfer propagates the whole state using an event. Event Sourcing on the other hand keeps absolutly everything as an event so you can rebuild the whole application state based on this events. Greg Young has an excellent talk about this.
For this project (and this concept) we will just focus on “Event Notification” and “Event-Carried State Transfer”.
An EDA typically consists of two components Producers and Consumers, and to keep this simple a Producer produces events while a consumer consumes events. In a bit more detail a Producer will receive or detect an event, parses this particular event into a message. The producer will then send this message into a channel (so called “producing”) where the consumer comes into play. The consumer is now able to consume events coming from this channel, parses the message and do things based on the event. The interesting part is, that producers and consumers SHOULD not know about each other. While it can be feasible under particular circumstances to do so and couple producers and consumers together, especially when speaking about a microservice architecture or in general a loosely coupled system this is really important. You will see in a minute why.
Apache Kafka
Coming from the previous paragraph where has been mentioned that producers and consumers communicate other a channel, Apache Kafka is one type of channel. When searching up about what exactly Apache Kafka is, different terms like “Event Streaming Platform”, “Event Broker” or “Event Log” will come up. In order to understand how to use Kafka and make the most use out of it, an understanding about the concepts behind it is really important.
What is “Event Streaming”?
Within a EDA there are basically two architecture styles for such a channel, one is Pub/Sub and one is Event streaming. Within a Pub/sub architecture the messaging infrastructure keeps track of subscriptions from consumers. So speaking a message published into a pub/sub-style channel will be propagated to every consumer that has currently subscribed to the channel. Within the Pub/sub model there is no history about messages, every new consumer will only receive messages from that point of time. Popular example implementations of the pub/sub model are Redis or MQTT. The second architectural style is Event Streaming. Here every message will be written onto a durable log. Consumers are not forced to subscribe to the message log, each consumer can read from any point of this log it wants at any point of time. Implementations are Apache Kafka or Redpanda.
I heared about redpanda before in this project and that we are using it, why are you still talking about Kafka all the time?
Redpanda is event streaming platform that is meant to be compatible with the Kafka API and could act as a drop-in-replacement for Apache Kafka. Therefore everything the Kafka API implements is implemented in Redpanda aswell, only administrative details differ.
What is an “Event Broker”?
This is relatively easy. A event broker is just a piece of software or infrastructure that connects publishers and consumers. This is in general just a more fancy term for “channel” in the previous paragraph. However, the term event broker is also used in another context to differentiate between two more concepts Orchestration and Choreography. Within a orchestration based infrastructure there is a orchestration service that is responsible for routing messages to particular consumers and even maybe ensure delivering order. The Choreography based infrastructure is based around an event broker and consumers are autonomous in receiving their messages. The consumers are their own orchestration service so to speak.
Concepts and Principles
Before explaining some of the key concepts it is worth mentioning that Kafka is from ground up build in a distributed and scalable manner which results into most of the concepts we will see in a moment. To put that into numbers LinkedIn (the company that initially developed Kafka) published some of their own scale in 2015. At LinkedIn their Kafka clusters have to take care of up to 13 million messages per second spread over 1100 Kafka brokers.
Typically Kafka instances are distributed and organized within a cluster of one or more brokers. Because of this there is some kind of infrastructural software required, that is capable of managing the collaboration of brokers in the cluster. The only way until 2021 was to rely on an additional system called Zookeeper that manages the cluster. Kafka 2.8.0 removed the strict requirement for Zookeeper and introduced a self-managed system called quorum. Redpanda does also rely on the algorithm behind quorum and is therefore self-managed.
Messages produced to and consumed from Kafka are called Records. Technically the API differentiates between ConsumerRecord and ProducerRecord, but for now it is enough to know, that Kafka messages are called Records. A record consists of basically five components: Topic, Partition, Key, Value and Headers. The semantics of each component is well-defined, for now we focus on Topic, Value and Headers. The topic is a string-based representation of an affilation of a message. A topic can be seen like a directory on a filesystem, it does not have any semantics or enforced rules itself, but describes the content in a meaningful manner and giving some structure. It is purely logical. Each Topic consists of at least one partition, which really make the magic happen, but we will come to that later on. Next up, a record has some value. The Kafka broker does not care about the structure of the value and simply treats it as a stream of bytes that is stored on the log. That means that you can publish any kind of data to Kafka, JSON, ProtoBuf or even unstructured content like images. In addition to the value a Kafka Record can contain various Headers (Introduced in 0.11.0.0) Record headers can be used to carry additional metadata between producers and consumers that are not part of the value itself, for example the message type or timestamp. Headers consist of a string-based key and various bytes as a value. That means that headers are as flexible as the message value itself in their content.
One of the key aspects Kafka offers is that Kafka retains the ordering of events within one partition. What does that mean in paricular? As already mentioned a topic consists of at least one partition. A partition is a storage unit in Kafka that contains records. Partitions are the magical things with which Kafka enables redundancy, scalability and reliability. Therefore, it is really important to understand how partitions behave in Kafka. As mentioned Kafka is made from the ground up in a distributed manner, and therefore all partitions of a topic are distributed around all brokers. But in addition to that Kafka will keep a redundancy of a partitions, so each partition is available on multiple brokers in the cluster, so that if a broker fails and crashes, Kafka can still serve consumers and producers. Because we have the structure of partitions Kafka not only allows scalability of the brokers, but also scalability on the consumer side. It is possible to have one consumer, that consumes records from all partitions, multiple consumers where each serves a subset of partitions or even multiple consumers for a single partition. This concept is controlled by the semantics of a consumer group. And because of these characteristics Kafka comes with several guarantees affecting the consumption and production process within a partition. The most crucial one for us is that all records within a partition are strictly ordered and will be served to consumers in the same order they were written into the log. So how will the partition be selected? That is the where the Key and Partition record components come into play. The most obvious way to produce a record into a specific partition is by setting the desired partition identifier in the record. But the more appropiate and flexible way is by making use of the key. Simply every record with the same key will guaranteed to be stored within the same partition. The partitioning is based on a hash algorithm, which is not really of interest at the moment.
The last bit of conceptual information about kafka belongs to the question How does kafka know which consumer is at which position in the log?. The question is relatively easy to answer but brings deeper semantics with it when viewed in detail. The simple answer is: each consumer tells Kafka which records it has consumed (so called offset commiting), and kafka associates a so called consumer offset with the consumer. Easy. But the offset behaviour is controllable, so we can decide over various consumption semantics. I will not go into detail now but it is for example possible to have consumers that will re-read the whole log every time they startup or consumers that are guaranteed to consume every record at-most/at-least once.
Here are two videos from Confluent which explains everything in a visual and simple form.
What is Apache Kafka®? | Apache Kafka® 101: Partitioning |
---|---|
![]() |
![]() |
Why do we need this?
As already discussed several times our eventing system as it is at the moment is far away from perfect, and in my personal opion even not deterministic. The most critical part is the fact, that we’re obfuscating events and spreading the actual state around different topics and messages. This makes it really hard for a player to catch up with its own state changes. Besides the player implementations we also have within our core services (so speaking game, map, robot, trading) many synchronous dependencies and are mainly using the event notification characteristic, which might be not ideal in every case. Personally I don’t think that we will get rid of really all these synchronous dependencies without making too many changes to the game design itself or introducing more complexity but we really should revisit the implementation details. However, a game is not really a game without players, so we really should focus on the ability to implement players in our infrastructure.
The following example should illustrate the difficulties a player has at the moment with our system. I won’t go into the dirty details but it should be self speaking.
If a game has been started, a player can issue a command to buy a robot. With the issued command the player receives a transactionId
. Using this transactionId the player can consume messages from the trades
and identify its trade by filtering for messages with the transactionId as a key. In the payload the player finds details about his bought robot and for example the planet it is on. In the meantime two more messages about the planet are published to Kafka, one in the movement
topic and one in the neighbours
topic. With the movement message the player can identify its plant, and knows which resources are on it, with the neighbours message the player knows to which other planets it can move. However these semantics can only be resolved if and only if the trade event before has been consumed and a player knows on which planet its robot is located. Race conditions are pre-programmed.
Implementation
The implementation is based on the assumption that we will drop the whole security / obfuscation details (also called “transactionId bullshit”). in the following sections, i will go into the relevant implementation details.
This is not to say that this implementation is perfect and should be carried over, it is just to give some insights into the stratgy behind it. It will probably change in future as things evolve naturally.
Events
The domain events are part of the domain layer and marked with a marker interface called DomainEvent
. Each aggregate the robot service takes care of has its own marker interface for aggregate-scoped events which inherits from the core DomainEvent
interface. Because the aggreagte-scoped marker interface is sealed every event is known at compile-time. As mentioned in Domain Events, domain events are DTOs and immutable which has been taken care of.
The following excerpt shows how this works:
// Global marker interface
interface DomainEvent
// Aggregate-scoped marker interface
sealed interface RobotDomainEvent : DomainEvent {
// This accessor must be implemented in every robot event as it is used to identify the affected aggregated by the event
fun robotId(): UUID
}
data class RobotEnergyReduced(
val robot: UUID,
val energyReducedAmount: Int,
val remainingEnergy: Int
) : RobotDomainEvent {
override fun robotId(): UUID = robot
}
All events are developed in this fashion.
Aggregates
As domain events are controlled by the aggregate root and should replicate a state change within a single aggregate a common class has been introduced. To store the events and therefore event changes a list property is used. So every time an aggregate’s state changes, a event is added to this list. Note that this list is transient in a JPA-Manner (not persisted into database) as well in JVM-Manner (Not serialized)
abstract class AbstractAggregateRoot(
@Transient
@kotlin.jvm.Transient
@JsonIgnore
val events: MutableList<DomainEvent> = mutableListOf()
) {
fun domainEvents(): Collection<DomainEvent> {
return events.toList()
}
fun clearEvents() {
this.events.clear()
}
}
This AbstractAggregateRoot can now be inherited by each aggregate root, for example Robot
. Note that Robot in this case has a private constructor and a static fabric method. The reasoning for this is, that we should consider the creation of a Robot as a domain event itself. To do so we must ensure that the event is only added on creation and not when Hibernate or another third-party-tool creates a new instance of a robot. Therefore, it is not feasible to use Kotlins init {}
block. If it happened to be Java we could also not rely on the constructor as Hibernate proxies the class and therefore invokes the constructor. This way we ensure that the event is only appended on a call on the fabric method. It also conforms to the Fabric Method described by Eric Evans in his big blue book about Domain Driven Design.
@Entity
class Robot private constructor(
@Type(type = "uuid-char")
val player: UUID,
planet: Planet
) : AbstractAggregateRoot() {
companion object {
// Fabric method
fun of(player: UUID, planet: Planet): Robot {
val robot = Robot(player, planet)
robot.events.add(RobotCreated(robot)) // add creation event to the event list
return robot
}
}
}
This approach runs through the entire aggregate, every time the state changes a suitable domain event is added to the event list.
Messaging
As domain events need to be published an interface with exactly this characteristics is introduced.
interface DomainEventPublisher {
fun publish(domainEvent: DomainEvent)
fun publish(domainEvents: Iterable<DomainEvent>)
}
Spring Event Bus
I decided to publish events first using the Spring Event Bus. This decouples Kafka from the event publication process and gives some more flexibility and extensibility. Note, that the spring event bus is synchronous by default.
@Component
class SpringEventBusPublisher(
val publisher: ApplicationEventPublisher
) : DomainEventPublisher {
override fun publish(domainEvent: DomainEvent) {
publisher.publishEvent(domainEvent)
}
override fun publish(domainEvents: Iterable<DomainEvent>) {
domainEvents.forEach {
publisher.publishEvent(it)
}
}
}
Kafka Publisher
To publish the domain events to Kafka a listener for the spring event bus is implemented
@Component
class KafkaPublishingDomainEventListener(
val kafkaTemplate: KafkaTemplate<String, String>
) {
companion object {
private const val robotTopic = "robot"
}
// Events will be published before any changes are written to the database, this ensures that we don't have an inconsistent state in our event log
// If publication fails an exception is thrown so we don't have an inconsistent state in our database either.
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.BEFORE_COMMIT)
fun publishEventToKafka(robotDomainEvent: RobotDomainEvent) {
val type = robotDomainEvent::class.simpleName
val id = robotDomainEvent.robotId()
// Add metadata like timestamp, eventId and type to the domain event
val enrichedDomainEvent = DomainEventEnricher.from(robotDomainEvent)
.eventId(UUID.randomUUID().toString())
.type(type)
.build()
// build a producerRecord (Key would be the robotId to ensure ordering)
val record = enrichtedDomainEvent.toProducerRecord()
// send producerrecord to kafka
kafkaTemplate.send(record)
}
}
Configuration
One notable detail is, that in this implementation the creation and configuration of topics is in the service responsibility. Therefore additional configuration properties has been introduced and the following configuration configures the robot
topic to 1 replica with 10 partitions and uncompressed records.
topics:
config:
robot:
replicas: 1
partitions: 10
properties:
"compression.type": uncompressed
These configuration attributes will be resolved during application context initialization and a spring bean takes care of the topic creation:
@Bean
fun robotTopics(config: TopicConfigurationProperties): NewTopics {
return NewTopics(
*config.config.entries.map { (key, value) ->
TopicBuilder.name(key)
.partitions(value.partitions)
.replicas(value.replicas)
.configs(value.properties)
.build()
}.toTypedArray()
)
}
Consequences
- The developer must make sure that the aggregates domain events are
- a) published over the Spring Event bus
- b) cleared after publication
- Publishing of all aggregate-related events into a single topic. Also events are distributed around partitions using their Identifier
- Ensures ordering, scalabiltiy and reliablity (as explained in Concepts and Principles)
- Brings a little bit of complexity to consumer side as it has to do resolve the type for deserialization. However this should be WAAAAAY less complex than the transactionId shenanigans.
Noteable Potential for Improvements
- I think it might be nice to have some kind of Hibernate Interceptor to simply publish database creation, deletion and updates of Aggregates to Kafka. This would give us a bit more secureness. However, there must be taken care to not drastically reduce the performance even more.
- I have done way to many changes for this PR already. And there is still things to rethink and refactor. For example the role of
Item
andPlanet
in the domain? Are these Aggregates on its own? Or Entities? What aboutPlayer
? At the moment there is just a weak reference toPlayer
andPlanet
within the robot
Sources
Relevant sources that this concept is based on:
- .NET Microservices: Architecture for Containerized .NET Applications
- Should You Put Several Event Types in the Same Kafka Topic?
- Apache Kafka Reference Documentation
- A better domain events pattern
- What do you mean by “Event-Driven”?
- What is event-driven architecture
- Event-driven architecture style
- How To Orchestrate Event-Driven Microservices
- Domain-Driven Design: Tackling Complexity in the Heart of Software, Eric Evans (ISBN: 978-0321125217)
- Implementing Domain-Driven Design, Vaughn Vernon (ISBN: 978-0321834577)