Event Sourcing in Go: From Zero to Production

key takeaways

  • Event sourcing provides complete audit trail and time-travel debugging capabilities
  • CQRS isolation enables independent scaling of reads and writes
  • Snapshots are essential for performance of large event streams
  • Proper event versioning and migration strategies prevent production disasters
  • Event streaming with Kafka enables real-time estimation and systems integration

Why Event Sourcing?

Shows your database current state. But how did it get there? Who changed what? When? Why?

-- Traditional: Current state only
SELECT balance FROM accounts WHERE id = 123;
-- Result: 1000

-- Event sourced: Complete history
SELECT * FROM events WHERE aggregate_id = 123;
-- Shows every deposit, withdrawal, fee, interest

We needed an audit trail for financial compliance. Event sourcing gave us time travel, debugging superpowers, and true scalability.

Key Concepts in 5 Minutes

Event sourcing stores changes in state as a sequence of events rather than overwriting the data. Instead of updated statements that destroy history, we add irreversible events that tell the whole story.

Traditional systems show what is there. Event sourcing shows what happened. This difference transforms debugging, auditing, and analytics. When a bug corrupts the data, we can replay the events to find out when and how it happened.

Events are facts about the past – they cannot be changed or removed. This immutability provides natural audit logging and enables powerful patterns such as temporal queries and retroactive fixes.

The state turns leftist on events. Current balance is not stored; It is calculated by recalculating all deposits and withdrawals. This sounds slow but with snapshots and projections, it is actually faster than traditional systems for many use cases.

// Events capture business intent
type AccountOpened struct {
    AccountID string
    Currency  string
}

type MoneyDeposited struct {
    AccountID string
    Amount    decimal.Decimal
}

// State derived from event history
func (a *Account) Apply(event Event) {
    // Rebuild state by replaying events
}

Production Event Store

A production event store needs to handle millions of events efficiently. Our PostgreSQL-based implementation processes 10K events/second with proper indexing and partitioning. The add-only nature makes it very fast – no updates, no deletes, just inserts.

Event order is important for continuity. We use database sequences per aggregate to ensure that events are applied in the correct order. This prevents race conditions where concurrent operations can corrupt the state.

Schema design balances normalization with query performance. Event data is stored as JSON for flexibility, while frequently asked fields (aggregate_id, event_type) are indexed columns. This hybrid approach enables both fast query and schema development.

Metadata tracks important context: user ID, correlation ID, causal ID. This audit trail proves invaluable for debugging and compliance. Every state change can be traced back to its origin.

type EventStore struct {
    db *sql.DB
}

type StoredEvent struct {
    ID            uuid.UUID
    AggregateID   string
    EventType     string
    EventVersion  int
    EventData     json.RawMessage
    Metadata      json.RawMessage
    OccurredAt    time.Time
}

// Append-only schema with proper indexes
const schema = `
CREATE TABLE events (
    id UUID PRIMARY KEY,
    aggregate_id VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_version INT NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB,
    occurred_at TIMESTAMP NOT NULL,
    recorded_at TIMESTAMP NOT NULL DEFAULT NOW(),
    
    -- Ensure events are ordered per aggregate
    UNIQUE(aggregate_id, event_version),
    
    -- Indexes for queries
    INDEX idx_aggregate (aggregate_id, event_version),
    INDEX idx_event_type (event_type),
    INDEX idx_occurred_at (occurred_at)
);

-- Global event sequence for ordering
CREATE SEQUENCE IF NOT EXISTS global_event_sequence;
ALTER TABLE events ADD COLUMN global_sequence BIGINT DEFAULT nextval('global_event_sequence');
CREATE INDEX idx_global_sequence ON events(global_sequence);
`

func (es *EventStore) SaveEvents(ctx context.Context, aggregateID, aggregateType string, events []Event, expectedVersion int) error {
    tx, err := es.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // Check optimistic concurrency
    var currentVersion int
    err = tx.QueryRow(`
        SELECT COALESCE(MAX(event_version), 0) 
        FROM events 
        WHERE aggregate_id = $1`,
        aggregateID,
    ).Scan(&currentVersion)
    
    if err != nil {
        return err
    }
    
    if currentVersion != expectedVersion {
        return fmt.Errorf("concurrency conflict: expected version %d, got %d", 
            expectedVersion, currentVersion)
    }
    
    // Save events
    version := expectedVersion
    for _, event := range events {
        version++
        
        eventData, err := json.Marshal(event)
        if err != nil {
            return err
        }
        
        metadata := map[string]interface{}{
            "user_id":     ctx.Value("user_id"),
            "trace_id":    ctx.Value("trace_id"),
            "source":      ctx.Value("source"),
        }
        metadataJSON, _ := json.Marshal(metadata)
        
        _, err = tx.Exec(`
            INSERT INTO events (
                aggregate_id, aggregate_type, event_type, 
                event_version, event_data, metadata, occurred_at
            ) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
            aggregateID,
            aggregateType,
            event.EventType(),
            version,
            eventData,
            metadataJSON,
            event.OccurredAt(),
        )
        
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) {
    rows, err := es.db.QueryContext(ctx, `
        SELECT 
            id, aggregate_id, aggregate_type, event_type,
            event_version, event_data, metadata, 
            occurred_at, recorded_at
        FROM events
        WHERE aggregate_id = $1 AND event_version > $2
        ORDER BY event_version`,
        aggregateID, fromVersion,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var events []StoredEvent
    for rows.Next() {
        var e StoredEvent
        err := rows.Scan(
            &e.ID, &e.AggregateID, &e.AggregateType,
            &e.EventType, &e.EventVersion, &e.EventData,
            &e.Metadata, &e.OccurredAt, &e.RecordedAt,
        )
        if err != nil {
            return nil, err
        }
        events = append(events, e)
    }
    
    return events, nil
}

composite root pattern

type AggregateRoot struct {
    ID               string
    Version          int
    uncommittedEvents []Event
}

func (a *AggregateRoot) RecordEvent(event Event) {
    a.uncommittedEvents = append(a.uncommittedEvents, event)
    a.Version++
}

func (a *AggregateRoot) GetUncommittedEvents() []Event {
    return a.uncommittedEvents
}

func (a *AggregateRoot) MarkEventsAsCommitted() {
    a.uncommittedEvents = []Event{}
}

// Example: Account aggregate
type Account struct {
    AggregateRoot
    Balance  decimal.Decimal
    Currency string
    Status   string
}

func (a *Account) Deposit(amount decimal.Decimal) error {
    if amount.LessThanOrEqual(decimal.Zero) {
        return fmt.Errorf("invalid deposit amount: %v must be positive", amount)
    }
    
    event := MoneyDeposited{
        AccountID: a.ID,
        Amount:    amount,
        Timestamp: time.Now(),
    }
    
    a.Apply(event)
    a.RecordEvent(event)
    return nil
}

func (a *Account) Withdraw(amount decimal.Decimal) error {
    if amount.GreaterThan(a.Balance) {
        return fmt.Errorf("insufficient funds: attempting to withdraw %v from balance %v", amount, a.Balance)
    }
    
    event := MoneyWithdrawn{
        AccountID: a.ID,
        Amount:    amount,
        Timestamp: time.Now(),
    }
    
    a.Apply(event)
    a.RecordEvent(event)
    return nil
}

func (a *Account) Apply(event Event) {
    switch e := event.(type) {
    case MoneyDeposited:
        a.Balance = a.Balance.Add(e.Amount)
    case MoneyWithdrawn:
        a.Balance = a.Balance.Sub(e.Amount)
    }
}

CQRS: Command and Query Separation

// Write side: Commands modify aggregates
type CommandHandler struct {
    eventStore *EventStore
    eventBus   *EventBus
}

func (h *CommandHandler) Handle(cmd Command) error {
    switch c := cmd.(type) {
    case DepositMoney:
        return h.handleDeposit(c)
    case WithdrawMoney:
        return h.handleWithdraw(c)
    }
    return errors.New("unknown command")
}

func (h *CommandHandler) handleDeposit(cmd DepositMoney) error {
    // Load aggregate from events
    account := &Account{}
    events, err := h.eventStore.GetEvents(ctx, cmd.AccountID, 0)
    if err != nil {
        return err
    }
    
    for _, e := range events {
        account.Apply(e)
    }
    
    // Execute business logic
    err = account.Deposit(cmd.Amount)
    if err != nil {
        return err
    }
    
    // Save new events
    err = h.eventStore.SaveEvents(
        ctx, 
        account.ID, 
        "Account",
        account.GetUncommittedEvents(),
        account.Version,
    )
    if err != nil {
        return err
    }
    
    // Publish for projections
    for _, event := range account.GetUncommittedEvents() {
        h.eventBus.Publish(event)
    }
    
    return nil
}

// Read side: Projections for queries
type AccountProjection struct {
    db *sql.DB
}

func (p *AccountProjection) Handle(event Event) error {
    switch e := event.(type) {
    case MoneyDeposited:
        _, err := p.db.Exec(`
            UPDATE account_projections 
            SET balance = balance + $1, updated_at = NOW()
            WHERE account_id = $2`,
            e.Amount, e.AccountID,
        )
        return err
        
    case MoneyWithdrawn:
        _, err := p.db.Exec(`
            UPDATE account_projections 
            SET balance = balance - $1, updated_at = NOW()
            WHERE account_id = $2`,
            e.Amount, e.AccountID,
        )
        return err
    }
    return nil
}

// Query handler reads from projections
type QueryHandler struct {
    db *sql.DB
}

func (q *QueryHandler) GetAccountBalance(accountID string) (decimal.Decimal, error) {
    var balance decimal.Decimal
    err := q.db.QueryRow(`
        SELECT balance FROM account_projections WHERE account_id = $1`,
        accountID,
    ).Scan(&balance)
    return balance, err
}

⚠️ Ultimate Consistency Tradeoff

CQRS introduction ultimate stability Between write and read models:

  • Events are written to the event store immediately
  • Estimates are updated asynchronously (typically from milliseconds to seconds).
  • Queries may return data that is out of date until estimated
  • Design your UX to handle this: optimistic UI updates, “processing” state, or read-write guarantees where important

snapshot for display

type Snapshot struct {
    AggregateID string
    Version     int
    Data        []byte
    CreatedAt   time.Time
}

func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
    _, err := es.db.ExecContext(ctx, `
        INSERT INTO snapshots (aggregate_id, version, data, created_at)
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (aggregate_id) 
        DO UPDATE SET version = $2, data = $3, created_at = $4`,
        snapshot.AggregateID,
        snapshot.Version,
        snapshot.Data,
        snapshot.CreatedAt,
    )
    return err
}

func (es *EventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
    var s Snapshot
    err := es.db.QueryRowContext(ctx, `
        SELECT aggregate_id, version, data, created_at
        FROM snapshots
        WHERE aggregate_id = $1`,
        aggregateID,
    ).Scan(&s.AggregateID, &s.Version, &s.Data, &s.CreatedAt)
    
    if err == sql.ErrNoRows {
        return nil, nil
    }
    return &s, err
}

// Load aggregate with snapshot optimization
func LoadAccount(es *EventStore, accountID string) (*Account, error) {
    account := &Account{}
    
    // Try to load snapshot
    snapshot, err := es.GetSnapshot(ctx, accountID)
    if err != nil {
        return nil, err
    }
    
    fromVersion := 0
    if snapshot != nil {
        // Restore from snapshot
        err = json.Unmarshal(snapshot.Data, account)
        if err != nil {
            return nil, err
        }
        fromVersion = snapshot.Version
    }
    
    // Apply events after snapshot
    events, err := es.GetEvents(ctx, accountID, fromVersion)
    if err != nil {
        return nil, err
    }
    
    for _, e := range events {
        account.Apply(e)
    }
    
    // Create new snapshot every 100 events
    if len(events) > 100 {
        snapshotData, _ := json.Marshal(account)
        es.SaveSnapshot(ctx, Snapshot{
            AggregateID: accountID,
            Version:     account.Version,
            Data:        snapshotData,
            CreatedAt:   time.Now(),
        })
    }
    
    return account, nil
}

Event Streaming with Kafka

type EventStreamer struct {
    eventStore *EventStore
    producer   *kafka.Writer
    lastSeq    int64
}

func (s *EventStreamer) StreamEvents(ctx context.Context) {
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            s.publishNewEvents(ctx)
        }
    }
}

func (s *EventStreamer) publishNewEvents(ctx context.Context) {
    rows, err := s.eventStore.db.QueryContext(ctx, `
        SELECT 
            global_sequence, aggregate_id, event_type, 
            event_data, occurred_at
        FROM events
        WHERE global_sequence > $1
        ORDER BY global_sequence
        LIMIT 1000`,
        s.lastSeq,
    )
    if err != nil {
        return
    }
    defer rows.Close()
    
    var messages []kafka.Message
    var maxSeq int64
    
    for rows.Next() {
        var seq int64
        var aggregateID, eventType string
        var eventData json.RawMessage
        var occurredAt time.Time
        
        rows.Scan(&seq, &aggregateID, &eventType, &eventData, &occurredAt)
        
        messages = append(messages, kafka.Message{
            Topic: fmt.Sprintf("events.%s", eventType),
            Key:   []byte(aggregateID),
            Value: eventData,
            Headers: []kafka.Header{
                {Key: "event_type", Value: []byte(eventType)},
                {Key: "occurred_at", Value: []byte(occurredAt.Format(time.RFC3339))},
            },
        })
        
        maxSeq = seq
    }
    
    if len(messages) > 0 {
        err := s.producer.WriteMessages(ctx, messages...)
        if err == nil {
            s.lastSeq = maxSeq
        }
    }
}

temporal question (time travel)

// Get account state at specific point in time
func (es *EventStore) GetAggregateAtTime(ctx context.Context, aggregateID string, pointInTime time.Time) (*Account, error) {
    events, err := es.db.QueryContext(ctx, `
        SELECT event_type, event_data
        FROM events
        WHERE aggregate_id = $1 AND occurred_at <= $2
        ORDER BY event_version`,
        aggregateID, pointInTime,
    )
    if err != nil {
        return nil, err
    }
    defer events.Close()
    
    account := &Account{}
    for events.Next() {
        var eventType string
        var eventData json.RawMessage
        events.Scan(&eventType, &eventData)
        
        event := deserializeEvent(eventType, eventData)
        account.Apply(event)
    }
    
    return account, nil
}

// Replay events for debugging
func ReplayEvents(es *EventStore, from, to time.Time, handler func(Event)) error {
    rows, err := es.db.Query(`
        SELECT event_type, event_data, occurred_at
        FROM events
        WHERE occurred_at BETWEEN $1 AND $2
        ORDER BY global_sequence`,
        from, to,
    )
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var eventType string
        var eventData json.RawMessage
        var occurredAt time.Time
        
        rows.Scan(&eventType, &eventData, &occurredAt)
        event := deserializeEvent(eventType, eventData)
        handler(event)
    }
    
    return nil
}

Saga pattern for distributed transactions

type TransferSaga struct {
    ID          string
    FromAccount string
    ToAccount   string
    Amount      decimal.Decimal
    State       string
    CompletedSteps []string
}

func (s *TransferSaga) Handle(event Event) ([]Command, error) {
    switch e := event.(type) {
    case TransferInitiated:
        return []Command{
            WithdrawMoney{AccountID: e.FromAccount, Amount: e.Amount},
        }, nil
        
    case MoneyWithdrawn:
        if e.AccountID == s.FromAccount {
            s.CompletedSteps = append(s.CompletedSteps, "withdrawn")
            return []Command{
                DepositMoney{AccountID: s.ToAccount, Amount: s.Amount},
            }, nil
        }
        
    case MoneyDeposited:
        if e.AccountID == s.ToAccount {
            s.State = "completed"
            return []Command{
                MarkTransferComplete{TransferID: s.ID},
            }, nil
        }
        
    case WithdrawFailed:
        s.State = "failed"
        return nil, nil
        
    case DepositFailed:
        // Compensate - refund the withdrawal
        return []Command{
            DepositMoney{AccountID: s.FromAccount, Amount: s.Amount},
        }, nil
    }
    
    return nil, nil
}

Event Store Consistency Warning

Event stores need to pay careful attention to:

  • Optimistic concurrency control to prevent data corruption
  • Guaranteeing event order within sets
  • Backup and recovery procedures for event streams
  • Event Schema Development and Versioning Strategies

security considerations

Event Sourcing Security Best Practices

  • Event Encryption: Encrypt sensitive data in event payload
  • access control: Role-based access to event streams and projections
  • Audit: Include user context and authorization in event metadata
  • data privacy: Enforce the “right to be forgotten” through cryptographic erasure
  • Replay Protection: Ensure event replay does not circumvent current security regulations
// Secure event with encryption
type SecureEvent struct {
    BaseEvent
    EncryptedPayload []byte
    KeyID           string
    Nonce           []byte
}

// GDPR-compliant cryptographic erasure
type GDPREventStore struct {
    *EventStore
    keyManager *KeyManager
}

func (ges *GDPREventStore) ForgetUser(ctx context.Context, userID string) error {
    events, err := ges.GetEventsByUser(ctx, userID)
    if err != nil {
        return fmt.Errorf("failed to find user events: %w", err)
    }
    
    for _, event := range events {
        if err := ges.keyManager.RevokeKey(event.KeyID); err != nil {
            return fmt.Errorf("failed to revoke key %s: %w", event.KeyID, err)
        }
    }
    
    return ges.MarkUserForgotten(ctx, userID)
}

testing strategy

📊 Event Sourcing Testing Framework

Comprehensive Testing Approach for Event-Sourced Systems:

  • Event Store Test: Test consistency, concurrency and durability
  • Overall Test: Unit testing business logic and immutable
  • Launch Test: Verify consistency of read model
  • integration test: End-to-end command/query flow
  • Event Schema Testing: test event development and migration
// Event store integration test
func TestEventStore(t *testing.T) {
    es := setupTestEventStore
    defer es.Close()
    
    t.Run("ConcurrencyControl", func(t *testing.T) {
        aggregateID := uuid.New().String()
        
        // First save succeeds
        err := es.SaveEvents(context.Background(), aggregateID, "Account", 
            []Event{&AccountOpened{AccountID: aggregateID}}, 0)
        require.NoError(t, err)
        
        // Second save with wrong version fails
        err = es.SaveEvents(context.Background(), aggregateID, "Account", 
            []Event{&MoneyDeposited{AccountID: aggregateID}}, 0)
        require.Error(t, err)
        require.Contains(t, err.Error(), "concurrency conflict")
    })
}

production monitoring

// Event store metrics
type Metrics struct {
    EventsWritten   prometheus.Counter
    EventsRead      prometheus.Counter
    SnapshotCreated prometheus.Counter
    WriteLatency    prometheus.Histogram
    ReadLatency     prometheus.Histogram
}

// Health checks
func (es *EventStore) HealthCheck() error {
    // Check write capability
    testEvent := HealthCheckEvent{
        ID:        uuid.New().String(),
        Timestamp: time.Now(),
    }
    
    err := es.SaveEvents(ctx, "health", "HealthCheck", []Event{testEvent}, 0)
    if err != nil {
        return fmt.Errorf("write check failed: %w", err)
    }
    
    // Check read capability
    events, err := es.GetEvents(ctx, "health", 0)
    if err != nil {
        return fmt.Errorf("read check failed: %w", err)
    }
    
    if len(events) == 0 {
        return errors.New("no events found")
    }
    
    return nil
}

// Lag monitoring
func MonitorProjectionLag(db *sql.DB) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        var lag time.Duration
        db.QueryRow(`
            SELECT MAX(NOW() - updated_at) 
            FROM projection_checkpoints`
        ).Scan(&lag)
        
        projectionLag.Set(lag.Seconds())
        
        if lag > 5*time.Minute {
            alert("Projection lag exceeds 5 minutes")
        }
    }
}

performance optimization

// 1. Batch event writes
func (es *EventStore) SaveEventsBatch(events []EventWithAggregate) error {
    // Use COPY for bulk insert
    stmt, err := es.db.Prepare(pq.CopyIn("events",
        "aggregate_id", "aggregate_type", "event_type",
        "event_version", "event_data", "occurred_at"))
    if err != nil {
        return err
    }
    
    for _, e := range events {
        _, err = stmt.Exec(e.AggregateID, e.AggregateType,
            e.EventType, e.Version, e.Data, e.OccurredAt)
        if err != nil {
            return err
        }
    }
    
    return stmt.Close()
}

// 2. Parallel projection updates
func UpdateProjectionsParallel(events []Event) {
    var wg sync.WaitGroup
    ch := make(chan Event, 100)
    
    // Start workers
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for event := range ch {
                updateProjection(event)
            }
        }()
    }
    
    // Send events
    for _, e := range events {
        ch <- e
    }
    close(ch)
    wg.Wait()
}

// 3. Cache aggregates
var aggregateCache = cache.New(5*time.Minute, 10*time.Minute)

func LoadAccountCached(es *EventStore, accountID string) (*Account, error) {
    if cached, found := aggregateCache.Get(accountID); found {
        return cached.(*Account), nil
    }
    
    account, err := LoadAccount(es, accountID)
    if err != nil {
        return nil, err
    }
    
    aggregateCache.Set(accountID, account, cache.DefaultExpiration)
    return account, nil
}

Escape from traditional system

// Generate events from existing state
func MigrateToEventSourcing(db *sql.DB, es *EventStore) error {
    rows, err := db.Query(`
        SELECT id, balance, created_at, updated_at
        FROM accounts`)
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var id string
        var balance decimal.Decimal
        var createdAt, updatedAt time.Time
        
        rows.Scan(&id, &balance, &createdAt, &updatedAt)
        
        // Create initial event
        events := []Event{
            AccountOpened{
                AccountID: id,
                Timestamp: createdAt,
            },
        }
        
        // Infer deposit event from balance
        if balance.GreaterThan(decimal.Zero) {
            events = append(events, MoneyDeposited{
                AccountID: id,
                Amount:    balance,
                Timestamp: updatedAt,
            })
        }
        
        es.SaveEvents(ctx, id, "Account", events, 0)
    }
    
    return nil
}

lessons from production

metric before(CRUD) After (Event Sourcing)
write throughput 1K/second 10K/second
Latency Read Page99 5ms 2ms (estimate)
audit completeness 60% 100%
debug time hours minutes (repetition)
storage cost $1K/month $3-5K/month

When not to use event sourcing

  • CRUD is enough (most apps)
  • No audit required
  • simple domain logic
  • unfamiliar with team patterns
  • Storage costs are important

Decision

Event sourcing is not free. 3-5x storage cost (events + projections + snapshots). Complex to implement. Mental model change.

But for financial systems, audit-heavy domains, or complex business logic? This is transformative. Complete history, perfect audit trail, time travel debugging and horizontal scalability.

start small: Event Source A set. See the benefits. Then expand. Don’t go in right away.



Leave a Comment