Concurrency in the Data Model Server

Owen Taylor <otaylor@redhat.com>
26 November 2007

Review of basic concepts

See my introductory blog entry and the the data model design document for full background on the data model concepts. But to review briefly:

The data model consists of a set of resources, with properties on each resource. The data for resource properties may come from the server's database engine, or they may derived from other sources like the web services. Clients connect to the server over a persistent stream connection (like XMPP) and make requests for data. A query for data consists of a query parameters that identify a resource or set of resource and a "fetch string" that specifies what properties to fetch from each resource. Requested properties of the resource are serialized and returned over the stream, but the request for data is also a request for future notification of changes. When data in the data model is changed, the new property values are serialized and sent out in "notification messages" to all interested clients.

The client is required to retain all data that it has fetched or been notified of; the server keeps track of what the client has fetched, and uses that information to compress the protocol stream and not send duplicate information.

Computation of resource properties can be expensive, so the data model core also contains a cache; when a property is fetched for the first time, it is stored in the cache as well as being returned to the client; subsequent reads can avoid recomputation by looking first in the cache.

There may also be other uses of the data model in contexts without a persistent stream, such as when serving a web page, in these cases, the data is read from the data model, and possibly stored in the cache, but registration for notification is skipped.

Consistency model

In general, we don't need strong guarantees about the up-to-dateness of data we read from the data model; if we read read data that is a few seconds or even minutes old, we probably won't notice. In particular, up-to-dateness isn't important because of the notification mechanisms built into the data model; if we read stale data, we will then get notification of the newer data.

We do want what we might term "asymptotic correctness" - if the data in the database stops changing, the copy of the data maintained locally by the client will eventually come into sync with the data on the server.

We also somewhat obviously need the database property of Isolation ... changes made in a read-write transaction shouldn't be visible to other transactions until they have been committed. We achieve this by separating read-write and read-only transactions and only ever updating the cache from read-only transactions.

Basic algorithms

For reference, we'll sketch out the basic algorithms for querying data from the data model and for sending notifications:

Reading data:

  1. Compute the result set of resources from the query
  2. For each resource
    1. Compute the union of the new fetch specification and any previously stored for that resource/client pair and store it (registering for future notification)
    2. For each property in the new fetch specification not in the previously stored fetch specification:
      1. If the property is resource-valued, recurse with the appropriate subportion of the fetch string
    3. For each property in the new fetch specification not in the previously stored fetch specification:
      1. Send the property value to the client

Making a change:

  1. A read-write session maintains a list of changed property/resource pairs.
  2. At transaction commit we:
    1. Invalidate changed properties in the local cache
    2. Send a message to other cluster members about the changed properties
    3. Asynchronously identify clients of the current machine that need notifications, and send notifications
  3. On receipt of a change notification message from another cluster member, we:
    1. Invalidate changed properties in the local cache
    2. Identify clients of the current machine that need notifications, and send notifications

Keeping correct data in the cache

One thing we want to avoid is the situation:

T1T2
Read data from the database
Change the database
Commit
Invalidate the cache
Store data into the cache

We use the technique of "transaction timestamps" to avoid this (borrowed from Hibernate). The idea of a transaction timestamp is that we have a global "timestamp source" from we can allocate monotonically increasing timestamps from, and with each node in the cache, we keep an "invalidation" timestamp, which is a timestamp after the timestamp of the transaction that changed the data. In a read-only transaction, we don't store data into the cache if the invalidation timestamp of the cache node is is newer than timestamp of the transaction (a timestamp before we read anything from the database.)

T1T2
Allocate txTimestamp M
Read data from the database
Change the database
Commit
Allocate txTimestamp N
Invalidate the cache, storing N
Don't store data (M < N)

Ordering fetches and notifications sent to each client

We want to ensure that each client as an "asymptotically correct" view of the data in the data model. That is, if no change occurs for a sufficiently long period of time, then they will have a local view of the data that corresponds to the data on the server. This means avoiding scenarios like:

T1 (fetch)T2 (update)T3 (notify)
Read data from the database
Change the database
Commit
Queue notifications
Send new data
Send stale data

If we have no control over ordering of messages sent to the client, then trying to avoid problems like this is pretty much hopeless. So, we introduce the concept of a message serial (a message serial is vaguely similar to the transaction timestamp, but specific to a particular client connection.) At any point of time, we can "allocate a message serial"; we then provide that serial when sending a message. Messages will be sent in the order of their serial. (If you allocate a serial, you must either send a message or inform the client that the serial will not be used.) With message serials, the sequence above then becomes:

T1 (fetch)T2 (update)T3 (notify)
Allocate serial M
Read data from the database
Change the database
Commit
Queue notifications
Allocate serial N
Send new data (serial=N)
Send stale data (serial=M)[...waiting...]
Data sent[...waiting...]
Data sent

This doesn't entirely solve the problem, however, because we've omitted a step above - we have to register which clients to notify; if we read data from the database before registering, then it's possible to have the situation:

T1 (fetch)T2 (update)T3 (notify)
Read data from the database
Change the database
Commit
Nothing to notify
Register for notification
Send stale data

It seems that registering for change notification before reading data from the database would solve this problem, and indeed we do that. (See com.dumbhippo.dm.Fetch.visit()), but it isn't a complete solution because data can be read by side-effect. For example if we fetch properties from a user resource with the fetch string:

userContacts[name];contacters[photoUrl]

And we have some other user both in userContacts and contacters, then reading the name property from the database will cause the photoUrl property to be stored in the Hibernate L1 cache, so if the photoUrl changes between reading (and registering for name) and reading (and registering for photoUrl), we'll get a stale value of photoUrl with no notification.

This problem exists in the server code at time of writing; one possible approach to a solution would be to use the invalidation timestamp. If, during a fetch, we are reading data from a resource whose invalidation timestamp is newer than the timestamp of the current transaction, then we may be about to send stale data to the client; so we should queue a notification pass for the pair of client and resource after the commit of the current transaction.

Extending the above for feeds

Introducing feeds into the data model makes things considerably more challenging: the data we store in the cache is only a subrange of the total data set. When we invalidate, do we try to partially invalidate the cache and reduce the stored range, or do we start over? We want to be able to do incremental notification when items are added to the feed. How does this affect the questions of consistency discussed above?

Some questions about how feeds can be updated affect how we handle things:

If the answer to all the above were "no", then some major efficiencies would be possible because all updates would consist of adding a block of items at the end of the feed. Unfortunately, the answer to all of the above likely needs to be "yes".

For caching, it probably makes sense to invalidate the entire cache; the cache is shared among all clients fetching a feed, and fetching the identity of the feed items (not their contents) is in most cases swift.

Compressing data sent to clients is more important; we could do that by recording the last set of data sent to the client and a diff. But (as discussed above) keeping track of the set of data sent can't be done by simply recording a range of entry timestamps, because of restacking and item deletion. We'd actually have to keep track of the id of every resource in the feed that the client has seen.

A method that avoids keeping an arbitrary amount of per-client data is to keep a log of changes to the data model, indexed by transaction timestamp. Keeping actual changes is problematical, because then we need to have accurate ordering of the log, and that requires locking the log together with the database on any feed changes. What works better is to keep a log whose entries are the timestamps of stacked or restacked items, ordered by txTimestamp.

txTimestampStack Timestamp 
.......0001.......1343Item stacked
.......0053.......1403Item stacked
.......0132-1Deletion (start over)
.......0169.......1532Item stacked
.......0312.......1432Item stacked in past

For each client, we just record the txTimestamp of the transaction where we last sent data to the client. On notification, we go through the log and find the minimum stack timestamp for a txTimestamp later than the recorded timestamp. We then send all entries in the feed later than this timestamp (subject to constraints specified in the client's recorded fetch for the maximum number of items.) A stack timestamp of -1 in the log indicates a deletion; if we see a deletion we need to send a "clear" update to the client followed by the complete feed contents. (Again subsequent to maximum number of items constraints.)