Technical rants on distributed computing, high performance data management, etc. You are warned! A lot will be shameless promotion for VMWare products

Thursday, July 19, 2007

High performance data sharing between C#, Java and C++

SOAP and High Performance: an oxymoron?

XML messaging using the SOAP protocol has become the lingua franca for interoperability. Though loose coupling and simplicity through a standard text based protocol has its appeal, it is no secret that SOAP isn't suitable for high performance messaging. The simplicity and extensibility of XML/SOAP has generated great deal of interest and resulted in pervasive support across many languages and scripting environments.

This paper evaluates the performance of SOAP for real-time trading and compares it to native binary protocol such as FIX (Financial Information eXchange).

Here is a simple example comparing two text based protocols, one that uses simple 'tag=value' pairs and the other a SOAP envelope. The price you pay for XML and SOAP is obvious.

A FIX message
8=FIX.4.3 9=00000098 35=X 49=ABC 56=XYZ 34=1
52=20021116-10:15:28 262=MYREQ 268=1 279=1
278=FOO.last270=13.42 271=1200 10=185

Equivalent message using FIXML (FIX Markup Language)

The benchmark conducted summaries that SOAP messages are 3.5-4.5 larger than FIX, latency is 2-3 times worse, and encoding/decoding costs are increased by up to nearly 9 times.

Web services based on REST offers an efficient alternative to SOAP based web services. It is a simple HTTP based protocol that allows access to resources through CRUD (Create, Read, Update and Delete) operations. However, this is mostly geared towards simple request-response type synchronous messaging between services. Products that offer reliable, asynchronous messaging semantics on top of REST could be something to watch out for.

Traditional messaging might be popular, but it has its limitations

High performance applications commonly use message oriented middleware products for asynchronous sharing of events and data across heterogeneous applications such as IBM MQ or Tibco Rendezvous.

However, messaging solutions, as the name implies, package, move and deliver messages, one at a time. The receiving applications often need sufficient context to act on the incoming message. Typically the publisher provides this contextual information through message headers increasing the overhead for each message sent. So, for instance, an incoming Order may have to contain sufficient information to identify the associated customer. With the related contextual data arriving in encoded form, such as the Customer ID, the application can only process the Order after it can fetch all the related data (the entire customer record with credit information, for instance). This might require a round trip to the database. Therefore, if all the related data required for processing an incoming message requires external data source access, the processing speed can only go as fast as the weakest link - the database in most circumstances.

Application environments with a high sustained message rate are much more exposed to this "weakest link" problem because enqueing in the messaging system doesn't really help. At some point, either messages have to be discarded or the publisher rate throttled.

To support heterogeneous applications most messaging solutions require the application to construct text based self-describing messages. It becomes the responsibility of the application to use an appropriate encoding format for data such that it can be decoded by the receiving application. Should you use XML for the data format you will experience performance problems similar to those outlined above.

In addition, a highly concurrent application environment with multiple publishers will have an increased probability for race conditions causing data integrity issues. Messaging systems have no inherent capabilities to deal with relationships between messages or ordering of messages across multiple messaging destinations (queues or topics). So, for instance, if Orders (parent) and corresponding LineItems (children) are delivered on two different queues, it is possible for the child data item to be delivered before the parent. Applications would normally be aware of these constraints and route related messages on the same messaging endpoint. It would be advantageous though, if the infrastructure used for transferring messages was aware of this relationship.

Most messaging solutions are really designed to support diverse platforms, multiple language bindings, multiple protocols, flexibility in terms of message reliability, and more. But when it comes to object oriented applications such as C++, Java or .NET applications that want to share data, it can be cumbersome constructing or interpreting message headers, encoding/decoding to/from text based payloads, configuring message delivery options, correlating messages, looking up related data from backend databases before taking action, etc.

Take this MQ example:

qMgr = new MQQueueManager();

/* Open the queue, build the message, and put the message. */
int openOptions = MQC.MQOO_OUTPUT;
MQQueue myQueue = qMgr.accessQueue(args[0], openOptions,
null, null, null);

MQMessage myMessage = new MQMessage();
myMessage.format = MQC.MQFMT_STRING;

MQPutMessageOptions pmo = new MQPutMessageOptions();
myQueue.put(myMessage, pmo);

/* Close the queue and disconnect from the queue manager. */

The application developer has to create a QueueManager, fetch queues, define the options to write into the queue, encode their application objects into some text based format, construct Message objects, configure message format, publish into the queue, close the queue and disconnect from the QueueManager. Quite cumbersome, right?

Introducing a Data Fabric

Shared objects/events across heterogeneous applications through main memory caching

The basic idea is as follows:

Instead of requiring application developers to think about messages being the mechanism to share information, why not let them use the same paradigm they use when communicating between various components within the application; simply share domain objects in common data structures like a Map and use native thread based notification services. Extend this concept such that these data structures are distributed and visible to disparate applications. Essentially, applications share data and events with each other through a shared object database that is distributed in nature. Applications perform CRUD operations on a database and receive notifications when changes to data they are interested in occur.

There are some important characteristics that differentiate this database compared to a regular disk-based relational database:

1). It is distributed in nature - applications publish data objects and the database copies/moves these to multiple nodes. Sounds like database replication, except the location of data, the number of copies made, how the data is transported are aspects that are different
2). It is primarily memory based and hence fast
3). It is active in nature and pays attention to complex expressions of interest from subscribing applications and when they change, instantaneously pushing the change to the application.

This kind of data management system is referred to as a data fabric or data grid. A Google search on “Enterprise Data Fabric” will provide an idea of various vendor offerings.

A data fabric or data grid combines the important features and semantics seen in database technology and messaging for high performance applications.

A true data fabric includes the following comprehensive capabilities:

Ø Designed to offer in-memory data access speeds: Data is managed in concurrent data structures (link to concurrent java stuff), primarily in memory with minimum contention issues.
Ø Flexibility in data storage: Data can be stored locally in process, replicated to multiple nodes, partitioned across multiple nodes, maintained both in-memory and in-disk or simply fetched lazily from a back-end data store. Where the data is located or how many copies are maintained becomes a configuration issue and is based on the requirements around performance, high availability and volume of data being managed, yet completely abstracted away from the developer. Data locality is virtualized in a data fabric.

Ø Simple development model: A data fabric offers a simple Map like interface to applications. Applications can simply fetch by key or put domain objects directly into the cache. There is no need to worry about headers, encoding, decoding to some intermediate format, etc.

Ø Scalable: By distributing the data the data fabric uses resources across multiple nodes. Deployments can simply add additional capacity on the fly and automatically get the data rebalanced and handle increasing load (concurrent activity or data volume).

Ø Transactional: The fabric can automatically participate in any ongoing transactions and ensure consistency of data across all the applications sharing the data. For instance, if two applications concurrently decide to create the same customer order and publish this to others, the conflict will be detected and handled appropriately. This is a big difference compared to messaging - with no notion of identity that can be associated with data inherent within the messaging system, it is non trivial to detect conflicts when two applications decide to make the same change at the same time.

Ø Reliable Publish-Subscribe semantics: Applications perform CRUD operations on a local cache and the corresponding event is routed to nodes that subscribe to the data. Data objects can either be synchronously or asynchronously pushed to subscribing applications. Events are pushed to subscribers that contain the new, changed or deleted object(s). The data fabric is intelligent enough to only propagate changes to data objects or its relationships, keeping the underlying network traffic to a minimum.

Ø Querying: Similar to a regular database, the data in-memory can be indexed and queried using SQL like syntax.

Ø Continuous querying: Applications register complex queries, which are queries with complex predicates, joins, etc and, unlike a regular query, are not just executed once. They become resident in the database and give the impression that the query is continuously running. As the data changes the continuous query engine calculates how the result set has changed, pushes the "delta" to the application and merges this with a cache result set on the application node.

Ø Heterogeneous language support: Application objects are automatically serialized in a neutral format within the fabric such that the same object can be de-serialized into an instance of a class in another application written using a different language (with a similar class structure).

For additional information, read more on the architecture of the GemFire Data Fabric

Understanding the data fabric through a use case

Let us look at a financial Trade order processing system that can route orders to an exchange offering the best liquidity.

Here is how an order flows through the system:

- Orders arrive as FIX messages from trading partners and clients to an Order processing application
- The incoming FIX message is validated (authentication and authorization), normalized (combined with other related information such as Trader information, etc) and written into the data fabric as an Order object
- If the incoming order rate is very high, the validation and normalization process itself can be load balanced across multiple nodes
- Orders are then routed to a Trading Strategy engine that uses different algorithms to determine the best time, quantity and exchange to route the order. Execution of the algorithms requires current market data from multiple exchanges (these are the prices for the different securities traded in the exchanges) and also uses various risk metrics in its calculations. The incoming market data is being pushed typically at a very high rate (20,000 or more ticks per second)
- Orders are batched and then routed to an Exchange router, the application responsible for reliably getting the trade order executed on a market exchange
- Finally, there is the entire process of post trade information exchange that we will ignore as it is not relevant for this discussion.

Let us assume that the order processing application and the exchange router application are written in Java and the computation intensive strategy engine is written in C++. These three applications are sharing data and events in real-time.

While it is beyond the scope of this article to explain all the aspects of the data fabric, we will focus on how the fabric enables objects and event sharing.

Shared Object model: With the data fabric, the domain model classes are designed such that they are more or less equivalent for each language in use. One might use an Object modeling tool and generate these classes for both Java and C++.

Serialization framework: Developers implement callbacks similar to Java Externalizable where object fields are written to a stream managed by the serialization framework provided by the data fabric. The framework serializes data into a language neutral wire format that can be consumed by any application that uses the serialization framework at the consuming end.

In this case, the order processing application knows that an order is a complex object comprised of several fields and sub fields. Using the serialization framework, the application writes out the relevant fields out to the stream and puts it out on the wire.

The framework handles translation of primitives across languages and across processor architectures, removing that burden from the end user application. It also preserves complex object relationships across the serialization boundary.
At the receiving end, the incoming bytes are identified as an order object (because the type Id for the payload would be the same across all languages).
Once the object type is identified, the rest of the payload is streamed into the order object on the receiving end (let's say that this is the strategy engine)

Another advantage of this mechanism is that the de-serialization of the payload from the wire (the other half of the serialization framework) results in a ready to use object in the language of the consuming application, which receives a notification about the change and can act on it.

For our strategy engine, the data fabric then fires a notification and hands off the order object (now represented in C++) to the application which then acts on it.

Much of this work can be automated using tools that make it almost trivial to define a data model that is inherently faster and more efficient than traditional serialization mechanisms using XML marshalling, Java or C# serialization.

“Delta” Propagation: When a new incoming FIX message is a “change request” to a pending order, the application merely fetches the Order from the fabric and applies the change to one or more fields – for example, the customer wants to change the “buy” or “sell” volume of the trade, which would be updating a single field in the object. Given that the fabric maintains identity for all objects in the distributed system much like a database, it is now able to calculate the exact “delta” and just transmit that to the strategy engine or any number of connected applications that are listening for the incoming Order requests. By dramatically cutting down on the network and serialization overhead, “Delta” propagation allows the system to scale much better and push much more data through the system than would otherwise be possible.

XML based interoperability works well for a wide class of applications and very well suited to loosely couple applications, but, isn’t well suited for eXtreme Transaction processing applications. Messaging system lack enough context and hence prone to data integrity and consistency issues.

Distributed main memory based architecture such as distributed caching or data fabric (grid) technologies that combines the functions and semantics of database technology and reliable messaging technology may be a better fit.

Heterogeneous applications share a common object domain model - objects published or altered by one application is shared with other applications at memory speeds. The key to fast object sharing is the use of domain classes across these heterogeneous applications that are more or less similar, a native serialization protocol that can detect and dispatch object change "deltas" and an optimized neutral object wire format.


Monday, April 09, 2007

Reliable pub Sub is an integral part of a Data Fabric

Can the data fabric (or a distributed cache with strong reliability semantics in message distribution) be used as a replacement for traditional messaging? This is a question we frequently get asked and this has been a sweet spot for us at GemStone. Here is some rant on the rationale:

Instead of a distributed main memory based data management solution that merely manages key-value pairs, imagine a solution that can manage objects along with relationships. In other words, ensures data integrity through knowledge about the entire data model. Ahh! like a database.
Now, combine this attribute with the ability to distribute data (replicate, partition, whatever) to many nodes, reliably and provide notifications to subsribing applications. What you get is a ACTIVE, distributed data management system - a system that inherently provides reliable pub-sub along with key semantics of the traditional data management system. Applications simply update a database (objects and relationships), express interest through complex query expressions on the data model and get delivered notifications based on their interest.

In a traditional messaging solution:

  1. Publishing application has to explicitly construct messages, add header information for message identification and to allow subscribers to make sense of the message add enough contexual information. Often, this contextual information takes the form of identifiers (keys that point to the real data in some database).
  2. Most messaging solutions use hub-spoke mechanisms to queue, relay messages to subscribers. i.e. messages hop from application process to some server process managed by the messaging provider and then to the receiving application. Look at many practical applications and you will find that the cost of messaging is not necessarily in the network, but, rather in the CPU costs associated with (de)serialization and all the copying of byte[] that goes on each process space and in the kernel layers.
  3. Receiving application again has to do the reverse - parse headers, deserialize message body and then make sense of the message. To make any decision, often the application has to look up related data from other enterprise repositories such as a relational database, often slowing down the rate at which the entire flow can operate to the slowest link - often the relational database query.
Now, with a data fabric, all applications are sharing a single data model and express interest on the data model through simple, intuitive queries. The underlying fabric is constantly detecting what and how data and relationships are changing and simply sends notifications of the changes to the consuming application.

Note the following advantages with a data fabric:

  1. Application processes are connected to each other in a p2p fashion with direct connections between each. This allows the fabric to avoid unnecessary network hops, dramatically reducing latency and CPU costs associated with message transfer.
  2. As the data is typically held in multiple locations and often replicated to the process space of the consuming application, the publisher doesn't have to send obese messages - applications merely change the data fabric and underneath the hood the right "delta" event gets propagated to the consuming applications.
  3. The receiving application when notified can take immediate business decisions as the contextual information they need is cached right there.

More later ....