key takeaways
- Event sourcing provides complete audit trail and time-travel debugging capabilities
- CQRS isolation enables independent scaling of reads and writes
- Snapshots are essential for performance of large event streams
- Proper event versioning and migration strategies prevent production disasters
- Event streaming with Kafka enables real-time estimation and systems integration
Why Event Sourcing?
Shows your database current state. But how did it get there? Who changed what? When? Why?
-- Traditional: Current state only
SELECT balance FROM accounts WHERE id = 123;
-- Result: 1000
-- Event sourced: Complete history
SELECT * FROM events WHERE aggregate_id = 123;
-- Shows every deposit, withdrawal, fee, interest
We needed an audit trail for financial compliance. Event sourcing gave us time travel, debugging superpowers, and true scalability.
Key Concepts in 5 Minutes
Event sourcing stores changes in state as a sequence of events rather than overwriting the data. Instead of updated statements that destroy history, we add irreversible events that tell the whole story.
Traditional systems show what is there. Event sourcing shows what happened. This difference transforms debugging, auditing, and analytics. When a bug corrupts the data, we can replay the events to find out when and how it happened.
Events are facts about the past – they cannot be changed or removed. This immutability provides natural audit logging and enables powerful patterns such as temporal queries and retroactive fixes.
The state turns leftist on events. Current balance is not stored; It is calculated by recalculating all deposits and withdrawals. This sounds slow but with snapshots and projections, it is actually faster than traditional systems for many use cases.
// Events capture business intent
type AccountOpened struct {
AccountID string
Currency string
}
type MoneyDeposited struct {
AccountID string
Amount decimal.Decimal
}
// State derived from event history
func (a *Account) Apply(event Event) {
// Rebuild state by replaying events
}
Production Event Store
A production event store needs to handle millions of events efficiently. Our PostgreSQL-based implementation processes 10K events/second with proper indexing and partitioning. The add-only nature makes it very fast – no updates, no deletes, just inserts.
Event order is important for continuity. We use database sequences per aggregate to ensure that events are applied in the correct order. This prevents race conditions where concurrent operations can corrupt the state.
Schema design balances normalization with query performance. Event data is stored as JSON for flexibility, while frequently asked fields (aggregate_id, event_type) are indexed columns. This hybrid approach enables both fast query and schema development.
Metadata tracks important context: user ID, correlation ID, causal ID. This audit trail proves invaluable for debugging and compliance. Every state change can be traced back to its origin.
type EventStore struct {
db *sql.DB
}
type StoredEvent struct {
ID uuid.UUID
AggregateID string
EventType string
EventVersion int
EventData json.RawMessage
Metadata json.RawMessage
OccurredAt time.Time
}
// Append-only schema with proper indexes
const schema = `
CREATE TABLE events (
id UUID PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_version INT NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
occurred_at TIMESTAMP NOT NULL,
recorded_at TIMESTAMP NOT NULL DEFAULT NOW(),
-- Ensure events are ordered per aggregate
UNIQUE(aggregate_id, event_version),
-- Indexes for queries
INDEX idx_aggregate (aggregate_id, event_version),
INDEX idx_event_type (event_type),
INDEX idx_occurred_at (occurred_at)
);
-- Global event sequence for ordering
CREATE SEQUENCE IF NOT EXISTS global_event_sequence;
ALTER TABLE events ADD COLUMN global_sequence BIGINT DEFAULT nextval('global_event_sequence');
CREATE INDEX idx_global_sequence ON events(global_sequence);
`
func (es *EventStore) SaveEvents(ctx context.Context, aggregateID, aggregateType string, events []Event, expectedVersion int) error {
tx, err := es.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Check optimistic concurrency
var currentVersion int
err = tx.QueryRow(`
SELECT COALESCE(MAX(event_version), 0)
FROM events
WHERE aggregate_id = $1`,
aggregateID,
).Scan(¤tVersion)
if err != nil {
return err
}
if currentVersion != expectedVersion {
return fmt.Errorf("concurrency conflict: expected version %d, got %d",
expectedVersion, currentVersion)
}
// Save events
version := expectedVersion
for _, event := range events {
version++
eventData, err := json.Marshal(event)
if err != nil {
return err
}
metadata := map[string]interface{}{
"user_id": ctx.Value("user_id"),
"trace_id": ctx.Value("trace_id"),
"source": ctx.Value("source"),
}
metadataJSON, _ := json.Marshal(metadata)
_, err = tx.Exec(`
INSERT INTO events (
aggregate_id, aggregate_type, event_type,
event_version, event_data, metadata, occurred_at
) VALUES ($1, $2, $3, $4, $5, $6, $7)`,
aggregateID,
aggregateType,
event.EventType(),
version,
eventData,
metadataJSON,
event.OccurredAt(),
)
if err != nil {
return err
}
}
return tx.Commit()
}
func (es *EventStore) GetEvents(ctx context.Context, aggregateID string, fromVersion int) ([]StoredEvent, error) {
rows, err := es.db.QueryContext(ctx, `
SELECT
id, aggregate_id, aggregate_type, event_type,
event_version, event_data, metadata,
occurred_at, recorded_at
FROM events
WHERE aggregate_id = $1 AND event_version > $2
ORDER BY event_version`,
aggregateID, fromVersion,
)
if err != nil {
return nil, err
}
defer rows.Close()
var events []StoredEvent
for rows.Next() {
var e StoredEvent
err := rows.Scan(
&e.ID, &e.AggregateID, &e.AggregateType,
&e.EventType, &e.EventVersion, &e.EventData,
&e.Metadata, &e.OccurredAt, &e.RecordedAt,
)
if err != nil {
return nil, err
}
events = append(events, e)
}
return events, nil
}
composite root pattern
type AggregateRoot struct {
ID string
Version int
uncommittedEvents []Event
}
func (a *AggregateRoot) RecordEvent(event Event) {
a.uncommittedEvents = append(a.uncommittedEvents, event)
a.Version++
}
func (a *AggregateRoot) GetUncommittedEvents() []Event {
return a.uncommittedEvents
}
func (a *AggregateRoot) MarkEventsAsCommitted() {
a.uncommittedEvents = []Event{}
}
// Example: Account aggregate
type Account struct {
AggregateRoot
Balance decimal.Decimal
Currency string
Status string
}
func (a *Account) Deposit(amount decimal.Decimal) error {
if amount.LessThanOrEqual(decimal.Zero) {
return fmt.Errorf("invalid deposit amount: %v must be positive", amount)
}
event := MoneyDeposited{
AccountID: a.ID,
Amount: amount,
Timestamp: time.Now(),
}
a.Apply(event)
a.RecordEvent(event)
return nil
}
func (a *Account) Withdraw(amount decimal.Decimal) error {
if amount.GreaterThan(a.Balance) {
return fmt.Errorf("insufficient funds: attempting to withdraw %v from balance %v", amount, a.Balance)
}
event := MoneyWithdrawn{
AccountID: a.ID,
Amount: amount,
Timestamp: time.Now(),
}
a.Apply(event)
a.RecordEvent(event)
return nil
}
func (a *Account) Apply(event Event) {
switch e := event.(type) {
case MoneyDeposited:
a.Balance = a.Balance.Add(e.Amount)
case MoneyWithdrawn:
a.Balance = a.Balance.Sub(e.Amount)
}
}
CQRS: Command and Query Separation
// Write side: Commands modify aggregates
type CommandHandler struct {
eventStore *EventStore
eventBus *EventBus
}
func (h *CommandHandler) Handle(cmd Command) error {
switch c := cmd.(type) {
case DepositMoney:
return h.handleDeposit(c)
case WithdrawMoney:
return h.handleWithdraw(c)
}
return errors.New("unknown command")
}
func (h *CommandHandler) handleDeposit(cmd DepositMoney) error {
// Load aggregate from events
account := &Account{}
events, err := h.eventStore.GetEvents(ctx, cmd.AccountID, 0)
if err != nil {
return err
}
for _, e := range events {
account.Apply(e)
}
// Execute business logic
err = account.Deposit(cmd.Amount)
if err != nil {
return err
}
// Save new events
err = h.eventStore.SaveEvents(
ctx,
account.ID,
"Account",
account.GetUncommittedEvents(),
account.Version,
)
if err != nil {
return err
}
// Publish for projections
for _, event := range account.GetUncommittedEvents() {
h.eventBus.Publish(event)
}
return nil
}
// Read side: Projections for queries
type AccountProjection struct {
db *sql.DB
}
func (p *AccountProjection) Handle(event Event) error {
switch e := event.(type) {
case MoneyDeposited:
_, err := p.db.Exec(`
UPDATE account_projections
SET balance = balance + $1, updated_at = NOW()
WHERE account_id = $2`,
e.Amount, e.AccountID,
)
return err
case MoneyWithdrawn:
_, err := p.db.Exec(`
UPDATE account_projections
SET balance = balance - $1, updated_at = NOW()
WHERE account_id = $2`,
e.Amount, e.AccountID,
)
return err
}
return nil
}
// Query handler reads from projections
type QueryHandler struct {
db *sql.DB
}
func (q *QueryHandler) GetAccountBalance(accountID string) (decimal.Decimal, error) {
var balance decimal.Decimal
err := q.db.QueryRow(`
SELECT balance FROM account_projections WHERE account_id = $1`,
accountID,
).Scan(&balance)
return balance, err
}
⚠️ Ultimate Consistency Tradeoff
CQRS introduction ultimate stability Between write and read models:
- Events are written to the event store immediately
- Estimates are updated asynchronously (typically from milliseconds to seconds).
- Queries may return data that is out of date until estimated
- Design your UX to handle this: optimistic UI updates, “processing” state, or read-write guarantees where important
snapshot for display
type Snapshot struct {
AggregateID string
Version int
Data []byte
CreatedAt time.Time
}
func (es *EventStore) SaveSnapshot(ctx context.Context, snapshot Snapshot) error {
_, err := es.db.ExecContext(ctx, `
INSERT INTO snapshots (aggregate_id, version, data, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (aggregate_id)
DO UPDATE SET version = $2, data = $3, created_at = $4`,
snapshot.AggregateID,
snapshot.Version,
snapshot.Data,
snapshot.CreatedAt,
)
return err
}
func (es *EventStore) GetSnapshot(ctx context.Context, aggregateID string) (*Snapshot, error) {
var s Snapshot
err := es.db.QueryRowContext(ctx, `
SELECT aggregate_id, version, data, created_at
FROM snapshots
WHERE aggregate_id = $1`,
aggregateID,
).Scan(&s.AggregateID, &s.Version, &s.Data, &s.CreatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
return &s, err
}
// Load aggregate with snapshot optimization
func LoadAccount(es *EventStore, accountID string) (*Account, error) {
account := &Account{}
// Try to load snapshot
snapshot, err := es.GetSnapshot(ctx, accountID)
if err != nil {
return nil, err
}
fromVersion := 0
if snapshot != nil {
// Restore from snapshot
err = json.Unmarshal(snapshot.Data, account)
if err != nil {
return nil, err
}
fromVersion = snapshot.Version
}
// Apply events after snapshot
events, err := es.GetEvents(ctx, accountID, fromVersion)
if err != nil {
return nil, err
}
for _, e := range events {
account.Apply(e)
}
// Create new snapshot every 100 events
if len(events) > 100 {
snapshotData, _ := json.Marshal(account)
es.SaveSnapshot(ctx, Snapshot{
AggregateID: accountID,
Version: account.Version,
Data: snapshotData,
CreatedAt: time.Now(),
})
}
return account, nil
}
Event Streaming with Kafka
type EventStreamer struct {
eventStore *EventStore
producer *kafka.Writer
lastSeq int64
}
func (s *EventStreamer) StreamEvents(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.publishNewEvents(ctx)
}
}
}
func (s *EventStreamer) publishNewEvents(ctx context.Context) {
rows, err := s.eventStore.db.QueryContext(ctx, `
SELECT
global_sequence, aggregate_id, event_type,
event_data, occurred_at
FROM events
WHERE global_sequence > $1
ORDER BY global_sequence
LIMIT 1000`,
s.lastSeq,
)
if err != nil {
return
}
defer rows.Close()
var messages []kafka.Message
var maxSeq int64
for rows.Next() {
var seq int64
var aggregateID, eventType string
var eventData json.RawMessage
var occurredAt time.Time
rows.Scan(&seq, &aggregateID, &eventType, &eventData, &occurredAt)
messages = append(messages, kafka.Message{
Topic: fmt.Sprintf("events.%s", eventType),
Key: []byte(aggregateID),
Value: eventData,
Headers: []kafka.Header{
{Key: "event_type", Value: []byte(eventType)},
{Key: "occurred_at", Value: []byte(occurredAt.Format(time.RFC3339))},
},
})
maxSeq = seq
}
if len(messages) > 0 {
err := s.producer.WriteMessages(ctx, messages...)
if err == nil {
s.lastSeq = maxSeq
}
}
}
temporal question (time travel)
// Get account state at specific point in time
func (es *EventStore) GetAggregateAtTime(ctx context.Context, aggregateID string, pointInTime time.Time) (*Account, error) {
events, err := es.db.QueryContext(ctx, `
SELECT event_type, event_data
FROM events
WHERE aggregate_id = $1 AND occurred_at <= $2
ORDER BY event_version`,
aggregateID, pointInTime,
)
if err != nil {
return nil, err
}
defer events.Close()
account := &Account{}
for events.Next() {
var eventType string
var eventData json.RawMessage
events.Scan(&eventType, &eventData)
event := deserializeEvent(eventType, eventData)
account.Apply(event)
}
return account, nil
}
// Replay events for debugging
func ReplayEvents(es *EventStore, from, to time.Time, handler func(Event)) error {
rows, err := es.db.Query(`
SELECT event_type, event_data, occurred_at
FROM events
WHERE occurred_at BETWEEN $1 AND $2
ORDER BY global_sequence`,
from, to,
)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var eventType string
var eventData json.RawMessage
var occurredAt time.Time
rows.Scan(&eventType, &eventData, &occurredAt)
event := deserializeEvent(eventType, eventData)
handler(event)
}
return nil
}
Saga pattern for distributed transactions
type TransferSaga struct {
ID string
FromAccount string
ToAccount string
Amount decimal.Decimal
State string
CompletedSteps []string
}
func (s *TransferSaga) Handle(event Event) ([]Command, error) {
switch e := event.(type) {
case TransferInitiated:
return []Command{
WithdrawMoney{AccountID: e.FromAccount, Amount: e.Amount},
}, nil
case MoneyWithdrawn:
if e.AccountID == s.FromAccount {
s.CompletedSteps = append(s.CompletedSteps, "withdrawn")
return []Command{
DepositMoney{AccountID: s.ToAccount, Amount: s.Amount},
}, nil
}
case MoneyDeposited:
if e.AccountID == s.ToAccount {
s.State = "completed"
return []Command{
MarkTransferComplete{TransferID: s.ID},
}, nil
}
case WithdrawFailed:
s.State = "failed"
return nil, nil
case DepositFailed:
// Compensate - refund the withdrawal
return []Command{
DepositMoney{AccountID: s.FromAccount, Amount: s.Amount},
}, nil
}
return nil, nil
}
Event Store Consistency Warning
Event stores need to pay careful attention to:
- Optimistic concurrency control to prevent data corruption
- Guaranteeing event order within sets
- Backup and recovery procedures for event streams
- Event Schema Development and Versioning Strategies
security considerations
Event Sourcing Security Best Practices
- Event Encryption: Encrypt sensitive data in event payload
- access control: Role-based access to event streams and projections
- Audit: Include user context and authorization in event metadata
- data privacy: Enforce the “right to be forgotten” through cryptographic erasure
- Replay Protection: Ensure event replay does not circumvent current security regulations
// Secure event with encryption
type SecureEvent struct {
BaseEvent
EncryptedPayload []byte
KeyID string
Nonce []byte
}
// GDPR-compliant cryptographic erasure
type GDPREventStore struct {
*EventStore
keyManager *KeyManager
}
func (ges *GDPREventStore) ForgetUser(ctx context.Context, userID string) error {
events, err := ges.GetEventsByUser(ctx, userID)
if err != nil {
return fmt.Errorf("failed to find user events: %w", err)
}
for _, event := range events {
if err := ges.keyManager.RevokeKey(event.KeyID); err != nil {
return fmt.Errorf("failed to revoke key %s: %w", event.KeyID, err)
}
}
return ges.MarkUserForgotten(ctx, userID)
}
testing strategy
📊 Event Sourcing Testing Framework
Comprehensive Testing Approach for Event-Sourced Systems:
- Event Store Test: Test consistency, concurrency and durability
- Overall Test: Unit testing business logic and immutable
- Launch Test: Verify consistency of read model
- integration test: End-to-end command/query flow
- Event Schema Testing: test event development and migration
// Event store integration test
func TestEventStore(t *testing.T) {
es := setupTestEventStore
defer es.Close()
t.Run("ConcurrencyControl", func(t *testing.T) {
aggregateID := uuid.New().String()
// First save succeeds
err := es.SaveEvents(context.Background(), aggregateID, "Account",
[]Event{&AccountOpened{AccountID: aggregateID}}, 0)
require.NoError(t, err)
// Second save with wrong version fails
err = es.SaveEvents(context.Background(), aggregateID, "Account",
[]Event{&MoneyDeposited{AccountID: aggregateID}}, 0)
require.Error(t, err)
require.Contains(t, err.Error(), "concurrency conflict")
})
}
production monitoring
// Event store metrics
type Metrics struct {
EventsWritten prometheus.Counter
EventsRead prometheus.Counter
SnapshotCreated prometheus.Counter
WriteLatency prometheus.Histogram
ReadLatency prometheus.Histogram
}
// Health checks
func (es *EventStore) HealthCheck() error {
// Check write capability
testEvent := HealthCheckEvent{
ID: uuid.New().String(),
Timestamp: time.Now(),
}
err := es.SaveEvents(ctx, "health", "HealthCheck", []Event{testEvent}, 0)
if err != nil {
return fmt.Errorf("write check failed: %w", err)
}
// Check read capability
events, err := es.GetEvents(ctx, "health", 0)
if err != nil {
return fmt.Errorf("read check failed: %w", err)
}
if len(events) == 0 {
return errors.New("no events found")
}
return nil
}
// Lag monitoring
func MonitorProjectionLag(db *sql.DB) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
var lag time.Duration
db.QueryRow(`
SELECT MAX(NOW() - updated_at)
FROM projection_checkpoints`
).Scan(&lag)
projectionLag.Set(lag.Seconds())
if lag > 5*time.Minute {
alert("Projection lag exceeds 5 minutes")
}
}
}
performance optimization
// 1. Batch event writes
func (es *EventStore) SaveEventsBatch(events []EventWithAggregate) error {
// Use COPY for bulk insert
stmt, err := es.db.Prepare(pq.CopyIn("events",
"aggregate_id", "aggregate_type", "event_type",
"event_version", "event_data", "occurred_at"))
if err != nil {
return err
}
for _, e := range events {
_, err = stmt.Exec(e.AggregateID, e.AggregateType,
e.EventType, e.Version, e.Data, e.OccurredAt)
if err != nil {
return err
}
}
return stmt.Close()
}
// 2. Parallel projection updates
func UpdateProjectionsParallel(events []Event) {
var wg sync.WaitGroup
ch := make(chan Event, 100)
// Start workers
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for event := range ch {
updateProjection(event)
}
}()
}
// Send events
for _, e := range events {
ch <- e
}
close(ch)
wg.Wait()
}
// 3. Cache aggregates
var aggregateCache = cache.New(5*time.Minute, 10*time.Minute)
func LoadAccountCached(es *EventStore, accountID string) (*Account, error) {
if cached, found := aggregateCache.Get(accountID); found {
return cached.(*Account), nil
}
account, err := LoadAccount(es, accountID)
if err != nil {
return nil, err
}
aggregateCache.Set(accountID, account, cache.DefaultExpiration)
return account, nil
}
Escape from traditional system
// Generate events from existing state
func MigrateToEventSourcing(db *sql.DB, es *EventStore) error {
rows, err := db.Query(`
SELECT id, balance, created_at, updated_at
FROM accounts`)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var id string
var balance decimal.Decimal
var createdAt, updatedAt time.Time
rows.Scan(&id, &balance, &createdAt, &updatedAt)
// Create initial event
events := []Event{
AccountOpened{
AccountID: id,
Timestamp: createdAt,
},
}
// Infer deposit event from balance
if balance.GreaterThan(decimal.Zero) {
events = append(events, MoneyDeposited{
AccountID: id,
Amount: balance,
Timestamp: updatedAt,
})
}
es.SaveEvents(ctx, id, "Account", events, 0)
}
return nil
}
lessons from production
| metric | before(CRUD) | After (Event Sourcing) |
|---|---|---|
| write throughput | 1K/second | 10K/second |
| Latency Read Page99 | 5ms | 2ms (estimate) |
| audit completeness | 60% | 100% |
| debug time | hours | minutes (repetition) |
| storage cost | $1K/month | $3-5K/month |
When not to use event sourcing
- CRUD is enough (most apps)
- No audit required
- simple domain logic
- unfamiliar with team patterns
- Storage costs are important
Decision
Event sourcing is not free. 3-5x storage cost (events + projections + snapshots). Complex to implement. Mental model change.
But for financial systems, audit-heavy domains, or complex business logic? This is transformative. Complete history, perfect audit trail, time travel debugging and horizontal scalability.
start small: Event Source A set. See the benefits. Then expand. Don’t go in right away.