Skip to main content

Event Sourcing: Patterns and Pitfalls

A practical guide to event sourcing - when to use it, how to implement it, and common mistakes to avoid.

Event sourcing stores state as a sequence of events rather than current values. It’s powerful but not a silver bullet.

The Core Idea #

Instead of storing:

{"accountId": "123", "balance": 150}

Store the events:

{"type": "AccountCreated", "accountId": "123", "timestamp": "..."}
{"type": "MoneyDeposited", "accountId": "123", "amount": 200, "timestamp": "..."}
{"type": "MoneyWithdrawn", "accountId": "123", "amount": 50, "timestamp": "..."}

Current state = replay all events.

When to Use Event Sourcing #

Good fit:

  • Audit requirements (financial, healthcare)
  • Complex domain with business rule evolution
  • Need to answer “what happened” questions
  • Event-driven architectures

Not ideal:

  • Simple CRUD applications
  • When you’ll never need event history
  • Performance-critical read paths without projections

Implementation Patterns #

Event Store #

type Event struct {
    ID          string
    AggregateID string
    Type        string
    Data        json.RawMessage
    Version     int
    Timestamp   time.Time
}

type EventStore interface {
    Append(aggregateID string, events []Event, expectedVersion int) error
    Load(aggregateID string) ([]Event, error)
    LoadFrom(aggregateID string, fromVersion int) ([]Event, error)
}

Aggregate Root #

type Account struct {
    id      string
    balance int
    version int
    changes []Event
}

func (a *Account) Deposit(amount int) error {
    if amount <= 0 {
        return errors.New("amount must be positive")
    }
    a.apply(MoneyDeposited{AccountID: a.id, Amount: amount})
    return nil
}

func (a *Account) apply(event interface{}) {
    a.when(event)
    a.changes = append(a.changes, Event{
        AggregateID: a.id,
        Type:        reflect.TypeOf(event).Name(),
        Data:        marshal(event),
        Version:     a.version + 1,
    })
    a.version++
}

func (a *Account) when(event interface{}) {
    switch e := event.(type) {
    case AccountCreated:
        a.id = e.AccountID
        a.balance = 0
    case MoneyDeposited:
        a.balance += e.Amount
    case MoneyWithdrawn:
        a.balance -= e.Amount
    }
}

Rehydrating from Events #

func LoadAccount(store EventStore, id string) (*Account, error) {
    events, err := store.Load(id)
    if err != nil {
        return nil, err
    }
    
    account := &Account{}
    for _, event := range events {
        account.when(unmarshal(event))
        account.version = event.Version
    }
    return account, nil
}

Projections (Read Models) #

Events are great for writes, but reads need optimized views:

type AccountProjection struct {
    ID      string
    Balance int
    Owner   string
}

type AccountProjector struct {
    db *sql.DB
}

func (p *AccountProjector) Handle(event Event) error {
    switch event.Type {
    case "AccountCreated":
        var e AccountCreated
        json.Unmarshal(event.Data, &e)
        _, err := p.db.Exec(
            "INSERT INTO accounts (id, balance, owner) VALUES ($1, 0, $2)",
            e.AccountID, e.Owner,
        )
        return err
        
    case "MoneyDeposited":
        var e MoneyDeposited
        json.Unmarshal(event.Data, &e)
        _, err := p.db.Exec(
            "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
            e.Amount, e.AccountID,
        )
        return err
    }
    return nil
}

Snapshotting #

For aggregates with many events:

type Snapshot struct {
    AggregateID string
    Version     int
    State       json.RawMessage
    Timestamp   time.Time
}

func LoadAccountWithSnapshot(store EventStore, snapStore SnapshotStore, id string) (*Account, error) {
    snapshot, _ := snapStore.Load(id)
    
    var account *Account
    var fromVersion int
    
    if snapshot != nil {
        account = unmarshalAccount(snapshot.State)
        fromVersion = snapshot.Version
    } else {
        account = &Account{}
    }
    
    events, _ := store.LoadFrom(id, fromVersion)
    for _, event := range events {
        account.when(unmarshal(event))
        account.version = event.Version
    }
    
    // Create snapshot every 100 events
    if account.version - fromVersion > 100 {
        snapStore.Save(Snapshot{
            AggregateID: id,
            Version:     account.version,
            State:       marshal(account),
        })
    }
    
    return account, nil
}

Common Pitfalls #

1. Changing Event Schemas #

Events are immutable. Never change their structure:

// V1 - Original
type OrderPlaced struct {
    OrderID string
    Amount  int  // in cents
}

// V2 - Need currency? Add new field, don't change Amount
type OrderPlacedV2 struct {
    OrderID  string
    Amount   int
    Currency string // New field, default to "USD" if empty
}

2. Large Aggregates #

Keep aggregates small. If you have 100k events, something’s wrong:

// Bad: One aggregate for all orders
type AllOrders struct {
    orders map[string]Order
}

// Good: One aggregate per order
type Order struct {
    id    string
    items []OrderItem
}

3. Querying Events Directly #

Don’t query the event store for reads. Use projections:

// Bad
events, _ := store.LoadByType("OrderPlaced")
for _, e := range events {
    // Process...
}

// Good
orders, _ := db.Query("SELECT * FROM order_summary WHERE status = 'pending'")

4. Forgetting Idempotency #

Projections may process events multiple times:

func (p *Projector) Handle(event Event) error {
    // Use event ID for idempotency
    _, err := p.db.Exec(`
        INSERT INTO projections (event_id, ...) 
        VALUES ($1, ...) 
        ON CONFLICT (event_id) DO NOTHING
    `, event.ID)
    return err
}

Key Takeaways #

  1. Event sourcing is about capturing intent, not just state
  2. Projections are essential for readable queries
  3. Keep aggregates small and focused
  4. Never modify historical events
  5. Consider the operational complexity

Event sourcing shines when you need complete audit trails and the ability to rebuild state from history.