// thinkbeforecoding

Event Sourcing and CQRS, Now !

2009-10-30T14:53:50 / jeremie chassaing

Enough talking, Action !

Today, we will build a basic event sourcing infrastructure. Get the beta 2 of Visual Studio 2010, we’ll be using C# dynamic features to go straight to our goal.

Then Event Storage

Let’s hide the ugly details of the event storage behind two simple interfaces :

    public interface IEventStorage : IDisposable

    {

        IAggregateRootStorage<TId> GetAggregateRootStore<TAggregateRoot, TId>()

            where TAggregateRoot : AggregateRoot<TId>;

    }

 

    public interface IAggregateRootStorage<in TId>

    {

        void Append(TId id, IEnumerable<object> events);

        IEnumerable<object> this[TId id] { get; }

    }

And we start with a minimal in memory implementation, the event storage first :

    public class EventStorage : IEventStorage

    {

        private readonly Dictionary<Type, dynamic> stores = new Dictionary<Type, dynamic>();

 

        public IAggregateRootStorage<TId> GetAggregateRootStorage<TAggregateRoot, TId>()

           where TAggregateRoot : AggregateRoot<TId>

        {

 

            dynamic store;

            if (!stores.TryGetValue(typeof(TAggregateRoot), out store))

            {

                store = new AggregateRootStorage<TId>();

                stores.Add(typeof (TAggregateRoot), store);

            }

            return store;

        }

 

        public void Dispose()

        {

            stores.Clear();

        }

    }

Here I could replace dynamic by object, and cast to requested type on return. I use dynamic because this kind of code is not compile time safe anyway. There’s a specific storage for each Aggregate Root type, especially depending on identifier type, for type safety.

Then the AggregateRootStorage :

    class AggregateRootStorage<TId> : IAggregateRootStorage<TId>

    {

        private readonly Dictionary<TId, List<object>> store = new Dictionary<TId, List<object>>();

 

        public void Append(TId id, IEnumerable<object> events)

        {

            List<object> aggregateRootEvents;

            if (!store.TryGetValue(id, out aggregateRootEvents))

            {

                aggregateRootEvents = new List<object>();

                store.Add(id, aggregateRootEvents);

            }

 

            aggregateRootEvents.AddRange(events);

        }

 

        public IEnumerable<object> this[TId id]

        {

            get { return store[id]; }

        }

    }

It simply stores list of events associated with aggregate root identifier.

The Aggregate Root

Aggregate roots manage uncommitted events :

    public interface IUncommittedEvents : IEnumerable<object>

    {

        bool HasEvents { get; }

        void Commit();

    }

The interface can indicates whether there are events, returns the events, and clears the uncommitted events by committing.

Quick implementation :

    internal class UncommittedEvents : IUncommittedEvents

    {

        private readonly List<object> events = new List<object>();

 

        public void Append(object @event)

        {

            events.Add(@event);

        }

 

        IEnumerator<object> IEnumerable<object>.GetEnumerator()

        {

            return events.GetEnumerator();

        }

 

        public bool HasEvents

        {

            get { return events.Count != 0; }

        }

 

        void IUncommittedEvents.Commit()

        {

            events.Clear();

        }

 

        IEnumerator IEnumerable.GetEnumerator()

        {

            return events.GetEnumerator();

        }

    }

Nothing tricky here neither.

Now, the IAggregateRoot interface used by the repository gives access to the uncommitted events:

    public interface IAggregateRoot<out TId>

    {

        TId Id { get; }

        IUncommittedEvents UncommittedEvents { get; }

    }

The AggregateRoot class will maintain the uncommitted events :

    public abstract class AggregateRoot<TId> : IAggregateRoot<TId>

    {

        private readonly UncommittedEvents uncommittedEvents = new UncommittedEvents();

 

        protected void Replay(IEnumerable<object> events)

        {

            dynamic me = this;

            foreach (var @event in events)

                me.Apply(@event);

        }

 

        protected void Append(object @event)

        {

            uncommittedEvents.Append(@event);

        }

 

        public abstract TId Id { get; }

 

        IUncommittedEvents IAggregateRoot<TId>.UncommittedEvents

        {

            get { return uncommittedEvents; }

        }

    }

The Append method will be use by child class to append events after they are applied.

The Replay method is used in the child class constructor to rebuild the Aggregate Root state from events.

Here I use a dynamic me variable to dispatch events on specific child class Apply methods. A .Net 2 or 3.5 implementation would use reflection to dispatch events on Apply methods.

The UncommittedEvents property is implemented explicitly so that it does not appear in standard class use.

The Repository

The repository is just very slightly longer. I added a session concept so that several repositories can submit changes in a single transaction :

    internal interface ISessionItem

    {

        void SubmitChanges();

    }

 

     public abstract class Repository<TId, TAggregateRoot> : ISessionItem

        where TAggregateRoot : AggregateRoot<TId>

    {

        private readonly Dictionary<TId, TAggregateRoot> users = new Dictionary<TId, TAggregateRoot>();

        private readonly IAggregateRootStorage<TId> aggregateRootStorage;

 

        protected Repository()

        {

            aggregateRootStorage = Session.Enlist(this);

        }

 

        public void Add(TAggregateRoot user)

        {

            users.Add(user.Id, user);

        }

 

        public TAggregateRoot this[TId id]

        {

            get { return Find(id) ?? Load(id); }

        }

 

        private TAggregateRoot Find(TId id)

        {

            TAggregateRoot user;

            return users.TryGetValue(id, out user) ? user : null;

        }

 

        private TAggregateRoot Load(TId id)

        {

            var events = aggregateRootStorage[id];

            var user = CreateInstance(id, events);

 

            users.Add(id, user);

 

            return user;

        }

 

        protected abstract TAggregateRoot CreateInstance(TId id, IEnumerable<object> events);

 

        public void SubmitChanges()

        {

            foreach (IAggregateRoot<TId> user in users.Values)

            {

                var uncomitedEvents = user.UncommittedEvents;

                if (uncomitedEvents.HasEvents)

                {

                    aggregateRootStorage.Append(user.Id, uncomitedEvents);

                    PublishEvents(uncomitedEvents);

                    uncomitedEvents.Commit();

                }

            }

            users.Clear();

        }

 

        protected void PublishEvents(IUncommittedEvents uncommittedEvents)

        {

            foreach (dynamic @event in uncommittedEvents)

                DomainEvents.Raise(@event);

        }

 

    }

The constructor enlist the repository in current session.

The Add method registers the aggregate root in the repository, its events will be persisted in SubmitChanges()

The indexer finds an entity already in memory or loads it from the event store. The abstract CreateInstance method implementation will be responsible for instantiation.

Submit changes does what is expected, and also publish committed events. Will see the trick with dynamic @events when we analyze domain events.

The Session and its Factory

Just to group the SubmitChanges on several repositories :

    public interface ISessionFactory : IDisposable

    {

        ISession OpenSession();

    }

 

    public class SessionFactory : ISessionFactory

    {

        private readonly IEventStorage eventStorage;

 

        public SessionFactory(IEventStorage eventStorage)

        {

            this.eventStorage = eventStorage;

        }

 

        public ISession OpenSession()

        {

            return new Session(eventStorage);

        }

 

        public void Dispose()

        {

            eventStorage.Dispose();

        }

    }

 

    public interface ISession : IDisposable

    {

        void SubmitChanges();

    }

 

    public class Session : ISession

    {

        private readonly IEventStorage eventStorage;

        private readonly HashSet<ISessionItem> enlistedItems = new HashSet<ISessionItem>();

 

        [ThreadStatic] private static Session current;

 

        internal Session(IEventStorage eventStorage)

        {

            this.eventStorage = eventStorage;

            if (current != null)

                throw new InvalidOperationException("Cannot nest unit of work");

 

            current = this;

        }

 

        private static Session Current

        {

            get { return current; }

        }

 

        public void SubmitChanges()

        {

            foreach (var enlisted in enlistedItems)

                enlisted.SubmitChanges();

 

            enlistedItems.Clear();

        }

 

        public void Dispose()

        {

            current = null;

        }

 

        internal static IAggregateRootStorage<TId> Enlist<TId, TAggregateRoot>

                        (Repository<TId, TAggregateRoot> repository)

            where TAggregateRoot : AggregateRoot<TId>

        {

            var unitOfWork = Current;

            unitOfWork.enlistedItems.Add(repository);

            return unitOfWork.eventStorage.GetAggregateRootStorage<TAggregateRoot, TId>();

        }

    }

Ok almost everything is here. The last part, for events to be used by other parts of the system, needed to go CQRS.

Domain Events

Here, I made a minor variation on Udi Dahan’s DomainEvents implementation :

public static class DomainEvents

    {

        [ThreadStatic] private static List<Delegate> actions;

 

        private static List<Handler> handlers;

 

        public static void Register<T>(Action<T> callback)

        {

            if (actions == null) actions = new List<Delegate>();

            actions.Add(callback);

        }

 

        public static void RegisterHanlder<T>(Func<T> factory)

        {

            if (handlers == null) handlers = new List<Handler>();

            handlers.Add(new Handler<T>(factory));

        }

 

        //Raises the given domain event      

        public static void Raise<T>(T @event)

        {

            if (actions != null)

                foreach (Delegate action in actions)

                    if (action is Action<T>)

                        ((Action<T>) action)(@event);

 

            if (handlers != null)

                foreach (var h in handlers)

                {

                    if (h.Handles<T>())

                    {

                        var handler= h.CreateInstance<T>();

                        handler.Handle(@event);

                    }

                }

        }

 

        private abstract class Handler

        {

            public abstract bool Handles<E>();

            public abstract Handles<E> CreateInstance<E>();

        }

 

        private class Handler<T> : Handler

        {

 

            private readonly Func<T> factory;

 

            public Handler(Func<T> factory)

            {

                this.factory = factory;

            }

 

            public override bool Handles<E>()

            {

                return typeof (Handles<E>)

                   .IsAssignableFrom(typeof (T));

            }

 

            public override Handles<E> CreateInstance<E>()

            {

                return (Handles<E>)factory();

            }

        }

    }

 

    public interface Handles<in T>

    {

        void Handle(T @event);

    }

Edit : Changed the Handles<E>, T should be casted as Handles<E>, not as E, of course.

Event handlers can be registerd as Action<T> delegates or as class that implements once or more Handles<T>.

The private Handler and Handler<T> classes are used to find handlers that handles a specific message and dispatch it, without using a Dependency Injection Container like Udi’s implementation.

The simple dynamic-fu in the repository was to call DomainEvents.Raise<T> using a dynamic dispatch. This way, Raise is always called with the actual event type in T. No tricky reflection is needed for the dispatch. inside Raise<T>, we can the rely on T as being the actual event type. Funky !

Next Time…

There’s already a lot of code for a single post, every thing is in place at infrastructure level. You can already try it for yourself if you can figure how to.

The sample will come in the next post, stay tuned.