Vaughn Vernon's Blog, page 2

October 18, 2015

Java-Native Cloud and Reactive Realities

I recently read an article about Cloud-Native vs Reactive development, in which the statement is made that Cloud-Native looks a lot like Reactive. The article asserts several points that could make the pursuit of both Cloud-Native and Reactive seem unreachably complex.


I think most of the message that I read is targeted at C-level executives, with a lot of sweeping statements that are quite inaccurate. It seems appropriate to clarify what the actual technical realities are around Cloud-Native and Reactive but stated more in terms of business drivers, goals, and general concerns.



I hope my input will help C-level executives and the technical experts supporting them to avoid the danger zone of “streaming all the things” and “NoSQL all the things” and “reactive all the things.” Those approaches are certainly important, and in any given service possibly worthy of use, but don’t believe that you will miss out on the benefits of Cloud if everything in your digital assets portfolio doesn’t work that way.


Addressing Reality

In an effort to bring balance and accuracy to an otherwise super-hyped technology stack, I address each of the statements that most concerned me. Here are the assertions made about Cloud-Native and Reactive:



In the cloud, information is a live stream of datasets in raw format, unstructured by definition. Developers no longer have anyone helping them parse it.
The skill set that you’re talking about at the traditional Java shop is often not conducive to the architecture models of modern, cloud, and data-centric applications.
You’re dealing with data that are not persisted; it’s on the fly.
That data is a stream—it’s not a SQL query that gets re-run. Clearly, the application gets more complex with these stream-based systems.
Reactive is how you design a modern application to take advantage of unlimited scalability in the cloud. If your application is monolithic, you get none of the advantages of the cloud.
Java, if it’s to have a future, needs to understand that you need to do some things differently and in a Reactive manner to take advantage of the cloud, and certainly to be successful with streaming and real-time data-driven uses cases.

I address each of these in the next subsections.


1. In the cloud, information is a live stream of datasets in raw format, unstructured by definition. Developers no longer have anyone helping them parse it.

Streaming data has become quite important and useful. It can make a huge difference between acting late on slow business intelligence or acting near real-time to fast business intelligence. Where it is critical to moving rapidly on data just in time, there is currently no better way to do so than with streaming.


Yet, to state that in the cloud it’s always streaming is just not accurate. There is still a persistent state in most business systems and subsystems (services). Call this persistent state data at rest. To garner more value from each subsystem that persists data, it’s critical to inform other subsystems what has occurred in a given subsystem that could cause the others to react. That’s important, for otherwise, how would the other subsystems know about the happenings in another?


Informing other subsystems of happenings in your subsystem can be accomplished in several ways. One of those is through streaming. When data is streaming, call it data in motion. Yet, what streaming data is can be interpreted in different ways. Generally, we think of streaming as a push-based approach, where other subsystems are proactively notified of various occurrences while the consumer itself is mostly passive until the notification arrives. Some of the most popular “streaming” solutions are in reality poll-based, where the interested subsystems ask the streaming topic for new happenings that are currently available.


Another important point to grasp is that, in fact, there are very intelligent and type-safe ways to help the developers of consuming subsystems to “parse it,” where “it” is the streaming data. Beware of technology stacks that have largely ignored this essential developer need.


2. The skill set that you’re talking about at the traditional Java shop is often not conducive to the architecture models of modern, cloud, and data-centric applications.

This is a sad commentary on the part of those who promote Reactive, as if ‘your developers don’t cut it.’ Perhaps such messages explain why many enterprises have resisted adopting Reactive even when it makes so much sense to do so. After all, if you must purge much of your existing expertise and then rebuild in an industry that is already strapped for software developers and architects, why would you purposely do so and endanger your core business to follow a hard-to-reach technology stack? C-level executives, team managers, and leads, and even the developers with hard-won experience should rest assured that existing Java skills can be leveraged in a Reactive architecture and software development environment.


What is more, Java developers should use toolsets that purposely preserve the current experience of Java developers. They should be able to define Java interfaces as messaging protocols and then implement objects that fulfill the defined protocols. Yet, along with this these experienced developers should be afforded modern tooling that supports vastly concurrent and parallel hardware that has become a commodity since approximately 2003 and beyond.


If someone tells you that “the skill set at the traditional Java shop is not conducive to the architecture models of modern, cloud, and data-centric applications” challenge it. They have over-complicated things.


3. You’re dealing with data that are not persisted; it’s on the fly.

Referring back to the first statement, this is simply not so. Data is persisted identically or similarly to how it has been for decades. To make statements to the contrary is misleading and even dangerous. There are still databases, even rational databases (RDBMS) in the Cloud. There are also what are known as “No SQL” (or NoSql) stores. Each of these is leveraged for different reasons.


You should not consider it inferior to use an RDBMS when circumstances call for one. In some cases, an RDBMS may not be able to meet the scalability needs of a specific set of use cases. At other times it’s just perfect for the task at hand, and if arbitrarily ignored as a technology choice could lead to much unnecessary complexity.


Think for a moment about this assertion that “you’re dealing with data that are not persisted.” This makes my eye twitch. If data in the Cloud is not persisted, what happens when there is an outage of any kind in the system or subsystems? Of course, data is persisted, because persistence is the only way to protect one of your most valuable assets, your data.


Yes, even data in streams is very often persisted, possibly even mostly so. Again, if a stream of data is not persisted, how will your business intelligence analytics continue processing data after some outage somewhere in the critical path? Quite often, even most of the time, streaming data is persisted with a retention period that may range from “forever” to weeks, hours, days, or minutes.


This, of course, is not to say that all streaming data is persistent. This is sometimes unnecessary when the stream’s consumers need only samples of data every so often. It may also not be possible to persist some streams of data due to throughput and performance requirements. Although it helps any given consumer to be more accurate about it’s calculations and other processing goals if they have all the data, other processing pipelines won’t be negatively impacted by losing some data. In such cases when some outages cause a relatively minimal amount of data to be lost, the overall consumer processing goals are not compromised.


The important lesson to gain out of this is to understand when stream persistence is necessary and when it is not, or when it is even detrimental to the overall throughput and scalability goals. Most business stakeholders and technologists can reason about this and make sound decisions that achieve the best business outcome.


4. That data is a stream—it’s not a SQL query that gets re-run. Clearly, the application gets more complex with these stream-based systems.

True, streams are streams. If you think of a stream of water and yourself being in a fixed position with a fixed line of sight, you get only one opportunity to see a specific section of the stream. What you just saw is now downstream and out of your visibility scope.


However, in the points just made in the previous subsection, it is common for the strongest streaming options to offer retentions periods, such that if for some reason a critical portion of data was not seen, it may be recalled for later consumption. That’s why you need to think about the properties of the consumer itself and whether or not it needs persisted data.


This leads to another point about “a SQL query” being unavailable. Well, that may be true for the stream itself, but it is possible and even common for a stream of data to be projected into a store that enables developers to query it using SQL. One such means of querying live streaming data is known as a continuous query and has been available for quite a while.


5. Reactive is how you design a modern application to take advantage of unlimited scalability in the cloud. If your application is monolithic, you get none of the advantages of the cloud.

This is often the message that emanates from product companies that don’t offer any other kind of solution. Unfortunately, this is true of several leading technology stacks. Yet, there are offerings known as “lift and shift” which gives you the benefit of preserving your legacy while still getting Cloud benefits.


Although I won’t first and foremost direct others to create monoliths, creating a monolith is not the worst thing that your development teams could do. A well-modularized monolith is far better than a large array of strongly coupled microservices. The real peril here comes, not from monolithic architectures and designs, but those that have weak architectures and poor modularity and thus constitute what has become known as the Big Ball of Mud. This name should be self-describing.


Even so, one of the biggest risks that teams face is having experience only with creating Big Balls of Mud, and then imagining that if they can develop using microservices that this will somehow “fix” their architecture and designs.


As a guideline, use microservices when the rate of change across several business functionalities are quite different. Does trying to coordinate changes across various software updates cause contention across functional teams? What about scalability and throughput? If some business components vary in the scalability and throughput requirements, it can work out best to break off such into smaller microservices, because the scalability and throughput requirements can be addressed independently from those that do not have the same stringent demands.


Just as it is important to understand when streaming data should be persistent and when it does not need to be persistent, it’s important to understand when to use microservices and when to use monoliths, or to preserve preexisting ones. Both can be very useful, depending on the service-level agreements around each, and the experience of the teams available to develop the various parts of the whole system solution.


6. Java, if it’s to have a future, needs to understand that you need to do some things differently and in a Reactive manner to take advantage of the cloud, and certainly to be successful with streaming and real-time data-driven uses cases.

Depending on the poll, Java is the most popular programming language in the world or the second most. Java is not going away any time soon. Java SE releases are rolling out at a formerly un-heard-of rate. Improvements are being made on a continual basis. Does Java as a programming language has issues? Of course, but as it has been said:


There are only two kinds of [programming] languages: the ones people complain about and the ones nobody uses.


Java is the first kind, and it has been proven time and again that Java is the language used when software must be delivered. Even if you chose to use an alternative to Java, it will very likely be a JVM-language—one that is hosted by the Java Virtual Machine. It’s a testimony to the fact that the worldwide investment in Java is extremely high, and it is a reliable delivery platform.


Java-Native for Reactive

Java not only has a future but a very good one. With Java add-on tools that support a Java-Native Reactive programming model, businesses building on the quality of Java can continue to make strides in the direction of Cloud-Native and Reactive.



I hope you take a few moments to educate yourself on our open source Java-Native Reactive Platform. We assure you that our Reactive toolset does not require you to abandon hard-won experience with the Java language and platform. We have made it simple to use standard Java interfaces as protocols implemented in message-driven objects. Have your Java architects and developers go through this tutorial to jump start Reactive in just minutes.


Related Reading

Pivotal Software: What Is “Cloud-Native” Data and Why Does It Matter?


RedHat: Making Old Applications New Again; RedHat’s Lift and Shift Story


 •  0 comments  •  flag
Share on Twitter
Published on October 18, 2015 02:14

March 26, 2015

Summary of CRDTs

In this article I am going to introduce you to CRDTs, or Conflict-Free Replicated Data Types. You will see under what conditions CRDTs might be useful, why you might want to use such a data type, what kinds of CRDTs are available, and how you can use them.


Think of having a Shopping Cart persisted by a multi-node replicated store, and then having one of the nodes become unavailable. If you have at least 3 nodes in the store, which of the remaining nodes contains the most recent version of a given user’s Shopping Cart? You need to display the most recent one. How would you know? What can you do to resolve this problem when the user is just ready to check out and pay?


Semantic Resolution

One typical approach is to use what is called Semantic Resolution. In this case you take all copies of the Shopping Cart from each of the remaining storage replicas and apply some “business logic” to the situation. The business says that we want to make the maximum sale, so the “logical” thing to do is merge the two remaining Shopping Carts so that the “official” Shopping Cart has the maximum number of products, each with the maximum count of items, from both. However, if the user has just recently removed an item from their Shopping Cart, then merging that item back into their possession looks like an error, and is at least annoying. In the worst case the user ends up not noticing the results of the merge, buys the extra item, and then returns it for their money back. This is probably much more annoying that noticing the extra item in the Shopping Cart and removing it before final check out.


Isn’t there a better way?


Introducing CRDTs

A CRDT is literally a data type, and there are two basic ways to implement them. Both of these approaches adhere to the CRDT acronym because both are named starting with the letter C:



Convergent Replicated Data Types: This is where data converges, or merges. You focus on the data on each of the storage replicas and merge the data changes between each of the replicas in order to bring each replica up to date. Obviously to make this work well you must in some way time stamp the data so you can merge in a proper way. You could use a simple time stamp, but you may have to use Vector Clocks or some similar approach to get this to work out well.
Commutative Replicated Data Types: Maybe you remember Commutative Properties from mathematics. For example, you can take two numbers, such as 2 and 3, and whichever way you add them together, the results is always 5. That is, 2+3 yields the same value as 3+2. In this case we are more focused on the operations rather than the data, and we communicate the operations to each of the replicas rather than merging data. Regardless of the data they have, the results of applying the operations they receive will result in obtaining the correct state. Commutative types work well when your messaging supports at-least-once delivery, such as is the case with Akka Persistence.

In the following examples we will discuss both Convergent and Commutative types.


One Convergent-Only Replicated Data Type

Here I will discuss only one Convergent-only type, the Register. A Register type has only two operations that you can perform on it: Set and Get. Because the only modification that you can make to a Register is to set its current value, it cannot be managed concurrently as a Commutative type.


There are two ways to implement a Register:



Last-Write-Wins (LWW): Here you keep the value and a timestamp of when the value was set on a given replica. The timestamp allows the value to be merged on any given replica by comparing the timestamp of the local replica with that of the one to be merged. If the timestamp of the local value is older than the sent value that is to be merged, then the local value is updated.
Multi-Value: Here you may keep multiple copies of the same conceptual value, but maintain information that indicates when the value came to be set. It’s possible to hand to a client multiple values of the same thing and allow the client to apply conflict resolution to determine which of the multiple values it shall choose as “current.”

There are advantages to both of these approaches. Multi-Value provides the most versatile, but also the most complex approach. Although a Multi-Value may seem to have the same nature as the poor merging example given under Semantic Resolution, the data that indicates the differences can be based on alternative criteria such as a Version Vectors or a Vector Clock.


Some Commutative Replicated Data Types

Here we are going to discuss several CRDTs:



Counters: G-Counters and PN-Counters
Sets: G-Sets, 2P-Sets, U-Set, LWW-Element-Sets, and OR-Sets
Graphs

Let’s take a look at these one by one.


Counters

There are two types of counters:



G-Counter: This is a grow-only counter, one that only understands addition. The implementation of a G-Counter basically implements a Version Vector or Vector Clock.
PN-Counter: This is a positive-negative counter, one that understands how to both add and subtract. To support this there are actually two G-Counters that are maintained: a Positive G-Counter and a Negative G-Counter.

To explain how G-Counter works, let’s consider its operations on two replicas:



R1 increments the G-Counter A, yielding a value of 1
R1 increments the G-Counter A again, yielding a value of 2
R2 increments the G-Counter A for the first time, yielding a value of 1

Now the increment operations are sent from R1 to R2, and from R2 to R1:



R2 receives an increment notification from R1 and increments G-Counter A from 1 to 2
R2 receives an increment notification from R1 and increments G-Counter A from 2 to 3
R1 receives an increment notification from R2 and increments G-Counter A from 2 to 3
Both R1 and R2 are now up to date with an A value of 3

Now consider an example of how the PN-Counter works between two replicas:



R1 increments PN-Counter B, yielding a PN value of { 1, 0 }
R1 increments PN-Counter B, yielding a PN value of { 2, 0 }
R2 increments PN-Counter B, yielding a PN value of { 1, 0 }
R2 decrements PN-Counter B, yielding a PN value of { 1, 1 }

Now the increment and decrement operations are sent from R1 to R2 and from R2 to R1:



R2 receives an increment notification from R1 and increments its P counter from 1 to 2, yielding { 2, 1 }
R2 receives an increment notification from R1 and increments its P counter from 2 to 3, yielding { 3, 1 }
R1 receives an increment notification from R2 and increments its P counter from 2 to 3, yielding { 3, 0 }
R1 receives a decrement notification from R2 and increments its N counter from 0 to 1, yielding { 3, 1 }
Both R1 and R2 are now up to date with a value of B { 3, 1}. The actual value of the PN-Counter B is yielded by subtracting the N value from the P value. So 3-1 is 2, which is the actual value of PN-Counter B.

You can use a PN-Counter to hold the count of each of the product items that are currently in the Shopping Cart. In case the counter yields a value less than 0, you can always evaluate it as 0 and not show the product in the Shopping Cart.


There are other types of Counters available, but not covered here.


Sets

A G-Set, or a grow-only set, is implemented very much like a G-Counter. Again, we will examine the use of a G-Set on two replicas:



R1 adds the value 1 to G-Set C, yielding a set of { 1 }
R1 adds the value 2 to G-Set C, yielding a set of { 1, 2 }
R2 adds the value 3 to G-Set C, yielding a set of { 3 }

Now the add operations are sent from R1 to R2, and from R2 to R1:



R2 receives an [ add 1 ] notification from R1 and R2 adds the value 1 to G-Set C, yielding { 1, 3 }
R2 receives an [ add 2 ] notification from R1 and R2 adds the value 2 to G-Set C, yielding { 1, 2, 3 }
R1 receives an [ add 3 ] notification from R2 and R1 adds the value 3 to G-Set C, yielding { 1, 2, 3 }
Both R1 and R2 are now up to date with a set of C as { 1, 2, 3 }

There are other kinds of sets supported, but I don’t cover them extensively here.


There is a 2P-Set, which is a 2-phase set. It is managed much the same way as a PN-Counter; that is, there is a G-Set of added elements and a G-Set of removed elements, which are merged on each replica to yield the actual up-to-date set. You cannot add an element to the removed set that does not exist locally on your added set, which effectively means you cannot remove an element that has not been added already. The remove set functions as a tombstone set, which prevents removed items from ever being added back in to the merged sets. Be careful what you remove because it can never come back.


In a U-Set each element is added with a unique identity and you can only remove an element by using its unique identity. A remove must be performed as a causal operation as it can only following the add element with the same identity.


There is also the LWW-Element-Set, where each of the items added to the set and removed from the set have a timestamp. The final current set is yielded by adding all the most recently added items and removing the most recently removed items. This allows removed elements to be added back to the LWW-Element-Set, which is not possible with the 2P-Set.


An OR-Set is an observed-remove set, where each element added to the set is assigned a unique identity. This set is kept up to date by applying causal consistency. If two of the same elements, yet with different identities, are added to the set they will both be added, although the get (look up) of the element will yield only one element as the second will be masked as required by the definition of set. When remove of a specific element is requested, all identities of the same element are matched at the given source replica. Those identities are then used to remove the same element from all replicas. A remove will only perform a remove if it is historically and causally following an add of the same element (with the same identity), which conforms to the sequential specification of a set. In other words, there must be an element matched at the source replica for this operation to work across all replicas.


You could use a 2P-Set to hold the products that are currently in the Shopping Cart, but the problem here is that if the user removed a product and then wanted to add it back, it would not be possible. For that reason a U-Set, LWW-Element-Set, or an OR-Set would be better.


Graphs

Although you might think of graphs as being very powerful, there are problems with applying Graph CRDTs in a distributed system. For this reason I suggest referring to the paper below if you think you are interested in using the Graph type.


Yeah, But…

You ask: “Why wouldn’t you use a Value Object with a built in time stamp for the Shopping Cart scenario described above? After all, it’s just a matter of figuring out which replicated Shopping Cart has the most recent data.”


First of all, modeling this as a Value Object with a time stamp would work. It’s just that there may be advantages in using CRDTs. Consider some of the following reasons.


In the case of the Shopping Cart the main advantage is in redundant data suppression. Only the changes that are made to the Shopping Cart must be copied around to other nodes, and with most of the data not making each notification journey it will make bringing all nodes up to date much faster and more likely to succeed quickly. Then if the node that is hosting the most up-to-date Shopping Cart is lost, chances are better that less data replication will allow other nodes to be updated sooner.


On the other hand, the shopping cart is probably not the best example to reason about CRDTs. The limitation with the Shopping Cart example is that it is owned by only one user and probably most of the changes will be saved to the same node first, then propagated to other nodes. However, if you think of an Aggregate that can be used and modified by multiple users simultaneously, it’s more likely that multiple nodes will be modified separately at the same time. Chances are good that if non-conflicting changes are made by each user then each of the changes can be replicated to multiple nodes without any of the multiple users being dinged for a conflict (typical optimistic concurrency failure). And, by the way, actually the same thing can be accomplished with Aggregates and Event Sourcing when events are exchanged between nodes.


The problem is easier to reason on if you simply think about many users hitting the same URL and a counter is maintained for that URL and is partitioned across many nodes. As each of many users hits the URL the counter is incremented on any one of the multiple nodes. What’s the actual total? Well, each time the counter is incremented on any given node it will take some time before the other nodes are also incremented for that same URL access. Yet, within some period of time it is possible that one or more nodes will reflect the true total of the number of hits, that is if the URL isn’t pounded every few milliseconds 24×7. In any case, any one of the nodes will show at least a close approximation of the actual total.


References

The official paper on CRDTs is provided by . You can see of good overview presentation by Sean Cribbs.


 •  0 comments  •  flag
Share on Twitter
Published on March 26, 2015 12:49

March 10, 2015

Building a Reactive Process Manager, Twice

On February 24, 2015, I delivered a presentation by this title to a meetup in the Netherlands. The group name is DomCode, and we had a packed house of more than 80 attendees. This topic not only drew a lot of attendees but held their rapt attention throughout more than an hour.


The first half of the presentation introduces you to Actor Model with Scala and Akka, and also includes a bit of discussion on using Actor Model with C# and Dotsero. Dotsero is my Open Source toolkit that supports using Actor Model on the .NET Platform.


The second half of the presentation explains what a Process Manager is, and how to build one. I walk through both Scala and Akka and C# and Dotsero implementations.


Now you can watch the recorded video.


The Process Manager is one of the patterns I discuss in my upcoming book, Reactive Enterprise with Actor Model.


 •  0 comments  •  flag
Share on Twitter
Published on March 10, 2015 11:04

December 8, 2014

The Ideal Domain-Driven Design Aggregate Store?

At the 2014 DDD eXchange in NYC, a park bench discussion developed around storing Aggregates. The consensus among the DDD leadership was against Object-Relational Mapping (ORM) and the desire to come up with a better way to store Aggregates. There were comments about ORM in general being an antiquated approach. While some developers are still new to ORMs, the technology of shoehorning objects into relational databases is more than 20 years old. In 20+ years, why haven’t we found a better way to store Aggregates?


During the park bench discussion I promoted the idea of serializing Aggregates as JSON and storing them in that object notation in a document store. A JSON-based store would enable you to query the object’s fields. Central to the discussion, there would be no need to use an ORM. This would help to keep the Domain Model pure and save days or weeks of time generally spent fiddling with mapping details. Even more, your objects could be designed in just the way your Ubiquitous Language is developed, and without any object-relational impedance mismatch whatsoever. Anyone who has used ORM with DDD knows that the limitations of mapping options regularly impede your modeling efforts.


When thinking of a JSON-based store, no doubt your mind is immediately drawn to MongoDB. That’s just how MongoDB works. While true, MongoDB still falls short of filling the needs of DDD Aggregates in one very important way. In our park bench discussion I noted how MongoDB was close to what I wanted, but that you could not use MongoDB to both update an Aggregate’s state to one collection in the store and append one or more new Domain Events to a different collection in the same operation. In short, MongoDB doesn’t support ACID transactions. This is a big problem when you want to use Domain Events along with your Aggregates, but you don’t want to use Event Sourcing. That is, your Domain Events are an adjunct to your Aggregate state, not its left fold. Hopefully I don’t have to explain the problems that would occur if we successfully saved an Aggregate’s state to MongoDB, but failed to append a new Domain Event to the same storage. That would simply make the state of the application completely wrong, and no doubt would lead to inconsistencies in dependent parts of our own Domain Model and/or those in one or more other Bounded Contexts.


Rumor has it that MongoDB will at some future time support ACID transactions. In fact there is now a branch of MongoDB that supports ACID transactions. It’s the TokuMX project. Although you may personally feel comfortable using this product, it didn’t excite me. Frankly, it could be a huge challenge to get a given enterprise to support MongoDB in the first place, let alone trying to convince every stakeholder to support a branch of MongoDB that is delivered by a lesser known third party. It seems to me that the best chance to use MongoDB with ACID transactions in your project is when you can finally download it from MongoDB.org.


For me this meant looking elsewhere, and boy, I am glad I did. I believe that I have found the truly ideal DDD Aggregate store in PostgreSQL 9.4. Here are the main reasons why I think this is so:



PostgreSQL 9.4 supports both text-based JSON (json datatype) and binary JSON (jsonb datatype). The binary JSON type is a higher performing datatype than the text-based datatype.
You can query directly against the JSON, and create indexes on specific JSON object fields/attributes.
PostgreSQL is, of course, a relational database and supports ACID transactions.
PostgreSQL is a very mature open source product and comes with support tools such as the Postgres Enterprise Manager and the like.
You can get both community and commercial support for PostgreSQL, and you have a choice among multiple support vendors.
PostgreSQL is fast. I mean, PostgreSQL is seriously fast. In benchmarks around version 9.4, PostgreSQL can perform database writes at or near 14,000 transactions per second. You will be hard pressed to find many projects that will need to perform anywhere near that fast or faster. I don’t have the comparison benchmarks handy, but I believe that is significantly faster than MongoDB (without ACID transactions). In my experience most likely PostgreSQL 9.4 (and later versions) could address the performance needs of probably something like 97% of all enterprise projects globally. Of course your mileage may vary, but I regularly poll developers for performance numbers. The majority need (far) less than 1,000 transactions per second, and only a few require anywhere near 10,000 transactions per second.
Using PostgreSQL’s JSON support is just plain easy.

What I will do next is step through how easy it is to use PostgreSQL to create DDD Aggregate storage.


Developing a PostgreSQL JSON Repository

If you are familiar with my book, Implementing Domain-Driven Design, you recall the Core Domain named the Agile Project Management Context. In that Bounded Context we model a project management application for Scrum-based Products. A Product is an Entity that serves as the Root of the Aggregate:


public class Product extends Entity {

private Set backlogItems;
private String description;
private ProductDiscussion discussion;
private String discussionInitiationId;
private String name;
private ProductId productId;
private ProductOwnerId productOwnerId;
private TenantId tenantId;
...
}

I am going to create a Repository to persist Product instances and find them again. Let’s first take a look at the basic means for persisting Product instances, and then we will look at querying for them. Here is the Repository declaration and the methods used to save and remove Product instances:


public class PostgreSQLJSONProductRepository
extends AbstractPostgreSQLJSONRepository
implements ProductRepository {
...
@Override
public ProductId nextIdentity() {
return new ProductId(UUID.randomUUID().toString().toUpperCase());
}
...
@Override
public void remove(Product aProduct) {
this.deleteJSON(aProduct);
}

@Override
public void removeAll(Collection aProductCollection) {
this.deleteJSON(aProductCollection);
}

@Override
public void save(Product aProduct)
this.saveAsJSON(aProduct);
}

@Override
public void saveAll(Collection aProductCollection) {
this.saveAsJSON(aProductCollection);
}
...
}

That’s pretty simple. The bulk of the work is in the abstract base class, AbstractPostgreSQLJSONRepository. The only method that must be overridden and implemented by the concrete sub-class is the tableName(), which allows the abstract base class to know the name of the table in which the concrete type is stored:


public class PostgreSQLJSONProductRepository
extends AbstractPostgreSQLJSONRepository
implements ProductRepository {
...
@Override
protected String tableName() {
return "tbl_products";
}
...
}

Let’s take a look inside that base class:


public abstract class AbstractPostgreSQLJSONRepository {

private ObjectSerializer serializer;
...
protected AbstractPostgreSQLJSONRepository() {
super();

this.serializer = ObjectSerializer.instance();
}

protected void close(ResultSet aResultSet) {
if (aResultSet != null) {
try {
aResultSet.close();
} catch (Exception e) {
// ignore
}
}
}

protected void close(Statement aStatement) {
if (aStatement != null) {
try {
aStatement.close();
} catch (Exception e) {
// ignore
}
}
}

protected Connection connection() throws SQLException {
Connection connection =
PostgreSQLPooledConnectionProvider
.instance()
.connection();

return connection;
}

protected void deleteJSON(Identifiable anAggregateRoot) {
try {
Connection connection = this.connection();

this.deleteJSON(connection, anAggregateRoot);

} catch (Exception e) {
throw new RuntimeException("Cannot delete: " + anAggregateRoot + " because: " + e.getMessage());
}
}

protected void deleteJSON(
Collection> anAggregateRoots) {

try {
Connection connection = this.connection();

for (Identifiable root : anAggregateRoots) {
this.deleteJSON(connection, root);
}

} catch (Exception e) {
throw new RuntimeException("Cannot delete: " + anAggregateRoots + " because: " + e.getMessage());
}
}

protected T deserialize(String aSerialization, final Class aType) {
return this.serializer.deserialize(aSerialization, aType);
}

...

protected String serialize(Object anAggregate) {
return this.serializer.serialize(anAggregate);
}

protected abstract String tableName();

protected void saveAsJSON(Identifiable anAggregateRoot) {
if (anAggregateRoot.isUnidentified()) {
this.insertAsJSON(anAggregateRoot);
} else {
this.updateAsJSON(anAggregateRoot);
}
}

protected void saveAsJSON(Collection> anAggregateRoots) {
try {
Connection connection = this.connection();

for (Identifiable aggregateRoot : anAggregateRoots) {
if (aggregateRoot.isUnidentified()) {
this.insertAsJSON(connection, aggregateRoot);
} else {
this.updateAsJSON(connection, aggregateRoot);
}
}

} catch (Exception e) {
throw new RuntimeException("Cannot save: " + anAggregateRoots + " because: " + e.getMessage());
}
}

private void deleteJSON(
Connection aConnection,
Identifiable anAggregateRoot)
throws SQLException {

PreparedStatement statement = null;

try {
statement = aConnection.prepareStatement(
"delete from "
+ this.tableName()
+ " where id = ?");

statement.setLong(1, anAggregateRoot.identity());
statement.executeUpdate();

} finally {
this.close(statement);
}
}

private void insertAsJSON(Identifiable anAggregateRoot) {
try {
Connection connection = this.connection();

this.insertAsJSON(connection, anAggregateRoot);

} catch (Exception e) {
throw new RuntimeException("Cannot save: " + anAggregateRoot + " because: " + e.getMessage());
}
}

private void insertAsJSON(
Connection aConnection,
Identifiable anAggregateRoot)
throws Exception {

PreparedStatement statement = null;

try {
String json = this.serialize(anAggregateRoot);

PGobject jsonObject = new PGobject();
jsonObject.setType("json");
jsonObject.setValue(json);

statement = aConnection.prepareStatement(
"insert into "
+ this.tableName()
+ " (data) values (?)");

statement.setObject(1, jsonObject);
statement.executeUpdate();

} finally {
this.close(statement);
}
}

private void updateAsJSON(Identifiable anAggregateRoot) {
try {
Connection connection = this.connection();

this.updateAsJSON(connection, anAggregateRoot);

} catch (Exception e) {
throw new RuntimeException("Cannot update: " + anAggregateRoot + " because: " + e.getMessage());
}
}

private void updateAsJSON(
Connection aConnection,
Identifiable anAggregateRoot)
throws SQLException {

PreparedStatement statement = null;

try {
String json = this.serialize(anAggregateRoot);

PGobject jsonObject = new PGobject();
jsonObject.setType("json");
jsonObject.setValue(json);

statement = aConnection.prepareStatement(
"update "
+ this.tableName()
+ " set data = ?"
+ " where id = ?");

statement.setObject(1, jsonObject);
statement.setLong(2, anAggregateRoot.identity());
statement.executeUpdate();

} finally {
this.close(statement);
}
}
}

Here are the highlights from the abstract base class with regard to saving and removing Aggregates to and from the store:



We use an ObjectSerializer to serialize Aggregate instances to JSON, and to deserialize them from JSON back to their Aggregate instance state. This ObjectSerializer is the same one I used in my book, which is based on the Google Gson parser. The biggest reason I use this JSON parser is because it works be introspection and reflection on object fields rather than requiring objects to support the JavaBean specification (yuk!).
There are special methods that help close ResultSet and PreparedStatement instances.
Each Repository gets a JDBC Connection to the database using PostgreSQLPooledConnectionProvider. All of the operations are simple, lightweight JDBC operations. As indicated by its name, the PostgreSQLPooledConnectionProvider provides pooled Connections that are thread bound using ThreadStatic.
You can delete and insert one or many Aggregate instances in one operation. This supports remove(), removeAll(), save(), and saveAll() in the concrete sub-classes.
All communication via JDBC uses the PGobject type to carry the JSON payload to and from the database. The PGobject type in this code is “json” and the value is a JSON String object. You can easily switch the code to the more efficient “jsonb” type.

Note another detail. All Aggregate Root Entities are passed into the abstract base class as Identifiable instances. This enables the base class Repository to determine whether the instances have already been saved to the data store on prior operations, or if this is the first time. For first time persistence the Repository uses an INSERT operation. For subsequent saves after having read the Aggregate instances from the store the operation will be an UPDATE. The Entity type in the Agile Project Management code base implements the Identifiable interface:


public interface Identifiable {
public T identity();
public void identity(T aValue);
public boolean isIdentified();
public boolean isUnidentified();
}

public abstract class Entity implements Identifiable {
...
private Long surrogateIdentity;

public Entity() {
super();

this.identity(0L);
}
...
@Override
public Long identity() {
return this.surrogateIdentity == null ? 0:this.surrogateIdentity;
}

@Override
public void identity(Long aValue) {
this.surrogateIdentity = aValue;
}

@Override
public boolean isIdentified() {
return identity() > 0;
}

@Override
public boolean isUnidentified() {
return identity()

Supporting this interface enables the various saveAsJSON() methods to interrogate each Aggregate instance for its surrogate identity. If the surrogate identity is not yet set, it knows that the Aggregate instance is new and must be inserted. If the surrogate identity is set, the Repository knows that it is a preexisting instance that must be updated to the data store. The surrogate identity is stored as the row’s primary key in the table.



Follow the Aggregate Rule of Thumb: Reference Other Aggregates By Identity Only

Following this rule is very important as it makes your Aggregate instance simple to serialize. If instead you use a graph of Aggregate instances, don’t expect fabulous things from the JSON serializer.



Speaking of database, here is a simple database SQL script used to create the database and tables used by the solution:


drop database if exists agilepm;
create database agilepm owner postgres;

create table tbl_events
(
id bigserial primary key,
data json not null
);

create table tbl_publishednotificationtracker
(
id bigserial primary key,
data json not null
);

create table tbl_timeconstrainedprocesstrackers
(
id bigserial primary key,
data json not null
);

create table tbl_backlogitems
(
id bigserial primary key,
data json not null
);

create table tbl_productowners
(
id bigserial primary key,
data json not null
);

create table tbl_products
(
id bigserial primary key,
data json not null
);

create table tbl_releases
(
id bigserial primary key,
data json not null
);

create table tbl_sprints
(
id bigserial primary key,
data json not null
);

create table tbl_teammembers
(
id bigserial primary key,
data json not null
);

create table tbl_teams
(
id bigserial primary key,
data json not null
);

As you can see, these are all very simple tables. The JSON is stored in the column named data. The bigserial column type is a bigint (8 bytes) that has a backing sequence. As you insert new rows into one of the tables, its sequence is used to auto-increment the primary key. As you can see, the tbl_events that holds each Domain Event published by the Bounded Context (see Chapter 8 of my book) has a primary key also. This serial bigint primary key serves as the unique notification identity for messaging notifications that are published inside and outside the Bounded Context.


Finally let’s take a look at how Aggregate instances stored as JSON inside the database are found. Note that we will be querying inside the data column of each database table. We use simple -> and ->> notation to navigate from data down into each JSON object. For example, here are the three finder methods found in the Repository for Products, the PostgreSQLJSONProductRepository:


public class PostgreSQLJSONProductRepository
extends AbstractPostgreSQLJSONRepository
implements ProductRepository {
...
@Override
public Collection allProductsOfTenant(TenantId aTenantId) {
String filter = "data->'tenantId'->>'id' = ?";

return this.findAll(Product.class, filter, "", aTenantId.id());
}

@Override
public Product productOfDiscussionInitiationId(
TenantId aTenantId,
String aDiscussionInitiationId) {

String filter = "data->'tenantId'->>'id' = ? and data->>'discussionInitiationId' = ?";

return this.findExact(Product.class, filter, aTenantId.id(), aDiscussionInitiationId);
}

@Override
public Product productOfId(TenantId aTenantId, ProductId aProductId) {
String filter = "data->'tenantId'->>'id' = ? and data->'productId'->>'id' = ?";

return this.findExact(Product.class, filter, aTenantId.id(), aProductId.id());
}
...
}

From the data column we filter using a WHERE clause. The full SELECT statement is found in the abstract base class, which we will examine in a moment. To keep the finder interfaces very simple I only require the client Repository to provide the actual matching parts, such as seen in the code snippet above. There are several tokens in each filter. The data token refers to the data column in the given row. The other tokens such as ‘tenantId’, ‘id’, and ‘productId’ are the JSON field names. So, to match on the tenant identity in the JSON you use data->’tenantId’->>’id’ = ? as part of the WHERE clause. Note that -> is used to navigate above the actual target field, while ->> points to the final target field.


You can findAll() or findExact(), which find a Collection of a specific type or find a single instance of a specific type, respectively:


public abstract class AbstractPostgreSQLJSONRepository {
...
protected > List findAll(
Class aType,
String aFilterExpression,
String anOrderBy,
Object ... anArguments) {

List aggregates = new ArrayList();
PreparedStatement statement = null;
ResultSet result = null;

String query =
"select id, data from "
+ this.tableName()
+ " where "
+ aFilterExpression
+ " "
+ anOrderBy;

try {
Connection connection = this.connection();

statement = connection.prepareStatement(query);

this.setStatementArguments(statement, anArguments);

result = statement.executeQuery();

while (result.next()) {
Long identity = result.getLong(1);

String serialized = result.getObject(2).toString();

T aggregate = this.deserialize(serialized, aType);

aggregate.identity(identity);

aggregates.add(aggregate);
}

} catch (Exception e) {
throw new RuntimeException("Cannot find: " + query + " because: " + e.getMessage());
} finally {
this.close(statement);
this.close(result);
}

return aggregates;
}

protected > T findExact(
Class aType,
String aFilterExpression,
Object ... anArguments) {

T aggregate = null;

List aggregates = this.findAll(aType, aFilterExpression, "", anArguments);

if (!aggregates.isEmpty()) {
aggregate = aggregates.get(0);
}

return aggregate;
}
...
private void setStatementArguments(
PreparedStatement aStatement,
Object[] anArguments)
throws SQLException {

for (int idx = 0; idx < anArguments.length; ++idx) {
Object argument = anArguments[idx];
Class argumentType = argument.getClass();

if (argumentType == String.class) {
aStatement.setString(idx+1, (String) argument);
} else if (argumentType == Integer.class) {
aStatement.setInt(idx+1, (Integer) argument);
} else if (argumentType == Long.class) {
aStatement.setLong(idx+1, (Long) argument);
} else if (argumentType == Boolean.class) {
aStatement.setBoolean(idx+1, (Boolean) argument);
} else if (argumentType == Date.class) {
java.sql.Date sqlDate = new java.sql.Date(((Date) argument).getTime());
aStatement.setDate(idx+1, sqlDate);
} else if (argumentType == Double.class) {
aStatement.setDouble(idx+1, (Double) argument);
} else if (argumentType == Float.class) {
aStatement.setFloat(idx+1, (Float) argument);
}
}
}
...
}

The backbone of the finders is implemented in findAll(), which findExact() reuses. Note that when the ResultSet is obtained we iterate over each entry. Using findAll() you can both filter and order the outcome by a specific column or JSON field.


We obtain both the surrogate identity and the JSON serialization payload. Once the JSON is used to deserialize to the Aggregate instance, we set the surrogate identity as the identity of the Identifiable. This prepares the Aggregate instance for updating should the client decide to modify the instance and call save() on the Product Repository.


Well, that’s pretty much it. Every concrete Repository implemented using the AbstractPostgreSQLJSONRepository is very simple and straightforward. I intend to push the implementation to its Github repository as soon as possible. That should give you everything you need to implement this in your own project.


 •  0 comments  •  flag
Share on Twitter
Published on December 08, 2014 13:55

November 21, 2014

An Approach to Composing Aggregate Boundaries

For complete coverage of this topic, you should see my book: Domain-Driven Design Distilled


 •  0 comments  •  flag
Share on Twitter
Published on November 21, 2014 17:06

October 13, 2014

Modeling Aggregates with DDD and Entity Framework

For everyone who has read my book and/or Effective Aggregate Design, but have been left wondering how to implement Aggregates with Domain-Driven Design (DDD) on the .NET platform using C# and Entity Framework, this post is for you.


[NOTE: As expected, this article has within hours of posting received some criticism for the approach used to O-R mapping with Entity Framework. Actually the article received much more praise than criticism, but… I want to just point out that I am purposely not attempting to win any guru award in Entity Framework mapping. If you browse through this post too quickly some of the key words of wisdom and my intent may be lost on your speed reading. I am purposely avoiding some of the expert guidance that is typically given with a view to deep understanding of Entity Framework mappings. In fact, you may not realize the purpose of the article unless you begin reading with the assumed attitude that “I hate O-R mapping.” The O-R mapping tooling is actually something like 20+ years old, and it is time that we come up with more practical solutions to storing objects as objects. In the meantime we should just do as little O-R mapping as we can get away with. So, thanks for your words of advice, but I have done everything below with precise intent.]


Definition of Aggregate

To start off, let’s recap the basic definition of DDD Aggregate. First and foremost the Aggregate pattern is about transactional consistency. At the end of a committed database transaction, a single Aggregate should be completely up to date. That means that any business rules regarding data consistency must be met and the persistence store should hold that consistent state, leaving the Aggregate correct and ready to use by the next use case. Figure 1 illustrates two such consistency boundaries, with two different Aggregates.


Aggregates


Figure 1. Two Aggregates, which represent two transactional consistency boundaries.


The problem that many have with designing Aggregates is that they don’t consider the true business constraints that require data to be transactionally consistent and instead design Aggregates in large clusters as shown in Figure 2. Designing Aggregates in this way is a big mistake if you expect them (1) to be used by many thousands of users, (2) to perform well, and (3) to scale to the demands of the Internet.


LargeCluster


Figure 2. A poorly designed Aggregate that is not conceived on according to true business consistency constraints.


Using an example from my book, a set of well-designed Aggregates are shown in Figure 3. These are based on true business rules that require specific data to be up-to-date at the end of a successful database transaction. These follow the rules of Aggregate, including designing small Aggregates.


FourSmallAggregates


Figure 3. Some well-designed Aggregates that adhere to true consistency rules.


Still, the question arises, if BacklogItem and Product have some data dependencies, how do we update both of them. This points to the another rule of Aggregate design, to use eventual consistency as shown in Figure 4. Of course, there’s a bit more involved when you consider the overall architecture, but the foregoing points out the high-level composition guidance of Aggregate design.


EventualConsistency


Figure 4. When two or more Aggregates have at least some dependencies on updates, use eventual consistency.


Now with this brief refresher on the basics of Aggregate design, let’s see how we might map the Product to a database using Entity Framework.


KISS with Entity Framework

So, we have four prominent Aggregates in our Scrum project management application: Product, BacklogItem, Release, and Sprint. We need to persist the state of these four small Aggregates and we want to use Entity Framework to do so. Here’s a possible surprise for you. I am not going to recommend that you need to become an Entity Framework guru. Nope, just the opposite in fact. I am going to suggest that you allow the Entity Framework development team to be the gurus, and you just focus on your specific application. After all, your Core Domain is where you want to put your creative energies, not in becoming an expert in Entity Framework.


What I am recommending is that you allow Entity Framework to take control of doing what it does best and we just stay out of its way. Entity Framework has a certain way of mapping entities into the database, and that’s just how it works. As soon as you try to step outside the basics and go to some extremes of esoteric mapping techniques in ways that Entity Framework was not meant to be used, you are going to experience a lot of pain. Still, we can get quite a bit of mileage out of Entity Framework in the midst of DDD and be quite happy with the way it all works out. To do so we are going to use just a few basic mapping techniques. If you follow my KISS guidance you can mostly ignore your Entity Framework documentation and how-to books. Just allow Entity Framework to map entities and get back to what will make a difference in this competitive world: your market-distinguishing application.


We are going to implement the Product Aggregate using two approaches. One approach uses a Separated Interface with an implementation class, and the other uses a domain object backed by a state object. The whole point of these examples is to stay as far out of Entity Framework’s way as possible.


Using a Separated Interface and Implementation Class

For the first example I create a Separated Interface that is implemented by a concrete domain object. Figure 5 shows you the basic intention of this approach.


EntityFramework1


Figure 5. The Separated Interface named IProduct is implemented by a concrete domain object. Clients directly use only IProduct.


It is pretty typical when programming with C# and .NET to name your interfaces with an “I” prefix, so we will use IProduct:


interface IProduct
{
ICollection AllBacklogItems();
IProductBacklogItem BacklogItem(BacklogItemId backlogItemId);
string Description { get; }
string Name { get; }
IBacklogItem PlanBacklogItem(BacklogItemId newBacklogItemId, string summary,
string story, string category, BacklogItemType type, StoryPoints storyPoints);
void PlannedProductBacklogItem(IBacklogItem backlogItem);
...
ProductId ProductId { get; }
ProductOwnerId ProductOwnerId { get; }
void ReorderFrom(BacklogItemId id, int ordering);
TenantId TenantId { get; }
}

With this interface we can create a concrete implementation class. Let’s call it Product:


public class Product : IProduct
{
[Key]
public string ProductKey { get; set; }
...
}

The point of the concrete class Product is to implement the business interface declared by IProduct and to also provide the accessors that are needed by Entity Framework to map the object into and out of the database. Note the ProductKey property. This is technically the kind of primary key that Entity Framework wants to work with. However, it is different from the ProductId, which when combined with the TenantId is the business identity. Therefore, internally the ProductKey must be set to a composite of TenantId as a string and ProductId as a string:


ProductKey = TenantId.Id + ":" + ProductId.Id;

I think you get the idea. We create an interface that we want our client to see and we hide the implementation details inside the implementing class. We make the implementation match up to really basic Entity Framework mappings. We purposely try to keep our special mappings, as with ProductKey, to a minimum. This helps keep the DbContext very simple by registering the implementation classes:


public class AgilePMContext : DbContext
{
public DbSet Products { get; set; }
public DbSet ProductBacklogItems { get; set; }
public DbSet BacklogItems { get; set; }
public DbSet Tasks { get; set; }
...
}

Rather than fully fleshing out the details of this approach, there is enough detail already to make some judgments. I’d like to discuss the fundamental flaws that I see in it:



The Ubiquitous Language is not really reinforced by using interfaces such as IProduct, IBacklogItem, etc. IProduct and IBacklogItem are not in our Ubiquitous Language, but Product and BacklogItem are. Thus, the client facing names should be Product, BacklogItem, and the like. We could accomplish this simply by naming the interfaces Product, BacklogItem, Release, and Sprint, but that would mean we would have to come up with sensible names for the implementation classes. Let’s just pause there and move on to the second and related issue.
There is really no good reason to create a Separated Interface. It would be very unlikely that we would ever create two or more implementations of IProduct or any of the other interfaces. The best reason we have for creating a Separated Interface is when there could be or are multiple implementations, and is just not going to happen in this Core Domain.

Based on these two points alone I would personally choose to abandon this approach before going any further with it. When using Domain-Driven Design the most important and overarching principle is the adhere to the Ubiquitous Language, and from the get-go this approach is driving us away from business terminology rather than toward it.


Domain Object Backed By a State Object

The second approach uses a domain object backed by state objects. As shown in Figure 6, the domain object defines and implements the domain-driven model using the Ubiquitous Language, and the state objects hold the state of the Aggregate.


EntityFramework2


Figure 6. The domain object that models the Aggregate behavior is backed by a state object that holds the model’s state.


By keeping state objects separate from the domain-driven implementation objects, it enables very simple mappings. We let Entity Framework to do what it knows how to do by default to map entities to and from the database. Consider Product, which is backed by the ProductState object. We have two Product constructors; a public business constructor for normal clients and a second internal constructor that is used only by internal implementation components:


public class Product
{
public Product(
TenantId tenantId,
ProductId productId,
ProductOwnerId productOwnerId,
string name,
string description)
{
State = new ProductState();
State.ProductKey = tenantId.Id + ":" + productId.Id;
State.ProductOwnerId = productOwnerId;
State.Name = name;
State.Description = description;
State.BacklogItems = new List();
}

internal Product(ProductState state)
{
State = state;
}
...
}

When the business constructor is invoked we create a new ProductState object and initialize it. The state object has a simple string-based identity:


public class ProductState
{
[Key]
public string ProductKey { get; set; }

public ProductOwnerId ProductOwnerId { get; set; }

public string Name { get; set; }

public string Description { get; set; }

public List BacklogItems { get; set; }
...
}

The ProductKey is actually encoded with two properties, the TenantId as a string and the ProductId as a string, with the two separated by a ‘:’ character. Including the TenantId in the ProductKey ensures that all data stored in the database is segregated by tenant. We must still support client requests for TenantId and ProductId from the Product:


public class Product
{
...
public ProductId ProductId { get { return new ProductId(State.DecodeProductId()); } }
...
public TenantId TenantId { get { return new TenantId(State.DecodeTenantId()); } }
...
}

The ProductState object must support both DecodeProductId() and DecodeTenantId() methods. We could also choose to design the state object to redundantly hold whole identities separate of the ProductKey:


public class ProductState
{
[Key]
public string ProductKey { get; set; }

public ProductId ProductId { get; set; }

public ProductOwnerId ProductOwnerId { get; set; }

public string Name { get; set; }

public string Description { get; set; }

public List BacklogItems { get; set; }

public TenantId TenantId { get; set; }
...
}

This could be well worth the slight memory overhead if converting to identities had a heavy performance footprint. All of the identity types, including ProductOwnerId, are Value Objects and are flattened and mapped into the same database row that ProductState occupies:


[ComplexType]
public class ProductOwnerId : Identity
{
public ProductOwnerId()
: base()
{
}

public ProductOwnerId(string id)
: base(id)
{
}
}

The [ComplexType] attribute marks the Value Object as a complex type, which is different from an entity. Complex types are non-scalar values that do not have keys and cannot be managed apart from their containing entity, or the complex type within which they are nested. Marking a Value Object with the Entity Framework [ComplexType] causes the data of the Value Object to be saved to the same database row as the entity. In this case, ProductOwnerId would be saved to the same database row as the ProductState entity.


Here are the base types for all Identity types of Value Objects:


public abstract class Identity : IEquatable, IIdentity
{
public Identity()
{
this.Id = Guid.NewGuid().ToString();
}

public Identity(string id)
{
this.Id = id;
}

public string Id { get; set; }

public bool Equals(Identity id)
{
if (object.ReferenceEquals(this, id)) return true;
if (object.ReferenceEquals(null, id)) return false;
return this.Id.Equals(id.Id);
}

public override bool Equals(object anotherObject)
{
return Equals(anotherObject as Identity);
}

public override int GetHashCode()
{
return (this.GetType().GetHashCode() * 907) + this.Id.GetHashCode();
}

public override string ToString()
{
return this.GetType().Name + " [Id=" + Id + "]";
}
}

public interface IIdentity
{
string Id { get; set; }
}

So, the ProductState object stands on its own when it comes to persisting the state of the Product. However, the ProductState also holds another collection of entities; that is, the List of ProductBacklogItemState:


public class ProductState
{
[Key]
public string ProductKey { get; set; }
...
public List BacklogItems { get; set; }
...
}

This is all well and good because we keep the database mappings really simple. Yet, how do we get a ProductBacklogItemState object, or the entire List collection for that matter, into a format that we can allow clients to consume? The ProductBacklogItemState is an internal implementation details—just a data holder. This points to the need for a few simple converters, which are used by the Product Aggregate root:


public class Product
{
...
public ICollection AllBacklogItems()
{
List all =
State.BacklogItems.ConvertAll(
new Converter(
ProductBacklogItemState.ToProductBacklogItem));

return new ReadOnlyCollection(all);
}

public ProductBacklogItem BacklogItem(BacklogItemId backlogItemId)
{
ProductBacklogItemState state =
State.BacklogItems.FirstOrDefault(
x => x.BacklogItemKey.Equals(backlogItemId.Id));

return new ProductBacklogItem(state);
}
...
}

Here we convert a collection of ProductBacklogItemState instances to a collection of ProductBacklogItem instances. And when the client requests just one ProductBacklogItem, we convert to one from a single ProductBacklogItemState with the matching identity. The ProductBacklogItemState object must only support a few simple conversion methods:


public class ProductBacklogItemState
{
[Key]
public string BacklogItemKey { get; set; }
...
public ProductBacklogItem ToProductBacklogItem()
{
return new ProductBacklogItem(this);
}

public static ProductBacklogItem ToProductBacklogItem(
ProductBacklogItemState state)
{
return new ProductBacklogItem(state);
}
...
}

Should the client ask repeatedly for a collection of ProductBacklogItem instances the Product could cache the collection after the first time it is generated.


In the end our goal is to stay out of the way of Entity Framework and make it super simple to map state objects in and out of the database. I think when you consider the DbContext for this solution you will conclude that we have a really simple approach:


public class AgilePMContext : DbContext
{
public DbSet Products { get; set; }
public DbSet ProductBacklogItems { get; set; }
public DbSet BacklogItems { get; set; }
public DbSet Tasks { get; set; }
public DbSet Releases { get; set; }
public DbSet ScheduledBacklogItems { get; set; }
public DbSet Sprints { get; set; }
public DbSet CommittedBacklogItems { get; set; }
...
}

Creating and using a ProductRepository is easy as well:


public interface ProductRepository
{
void Add(Product product);

Product ProductOfId(TenantId tenantId, ProductId productId);
}

public class EFProductRepository : ProductRepository
{
private AgilePMContext context;

public EFProductRepository(AgilePMContext context)
{
this.context = context;
}

public void Add(Product product)
{
try
{
context.Products.Add(product.State);
}
catch (Exception e)
{
Console.WriteLine("Add() Unexpected: " + e);
}
}

public Product ProductOfId(TenantId tenantId, ProductId productId)
{
string key = tenantId.Id + ":" + productId.Id;
var state = default(ProductState);

try
{
state = (from p in context.Products
where p.ProductKey == key
select p).FirstOrDefault();
}
catch (Exception e)
{
Console.WriteLine("ProductOfId() Unexpected: " + e);
}

if (EqualityComparer.Default.Equals(state, default(ProductState)))
{
return null;
}
else
{
return new Product(state);
}
}
}

// Using the repository
using (var context = new AgilePMContext())
{
ProductRepository productRepository = new EFProductRepository(context);

var product =
new Product(
new ProductId(),
new ProductOwnerId(),
"Test",
"A test product.");

productRepository.Add(product);

context.SaveChanges();
...
var foundProduct = productRepository.ProductOfId(product.ProductId);
}

Taking this approach will help us to stay focused on what really counts the most, our Core Domain and its Ubiquitous Language.


 •  0 comments  •  flag
Share on Twitter
Published on October 13, 2014 12:46

October 9, 2014

Effective Aggregate Design

This is a three-part series about using Domain-Driven Design (DDD) to implement Aggregates. Clustering Entities and Value Objects into an Aggregate with a carefully crafted consistency boundary may at first seem like quick work, but among all DDD tactical guidance, this pattern is one of the least well understood. This essay is the basis for Chapter 10 of my book, Implementing Domain-Driven Design.


The documents are available for download as three PDFs and are licensed under the Creative Commons Attribution-NoDerivs 3.0 Unported License.


Original English Edition

Effective Aggregate Design: Part 1

Effective Aggregate Design: Part 2

Effective Aggregate Design: Part 3


French Translation

Conception Efficace des Aggregates 1 ere Partie


 


 •  0 comments  •  flag
Share on Twitter
Published on October 09, 2014 18:49

April 11, 2014

Dealing with Device Reads Using Akka

You have a device monitor actor. It must poll the device every time an internal expires. When data starts flowing, it flows a lot. Sometimes, however, the device will not have readable data for long periods of time (several seconds), and continuing to read the device on short intervals will take CPU core cycles away from other parts of the actor system that are busy processing the data that has already be taken in. How can we make the device monitor actor be more in tune with the way the device operates?


Have the device monitor actor create, as part of its construction, an instance of CappedBackOffScheduler. This scheduler is used to send the “try read” messages to the monitor actor on intervals. The interval starts out at a minimal 500 milliseconds. On any given probe of the device that results in a successful read, the following interval will be 500 milliseconds. Otherwise, each successive failed read doubles the next interval, but only until a time cap is reached. Here is the CappedBackOffScheduler implementation:


class CappedBackOffScheduler(
minimumInterval: Int,
maximumInterval: Int,
system: ActorSystem,
receiver: ActorRef,
message: Any) {

var interval = minimumInterval

def backOff = {
interval = interval * 2
if (interval > maximumInterval) interval = maximumInterval
schedule
}

def reset = {
interval = minimumInterval
schedule
}

private def schedule = {
val duration = Duration.create(interval, TimeUnit.MILLISECONDS)
system.scheduler.scheduleOnce(duration, receiver, message)
}
}

Each time the scheduler is told to backOff, it calculates a new interval, at least until it reaches 15 seconds. When the scheduler is told to reset following each successful read, the interval is (re)set to half a second. Either way, the interval is used to schedule a new “try read” message. The message will be sent to the device monitor once the interval expires.


Here’s how to use the CappedBackOffScheduler:


class DeviceMonitor(device: Device) extends Actor {
val scheduler =
new CappedBackOffScheduler(
500,
15000,
context.system,
self,
TryRead())

def tryRead = {
val data = device.readData(500)
if (data.isDefined) {
println(s"HIT: ${data.get}")
scheduler.reset
} else {
println(s"MISS")
scheduler.backOff
}
}

def receive = {
case request: TryRead =>
tryRead
}
}

Note that the device readData() attempt itself has a timeout, in this case we allow half a second for the readData() to succeed. This timeout must be carefully considered because the thread currently processing the TryRead message will block until the readData() attempt returns. And here’s some sample output:


HIT: ...
HIT: ...
HIT: ...
MISS
MISS
HIT: ...
HIT: ...
HIT: ...
MISS
HIT: ...
HIT: ...
HIT: ...
HIT: ...
MISS
MISS
MISS
MISS
HIT: ...
 •  0 comments  •  flag
Share on Twitter
Published on April 11, 2014 07:46

October 26, 2013

DDD with Scala and Akka Revisited

There were a number of issues with the code I wrote in the post Using Scala and Akka with Domain-Driven Design. I am addressing those problems in this post and cleaning up the code. The good thing about revisiting this topic is that you can learn from the problems found in the previous post, and how to fix them.


Share Nothing

A major problem in the previous post is that I shared part of an Actor’s internal state with the outside.Recall that one of the basic rules of Actor Model is to share nothing. Oops.


The AggregateCache used a helper named AggregateCacheWorker. The existence of this class in itself is not a problem, especially when it is acting in behalf of the AggregateCache. The problem is that the DomainModel can send a ProvideWorker message to the AggregateCache to request a reference to its internal AggregateCacheWorker. That’s just wrong. The DomainModel should not be able to obtain a reference to any part of the state of AggregateCache.


So how can this problem be corrected? The main clue is to ask: Why did the DomainModel imagine it needed to use the AggregateCacheWorker? The reason is that the DomainModel needed to delegate Aggregate Actor creation to the AggregateCacheWorker because the AggregateCacheWorker holds the ActorContext needed to create a child of the AggregateCache. Yet, this is clearly not the way we should attempt to create new Aggregate Actors.


The solution to the problem actually already exists in the AggregateCache itself. Recall that when a message is sent from a client to an Aggregate Actor, the AggregateCache will check to see if that specific child Actor is currently in memory or not. If the Actor is not currently in memory, the AggregateCache creates the child Aggregate Actor dynamically and then dispatches the message being sent to it by the client. Thus, we can just leverage this AggregateCache behavior to lazily create an Aggregate Actor when the first message is sent to it.


But wait. Isn’t it a problem for the Aggregate Actor not to exist at all until a message is sent to it? Well, not if you consider an Actor’s state is mutated by it’s handling various messages. Therefore, the Actor really has an empty initial state until it receives it’s first command message. Here’s how it will work:


object DomainModelPrototype extends CompletableApp(1) {

val model = DomainModel("OrderProcessing")

model.registerAggregateType("co.vaughnvernon.orderprocessing.domain.model.Order")

val order = model.aggregateOf("co.vaughnvernon.orderprocessing.domain.model.Order", "123")

order ! InitializeOrder(249.95)

...

Did you notice some fundamental changes to the client code compared to the previous example? First of all, now when registering an Aggregate type we use the fully-qualified class name of the Actor. Then when the Aggregate is created using aggregateOf(), we pass in only the type and the globally unique id. We no longer pass in the Props, because there will purposely be no initial state in the Actor until it receives its first message, which you can see is sent as InitializeOrder just following its creation by aggregateOf().


Improved Solution

So, allow me to reintroduce the Scala classes from the top down, starting with the new DomainModel and companion object:


object DomainModel {
def apply(name: String): DomainModel = {
new DomainModel(name)
}
}

class DomainModel(name: String) {
val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
val system = ActorSystem(name)

def aggregateOf(typeName: String, id: String): AggregateRef = {
if (aggregateTypeRegistry.contains(typeName)) {
val aggregateType = aggregateTypeRegistry(typeName)
aggregateType.cacheActor ! RegisterAggregateId(id)
AggregateRef(id, aggregateType.cacheActor)
} else {
throw new IllegalStateException("DomainModel type registry does not have a $typeName")
}
}

def registerAggregateType(typeName: String): Unit = {
if (!aggregateTypeRegistry.contains(typeName)) {
val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
aggregateTypeRegistry(typeName) = AggregateType(actorRef)
}
}

def shutdown() = {
system.shutdown()
}
}

Note that the DomainModel no longer attempts to use the AggregateCacheWorker. In fact, there is no longer such a worker class. Instead aggregateOf() now sends a message to the AggregateCache under which the Aggregate Actor is to exist:


aggregateType.cacheActor ! RegisterAggregateId(id)

This leads to the new implementation of the AggregateCache:


class AggregateCache(typeName: String) extends Actor {
val aggregateClass: Class[Actor] = Class.forName(typeName).asInstanceOf[Class[Actor]]
val aggregateIds = scala.collection.mutable.Set[String]()

def receive = {
case message: CacheMessage =>
val aggregate = context.child(message.id).getOrElse {
if (!aggregateIds.contains(message.id)) {
throw new IllegalStateException(s"No aggregate of type $typeName and id ${message.id}")
} else {
context.actorOf(new Props(aggregateClass), message.id)
}
}
aggregate.tell(message.actualMessage, message.sender)

case register: RegisterAggregateId =>
this.aggregateIds.add(register.id)
}
}

The AggregateCache no longer holds a Map of type names to Props. Instead, it now contains a Set of unique identities for each Aggregate that has been registered. You can see that each identity is registered when a RegisterAggregateId message is sent from to DomainModel and received by the AggregateCache.


Even so, what would happen if the RegisterAggregateId message is not received by the AggregateCache until after the first message is sent from the client to the yet-to-be-created Aggregate Actor? Actually this is impossible because of a simple rule of Actor Model: When a message is sent to an Actor and the Actor has a default FIFO mailbox, that message is guaranteed to be received by the Actor before any subsequently sent messages. Thus, when the DomainModel sends RegisterAggregateId to the AggregateCache, there is no way that a subsequent CacheMessage sent to the Aggregate Actor in question will be received by the AggregateCache before the RegisterAggregateId is received.


Now back to the state of the AggregateCache. It also has an aggregateClass instance value, which is created from the typeName. This is possible because the typeName now must be the fully-qualified class name of the Aggregate Actor type. The aggregateClass is passed as the only Props argument to the actorOf() function of the ActorContext. This allows the Actor to be dynamically created using the specific type for which each specific AggregateCache exists.


The support classes are only slightly different from the previous example:


case class AggregateRef(id: String, cache: ActorRef) {
def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
cache ! CacheMessage(id, message, sender)
}

def !(message: Any)(implicit sender: ActorRef = null): Unit = {
cache ! CacheMessage(id, message, sender)
}
}

case class AggregateType(cacheActor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class RegisterAggregateId(id: String)

Finally, here are the changes to the Order Aggregate:


class Order extends Actor {
var amount: Double = _

def receive = {
case init: InitializeOrder =>
println(s"Initializing Order with $init")
this.amount = init.amount
case processOrder: ProcessOrder =>
println(s"Processing Order is $processOrder")
DomainModelPrototype.completedStep()
}
}

case class InitializeOrder(amount: Double)
case class ProcessOrder

I think this addresses all the the issues that were apparent from the original post. Hopefully it has reenforced the basic rule of Actor Model: Share Nothing.


There are still several other pieces of the DDD with Scala and Akka puzzle to snap into place. I’ll be introducing those over the next few weeks. Well, I can only promise that it will be ASAP.


 •  0 comments  •  flag
Share on Twitter
Published on October 26, 2013 22:07

October 25, 2013

Using Scala and Akka with Domain-Driven Design

If you’ve been following my theme for a while I’ve spent a considerable amount of effort promoting Scala and Akka for use when Implementing Domain-Driven Design. Few seem to share my vision of a completely explicit use of Actors as Aggregates, with direct message sending between clients and the Aggregates (in which case the “client” may be, for example, the User Interface or another Aggregate).


NOTE: There is a follow up to this post where I address the problems found herein.


More recently there have been numerous suggestions to place various wrappers around the domain model’s Aggregates. This smells of Application Layer and its Application Services, or at least like a named cached/region such is used in Coherence or GemFire. That’s absolutely not what I want. Sure, those mechanisms could exist behind the scenes, but not between a client and the Aggregate instance (an Actor) that it wants to communicate with. The latter makes me really unhappy. The main point of my emphasis on the use of Actor Model in the first place was to get rid of all the extraneous architecture layers that cloud our vision of what the core of the application—its domain model—is really doing.


Rather than abandon Akka as a means to employ Actor Model with DDD, what I did over the past few hours was prototype a few useful abstractions as really simple set of Scala classes that could be used to implement my vision. So, here you go.


It seems appropriate to me that the basic interface to the desired functionality should be a DomainModel. Here’s a simple class and companion object:


object DomainModel {
def apply(name: String): DomainModel = {
new DomainModel(name)
}
}

class DomainModel(name: String) {
val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
val system = ActorSystem(name)

def aggregateOf(typeName: String, props: Props, id: String) = {
if (aggregateTypeRegistry.contains(typeName)) {
val aggregateType = aggregateTypeRegistry(typeName)
val worker = aggregateType.worker
val actorRef = worker.aggregateOf(props, id)
val cacheActor = aggregateType.actor
AggregateRef(id, cacheActor)
} else {
AggregateRef(null, null)
}
}

def registerAggregateType(typeName: String): Unit = {
if (!aggregateTypeRegistry.contains(typeName)) {
val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
implicit val timeout = Timeout(5 seconds)
val future = actorRef ? ProvideWorker
val worker = Await.result(future, timeout.duration).asInstanceOf[AggregateCacheWorker]
aggregateTypeRegistry(typeName) = AggregateType(worker, actorRef)
}
}

def shutdown() = {
system.shutdown()
}
}

After your DomainModel is initialized, the idea is to allow for the registration of various Aggregate types using registerAggregateType(). After you have registered any number of types that are appropriate for your Bounded Context, you can start creating instances of them using aggregateOf(). Here you pass the type name of the Aggregate that you want to create, as well as the Props of the Akka Actor, and the id of the Aggregate. The id is both the name of the Actor and the globally unique identity of the Aggregate instance being created. Finally, if your application needs to be shutdown you must call the DomainModel shutdown() function.


Now, what is the reason for registering Aggregate types? This serves a few different purposes. First of all, there is an underlying Actor that implements a special node-specific cache. This cache Actor will serve as the parent of all Aggregate Actors within the registered type. All messages that will be sent to the Actor that implements a specific Aggregate instance must pass through the cache Actor. Here’s the class that serves as the parent of all Aggregate instances within a specified type, or named cache:


class AggregateCache(typeName: String) extends Actor {
val worker = new AggregateCacheWorker(context)

def receive = {
case message: CacheMessage =>
val aggregate = context.child(message.id).getOrElse {
context.actorOf(worker.propsFor(message.id), message.id)
}
aggregate.tell(message.actualMessage, message.sender)

case ProvideWorker =>
sender ! worker
}
}

As you can see, the main functionality of the AggregateCache is to look up child Aggregate Actors and dispatch messages to them. If the look up fails it means that the Aggregate Actor instance no longer in memory. This implies that Aggregates can be transient; that is, they may exist in memory for some period of time, and then after some degree of inactivity they can be closed and removed from memory. However, if a message is sent to the Aggregate Actor by a client, the client needs to be assured that the Aggregate will receive the message no matter what. This is where the getOrElse expression comes into play. The Aggregate Actor will be dynamically reloaded, and then the AggregateCache can dispatch the message to it.


The AggregateCache Actor uses an internal worker object:


class AggregateCacheWorker(context: ActorContext) {
val aggregateInfo = scala.collection.mutable.Map[String, Props]()

def aggregateOf(props: Props, id: String): ActorRef = {
if (!aggregateInfo.contains(id)) {
aggregateInfo(id) = props
context.actorOf(props, id)
} else {
throw new IllegalStateException(s"Aggregate with id $id already exists")
}
}

def propsFor(id: String): Props = {
if (aggregateInfo.contains(id)) {
aggregateInfo(id)
} else {
throw new IllegalStateException(s"No Props for aggregate of id $id")
}
}
}

The AggregateCacheWorker handles support work that is needed by the DomainModel and AggregateCache. This includes creating new Aggregate instances by way of aggregateOf(), and also providing the Props instance, via propsFor(), for each Aggregate Actor that may need to be dynamically reloaded.


Finally, there are a few simple support classes:


case class AggregateRef(id: String, cache: ActorRef) {
def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
cache ! CacheMessage(id, message, sender)
}

def !(message: Any)(implicit sender: ActorRef = null): Unit = {
cache ! CacheMessage(id, message, sender)
}
}

case class AggregateType(worker: AggregateCacheWorker, actor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class ProvideWorker

The AggregateRef is a really important abstraction. Rather than returning an ActorRef when an Aggregate Actor is created, the DomainModel instead returns an AggregateRef. As you can see from its two functions, the AggregateRef knows how to send messages to the underlying Aggregate Actor. However, it’s not a “straight shot,” so to speak, as it is with ActorRef. Instead, all messages sent to Aggregates via AggregateRef’s tell() are first sent to the AggregateCache. This allows for look up and any necessary dynamic loading of the Aggregate Actors into the cache. Messages are sent to AggregateCache as a CacheMessage instance.


So, how does it work? Here’s a runner for the prototype:


class Order(id: String, amount: Double) extends Actor {
def receive = {
case p: ProcessOrder =>
println(s"Processing Order is $p")
DomainModelPrototype.completedStep()
case a: Any =>
println(s"message is $a")
DomainModelPrototype.completedStep()
}
}

case class ProcessOrder

object DomainModelPrototype extends CompletableApp(1) {

val model = DomainModel("prototype")

model.registerAggregateType("Order")

val order = model.aggregateOf("Order", Props(new Order("123", 249.95)), "123")

order ! ProcessOrder()

awaitCompletion()

model.shutdown()

println("DomainModelPrototype: is completed.")
}

This way the Order Aggregate Actor can, as if, receive messages directly from a client. This is not actually a direct message send in the sense that ActorRef manages. Even so, ActorRef’s message sends are also not as “direct” as you might like to think. So by means of DomainModel and AggregateRef—the only abstractions of necessity exposed to the client—we get a smooth and seamless interface to Aggregate Actors. Even better, these few abstractions address the way Akka needs to work, but covers over the particulars.


NOTE: If you think the above code is a little hacky, you would be correct. I made some tradeoffs just to express my vision in Scala and Akka without getting bogged down in laborious details. The little bit of code should be revisited for some clean up. Yet, it still needs a bit more details for handling distributed caches, etc. Right now it’s just a prototype. What’s good about the revisit to the code is that corrections end up being a good learning example.


Thanks to the Akka Team, including Roland Kuhn and Patrik Nordwall, for ideas about making Akka happy. Their advice was to use some sort of intermediary to dynamically look up child Aggregate Actors, which I implemented as AggregateCache. Yet, it is my use of AggregateRef that allows the intermediary to be removed from the view of clients.


Anyway, I’ve been beating this drum for a while. I feel good that I finally had a few hours to hammer out a prototype solution for this specific set of problems. There is still a large body of work that I have accomplished around DDD with Scala and Akka that I will be presenting ASAP.


 •  0 comments  •  flag
Share on Twitter
Published on October 25, 2013 14:15

Vaughn Vernon's Blog

Vaughn Vernon
Vaughn Vernon isn't a Goodreads Author (yet), but they do have a blog, so here are some recent posts imported from their feed.
Follow Vaughn Vernon's blog with rss.