One of the things I've been doing in my toy "study" problem, has been to implement an in memory event store. That means no persistence, per se, but all of the block and tackle of getting data to move from the "write model" to the "read model".
In particular, I've been taking pains to ensure that the asynchronous points in the data transfer are modeled that way -- I'm using a DirectExecutorService to run the asynchronous tasks, but I want to make sure that I'm getting them "right".
So, for this toy event store, I use the streamIds as keys to a hash; the object that comes out is a description of the stream, including a complete list of the events in that stream. Each write is implemented as a task submitted to the executor service, which uses a lock to ensure that only one thread writes to the event store at a time. The commit method replaces a volatile reference to the hash with a reference to an updated copy, producing an atomic commit. As the toy problem has very forgiving SLAs, writes are not merely appends to the stream, but actually check for conflicts, duplication, and so on.
Riddle: how to now update the read model. The transaction is the write to the volatile memory location, and if that part succeeds the client should be informed. So we really can't do any sort of synchronous notification of the read model. Instead, another task is scheduled to perform the update.
What should that task do? Pub/sub! which is right, but deceptively so. The basic idea is fine - we're going to asynchronously dispatch a message to an event queue, and all the subscribers will pick up that update and react.
What's the message though. I had been thinking that we could just enumerate the events, or possible the collection of events, but that makes a mess on the downstream side. The two basic issues being (a) the broadcast is asynchronous, so you really need the message handling to be idempotent, and (b) being asynchronous, the messages can arrive out of order.
Which means that simply publishing each of the domain events onto an asynchronous bus is going to make a mess of the event handlers, which all need a bunch of sequencing logic to repair the inevitable ordering edge cases.
Too much work.
The key clue is that the event sourced projections, process managers, and so on aren't really interested in a stream of events, so much as they are interested in a sequence of events. That sequence already exists in the write model, so the key idea is to not screw it up; we should be pushing/polling for updates to the sequence, rather than trying to track things at the level of the individual domain events.
The answer is to think in terms of publishing the cursor position for each stream.
In the write model, we push the events to the store as before. But we keep track of the positions in the stream that we have just written. After the transaction has been committed, we schedule an asynchronous task to push an event describing the new cursor position to the pub/sub system. Each event handler subscribes to that queue, and on each message compares the cursor position to its own high water mark; if there is further progress to be made, the handler fetches an ordered sub sequence of the events from the stream.
A potentially interesting byproduct of this idea: the write can return the cursor position to the caller, which can then use that position to rebuild it's next view. A reader that knows the specific position that it is waiting for can block until the read model has been updated to that point.
Because each of the event handlers is tracking its own high water mark, the cursor update messages are trivial to handle idempotently; the incorrectly ordered update messages are trivial to recognize and drop.
No comments:
Post a Comment