How to compute the current state of a resource in an event-sourced Go application?

Recently, I have been working on an event-sourced application built around Command-Query Responsibility Segregation (CQRS). My job was to figure out how to implement a new command in Go.

In order to act on a given object, we have to build the current state of that object out of the events that created and modified it. Here I want to show how I have come up with the applier interface to represent the Event logic.

The Event Store

Let's keep the architecture as simple as it can be: a Go service responds to the external call and directly operates on the events.

The collection of events, namely the Event Store, can be a SQL table that looks like this:

id handle aggregate_id payload
1043 user.created user_123 {"first_name": "Pierre", "fav_ide": "vim" }
1044 user.updated user_123 {"fav_ide": "neovim"}

The property aggregate_id uniquely identifies every resource in the store.

How to build an aggregate

Martin Fowler, in his article on Event Sourcing, models two different responsibilities in handling events:

  • Processing domain logic: the logic that implements the change (e.g. func UpdateUser(userAggregateId, newUser))
  • Processing selection logic: the logic that maps the event named user.updated to the function UpdateUser

According to Fowler, a good place to put the Processing domain logic in is the Domain Model.

The event data would be fetched from the persistence and sent to a matcher function (selection logic), that would itself call the Processing domain logic coded as a User method.

Let's focus on the Processing domain logic: I suspected there had to be a more idiomatic shape for the event logic in Go. I started my investigation by asking myself:

What is an event?

An event is a state change. The event, in our Event Store, is no more and no less than the flattened representation of a behaviour, waiting to be applied to a specific object.

What would be the most scalable way to define a behaviour in Go? Interfaces!

type applier interface {
    Apply(*user.User)
}

This applier interface defines the behaviour of an event: it is defined as a function that changes the state of the domain object it refers to.

We can have separate logic handling the connection to the Event Store and the Processing selection logic, fetching events from the Event Store and returning them as types implementing the applier interface.

This is how simple the Aggregate builder can look like:

type applier interface {
	Apply(*user.User)
}

func buildUserAggregate(appliers []applier) user.User {
	// Initialise an empty User on which all the events will be applied.
	var u user.User

	// Every event in the slice carries a state change that has to be applied
	// on our User, in order to rebuild its current state.
	for _, a := range appliers {
		a.Apply(&u)
	}

	return u
}

Validation

How do we validate the change request?

I can think of 3 different types of validation:

  • payload validation. Rules I can enforce just by looking at the request payload: required fields, value types, ranges, enums, and payload internal consistency. Example: the favourite IDE has to be picked from a list.
  • request validation. Rules that refer to the state of the domain object before the change. If a user has been deactivated, he should not be able to change his favourite editor.
  • domain object validation. Is the resulting domain object in a legit state after the change is applied? Example: a user can only have Xcode as his favourite editor if he's using a Mac.

In the applier-featured model, the most obvious piece of code that has access to the three states (the change payload, the User before, the User after) is the Applier itself. The Apply function will run all the required validations, and possibly return an error.

// CustomerUpdate is an event as represented in the Event Store.
type CustomerUpdate struct {
    handle      string
    aggregateID string
    payload     []byte
}

// Apply is the actual event logic. CustomerUpdate now implements `applier`.
func (cu CustomerUpdate) Apply(u *user.User) error {

    // updatedFields is the parsed JSON payload of the event.
    var updatedFields map[string]interface{}    
    if err := json.Unmarshal(cu.payload, &updatedFields); err != nil {
        return fmt.Errorf("unmarshaling JSON: %v", err)
    }
    
    for k, v := range updatedFields {
        switch k {
        case "fav_ide":
            // payload validation: `fav_ide` must be string
            newFavIDE, ok := v.(string)
            if !ok {
                return fmt.Errorf("fav_ide expected to be string, found %v", v)
            }
            
            // request validation: is the requested change compatible with
            // the state of `u`?
            if !u.Active {
                return fmt.Errorf("cannot change the favorite IDE of a deactivated user")
            }
        
            // the actual change
            u.FavIDE = newFavIDE
            
        default:
            // payload validation: only some fields can be updated
            return fmt.Errorf("invalid field to update: %s", k)
        }
    }
    
    // domain object validation: check the internal consistency
    // of `u` after the change
    return validateUser(u)
}

The event would then be applied carefully handling the returned errors.

// ChangeUserFavIDE, provided an `events` connection to the Event Store
// and the `aggregateID` of a User, sources a new event if the change
// successfully passes the validation.
func ChangeUserFavIDE(events fetchsourcer, aggregateID, newFavIDE string) error {

	// Fetch all the events that refer to that specific resource.
	// They are returned as a slice of objects that implement
	// `Apply(*user.User) error`.
    appliers, err := events.FetchByAggregateID(aggregateID)
    if err != nil {
        return fmt.Errorf("fetching the events: %v", err)
    }
    
    var u user.User
    for _, a := range appliers {
        // An error here would mean that an event in the store failed to validate:
        // `err!=nil` has to be treated like a failure.
        if err := a.Apply(&u); err != nil {
            return fmt.Errorf("building the aggregate: %v", err)
        }
    }
    
    // Create the new event that represents the requested change.
    ev := NewUpdateFavIDEEvent(newFavIDE)
    
    // Finally: apply the "new favourite IDE" logic, and see if it's a legit change.
    // The validation logic, when embedded in the `applier`, has access
    // to the payload, to the previous state of the User, and to the
    // resulting new state.
    if err := ev.Apply(&u); err != nil {
        return fmt.Errorf("applying the new event: %v", err)
    }
    
    // Emit the new event to be persisted in the Event Store.
    if err := events.Source(ev); err != nil {
        return fmt.Errorf("sourcing the new event: %v", err)
    }
    
    return nil
}

This code shows how decoupled the event logic handling can be from the data model, and how it offers pretty nice boundaries for our juicy unit tests.

A further iteration could make Apply accept an interface as an argument, instead of a pointer. For this, I would need to add getter and setter methods on the User type. This is a painful step I am trying to avoid by keeping the User type very simple, and therefore not giving it too much reasons to change. As long as it only is a collection of properties, I don't think I will need this new layer of abstraction.