Event Sourcing and CQRS, Now !
By Jérémie Chassaing on Friday, October 30, 2009, 15:53 - Domain Driven Design - Permalink
Enough talking, Action !
Today, we will build a basic event sourcing infrastructure. Get the beta 2 of Visual Studio 2010, we’ll be using C# dynamic features to go straight to our goal.
Then Event Storage
Let’s hide the ugly details of the event storage behind two simple interfaces :
public interface IEventStorage : IDisposable
{
IAggregateRootStorage<TId> GetAggregateRootStore<TAggregateRoot, TId>()
where TAggregateRoot : AggregateRoot<TId>;
}
public interface IAggregateRootStorage<in TId>
{
void Append(TId id, IEnumerable<object> events);
IEnumerable<object> this[TId id] { get; }
}
And we start with a minimal in memory implementation, the event storage first :
public class EventStorage : IEventStorage
{
private readonly Dictionary<Type, dynamic> stores = new Dictionary<Type, dynamic>();
public IAggregateRootStorage<TId> GetAggregateRootStorage<TAggregateRoot, TId>()
where TAggregateRoot : AggregateRoot<TId>
{
dynamic store;
if (!stores.TryGetValue(typeof(TAggregateRoot), out store))
{
store = new AggregateRootStorage<TId>();
stores.Add(typeof (TAggregateRoot), store);
}
return store;
}
public void Dispose()
{
stores.Clear();
}
}
Here I could replace dynamic by object, and cast to requested type on return. I use dynamic because this kind of code is not compile time safe anyway. There’s a specific storage for each Aggregate Root type, especially depending on identifier type, for type safety.
Then the AggregateRootStorage :
class AggregateRootStorage<TId> : IAggregateRootStorage<TId>
{
private readonly Dictionary<TId, List<object>> store = new Dictionary<TId, List<object>>();
public void Append(TId id, IEnumerable<object> events)
{
List<object> aggregateRootEvents;
if (!store.TryGetValue(id, out aggregateRootEvents))
{
aggregateRootEvents = new List<object>();
store.Add(id, aggregateRootEvents);
}
aggregateRootEvents.AddRange(events);
}
public IEnumerable<object> this[TId id]
{
get { return store[id]; }
}
}
It simply stores list of events associated with aggregate root identifier.
The Aggregate Root
Aggregate roots manage uncommitted events :
public interface IUncommittedEvents : IEnumerable<object>
{
bool HasEvents { get; }
void Commit();
}
The interface can indicates whether there are events, returns the events, and clears the uncommitted events by committing.
Quick implementation :
internal class UncommittedEvents : IUncommittedEvents
{
private readonly List<object> events = new List<object>();
public void Append(object @event)
{
events.Add(@event);
}
IEnumerator<object> IEnumerable<object>.GetEnumerator()
{
return events.GetEnumerator();
}
public bool HasEvents
{
get { return events.Count != 0; }
}
void IUncommittedEvents.Commit()
{
events.Clear();
}
IEnumerator IEnumerable.GetEnumerator()
{
return events.GetEnumerator();
}
}
Nothing tricky here neither.
Now, the IAggregateRoot interface used by the repository gives access to the uncommitted events:
public interface IAggregateRoot<out TId>
{
TId Id { get; }
IUncommittedEvents UncommittedEvents { get; }
}
The AggregateRoot class will maintain the uncommitted events :
public abstract class AggregateRoot<TId> : IAggregateRoot<TId>
{
private readonly UncommittedEvents uncommittedEvents = new UncommittedEvents();
protected void Replay(IEnumerable<object> events)
{
dynamic me = this;
foreach (var @event in events)
me.Apply(@event);
}
protected void Append(object @event)
{
uncommittedEvents.Append(@event);
}
public abstract TId Id { get; }
IUncommittedEvents IAggregateRoot<TId>.UncommittedEvents
{
get { return uncommittedEvents; }
}
}
The Append method will be use by child class to append events after they are applied.
The Replay method is used in the child class constructor to rebuild the Aggregate Root state from events.
Here I use a dynamic me variable to dispatch events on specific child class Apply methods. A .Net 2 or 3.5 implementation would use reflection to dispatch events on Apply methods.
The UncommittedEvents property is implemented explicitly so that it does not appear in standard class use.
The Repository
The repository is just very slightly longer. I added a session concept so that several repositories can submit changes in a single transaction :
internal interface ISessionItem
{
void SubmitChanges();
}
public abstract class Repository<TId, TAggregateRoot> : ISessionItem
where TAggregateRoot : AggregateRoot<TId>
{
private readonly Dictionary<TId, TAggregateRoot> users = new Dictionary<TId, TAggregateRoot>();
private readonly IAggregateRootStorage<TId> aggregateRootStorage;
protected Repository()
{
aggregateRootStorage = Session.Enlist(this);
}
public void Add(TAggregateRoot user)
{
users.Add(user.Id, user);
}
public TAggregateRoot this[TId id]
{
get { return Find(id) ?? Load(id); }
}
private TAggregateRoot Find(TId id)
{
TAggregateRoot user;
return users.TryGetValue(id, out user) ? user : null;
}
private TAggregateRoot Load(TId id)
{
var events = aggregateRootStorage[id];
var user = CreateInstance(id, events);
users.Add(id, user);
return user;
}
protected abstract TAggregateRoot CreateInstance(TId id, IEnumerable<object> events);
public void SubmitChanges()
{
foreach (IAggregateRoot<TId> user in users.Values)
{
var uncomitedEvents = user.UncommittedEvents;
if (uncomitedEvents.HasEvents)
{
aggregateRootStorage.Append(user.Id, uncomitedEvents);
PublishEvents(uncomitedEvents);
uncomitedEvents.Commit();
}
}
users.Clear();
}
protected void PublishEvents(IUncommittedEvents uncommittedEvents)
{
foreach (dynamic @event in uncommittedEvents)
DomainEvents.Raise(@event);
}
}
The constructor enlist the repository in current session.
The Add method registers the aggregate root in the repository, its events will be persisted in SubmitChanges()
The indexer finds an entity already in memory or loads it from the event store. The abstract CreateInstance method implementation will be responsible for instantiation.
Submit changes does what is expected, and also publish committed events. Will see the trick with dynamic @events when we analyze domain events.
The Session and its Factory
Just to group the SubmitChanges on several repositories :
public interface ISessionFactory : IDisposable
{
ISession OpenSession();
}
public class SessionFactory : ISessionFactory
{
private readonly IEventStorage eventStorage;
public SessionFactory(IEventStorage eventStorage)
{
this.eventStorage = eventStorage;
}
public ISession OpenSession()
{
return new Session(eventStorage);
}
public void Dispose()
{
eventStorage.Dispose();
}
}
public interface ISession : IDisposable
{
void SubmitChanges();
}
public class Session : ISession
{
private readonly IEventStorage eventStorage;
private readonly HashSet<ISessionItem> enlistedItems = new HashSet<ISessionItem>();
[ThreadStatic] private static Session current;
internal Session(IEventStorage eventStorage)
{
this.eventStorage = eventStorage;
if (current != null)
throw new InvalidOperationException("Cannot nest unit of work");
current = this;
}
private static Session Current
{
get { return current; }
}
public void SubmitChanges()
{
foreach (var enlisted in enlistedItems)
enlisted.SubmitChanges();
enlistedItems.Clear();
}
public void Dispose()
{
current = null;
}
internal static IAggregateRootStorage<TId> Enlist<TId, TAggregateRoot>
(Repository<TId, TAggregateRoot> repository)
where TAggregateRoot : AggregateRoot<TId>
{
var unitOfWork = Current;
unitOfWork.enlistedItems.Add(repository);
return unitOfWork.eventStorage.GetAggregateRootStorage<TAggregateRoot, TId>();
}
}
Ok almost everything is here. The last part, for events to be used by other parts of the system, needed to go CQRS.
Domain Events
Here, I made a minor variation on Udi Dahan’s DomainEvents implementation :
public static class DomainEvents
{
[ThreadStatic] private static List<Delegate> actions;
private static List<Handler> handlers;
public static void Register<T>(Action<T> callback)
{
if (actions == null) actions = new List<Delegate>();
actions.Add(callback);
}
public static void RegisterHanlder<T>(Func<T> factory)
{
if (handlers == null) handlers = new List<Handler>();
handlers.Add(new Handler<T>(factory));
}
//Raises the given domain event
public static void Raise<T>(T @event)
{
if (actions != null)
foreach (Delegate action in actions)
if (action is Action<T>)
((Action<T>) action)(@event);
if (handlers != null)
foreach (var h in handlers)
{
if (h.Handles<T>())
{
var handler= h.CreateInstance<T>();
handler.Handle(@event);
}
}
}
private abstract class Handler
{
public abstract bool Handles<E>();
public abstract Handles<E> CreateInstance<E>();
}
private class Handler<T> : Handler
{
private readonly Func<T> factory;
public Handler(Func<T> factory)
{
this.factory = factory;
}
public override bool Handles<E>()
{
return typeof (Handles<E>)
.IsAssignableFrom(typeof (T));
}
public override Handles<E> CreateInstance<E>()
{
return (Handles<E>)factory();
}
}
}
public interface Handles<in T>
{
void Handle(T @event);
}
Edit : Changed the Handles<E>, T should be casted as Handles<E>, not as E, of course.
Event handlers can be registerd as Action<T> delegates or as class that implements once or more Handles<T>.
The private Handler and Handler<T> classes are used to find handlers that handles a specific message and dispatch it, without using a Dependency Injection Container like Udi’s implementation.
The simple dynamic-fu in the repository was to call DomainEvents.Raise<T> using a dynamic dispatch. This way, Raise is always called with the actual event type in T. No tricky reflection is needed for the dispatch. inside Raise<T>, we can the rely on T as being the actual event type. Funky !
Next Time…
There’s already a lot of code for a single post, every thing is in place at infrastructure level. You can already try it for yourself if you can figure how to.
The sample will come in the next post, stay tuned.
Comments
Love the CQRS concept and looking forward for the next post.
My only problem is that the UI must be task-based to match up with the Commands been
given to the domain object. Otherwise if the UI consists of one large form with myriads
of editable textboxes and editable grids and saving everything in one transaction
with one single Save button (as most of our users demand), it becomes difficult
to map and produce N distinct commands for the domain object to process.
@cqrnoob> For sure, this architectures is made to focus on why the system changes. The events are part of the Ubiquitous Language and must be domain driven.
Form/Grid oriented UIs tend to see the changes at the technical level (a database row is updated).
Here, we'll make a difference if the user address changed because there was a mispelling of if the user moved.
I wish there's a VS2008 version =)
and a downloadable source code =)
Gr8 post.
I am in same condition as cqrnoob, the first commenter.
We also have same case where in we have large form with grids and user just changes whatever he needs to and clicks on save. This is the easiest method for them to do data entry.
How do I separte them into different commands?
One possibility is even though everything is on single screen we logically divide it into chunks and create commands for the same.
What is the best way?
Again a gr8 post.
Thanks,
Bkak.
@Bkak> There are surely groupings in your form that have a business meaning.
The hard part will be to extract from what user entered why he did it...
After this, you simply create commands using those parameters. Sometimes command can have many parameters (in this case you can embbeded it in dtos).
But if user 'updates' contains no domain signification, I'm not sure domain driven design is the best way to go...
@Bkak>The usual problem with Form/GridView oriented interfaces is that even if the fields capture the Nouns from the Ubiquitous Language, they lose the Verbs...
Is this thread safe?
If two commands are fired on the same instance of an aggregate root at the same time, would this not commit uncommitted events after the first command finished processing, but half way through the second command completing?
Also, I assume including the repositories in a UnitOfWork instead of a Session works just as well?
@Ben> There is no problem of thread safety as long as you use messaging.
Messages are processed sequentially, so no conflict can occure.
@jeremie: Thanks for the reply.
I'm using WCF and NServiceBus and in both cases I have multiple threads reading messages from MS Message Queuing queues. This means "command" messages can be processed out of order (and multiple messages for the same AR can be processed simultaneously).
As long as the AR is locked so only one Command message can access it at a time and in the case of an roll-back any changes to the "in-memory" copy of the AR are also rolled back, this all seems to work well.
From the event side, the order in which events arrive can be important (such as updating the query database), but the event side doesn't add events to the AR so this isn't much of a problem.
I know its a brief summary, but have I architected CQRS incorrectly? If not, I'd need to modify the above code to allow for locking of ARs and roll-backs.
@Ben> if you're locking your AR, there's no problem..
In the above implementation I just assume that commands/events are received sequencially for a given AR.
Anyway in case of failure in an AR, you should drop it from memory, and reload it next time it'll be used. But that's not a big deal since a command should be sent only when you're almost sure that it won't fail.
just quick question what is the logic behind the explicit interface method as such,making it private?
void IUncommittedEvents.Commit()
@Moby> yes, to avoid bloating the class