Search Tips   |   Advanced Search

WebSphere eXtreme Scale v8.6 Overview

  1. Elastic scalability
  2. WXS with databases
  3. What's new in Version 8.6
  4. Release notes
  5. Hardware and software requirements
  6. Directory conventions
  7. WXS technical overview
  8. Caching overview
  9. Cache integration overview
  10. Database integration
  11. Serialization overview
  12. Scalability overview
  13. Availability overview


WebSphere eXtreme Scale (WXS) is a data grid that caches application data across multiple servers, performing massive volumes of transaction processing with high efficiency and linear scalability.

WXS can be used...

eXtreme Scale splits data set into partitions. Each partition exists as a primary copy, or shard. A partition also contains replica shards for backing up the data.

WebSphere DataPower XC10 can be used with WebSphere Commerce Suite as an alternative caching mechanism for Dynamic Cache to reduce local memory. Commerce nodes can use the WebSphere eXtreme Scale dynamic cache provider to off load caching from local memory to XC10. All of the cached data is stored in on the XC10 Collective, providing availability of cache data and improved performance.

Elastic scalability

Elastic scalability is enabled using distributed object caching. The data grid monitors and manages itself, adding or removing servers from the topology, which increases memory, network throughput, and processing capacity as needed. Capacity is added to the data grid while it is running without requiring a restart. The data grid self-heals by automatically recovering from failures.

WXS with databases

Using the write-behind cache feature, eXtreme Scale can serve as a front-end cache for a database. WXS clients send and receive data from the data grid, which can be synchronized with a backend data store. The cache is coherent because all of the clients see the same data in the cache. Each piece of data is stored on exactly one writable server in the cache. Having one copy of each piece of data prevents wasteful copies of records that might contain different versions of the data. A coherent cache holds more data as more servers are added to the data grid, and scales linearly as the data grid grows in size. The data can also be optionally replicated for additional fault tolerance.

WXS container servers provide the in-memory data grid. Container servers can run...

The data grid is not limited by, and does not have an impact on, the memory or address space of the application or the application server. The memory can be the sum of the memory of several hundred, or thousand, JVMs, running on many different physical servers.

As an in-memory database processing space, WXS can be backed by disk, database, or both.

While eXtreme Scale provides several Java APIs, many use cases require no user programming, just configuration and deployment in your WebSphere infrastructure.

Data grid overview

The simplest eXtreme Scale programming interface is the ObjectMap interface, a map interface that includes:

The fundamental data grid paradigm is a key-value pair, where the data grid stores values (Java objects), with an associated key (another Java object). The key is later used to retrieve the value. In eXtreme Scale, a map consists of entries of such key-value pairs.

WXS offers a number of data grid configurations, from a single, simple local cache, to a large distributed cache, using multiple JVMs or servers.

In addition to storing simple Java objects, we can store objects with relationships. Use a query language that is like SQL, with SELECT - FROM - WHERE statements to retrieve these objects. For example, an order object might have a customer object and multiple item objects associated with it. WXS supports one-to-one, one-to-many, many-to-one, and many-to-many relationships.

WXS also supports an EntityManager programming interface for storing entities in the cache. This programming interface is like entities in Java Enterprise Edition. Entity relationships can be automatically discovered from..

Retrieve entities from the cache by primary key using the find method on the EntityManager interface. Entities can be persisted to or removed from the data grid within a transaction boundary.

Consider a distributed example where the key is a simple alphabetic name. The cache might be split into four partitions by key: partition 1 for keys starting with A-E, partition 2 for keys starting with F-L, and so on. For availability, a partition has a primary shard and a replica shard. Changes to the cache data are made to the primary shard, and replicated to the replica shard. You configure the number of servers that contain the data grid data, and eXtreme Scale distributes the data into shards over these server instances. For availability, replica shards are placed in separate physical servers from primary shards.

WXS uses a catalog service to locate the primary shard for each key. It handles moving shards among eXtreme Scale servers when the physical servers fail and later recover. If the server containing a replica shard fails, eXtreme Scale allocates a new replica shard. If a server containing a primary shard fails, the replica shard is promoted to be the primary shard . As before, a new replica shard is constructed.

Release notes

  1. System Requirements
  2. Detailed system requirements
  3. Technotes
  4. Support Web
  5. Fix packs
  6. Library

Hardware and software requirements

Formally supported hardware and software options are available on the Systems Requirements page.

Install and deploy the product in Java EE and Java SE environments. We can also bundle the client component with Java EE applications directly without integrating with WebSphere Application Server.

Hardware requirements

WXS does not require a specific level of hardware. The hardware requirements are dependent on the supported hardware for the Java SE installation used to run WXS. If we are using eXtreme Scale with WebSphere Application Server or another JEE implementation, the hardware requirements of these platforms are sufficient for WXS.

Operating system requirements

Each Java SE and Java EE implementation requires different operating system levels or fixes for problems that are discovered during the testing of the Java implementation. The levels required by these implementations are sufficient for eXtreme Scale.

Installation Manager requirements

Before installing WXS, install Installation Manager using either...

Web browser requirements

The web console supports the following Web browsers:

WebSphere Application Server requirements

Java requirements

Other Java EE implementations can use the WXS run time as a local instance or as a client to WXS servers. To implement Java SE, use Version 6 or later.

Directory conventions


Tutorials directory....

    wxs_home /samples

    wxs_home \samples


Root directory of the DVD that contains the product.



Root directory of the Eclipse Equinox OSGi framework installation. Example:



User files such as security profiles.

    c:\Documents and Settings\user_name


WXS technical overview

The first steps to deploying a data grid are to...

  1. Start a core group and catalog service
  2. Start WXS server processes for the data grid to store and retrieve data
As servers are started, they automatically register with the core group and catalog service. More servers increase data grid capacity and reliability.

A local data grid is a simple, single-instance grid. To best use WXS as an in-memory database processing space, deploy a distributed data grid. Data is spread out over the various eXtreme Scale servers. Each server contains only some of the data, called a partition.

The catalog service locates the partition for a given datum based on its key. A server contains one or more partitions, limited only by the server's memory space.

Increasing the number of partitions increases the capacity of the data grid. The maximum capacity of a data grid is the number of partitions times the usable memory size of each server.

The data of a partition is stored in a shard. For availability, a data grid can be configured with synchronous or asynchronous replicas. Changes to the grid data are made to the primary shard, and replicated to the replica shards. The total memory used or required by a data grid is thus the size of the data grid times (1 (for the primary) + the number of replicas).

WXS distributes the shards of a data grid over the number of servers comprising the grid. These servers may be on the same or different physical servers. For availability, replica shards are placed in separate physical servers from primary shards.

WXS monitors the status of its servers and moves shards during shard or physical server failure and recovery. If the server containing a replica shard fails, WXS allocates a new replica shard, and replicates data from the primary to the new replica. If a server that contains a primary shard fails, the replica shard is promoted to be the primary shard, and, a new replica shard is constructed. If you start an additional server for the data grid, the shards are balanced over all servers. This rebalancing is called scale-out. Similarly, for scale-in, you might stop one of the servers to reduce the resources used by a data grid. As a result, the shards are balanced over the remaining servers.

Caching overview

Caching architecture: Maps, containers, clients, and catalogs

With WXS, your architecture can use either...

WXS requires minimal additional infrastructure to operate. The infrastructure consists of scripts to install, start, and stop a JEE application on a server. Cached data is stored in the WXS server, and clients remotely connect to the server.

Distributed caches offer increased performance, availability and scalability and can be configured using dynamic topologies, in which servers are automatically balanced. We can also add additional servers without restarting your existing eXtreme Scale servers. We can create either simple deployments or large, terabyte-sized deployments in which thousands of servers are needed.

Catalog service

The catalog service...

The catalog service hosts logic that should be idle and has little influence on scalability. It is built to service hundreds of container servers that become available simultaneously, and run services to manage the container servers.

The catalog server responsibilities consist of the following services:

For high availability, configure a catalog service domain, which consists of multiple JVMs, including a master JVM and a number of backup JVMs.

Container servers, partitions, and shards

The container server stores application data for the data grid. This data is generally broken into parts, which are called partitions. Partitions are hosted across multiple shard containers. Each container server in turn hosts a subset of the complete data. A JVM might host one or more shard containers and each shard container can host multiple shards. Plan out the heap size for the container servers, which host all of your data. Configure heap settings accordingly.

Partitions host a subset of the data in the grid. WXS automatically places multiple partitions in a single shard container and spreads the partitions out as more container servers become available. Choose the number of partitions carefully before final deployment because the number of partitions cannot be changed dynamically. A hash mechanism is used to locate partitions in the network and eXtreme Scale cannot rehash the entire data set after it has been deployed. As a general rule, we can overestimate the number of partitions

Shards are instances of partitions and have one of two roles: primary or replica. The primary shard and its replicas make up the physical manifestation of the partition. Every partition has several shards that each host all of the data contained in that partition. One shard is the primary, and the others are replicas, which are redundant copies of the data in the primary shard . A primary shard is the only partition instance that allows transactions to write to the cache. A replica shard is a "mirrored" instance of the partition. It receives updates synchronously or asynchronously from the primary shard. The replica shard only allows transactions to read from the cache. Replicas are never hosted in the same container server as the primary and are not normally hosted on the same machine as the primary.

To increase the availability of the data, or increase persistence guarantees, replicate the data. However, replication adds cost to the transaction and trades performance in return for availability.

With eXtreme Scale, we can control the cost as both synchronous and asynchronous replication is supported, as well as hybrid replication models using both synchronous and asynchronous replication modes. A synchronous replica shard receives updates as part of the transaction of the primary shard to guarantee data consistency. A synchronous replica can double the response time because the transaction has to commit on both the primary and the synchronous replica before the transaction is complete. An asynchronous replica shard receives updates after the transaction commits to limit impact on performance, but introduces the possibility of data loss as the asynchronous replica can be several transactions behind the primary.


A map is a container for key-value pairs, which allows an application to store a value indexed by a key. Maps support indexes that can be added to index attributes on the key or value. These indexes are automatically used by the query runtime to determine the most efficient way to run a query.

A map set is a collection of maps with a common partitioning algorithm. The data within the maps are replicated based on the policy defined on the map set. A map set is only used for distributed topologies and is not needed for local topologies.

A map set can have a schema associated with it. A schema is the metadata that describes the relationships between each map when using homogeneous Object types or entities.

WXS can store serializable Java objects or XDF serialized objects in each of the maps using either...

A schema can be defined over the maps to identify the relationship between the objects in the maps where each map holds objects of a single type. Defining a schema for maps is required to query the contents of the map objects. WXS can have multiple map schemas defined.

WXS can also store entities using the EntityManager API . Each entity is associated with a map. The schema for an entity map set is automatically discovered using either...

Each entity has a set of key attributes and set of non-key attributes. An entity can also have relationships to other entities. WXS supports one to one, one to many, many to one and many to many relationships. Each entity is physically mapped to a single map in the map set. Entities allow applications to easily have complex object graphs that span multiple Maps. A distributed topology can have multiple entity schemas.



  1. connect to a catalog service
  2. retrieve a description of the server topology
  3. communicate directly to each server as needed

When the server topology changes because new servers are added or existing servers have failed, the dynamic catalog service routes the client to the appropriate server that is hosting the data. Clients must examine the keys of application data to determine which partition to route the request. Clients can read data from multiple partitions in a single transaction. Clients can update only a single partition in a transaction. After the client updates some entries, the client transaction must use that partition for updates.

Use two types of clients:

Java clients

Java client applications run on JVMs and connect to the catalog service and container servers.

.NET clients

NET clients work similarly to Java clients, but do not run in JVMs. .NET clients are installed remotely from the catalog and container servers. You connect to the catalog service from within the application. Use a .NET client application to connect to the same data grid as your Java clients.

Enterprise data grid overview

Enterprise data grids use the eXtremeIO transport mechanism and a new serialization format. With the new transport and serialization format, we can connect both Java and .NET clients to the same data grid.

With the enterprise data grid, we can create multiple types of applications, written in different programming languages, to access the same objects in the data grid. In prior releases, data grid applications had to be written in the Java programming language only. With the enterprise data grid function, we can write .NET applications that can create, retrieve, update, and delete objects from the same data grid as the Java application. data grid high-level overview

Object updates across different applications

  1. The .NET client saves data in its format to the data grid.
  2. The data is stored in a universal format, so that when the Java client requests this data it can be converted to Java format.
  3. The Java client updates and re-saves the data.
  4. The .NET client accesses the updated data, during which the data is converted to .NET format.

Transport mechanism

eXtremeIO (XIO) is a cross-platform transport protocol that replaces the Java-bound ORB. With the ORB, WXS is bound to Java native client applications. The XIO transport mechanism is specifically targeted for data caching, and enables client applications that are in different programming languages to connect to the data grid.


XDF is a cross-platform serialization format that replaces Java serialization. Enabled on maps with a copyMode attribute value of COPY_TO_BYTES in the ObjectGrid descriptor XML file. With XDF, performance is faster and data is more compact. In addition, the introduction of XDF enables client applications that are in different programming languages to connect to the same data grid.

Class evolution

XDF allows for class evolution. We can evolve class definitions used in the data grid without affecting older applications using previous versions of the class. Classes can function together when one of the classes has fewer fields than the other class.

XDF implementation scenarios:

Determination for evolution

XDF attempts to evolve a class when the class names match and the field names do not have conflicting types. Using the ClassAlias and FieldAlias annotations are useful when we are trying to match classes between C# and Java applications where the names of the classes or fields are slightly different. We can put these annotations on either the Java and C# application, or both. However, the lookup for the class in the Java application can be less efficient than defining the ClassAlias on the C# application.

The effect of missing fields in serialized data

The constructor of the class is not invoked during deserialization, so any missing fields have a default that is assigned to it based on the language. The application that is adding new fields must be able to detect the missing fields and react when an older version of class is retrieved.

Updating the data is the only way for older applications to keep the newer fields

An application might run a fetch operation and update the map with an older version of the class that is missing some fields in the serialized value from the client. The server then merges the values on the server and determines whether any fields in the original version are merged into the new record. If an application runs a fetch operation, and then removes and inserts an entry, the fields from the original value are lost.

Merging capabilities

Objects within an array or collection are not merged by XDF. It is not always clear whether an update to an array or collection is intended to change the elements of that array or the type. If a merge occurs based on positioning, when an entry in the array is moved, XDF might merge fields that are not intended to be associated. As a result, XDF does not attempt to merge the contents of arrays or collections. However, if you add an array in a newer version of a class definition, the array gets merged back into the previous version of the class.

IBM eXtremeMemory

IBM eXtremeMemory enables objects to be stored in native memory instead of the Java heap. By moving objects off the Java heap, we can avoid garbage collection pauses, leading to more constant performance and predicable response times.

The JVM relies on usage heuristics to collect, compact, and expand process memory. The garbage collector completes these operations. However, running garbage collection has an associated cost. The cost of running garbage collection increases as the size of the Java heap and number of objects in the data grid increase. The JVM provides different heuristics for different use cases and goals: optimum throughput, optimum pause time, generational, balanced, and real-time garbage collection. No heuristic is perfect. A single heuristic cannot suit all possible configurations.

WXS uses data caching, with distributed maps that have entries with a well-known lifecycle. This lifecycle includes the following operations: GET, INSERT, DELETE, and UPDATE. By using these well-known map lifecycles, eXtremeMemory can manage memory usage for data grid objects in container servers more efficiently than the standard JVM garbage collector.

The following diagram shows how using eXtremeMemory leads to more consistent relative response times in the environment. As the relative response times reach the higher percentiles, the requests that are using eXtremeMemory have lower relative response times. The diagram shows the 95-100 percentiles. .


Zones give you control over shard placement. Zones are user-defined logical groupings of physical servers. The following are examples of different types of zones:

Zones defined between data centers

The classic example and use case for zones is when you have two or more geographically dispersed data centers. Dispersed data centers spread the data grid over different locations for recovery from data center failure. For example, you might want to ensure that you have a full set of asynchronous replica shards for the data grid in a remote data center. With this strategy, we can recover from the failure of the local data center transparently, with no loss of data. Data centers themselves have high speed, low latency networks. However, communication between one data center and another has higher latency. Synchronous replicas are used in each data center where the low latency minimizes the impact of replication on response times. Using asynchronous replication reduces impact on response time. The geographic distance provides availability in case of local data center failure.

In the following example, primary shards for the Chicago zone have replicas in the London zone. Primary shards for the London zone have replicas in the Chicago zone.

Three configuration settings in eXtreme Scale control shard placement:

Disable development mode

To disable development mode, and to activate the first eXtreme Scale shard placement policy, in the deployment XML file, set...

Policy 1: Primary and replica shards for partition are placed in separate physical servers

If a single physical server fails, no data is lost. Most efficient setting for production high availability.

Physical servers are defined by an IP address. The IP addresses used by container servers hosting the shards is set using the start script parameter...

Policy 2: Primary and replica shards for the same partition are placed in separate zones

With this policy, we can control shard placement by defining zones. You choose your physical or logical boundary or grouping of interest. Then, choose a unique zone name for each group, and start the container servers in each of your zones with the name of the appropriate zone. Thus eXtreme Scale places shards so that shards for the same partition are placed in separate zones.

Container servers are assigned to zones with the zone parameter on the start server script. In a WAS environment, zones are defined through node groups with a specific name format:

We can extend the example of a data grid with one replica shard by deploying it across two data centers. Define each data center as an independent zone. Use a zone name of DC1 for the container servers in the first data center, and DC2 for the container servers in the second data center. With this policy, the primary and replica shards for each partition would be in different data centers. If a data center fails, no data is lost. For each partition, either its primary or replica shard is in the other data center.

Zone rules

The finest level of control over shard placement is achieved using zone rules, specified in the zoneMetadata element of the WXS deployment policy descriptor XML. A zone rule defines a set of zones in which shards are placed. A shardMapping element assigns a shard to a zone rule. The shard attribute of the shardMapping element specifies the shard type:

If more than one synchronous or asynchronous replica exist, provide shardMapping elements of the appropriate shard type. The exclusivePlacement attribute of the zoneRule element determines the placement of shards...

Shard placement strategies

Rolling upgrades

Consider a scenario in which we want to apply rolling upgrades to your physical servers, including maintenance that requires restarting the deployment. In this example, assume that you have a data grid spread across 20 physical servers, defined with one synchronous replica. You want to shut down 10 of the physical servers at a time for the maintenance.

When you shut down groups of 10 physical servers, to assure no partition has both its primary and replica shards on the servers we are shutting down, define a third zone. Instead of two zones of 10 physical servers each, use three zones, two with seven physical servers, and one with six. Spreading the data across more zones allows for better failover for availability.

Rather than defining another zone, the other approach is to add a replica.

Upgrade eXtreme Scale

When upgrading eXtreme Scale software in a rolling manner with data grids that contain live data, the catalog service software version must be greater than or equal to the container server software versions. Upgrade all the catalog servers first with a rolling strategy.

Change data model

To change the data model or schema of objects that are stored in the data grid without causing downtime...

  1. Start a new data grid with the new schema
  2. Copy the data from the old data grid to the new data grid
  3. Shut down the old data grid

Each of these processes are disruptive and result in downtime. To change the data model without downtime, store the object in one of these formats:

Write serializers to go from plain old Java object (POJO) to one of these formats on the client side.


eXtreme Scale insures that two shards for the same partition are never placed on the same IP address, as described in Policy 1 . When deploying on virtual images, such as VMware, many server instances, each with a unique IP address, can be run on the same physical server. To ensure that replicas can only be placed on separate physical servers, group physical servers into zones, and use zone placement rules to keep primary and replica shards in separate zones.

Zones for wide-area networks

Multiple buildings or data centers with slower network connections can lead to lower bandwidth and higher latency connections. The WXS catalog service organizes container servers into core groups that exchange heartbeats to detect container server failure. These core groups do not span zones. A leader within each core group pushes membership information to the catalog service which verifies reported failures before responding to membership information by heartbeating the container server in question. If the catalog service sees a false failure detection, the catalog service takes no action. The core group partition heals quickly. The catalog service also heartbeats core group leaders periodically at a slow rate to handle the case of core group isolation.


Evictors remove data from the data grid. We can set either...

Evictor types

A default time to live TTL evictor is created for every backing map.

You may also write your own evictors

Pluggable evictor

Use an optional pluggable evictor to evict entries based on the number of entries in the BackingMap.

The BackingMap informs an evictor as entries are created, modified, or removed in a transaction, keeping track of these entries and choosing when to evict entries.

A BackingMap instance has no configuration information for a maximum size. Instead, evictor properties are set to control the evictor behavior. Both the LRUEvictor and the LFUEvictor have a maximum size property used to cause the evictor to begin to evict entries after the maximum size is exceeded. Like the TTL evictor, the LRU and LFU evictors might not immediately evict an entry when the maximum number of entries is reached to minimize impact on performance. We can write your own custom evictors.

Memory-based eviction

Memory-based eviction is only supported on JEE v5 or later.

All built-in evictors support memory-based eviction that can be enabled on the BackingMap interface by setting the evictionTriggers attribute of BackingMap interface MEMORY_USAGE_THRESHOLD.

Memory-based eviction is based on heap usage threshold. When memory-based eviction is enabled on BackingMap and the BackingMap has any built-in evictor, the usage threshold is set to a default percentage of total memory if the threshold has not been previously set.

When using memory-based eviction, configure the garbage collection threshold to the same value as the target heap utilization. If the memory-based eviction threshold is set at 50 percent and the garbage collection threshold is at the default 70 percent level, the heap utilization can go as high as 70 percent because memory-based eviction is only triggered after a garbage collection cycle.

To change the default usage threshold percentage, set memoryThresholdPercentage. To set the target usage threshold on a client process, we can use the MemoryPoolMXBean.

The memory-based eviction algorithm used by WXS is sensitive to the behavior of the garbage collection algorithm in use. The best algorithm for memory-based eviction is the IBM default throughput collector. Generation garbage collection algorithms can cause undesired behavior, and so you should not use these algorithms with memory-based eviction.

To change the usage threshold percentage, set the memoryThresholdPercentage property on the container and server property files for eXtreme Scale server processes.

During runtime, if the memory usage exceeds the target usage threshold, memory-based evictors start evicting entries and try to keep memory usage below the target usage threshold. However, no guarantee exists that the eviction speed is fast enough to avoid a potential out of memory error if the system runtime continues to quickly consume memory.

OSGi framework overview

WXS OSGi support allows us to deploy WXS v8.6 in the Eclipse Equinox OSGi framework.

With the dynamic update capability that the OSGi framework provides, we can update the plug-in classes without restarting the JVM. These plug-ins are exported by user bundles as services. WXS accesses the service or services by looking them up the OSGi registry.

eXtreme Scale containers can be configured to start more dynamically using either...

To deploy a new data grid with its placement strategy, create an OSGi configuration or deploy a bundle with WXS descriptor XML files. With OSGi support, application bundles containing WXS configuration data can be installed, started, stopped, updated, and uninstalled without restarting the whole system. With this capability, we can upgrade the application without disrupting the data grid.

Plug-in beans and services can be configured with custom shard scopes, allowing sophisticated integration options with other services running in the data grid. Each plug-in can use OSGi Blueprint rankings to verify that every instance of the plug-in is activated is at the correct version. An OSGi-managed bean (MBean) and xscmd utility are provided, allowing you to query the WXS plug-in OSGi services and their rankings.

This capability allows administrators to quickly recognize potential configuration and administration errors and upgrade the plug-in service rankings in use by eXtreme Scale .

OSGi bundles

To interact with and deploy plug-ins in the OSGi framework, use bundles. In the OSGi service platform, a bundle is a JAR file that contains Java code, resources, and a manifest that describes the bundle and its dependencies. The bundle is the unit of deployment for an application. The eXtreme Scale product supports the following bundle types:


We cannot restart the WXS bundle because we cannot restart the object request broker (ORB) or eXtremeIO (XIO). To restart the WXS server, restart the OSGi framework.

Cache integration overview

The crucial element that gives WXS the capability to perform with such versatility and reliability is its application of caching concepts to optimize the persistence and recollection of data in virtually any deployment environment.

Spring cache provider

Spring Framework Version 3.1 introduced a new cache abstraction. With this new abstraction, we can transparently add caching to an existing Spring application. Use WXS as the cache provider for the cache abstraction.

Liberty profile

The Liberty profile is a highly composable, fast-to-start, dynamic application server runtime environment.

You install the Liberty profile when you install WXS with WebSphere Application Server Version 8.5. Because the Liberty profile does not include a Java runtime environment (JRE), you have to install a JRE provided by either Oracle or IBM .

This server supports two models of application deployment:

The Liberty profile supports a subset of the following parts of the full WebSphere Application Server programming model:

Associated services such as transactions and security are only supported as far as is required by these application types and by JPA.

Features are the units of capability by which you control the pieces of the runtime environment that are loaded into a particular server. The Liberty profile includes the following main features:

We can work with the runtime environment directly, or using the WebSphere Application Server Developer Tools for Eclipse.

On distributed platforms, the Liberty profile provides both a development and an operations environment. On the Mac, it provides a development environment.

On z/OS systems, the Liberty profile provides an operations environment. We can work natively with this environment using the MVS. console. For application development, consider using the Eclipse-based developer tools on a separate distributed system, on Mac OS, or in a Linux shell on z/OS.

Run the Liberty profile with a third-party JRE

When you use a JRE that Oracle provides, special considerations must be taken to run WXS with the Liberty profile.

WebSphere eXtreme Scale server features for the Liberty profile

Features are the units of capability by which you control the pieces of the runtime environment that are loaded into a particular server.

The following list contains information about the main available features. Including a feature in the configuration might cause one or more more features to be loaded automatically. Each feature includes a brief description and an example of how the feature is declared

Server feature

The server feature contains the capability for running a WXS server, both catalog and container. Add the server feature when we want to run a catalog server in the Liberty profile or when we want to deploy a grid application into the Liberty profile.

<wlp_install_root>/usr/server/wxsserver/server.xml file

<server description="WXS Server">




<com.ibm.ws.xs.server.config /> </server>

Client feature

The client feature contains most of the programming model for eXtreme Scale. Add the client feature when you have an application running in the Liberty profile that is going to use eXtreme Scale APIs.

<wlp_install_root>/usr/server/wxsclient/server.xml file

<server description="WXS client">




<com.ibm.ws.xs.client.config /> </server>

Web feature

Deprecated. Use the webapp feature when we want to replicate HTTP session data for fault tolerance.

The web feature contains the capability to extend the Liberty profile web application. Add the web feature when we want to replicate HTTP session data for fault tolerance.

<wlp_install_root>/usr/server/wxsweb/server.xml file

<server description="WXS enabled Web Server">




<com.ibm.ws.xs.web.config /> </server>

WebApp feature

The webApp feature contains the capability to extend the Liberty profile web application. Add the webApp feature when we want to replicate HTTP session data for fault tolerance.

<wlp_install_root>/usr/server/wxswebapp/server.xml file

<wlp_install_root>/usr/server/wxswebapp/server.xml file 
<server description="WXS enabled Web Server">


<com.ibm.ws.xs.webapp.config />

WebGrid feature

A Liberty profile server can host a data grid that caches data for applications to replicate HTTP session data for fault tolerance.

<wlp_install_root>/usr/server/wxswebgrid/server.xml file

<wlp_install_root>/usr/server/wxswebgrid/server.xml file 
<server description="WXS enabled Web Server">


<com.ibm.ws.xs.webgrid.config />

Dynamic cache feature

A Liberty profile server can host a data grid that caches data for applications that have dynamic cache enabled.

<wlp_install_root>/usr/server/wxsweb/server.xml file

<server description="WXS enabled Web Server">




<com.ibm.ws.xs.xsDynacacheGrid.config /> </server>

JPA feature

Use the JPA feature for the applications that use JPA in the Liberty profile.

<wlp_install_root>/usr/server/wxsjpa/server.xml file

<wlp_install_root>/usr/server/wxsjpa/server.xml file 
<server description="WXS enabled Web Server">


<com.ibm.ws.xs.jpa.config />

REST feature

Access simple data grids hosted by a collective in the Liberty profile.

<wlp_install_root>/usr/server/wxsrest/server.xml file

<wlp_install_root>/usr/server/wxsrest/server.xml file 
<server description="WXS enabled Web Server">


<com.ibm.ws.xs.rest.config />

OpenJPA level 2 (L2) and Hibernate cache plug-in

WXS includes JPA L2 cache plug-ins for both OpenJPA and Hibernate JPA providers. When you use one of these plug-ins, the application uses the JPA API. A data grid is introduced between the application and the database, improving response times.

Using WXS as an L2 cache provider increases performance when we are reading and querying data and reduces load to the database. WXS has advantages over built-in cache implementations because the cache is automatically replicated between all processes. When one client caches a value, all other clients are able to use the cached value that is locally in-memory.

We can configure the topology and properties for the L2 cache provider in the persistence.xml file.

The JPA L2 cache plug-in requires an application that uses the JPA APIs. To use WXS APIs to access a JPA data source, use the JPA loader.

JPA L2 cache topology considerations

The following factors affect which type of topology to configure:
  1. How much data do you expect to be cached?

    • If the data can fit into a single JVM heap, use the Embedded topology or Intra-domain topology.
    • If the data cannot fit into a single JVM heap, use the Embedded, partitioned topology, or Remote topology

  2. What is the expected read-to-write ratio?

    The read-to-write ratio affects the performance of the L2 cache. Each topology handles read and write operations differently.

      Embedded topology local read, remote write
      Intra-domain topology local read, local write
      Embedded, partitioned topology Partitioned: remote read, remote write
      Remote topology remote read, remote write.

    Applications that are mostly read-only should use embedded and intra-domain topologies when possible. Applications that do more writing should use intra-domain topologies.

  3. What is percentage of data is queried versus found by a key?

    When enabled, query operations make use of the JPA query cache. Enable the JPA query cache for applications with high read to write ratios only, for example when we are approaching 99% read operations. If you use JPA query cache, use the Embedded topology or Intra-domain topology .

    The find-by-key operation fetches a target entity if the target entity does not have any relationship. If the target entity has relationships with the EAGER fetch type, these relationships are fetched along with the target entity. In JPA data cache, fetching these relationships causes a few cache hits to get all the relationship data.

  4. What is the tolerated staleness level of the data?

    In a system with few JVMs, data replication latency exists for write operations. The goal of the cache is to maintain an ultimate synchronized data view across all JVMs. When using the intra-domain topology, a data replication delay exists for write operations. Applications using this topology must be able to tolerate stale reads and simultaneous writes that might overwrite data.

Intra-domain topology

With an intra-domain topology, primary shards are placed on every container server in the topology. These primary shards contain the full set of data for the partition. Any of these primary shards can also complete cache write operations. This configuration eliminates the bottleneck in the embedded topology where all the cache write operations must go through a single primary shard .

In an intra-domain topology, no replica shards are created, even if you have defined replicas in the configuration files. Each redundant primary shard contains a full copy of the data, so each primary shard can also be considered as a replica shard. This configuration uses a single partition, similar to the embedded topology.

Related JPA cache configuration properties for the intra-domain topology:



Embedded topology

Consider using an intra-domain topology for the best performance.

An embedded topology creates a container server within the process space of each application. OpenJPA and Hibernate read the in-memory copy of the cache directly and write to all of the other copies. We can improve the write performance by using asynchronous replication. This default topology performs best when the amount of cached data is small enough to fit in a single process. With an embedded topology, create a single partition for the data.

Related JPA cache configuration properties for the embedded topology:

ObjectGridName=objectgrid_name,ObjectGridType=EMBEDDED,MaxNumberOfReplicas=num_replicas,ReplicaMode=SYNC | ASYNC | NONE



Embedded, partitioned topology

Consider using an intra-domain topology for the best performance.

Do not use the JPA query cache with an embedded partitioned topology. The query cache stores query results that are a collection of entity keys. The query cache fetches all entity data from the data cache. Because the data cache is divided up between multiple processes, these additional calls can negate the benefits of the L2 cache.

When the cached data is too large to fit in a single process, we can use the embedded, partitioned topology. This topology divides the data over multiple processes. The data is divided between the primary shards, so each primary shard contains a subset of the data. We can still use this option when database latency is high.

Related JPA cache configuration properties for the embedded, partitioned topology:

ObjectGridName=objectgrid_name,ObjectGridType=EMBEDDED_PARTITION,ReplicaMode=SYNC | ASYNC | NONE, NumberOfPartitions=num_partitions,ReplicaReadEnabled=TRUE | FALSE



For example, to cache 10 GB of data with a maximum of 1 GB per JVM, 10 JVMs are required. The number of partitions must therefore be set to 10 or more. Ideally, the number of partitions must be set to a prime number where each shard stores a reasonable amount of memory. Usually, the numberOfPartitions setting is equal to the number of JVMs. With this setting, each JVM stores one partition. If you enable replication, you must increase the number of JVMs in the system. Otherwise, each JVM also stores one replica partition, which consumes as much memory as a primary partition.

Read about Sizing memory and partition count calculation to maximize the performance of your chosen configuration.

For example, in a system with four JVMs, and the numberOfPartitions setting value of

4, each JVM hosts a primary partition. A read operation has a 25 percent chance of fetching data from a locally available partition, which is much faster compared to getting data from a remote JVM. If a read operation, such as running a query, needs to fetch a collection of data that involves 4 partitions evenly, 75 percent of the calls are remote and 25 percent of the calls are local. If the ReplicaMode setting is set to either

SYNC or ASYNC and the ReplicaReadEnabled setting is set to true, then four replica partitions are created and spread across four JVMs. Each JVM hosts one primary partition and one replica partition. The chance that the read operation runs locally increases to 50 percent. The read operation that fetches a collection of data that involves four partitions evenly has 50 percent remote calls and 50 percent local calls. Local calls are much faster than remote calls. Whenever remote calls occur, the performance drops.

Remote topology


Do not use the JPA query cache with a remote topology . The query cache stores query results that are a collection of entity keys. The query cache fetches all entity data from the data cache. Because the data cache is remote, these additional calls can negate the benefits of the L2 cache.

Consider using an intra-domain topology for the best performance.

A remote topology stores all of the cached data in one or more separate processes, reducing memory use of the application processes. We can take advantage of distributing your data over separate processes by deploying a partitioned, replicated eXtreme Scale data grid. As opposed to the embedded and embedded partitioned configurations described in the previous sections, if we want to manage the remote data grid, you must do so independent of the application and JPA provider.

Related JPA cache configuration properties for the remote topology :


The REMOTE ObjectGrid type does not require any property settings because the ObjectGrid and deployment policy is defined separately from the JPA application. The JPA cache plug-in remotely connects to an existing remote ObjectGrid.

Because all interaction with the ObjectGrid is remote, this topology has the slowest performance among all ObjectGrid types.



HTTP session management

The session replication manager that is shipped with WXS can work with the default session manager in WebSphere Application Server. Session data is replicated from one process to another process to support user session data high availability.


The session manager has been designed so that it can run in any Java EE v 6 or later container. Because the session manager does not have any dependencies on WebSphere APIs, it can support various versions of WebSphere Application Server, as well as vendor application server environments.

The HTTP session manager provides session replication capabilities for an associated application. The session replication manager works with the session manager for the web container. Together, the session manager and web container create HTTP sessions and manage the life cycles of HTTP sessions that are associated with the application. These life cycle management activities include: the invalidation of sessions based on a timeout or an explicit servlet or JavaServer Pages (JSP) call and the invocation of session listeners that are associated with the session or the web application. The session manager persists its sessions in a fully replicated, clustered and partitioned data grid. The use of the WXS session manager enables the session manager to provide HTTP session failover support when application servers are shut down or end unexpectedly. The session manager can also work in environments that do not support affinity, when affinity is not enforced by a load balancer tier that sprays requests to the application server tier.

Usage scenarios

The session manager can be used in the following scenarios:

How the session manager works

The session replication manager uses a session listener to listen on the changes of session data. The session replication manager persists the session data into an ObjectGrid instance either locally or remotely. We can add the session listener and servlet filter to every web module in the application with tooling that ships with WXS. We can also manually add these listeners and filters to the web deployment descriptor of the application.

This session replication manager works with each vendor web container session manager to replicate session data across JVMs. When the original server dies, users can retrieve session data from other servers.

Deployment topologies

The session manager can be configured using two different dynamic deployment scenarios:

Generic embedded container startup

eXtreme Scale automatically starts an embedded ObjectGrid container inside any application-server process when the web container initializes the session listener or servlet filter, if the objectGridType property is set to EMBEDDED.

You are not required to package an ObjectGrid.xml file and objectGridDeployment.xml file into your web application WAR or EAR file. The default ObjectGrid.xml and objectGridDeployment.xml files are packaged in the product JAR file. Dynamic maps are created for various web application contexts by default. Static eXtreme Scale maps continue to be supported.

This approach for starting embedded ObjectGrid containers applies to any type of application server. The approaches involving a WebSphere Application Server component or WebSphere Application Server Community Edition GBean are deprecated.

Dynamic cache provider overview

The WebSphere Application Server provides a Dynamic Cache service that is available to deployed Java EE applications. This service is used to cache data such as output from servlet, JSP or commands, as well as object data progamatically specified within an enterprise application using the DistributedMap APIs.

Initially, the only service provider for the Dynamic Cache service was the default dynamic cache engine that is built into WebSphere Application Server. Today customers can also specify WXS to be the cache provider for any given cache instance. By setting up this capability, we can enable applications that use the Dynamic Cache service, to use the features and performance capabilities of WXS.

We can install and configure the dynamic cache provider

Deciding how to use WXS

The available features in WXS significantly increase the distributed capabilities of the Dynamic Cache service beyond what is offered by the default dynamic cache provider and data replication service. With eXtreme Scale, we can create caches that are truly distributed between multiple servers, rather than just replicated and synchronized between the servers. Also, eXtreme Scale caches are transactional and highly available, ensuring that each server sees the same contents for the dynamic cache service. WXS offers a higher quality of service for cache replication provided via DRS.

However, these advantages do not mean that the WXS dynamic cache provider is the right choice for every application. Use the decision trees and feature comparison matrix below to determine what technology fits the application best.

Decision tree for migrating existing dynamic cache applications

Decision tree for choosing a cache provider for new applications

Feature comparison

Feature DynaCache WXS provider WXS API
Local, in-memory caching Yes via Near-cache capability via Near-cache capability
Distributed caching via DRS Yes Yes
Linearly scalable No Yes Yes
Reliable replication (synchronous) No Yes Yes
Disk overflow Yes N/A N/A
Eviction LRU/TTL/heap-based LRU/TTL (per partition) LRU/TTL (per partition)
Invalidation Yes Yes Yes
Relationships Dependency / template ID relationships Yes No (other relationships are possible)
Non-key lookups No No via Query and index
Back-end integration No No via Loaders
Transactional No Yes Yes
Key-based storage Yes Yes Yes
Events and listeners Yes No Yes
WebSphere Application Server integration Single cell only Multiple cell Cell independent
Java Standard Edition support No Yes Yes
Monitoring and statistics Yes Yes Yes
Security Yes Yes Yes

An eXtreme Scale distributed cache can only store entries where the key and the value both implement the java.io.Serializable interface.

Topology types

Deprecated: The local, embedded, and embedded-partitioned topology types are deprecated.

A dynamic cache service created with WXS as the provider can be deployed in a remote topology .

Remote topology

The remote topology eliminates the need for a disk cache. All of the cache data is stored outside of WebSphere Application Server processes.

WXS supports standalone container processes for cache data. These container processes have a lower overhead than a WebSphere Application Server process and are also not limited to using a particular JVM. For example, the data for a dynamic cache service being accessed by a 32-bit WebSphere Application Server process could be located in a WXS container process running on a 64-bit JVM. This allows users to use the increased memory capacity of 64-bit processes for caching, without incurring the additional overhead of 64-bit for application server processes. The remote topology is shown in the following image:

Dynamic cache engine and eXtreme Scale functional differences

Users should not notice a functional difference between the two caches except that the WXS backed caches do not support disk offload or statistics and operations related to the size of the cache in memory.

There will be no appreciable difference in the results returned by most Dynamic Cache API calls, regardless of whether the customer is using the default dynamic cache provider or the WXS cache provider. For some operations we cannot emulate the behavior of the dynamic cache engine using eXtreme Scale.

Dynamic cache statistics

Statistical data for a WXS dynamic cache can be retrieved using the WXS monitoring tooling.

MBean calls

The WXS dynamic cache provider does not support disk caching. Any MBean calls relating to disk caching will not work.

Dynamic cache replication policy mapping

The WXS dynamic cache provider's remote topology supports a replication policy that most closely matches the SHARED_PULL and SHARED_PUSH_PULL policy (using the terminology used by the default WebSphere Application Server dynamic cache provider). In a WXS dynamic cache, the distributed state of the cache is completely consistent between all the servers.

Global index invalidation

Use a global index to improve invalidation efficiency in large partitioned environments; for example, more than 40 partitions. Without the global index feature, the dynamic cache template and dependency invalidation processing must send remote agent requests to all partitions, which results in slower performance. When configuring a global index, invalidation agents are sent only to applicable partitions that contain cache entries related to the Template or Dependency ID. The potential performance improvement will be greater in environments with large numbers of partitions configured. We can configure a global index using the Dependency ID and Template ID indexes, which are available in the example dynamic cache objectGrid descriptor XML files.


When a cache is running in a remote topology, it is possible for a standaloneeXtreme Scale client to connect to the cache and affect the contents of the dynamic cache instance. It is therefore important WXS servers containing the dynamic cache instances reside in an internal network, behind what is typically known as the network DMZ.

Near cache

A dynamic cache instance can be configured to create a maintain a near cache, which will reside locally within the application server JVM and will contain a subset of the entries contained within the remote dynamic cache instance. We can configure a near cache instance using file...

Database integration

WXS is used to front a traditional database and eliminate read activity normally pushed to the database. A coherent cache can be used with an application directly or indirectly using an object relational mapper. The coherent cache can then offload the database or backend from reads. In a slightly more complex scenario, such as transactional access to a data set where only some of the data requires traditional persistence guarantees, filtering can be used to offload even write transactions.

We can configure WXS to function as a highly flexible in-memory database processing space. However, WXS is not an object relational mapper (ORM). It does not know where the data in the data grid came from. An application or an ORM can place data in a WXS server.

It is the responsibility of the source of the data to make sure that it stays consistent with the database where data originated. This means WXS cannot invalidate data that is pulled from a database automatically. The application or mapper must provide this function and manage the data stored in WXS.

ObjectGrid as a database buffer

ObjectGrid as a side cache

Sparse and complete cache

WXS can be used as a sparse cache or a complete cache. A sparse cache only keeps a subset of the total data, while a complete cache keeps all of the data. and can be populated lazily, as the data is needed. Sparse caches are normally accessed using keys (instead of indexes or queries) because the data is only partially available.

When a key is not present in a sparse cache, or the data is not available and a cache miss occurs, the next tier is invoked. The data is fetched, from a database for example, and is inserted into the data grid cache tier. If we are using a query or index, only the currently loaded values are accessed and the requests are not forwarded to the other tiers.

A complete cache contains all of the required data and can be accessed using non-key attributes with indexes or queries. A complete cache is preloaded with data from the database before the application tries to access the data. A complete cache can function as a database replacement after data is loaded. Because all of the data is available, queries and indexes can be used to find and aggregate data.

Side cache

With a side cache, WXS is used to temporarily store objects that would normally be retrieved from a back-end database. Applications check to see if the data grid contains the data. If the data is in the data grid, the data is returned to the caller. If the data does not exist, the data is retrieved from the back-end database. The data is then inserted into the data grid so that the next request can use the cached copy.

Side cache plug-ins for Hibernate and OpenJPA

WXS can be used as a side-cache for OpenJPA and Hibernate by installing the bundled plug-ins. Because the cache is automatically replicated between all processes, when one client caches a value, all other clients can use the cached value.

In-line cache

In-line caching uses WXS as the primary means for interacting with the data. We can configure in-line caching for a database back end or as a side cache for a database. When used as an in-line cache, WXS interacts with the back end using a Loader plug-in. Applications can access the WXS APIs directly. Several different caching scenarios are supported in WXS to synchronize the data in the cache and the data in the back end.

In-line caching allows applications to access the WXS APIs directly. WXS supports several in-line caching scenarios...

Write-behind caching

Use write-behind caching to reduce the overhead that occurs when updating a database we are using as a back end.

Write-behind caching asynchronously queues updates to the Loader plug-in. We can improve performance by disconnecting updates, inserts, and removes for a map, the overhead of updating the back-end database. The asynchronous update is performed after a time-based delay (for example, five minutes) or an entry-based delay (1000 entries).

The write-behind configuration on a BackingMap creates a thread between the loader and the map. The loader then delegates data requests through the thread according to the configuration settings in the BackingMap.setWriteBehind method. When a WXS transaction inserts, updates, or removes an entry from a map, a LogElement object is created for each of these records. These elements are sent to the write-behind loader and queued in a special ObjectMap called a queue map. Each backing map with the write-behind setting enabled has its own queue maps. A write-behind thread periodically removes the queued data from the queue maps and pushes them to the real back-end loader.

The write-behind loader only sends insert, update, and delete types of LogElement objects to the real loader. All other types of LogElement objects, for example, EVICT type, are ignored.

Write-behind support is an extension of the Loader plug-in, which you use to integrate WXS with the database.


Enabling write-behind support has the following benefits:

Back end failure isolation Write-behind caching provides an isolation layer from back end failures. When the back-end database fails, updates are queued in the queue map. The applications can continue driving transactions to WXS. When the back end recovers, the data in the queue map is pushed to the back-end.
Reduced back end load The write-behind loader merges the updates on a key basis so only one merged update per key exists in the queue map. This merge decreases the number of updates to the back-end database.
Improved transaction performance Individual WXS transaction times are reduced because the transaction does not need to wait for the data to be synchronized with the back-end.


With a Loader plug-in, a data grid map can behave as a memory cache for data that is typically kept in a persistent store on either the same system or another system. Typically, a database or file system is used as the persistent store. A remote JVM can also be used as the source of data, allowing hub-based caches to be built using eXtreme Scale. A loader has the logic for reading and writing data to and from a persistent store.

Loaders are backing map plug-ins that are invoked when changes are made to the backing map or when the backing map is unable to satisfy a data request (a cache miss). The Loader is invoked when the cache is unable to satisfy a request for a key, providing read-through capability and lazy-population of the cache. A loader also allows updates to the database when cache values change. All changes within a transaction are grouped together to allow the number of database interactions to be minimized.

A TransactionCallback plug-in is used in conjunction with the loader to trigger the demarcation of the backend transaction. Using this plug-in is important when multiple maps are included in a single transaction or when transaction data is flushed to the cache without committing.

The loader can also use overqualified updates to avoid keeping database locks. By storing a version attribute in the cache value, the loader can see the before and after image of the value as it is updated in the cache. This value can then be used when updating the database or back end to verify that the data has not been updated.

A Loader can also be configured to preload the data grid when it is started. When partitioned, a Loader instance is associated with each partition. If the "Company" Map has ten partitions, there are ten Loader instances, one per primary partition. When the primary shard for the Map is activated, the preloadMap method for the loader is invoked synchronously or asynchronously which allows loading the map partition with data from the back-end to occur automatically. When invoked synchronously, all client transactions are blocked, preventing inconsistent access to the data grid. Alternatively, a client preloader can be used to load the entire data grid.

Two built-in loaders can simplify integration with relational database back ends.

The JPA loaders utilize the Object-Relational Mapping (ORM) capabilities of both the OpenJPA and Hibernate implementations of the JPA specification.

If we are using loaders in a multiple data center configuration, you must consider how revision data and cache consistency is maintained between the data grids.

Loader configuration

To add a Loader into the BackingMap configuration, we can use either...

A loader has the following relationship with a backing map.

Data pre-loading and warm-up

We can prepare data grids by pre-loading them with data. Complete cache data grids, must hold all data before any clients can connect to it. Sparse cache data grids can be warmed up so that clients can have immediate access to the data.

Two approaches exist for pre-loading data into the data grid:

Database synchronization techniques

When WXS is used as a cache, applications must be written to tolerate stale data if the database can be updated independently from a WXS transaction. To serve as a synchronized in-memory database processing space, eXtreme Scale provides several ways of keeping the cache updated.

Periodic refresh

The cache can be automatically invalidated or updated periodically using the JPA time-based database updater.The updater periodically queries the database using a JPA provider for any updates or inserts that have occurred since the previous update. Any changes identified are automatically invalidated or updated when used with a sparse cache . If used with a complete cache, the entries can be discovered and inserted into the cache. Entries are never removed from the cache.


Sparse caches can utilize eviction policies to automatically remove data from the cache without affecting the database. There are three built-in policies included in eXtreme Scale: time-to-live, least-recently-used, and least-frequently-used. All three policies can optionally evict data more aggressively as memory becomes constrained by enabling the memory-based eviction option.

Event-based invalidation

Sparse and complete caches can be invalidated or updated using an event generator such as Java Message Service (JMS). Invalidation using JMS can be manually tied to any process that updates the back-end using a database trigger. A JMS ObjectGridEventListener plug-in is provided in eXtreme Scale that can notify clients when the server cache has any changes. This can decrease the amount of time the client can see stale data.

Programmatic invalidation

The eXtreme Scale APIs allow manual interaction of the near and server cache using...

If a client or server process no longer needs a portion of the data, the invalidate methods can be used to remove data from the near or server cache. The beginNoWriteThrough method applies any ObjectMap or EntityManager operation to the local cache without calling the loader. If invoked from a client, the operation applies only to the near cache (the remote loader is not invoked). If invoked on the server, the operation applies only to the server core cache without invoking the loader.

Data invalidation

To remove stale cache data, we can use invalidation mechanisms.

Administrative invalidation

Use the web console or the xscmd utility to invalidate data based on the key. We can filter the cache data with a regular expression and then invalidate the data based on the regular expression.

Event-based invalidation

Sparse and complete caches can be invalidated or updated using an event generator such as Java Message Service (JMS). Invalidation using JMS can be manually tied to any process that updates the back-end using a database trigger. A JMS ObjectGridEventListener plug-in is provided in eXtreme Scale that can notify clients when the server cache changes. This type of notification decreases the amount of time the client can see stale data.

Event-based invalidation normally consists of the following three components.

Event queue Store the data change events. It could be a JMS queue, a database, an in-memory FIFO queue, or any kind of manifest as long as it can manage the data change events.
Event publisher Publish the data change events to the event queue. Usually an application you create or a WXS plug-in implementation. The event publisher knows when the data is changed or it changes the data itself. When a transaction commits, events are generated for the changed data and the event publisher publishes these events to the event queue.
Event consumer Consume data change events. Usually an application to ensure the target grid data is updated with the latest change from other grids. This event consumer interacts with the event queue to get the latest data change and applies the data changes in the target grid. The event consumers can use eXtreme Scale APIs to invalidate stale data or update the grid with the latest data.

For example, JMSObjectGridEventListener has an option for a client-server model, in which the event queue is a designated JMS destination. All server processes are event publishers. When a transaction commits, the server gets the data changes and publishes them to the designated JMS destination. All the client processes are event consumers. They receive the data changes from the designated JMS destination and apply the changes to the client's near cache .

Programmatic invalidation

The WXS APIs allow manual interaction of the near and server cache using...

If a client or server process no longer needs a portion of the data, the invalidate methods can be used to remove data from the near or server cache. The beginNoWriteThrough method applies any ObjectMap or EntityManager operation to the local cache without calling the loader. If invoked from a client, the operation applies only to the near cache (the remote loader is not invoked). If invoked on the server, the operation applies only to the server core cache without invoking the loader.

Use programmatic invalidation with other techniques to determine when to invalidate the data. For example, this invalidation method uses event-based invalidation mechanisms to receive the data change events, and then uses APIs to invalidate the stale data.

Near cache invalidation

If we are using a near cache, we can configure an asynchronous invalidation that is triggered each time an update, delete, invalidation operation is run against the data grid. Because the operation is asynchronous, you might still see stale data in the data grid.

To enable near cache invalidation, set the nearCacheInvalidationEnabled attribute on the backing map in the ObjectGrid descriptor XML file.


Index types and configuration

The indexing feature is represented by the MapIndexPlugin plug-in or Index for short. The Index is a BackingMap plug-in. A BackingMap can have multiple Index plug-ins configured, as long as each one follows the Index configuration rules.

Use the indexing feature to build one or more indexes on a BackingMap. An index is built from an attribute or a list of attributes of an object in the BackingMap. This feature provides a way for applications to find certain objects more quickly. With the indexing feature, applications can find objects with a specific value or within a range of values of indexed attributes.

Two types of indexing are possible: static and dynamic.

With static indexing, configure the index plug-in on the BackingMap before initializing the ObjectGrid instance. We can do this configuration with XML or programmatic configuration of the BackingMap. Static indexing starts building an index during ObjectGrid initialization. The index is always synchronized with the BackingMap and ready for use. After the static indexing process starts, the maintenance of the index is part of the WXS transaction management process. When transactions commit changes, these changes also update the static index, and index changes are rolled back if the transaction is rolled back.

With dynamic indexing, we can create an index on a BackingMap before or after the initialization of the containing ObjectGrid instance. Applications have life cycle control over the dynamic indexing process so that we can remove a dynamic index when it is no longer needed. When an application creates a dynamic index, the index might not be ready for immediate use because of the time it takes to complete the index building process. Because the amount of time depends upon the amount of data indexed, the DynamicIndexCallback interface is provided for applications that want to receive notifications when certain indexing events occur. These events include ready, error, and destroy. Applications can implement this callback interface and register with the dynamic indexing process.

If a BackingMap has an index plug-in configured, we can obtain the application index proxy object from the corresponding ObjectMap. Calling the getIndex method on the ObjectMap and passing in the name of the index plug-in returns the index proxy object. Cast the index proxy object to an appropriate application index interface, such as MapIndex, MapRangeIndex, MapGlobalIndex, or a customized index interface. After obtaining the index proxy object, we can use methods defined in the application index interface to find cached objects.

Indexing setup summary...

  1. Add either static or dynamic index plug-ins into the BackingMap.
  2. Obtain an application index proxy object by issuing the getIndex method of the ObjectMap.
  3. Cast the index proxy object to an appropriate application index interface, such as MapIndex, MapRangeIndex, or a customized index interface.
  4. Use methods that are defined in application index interface to find cached objects.

The HashIndex class is the built-in index plug-in implementation that can support the following built-in application index interfaces:

You also can create your own indexes. We can add HashIndex as either a static or dynamic index into the BackingMap, obtain either the MapIndex, MapRangeIndex, or MapGlobalIndex index proxy object, and use the index proxy object to find cached objects.

Global index

Global index is an extension of the built-in HashIndex class that runs on shards in distributed, partitioned data grid environments. It tracks the location of indexed attributes and provides efficient ways to find partitions, keys, values, or entries using attributes in large, partitioned data grid environments.

If global index is enabled in the built-in HashIndex plug-in, then applications can cast an index proxy object MapGlobalIndex type, and use it to find data.

Default index

To iterate through the keys in a local map, we can use the default index . This index does not require any configuration, but it must be used against the shard, using an agent or an ObjectGrid instance retrieved from the method...

Indexing data quality consideration

The results of index query methods only represent a snapshot of data at a point of time. No locks against data entries are obtained after the results return to the application. Data updates may occur on a returned data set. For example, the application obtains the key of a cached object by running the findAll method of MapIndex. This returned key object is associated with a data entry in the cache. The application should be able to run the get method on ObjectMap to find an object by providing the key object. If another transaction removes the data object from the cache just before the get method is called, the returned result will be null.

Indexing performance considerations

If indexing is not used properly, application performance may suffer. Consider the following factors...

The indexing function can improve BackingMap performance drastically. Ideal cases are when the BackingMap has mostly read operations, the query result set is of a small percentage of the BackingMap entries, and only few indexes are built over the BackingMap.

JPA Loaders

The JPA specification maps Java objects to relational databases.

JPA contains a full object-relational mapping (ORM) specification using Java language metadata annotations, XML descriptors, or both to define the mapping between Java objects and a relational database. A number of open-source and commercial implementations are available.

Use a JPA loader plug-in implementation with WXS to interact with any database supported by your chosen loader. To use JPA, you must have...

The JPALoader plug-in...

...and the JPAEntityLoader plug-in...

...synchronize the ObjectGrid maps with a database. A JPA implementation, such as Hibernate or OpenJPA, is required. The database can be any back end that is supported by the chosen JPA provider.

JPA loader architecture

The JPA Loader is used for eXtreme Scale maps that store plain old Java objects (POJO).

When an ObjectMap.get(Object key) method is called, the WXS run time first checks whether the entry is contained in the ObjectMap layer. If not, the run time delegates the request to the JPA Loader. Upon request of loading the key, to find the data from the JPA layer, the JPALoader calls the method...

If the data is contained in the JPA entity manager, it is returned; otherwise, the JPA provider interacts with the database to get the value.

When an update to ObjectMap occurs, for example, using the method...

...the WXS run time creates a LogElement for this update and sends it to the JPALoader.

To update the value to the database the JPALoader calls the method...

For the JPAEntityLoader, the same four layers are involved. However, because the JPAEntityLoader plug-in is used for maps that store WXS entities, relations among entities could complicate the usage scenario. An WXS entity is distinguished from JPA entity.


Loaders provide three main methods:

Both JPALoader and JPAEntityLoader plug-ins work with the JPATxCallback class to coordinate the WXS transactions and JPA transactions. JPATxCallback must be configured in the ObjectGrid instance to use these two loaders.

Serialization overview

Data is always expressed, but not necessarily stored, as Java objects in the data grid. WXS uses multiple Java processes to serialize the data, by converting the Java object instances to bytes and back to objects again, as needed, to move the data between client and server processes.

Data is serialized, meaning it is converted into a data stream for transmission over a network, in the following situations:

Alternatively, you might decide to forgo the serialization process through WXS and store raw data as byte arrays, which are much cheaper to store in memory since the JVM has fewer objects to search for during garbage collection, and they can be deserialized only when needed. Only use byte arrays if you do not need to access the objects using queries or indexes. Since the data is stored as bytes, WXS has no metadata for describing attributes to query.

To serialize data in WXS, we can use...

To optimize serialization with any of these options, we can use the COPY_TO_BYTES mode to improve performance up to 70 percent because the data is serialized when transactions commit, which means that serialization happens only once. The serialized data is sent unchanged from the client to the server or from the server to replicated server. By using the COPY_TO_BYTES mode, we can reduce the memory footprint that a large graph of objects can consume.

Serialization using Java

Java serialization refers to either...

ObjectTransformer plug-in

With the ObjectTransformer plug-in, we can serialize, deserialize, and copy objects in the cache for increased performance.

The ObjectTransformer interface has been replaced by the DataSerializer plug-ins, which we can use to efficiently store arbitrary data in WXS so that existing product APIs can efficiently interact with your data.

If you see performance issues with processor usage, add an ObjectTransformer plug-in to each map. If you do not provide an ObjectTransformer plug-in, up to 60-70 percent of the total processor time is spent serializing and copying entries.

With the ObjectTransformer plug-in, the applications can provide custom methods for the following operations:

If no ObjectTransformer plug-in is provided, you must be able to serialize the keys and values because the ObjectGrid uses a serialize and deserialize sequence to copy the objects. This method is expensive, so use an ObjectTransformer plug-in when performance is critical. The copying occurs when an application looks up an object in a transaction for the first time. We can avoid this copying by setting the copy mode of the Map to NO_COPY or reduce the copying by setting the copy mode to COPY_ON_READ. Optimize the copy operation when needed by the application by providing a custom copy method on this plug-in. Such a plug-in can reduce the copy overhead from 65.70 percent to 2/3 percent of total processor time.

The default copyKey and copyValue method implementations first attempt to use the clone method, if the method is provided. If no clone method implementation is provided, the implementation defaults to serialization.

Object serialization is also used directly when the WXS is running in distributed mode. The LogSequence uses the ObjectTransformer plug-in to help serialize keys and values before transmitting the changes to peers in the ObjectGrid. You must take care when providing a custom serialization method instead of using the built-in Java developer kit serialization. Object versioning is a complex issue and you might encounter problems with version compatibility if you do not ensure that your custom methods are designed for versioning.

The following list describes how the WXS tries to serialize both keys and values:

Use the ObjectTransformer interface

An ObjectTransformer object must implement the ObjectTransformer interface and follow the common ObjectGrid plug-in conventions.

Two approaches, programmatic configuration and XML configuration, are used to add an ObjectTransformer object into the BackingMap configuration as follows.

Programmatically plug in an ObjectTransformer object

The following code snippet creates the custom ObjectTransformer object and adds it to a BackingMap:

ObjectGridManager objectGridManager = ObjectGridManagerFactory.getObjectGridManager();
ObjectGrid myGrid = objectGridManager.createObjectGrid("myGrid", false);
BackingMap backingMap = myGrid.getMap("myMap");
MyObjectTransformer myObjectTransformer = new MyObjectTransformer();

XML configuration approach to plug in an ObjectTransformer

Assume that the class name of the ObjectTransformer implementation is the com.company.org.MyObjectTransformer class. This class implements the ObjectTransformer interface. An ObjectTransformer implementation can be configured using the following XML:

<?xml version="1.0" encoding="UTF-8"?>
<objectGridConfig xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://ibm.com/ws/objectgrid/config ../objectGrid.xsd"



<objectGrid name="myGrid">

<backingMap name="myMap" pluginCollectionRef="myMap" />




<backingMapPluginCollection id="myMap">

<bean id="ObjectTransformer" className="com.company.org.MyObjectTransformer" />


</backingMapPluginCollections> </objectGridConfig>

ObjectTransformer usage scenarios

We can use the ObjectTransformer plug-in in the following situations:

In the following example, ObjectGrid is used to store the Stock class:

* Stock object for ObjectGrid demo

public class Stock implements Cloneable 
    String ticket;
    double price;
    String company;
    String description;
    int serialNumber;
    long lastTransactionTime;

    * @return Return the description.
    public String getDescription() 
        return description;

    * @param description The description to set.
    public void setDescription(String description) 
        this.description = description;

    * @return Return the lastTransactionTime.
    public long getLastTransactionTime() 
        return lastTransactionTime;

    * @param lastTransactionTime The lastTransactionTime to set.
    public void setLastTransactionTime(long lastTransactionTime) 
        this.lastTransactionTime = lastTransactionTime;

    * @return Return the price.
    public double getPrice() 
        return price;

    * @param price The price to set.
    public void setPrice(double price) 
        this.price = price;

    * @return Return the serialNumber.
    public int getSerialNumber() 
        return serialNumber;

    * @param serialNumber The serialNumber to set.
    public void setSerialNumber(int serialNumber) 
        this.serialNumber = serialNumber;

    * @return Return the ticket.
    public String getTicket() 
        return ticket;

    * @param ticket The ticket to set.
    public void setTicket(String ticket) 
        this.ticket = ticket;

    * @return Return the company.
    public String getCompany() 
        return company;

    * @param company The company to set.
    public void setCompany(String company) 
        this.company = company;

    public Object clone() throws CloneNotSupportedException
        return super.clone();

We can write a custom object transformer class for the Stock class:

* Custom implementation of ObjectGrid ObjectTransformer for stock object
public class MyStockObjectTransformer implements ObjectTransformer 
    /* (non.Javadoc)
    * @see
    * com.ibm.websphere.objectgrid.plugins.ObjectTransformer#serializeKey 
    * (java.lang.Object, * java.io.ObjectOutputStream)

    public void serializeKey(Object key, ObjectOutputStream stream) throws IOException 
        String ticket= (String) key;
    /* (non.Javadoc)
    * @see com.ibm.websphere.objectgrid.plugins.
      ObjectTransformer#serializeValue(java.lang.Object, java.io.ObjectOutputStream)

    public void serializeValue(Object value, ObjectOutputStream stream) throws IOException 
        Stock stock= (Stock) value;
    /* (non.Javadoc)
    * @see com.ibm.websphere.objectgrid.plugins.

    public Object inflateKey(ObjectInputStream stream) throws IOException, ClassNotFoundException 
        String ticket=stream.readUTF();
        return ticket;}
    /* (non.Javadoc)
    * @see com.ibm.websphere.objectgrid.plugins.
    public Object inflateValue(ObjectInputStream stream) throws IOException, ClassNotFoundException 
        Stock stock=new Stock();
        return stock;}
    /* (non.Javadoc)
    * @see com.ibm.websphere.objectgrid.plugins.

    public Object copyValue(Object value) 
        Stock stock = (Stock) value;
            return stock.clone();
        catch (CloneNotSupportedException e)
            // display exception message    
    /* (non.Javadoc)
    *  @see com.ibm.websphere.objectgrid.plugins.

    public Object copyKey(Object key) 
        String ticket=(String) key;
        String ticketCopy= new String (ticket);
        return ticketCopy;

Then, plug in this custom MyStockObjectTransformer class into the BackingMap:

Serialization using the DataSerializer plug-ins

Use the DataSerializer plug-ins to efficiently store arbitrary data in WebSphere eXtreme Scale so that existing product APIs can efficiently interact with your data.

Serialization methods such as Java serialization and the ObjectTransformer plug-in allow data to be marshalled over the network. In addition, when you use these serialization options with the COPY_TO_BYTES copy mode, moving data between clients and servers becomes less expensive and performance is improved. However, these options do not solve the following issues that can exist:

The DataSerializer plug-ins introduce an efficient way of solving these problems. Specifically, the DataSerializer plug-in gives you a way to describe your serialization format, or byte array, to WXS so that the product can interrogate the byte array without requiring a specific object format. The public DataSerializer plug-in classes and interfaces are in the package...

Entity Java objects are not stored directly into the BackingMaps when you use the EntityManager API. The EntityManager API converts the entity object to Tuple objects. Entity maps are automatically associated with a highly optimized ObjectTransformer. Whenever the ObjectMap API or EntityManager API is used to interact with entity maps, the ObjectTransformer entity is invoked. Therefore, when you use entities, no work is required for serialization because the product automatically completes this process for you.

See also Oracle Java Serialization API

Scalability overview

WXS is scalable through the use of partitioned data, and can scale to thousands of containers if required because each container is independent from other containers.

WXS divides data sets into distinct partitions that can be moved between processes or even between physical servers at run time. We can, for example, start with a deployment of four servers and then expand to a deployment with 10 servers as the demands on the cache grow. Just as we can add more physical servers and processing units for vertical scalability, we can extend the elastic scaling capability horizontally with partitioning. Horizontal scaling is a major advantage to using WXS over an in-memory database. In-memory databases can only scale vertically.

With WXS, we can also use a set of APIs to gain transactional access this partitioned and distributed data. The choices you make for interacting with the cache are as significant as the functions to manage the cache for availability from a performance perspective.

Scalability is not available when containers communicate with one another. The availability management, or core grouping, protocol is an O(N2) heartbeat and view maintenance algorithm, but is mitigated by keeping the number of core group members under 20. Only peer to peer replication between shards exists.

Distributed clients

The WXS client protocol supports very large numbers of clients. The partitioning strategy offers assistance by assuming that all clients are not always interested in all partitions because connections can be spread across multiple containers. Clients are connected directly to the partitions so latency is limited to one transferred connection.

Data grids, partitions, and shards

A data grid is divided into partitions. A partition holds an exclusive subset of the data. A partition contains one or more shards: a primary shard and replica shards. Replica shards are not necessary in a partition, but we can use replica shards to provide high availability. Whether the deployment is an independent in-memory data grid or an in-memory database processing space, data access in WXS relies heavily on shards.

The data for a partition is stored in a set of shards at run time. This set of shards includes a primary shared and possibly one or more replica shards. A shard is the smallest unit that WXS can add or remove from a JVM.

Two placement strategies exist: fixed partition placement (default) and per container placement. The following discussion focuses on the usage of the fixed partition placement strategy.

Total number of shards

If your environment includes 10 partitions that hold 1 million objects with no replicas, then 10 shards would exist that each store 100,000 objects. If you add a replica to this scenario, then an extra shard exists in each partition. In this case, 20 shards exist: 10 primary shards and 10 replica shards. Each one of these shards store 100,000 objects. Each partition consists of a primary shard and one or more (N) replica shards. Determining the optimal shard count is critical. If you configure few shards, data is not distributed evenly among the shards, resulting in out of memory errors and processor overloading issues. You must have at least 10 shards for each JVM as you scale. When initially deploying the data grid, you would potentially use many partitions.

Number of shards per JVM scenarios

Scenario: Small number of shards for each JVM

Data is added and removed from a JVM using shard units. Shards are never split into pieces. If 10 GB of data existed, and 20 shards exist to hold this data, then each shard holds 500 MB of data on average. If nine JVMs host the data grid, then on average each JVM has two shards. Because 20 is not evenly divisible by 9, a few JVMs have three shards, in the following distribution:

Because each shard holds 500 MB of data, the distribution of data is unequal. The seven JVMs with two shards each host 1 GB of data. The two JVMs with three shards have 50% more data, or 1.5 GB, which is a much larger memory burden. Because the two JVMs are hosting three shards, they also receive 50% more requests for their data. As a result, having few shards for each JVM causes imbalance. To increase the performance, you increase the number of shards for each JVM.

Scenario: Increased number of shards per JVM

In this scenario, consider a much larger number of shards. In this scenario, there are 101 shards with nine JVMs hosting 10 GB of data. In this case, each shard holds 99 MB of data. The JVMs have the following distribution of shards:

The two JVMs with 12 shards now have just 99 MB more data than the other shards, which is a 9% difference. This scenario is much more evenly distributed than the 50% difference in the scenario with few shards. From a processor use perspective, only 9% more work exists for the two JVMs with the 12 shards compared to the seven JVMs that have 11 shards. By increasing the number of shards in each JVM, the data and processor use is distributed in a fair and even way.

When creating your system, use 10 shards for each JVM in its maximally sized scenario, or when the system is running its maximum number of JVMs in your planning horizon.

Additional placement factors

The number of partitions, the placement strategy, and number and type of replicas are set in the deployment policy. The number of shards placed depend on the deployment policy that you define. The minSyncReplicas, developmentMode, maxSyncReplicas, and maxAsyncReplicas attributes affect where partitions and replicas are placed.

The following factors affect when shards can be placed:

If the maximum number of replicas are not placed during the initial startup, additional replicas might be placed if you start additional servers later. When planning the number of shards per JVM, the maximum number of primary and replica shards is dependent on having enough JVMs started to support the configured maximum number of replicas. A replica is never placed in the same process as its primary. If a process is lost, both the primary and the replica are lost. When the developmentMode attribute is set to false, the primary and replicas are not placed on the same physical server.


Use partitioning to scale out an application. We can define the number of partitions in the deployment policy.

Partitioning is not like Redundant Array of Independent Disks (RAID) striping, which slices each instance across all stripes. Each partition hosts the complete data for individual entries. Partitioning is a very effective means for scaling, but is not applicable to all applications. Applications that require transactional guarantees across large sets of data do not scale and cannot be partitioned effectively. WXS does not currently support two-phase commit across partitions.

The number of partitions defined in the deployment policy affects the number of container servers to which an application can scale. Each partition is made up of a primary shard and the configured number of replica shards.

For a data grid with four containers, an unreasonable configuration is 2000 partitions, each with two shards, running on four containers. This configuration results in 4000 shards placed on four containers or 1000 shards per container.

A better configuration would be under 5 partitions (10 shards) for each expected container. This configuration still gives the possibility of allowing for elastic scaling that is ten times the initial configuration while keeping a reasonable number of shards per container.

Consider this scaling example: 60 containers on 30 physical servers. With four shards per container, total shards would be 240. If partitions contain a primary shard and one replica shard, you would want 120 partitions.

ObjectMap and partitioning

With the default FIXED_PARTITION placement strategy, maps are split across partitions and keys hash to different partitions. The client does not need to know to which partition the keys belong. If a mapSet has multiple maps, the maps should be committed in separate transactions.

Entities and partitioning

Entity manager entities have an optimization that helps clients working with entities on a server. The entity schema on the server for the map set can specify a single root entity. The client accesses all entities through the root entity. The entity manager can then find related entities from that root in the same partition without requiring the related maps to have a common key. The root entity establishes affinity with a single partition. This partition is used for all entity fetches within the transaction after affinity is established. This affinity can save memory because the related maps do not require a common key. The root entity must be specified with a modified entity annotation as shown in the following example:

Use the entity to find the root of the object graph. The object graph defines the relationships between one or more entities. Each linked entity must resolve to the same partition. All child entities are assumed to be in the same partition as the root. The child entities in the object graph are only accessible from a client from the root entity. Root entities are always required in partitioned environments when using a WXS client to communicate to the server. Only one root entity type can be defined per client. Root entities are not required when using Extreme Transaction Processing (XTP) style ObjectGrids, because all communication to the partition is accomplished through direct, local access and not through the client and server mechanism.

Placement and partitions

You have two placement strategies available for WXS: fixed partition and per-container.

Fixed partition placement

The default placement strategy is fixed-partition placement, enabled with the FIXED_PARTITION setting in the deployment policy XML file. The number of primary shards placed across the available containers is equal to the number of partitions configured (numberOfPartitions).

If you have configured replicas, the minimum total number of shards placed...

The maximum total number of shards placed...

Your WXS deployment spreads these shards over the available containers. The keys of each map are hashed into assigned partitions based on the total partitions you have defined. They keys hash to the same partition even if the partition moves because of failover or server changes.

If the numberPartitions value is 6 and the minSync value is 1 for MapSet1, the total shards for that map set is 12 because each of the 6 partitions requires a synchronous replica. If three containers are started, WXS places four shards per container for MapSet1.

Per-container placement

The alternate placement strategy is per-container placement, which is enabled with the PER_CONTAINER setting for the placementStrategy attribute in the map set element in the deployment XML file. With this strategy, the number of primary shards placed on each new container is equal to the number of partitions, P, configured. The WXS deployment environment places P replicas of each partition for each remaining container. The numInitialContainers setting is ignored when we are using per-container placement. The partitions get larger as the containers grow. The keys for maps are not fixed to a certain partition in this strategy. The client routes to a partition and uses a random primary. If a client wants to reconnect to the same session that it used to find a key again, it must use a session handle.

For failover or stopped servers, the WXS environment moves the primary shards in the per-container placement strategy if they still contain data. If the shards are empty, they are discarded. In the per-container strategy, old primary shards are not kept because new primary shards are placed for every container.

WXS allows per-container placement as an alternative to what could be termed the "typical" placement strategy, a fixed-partition approach with the key of a Map hashed to one of those partitions. In a per-container case (which you set with PER_CONTAINER), the deployment places the partitions on the set of online container servers and automatically scales them out or in as containers are added or removed from the server data grid. A data grid with the fixed-partition approach works well for key-based grids, where the application uses a key object to locate data in the grid. The following discusses the alternative.

Example of a per-container data grid

PER_CONTAINER data grids are different. You specify that the data grid uses the PER_CONTAINER placement strategy with the placementStrategy attribute in the deployment XML file. Instead of configuring how many partitions total we want in the data grid, you specify how many partitions we want per container that you start.

For example, if you set five partitions per container, five new anonymous partition primaries are created when you start that container server, and the necessary replicas are created on the other deployed container servers.

The following is a potential sequence in a per-container environment as the data grid grows.

  1. Start container C0 hosting 5 primaries (P0 - P4).

    • C0 hosts: P0, P1, P2, P3, P4.

  2. Start container C1 hosting 5 more primaries (P5 - P9). Replicas are balanced on the containers.

    • C0 hosts: P0, P1, P2, P3, P4, R5, R6, R7, R8, R9.
    • C1 hosts: P5, P6, P7, P8, P9, R0, R1, R2, R3, R4.

  3. Start container C2 hosting 5 more primaries (P10 - P14). Replicas are balanced further.

    • C0 hosts: P0, P1, P2, P3, P4, R7, R8, R9, R10, R11, R12.
    • C1 hosts: P5, P6, P7, P8, P9, R2, R3, R4, R13, R14.
    • C2 hosts: P10, P11, P12, P13, P14, R5, R6, R0, R1.

The pattern continues as more containers are started, creating five new primary partitions each time and rebalancing replicas on the available containers in the data grid.

WXS does not move primary shards when using the PER_CONTAINER strategy, only replicas.

Remember that the partition numbers are arbitrary and have nothing to do with keys, so we cannot use key-based routing. If a container stops then the partition IDs created for that container are no longer used, so there is a gap in the partition IDs. In the example, there would no longer be partitions P5 - P9 if the container C1 failed, leaving only P0 - P4 and P10 - P14, so key-based hashing is impossible.

Use numbers like five or even more likely 10 for how many partitions per container works best if you consider the consequences of a container failure. To spread the load of hosting shards evenly across the data grid, you need more than just one partition for each container. If we had a single partition per container, then when a container fails, only one container (the one hosting the corresponding replica shard) must bear the full load of the lost primary. In this case, the load is immediately doubled for the container. However, if you have five partitions per container, then five containers pick up the load of the lost container, lowering impact on each by 80 percent. Using multiple partitions per container generally lowers the potential impact on each container substantially. More directly, consider a case in which a container spikes unexpectedly.the replication load of that container is spread over 5 containers rather than only one.

Use the per-container policy

Several scenarios make the per-container strategy an ideal configuration, such as with HTTP session replication or application session state. In such a case, an HTTP router assigns a session to a servlet container. The servlet container needs to create an HTTP session and chooses one of the 5 local partition primaries for the session. The "ID" of the partition chosen is then stored in a cookie. The servlet container now has local access to the session state which means zero latency access to the data for this request as long as you maintain session affinity. And WXS replicates any changes to the partition.

In practice, remember the repercussions of a case in which you have multiple partitions per container (say 5 again). Of course, with each new container started, you have 5 more partition primaries and 5 more replicas. Over time, more partitions should be created and they should not move or be destroyed. But this is not how the containers would actually behave. When a container starts, it hosts 5 primary shards, which can be called "home" primaries, existing on the respective containers that created them. If the container fails, the replicas become primaries and WXS creates 5 more replicas to maintain high availability (unless you disabled auto repair). The new primaries are in a different container than the one that created them, which can be called "foreign" primaries. The application should never place new state or sessions in a foreign primary. Eventually, the foreign primary has no entries and WXS automatically deletes it and its associated replicas. The foreign primaries' purpose is to allow existing sessions to still be available (but not new sessions).

A client can still interact with a data grid that does not rely on keys. The client just begins a transaction and stores data in the data grid independent of any keys. It asks the Session for a SessionHandle object, a serializable handle allowing the client to interact with the same partition when necessary. WXS chooses a partition for the client from the list of home partition primaries. It does not return a foreign primary partition. The SessionHandle can be serialized in an HTTP cookie, for example, and later convert the cookie back into a SessionHandle. Then the WXS APIs can obtain a Session bound to the same partition again, using the SessionHandle.

We cannot use agents to interact with a PER_CONTAINER data grid.


The previous description is different from a normal FIXED_PARTITION or hash data grid because the per-container client stores data in a place in the grid, gets a handle to it and uses the handle to access it again. There is no application-supplied key as there is in the fixed-partition case.

Your deployment does not make a new partition for each Session. So in a per-container deployment, the keys used to store data in the partition must be unique within that partition. For example, you may have your client generate a unique SessionID and then use it as the key to find information in Maps in that partition. Multiple client sessions then interact with the same partition so the application needs to use unique keys to store session data in each given partition.

The previous examples used 5 partitions, but the numberOfPartitions parameter in the objectgrid XML file can be used to specify the partitions as required. Instead of per data grid, the setting is per container. (The number of replicas is specified in the same way as with the fixed-partition policy.)

The per-container policy can also be used with multiple zones. If possible, WXS returns a SessionHandle to a partition whose primary is located in the same zone as that client. The client can specify the zone as a parameter to the container or by using an API. The client zone ID can be set using serverproperties or clientproperties.

The PER_CONTAINER strategy for a data grid suits applications storing conversational type state rather than database-oriented data. The key to access the data would be a conversation ID and is not related to a specific database record. It provides higher performance (because the partition primaries can be collocated with the servlets for example) and easier configuration (without having to calculate partitions and containers).

Single-partition and cross-data-grid transactions

The major distinction between WXS and traditional data storage solutions like relational databases or in-memory databases is the use of partitioning, which allows the cache to scale linearly. The important types of transactions to consider are single-partition and every-partition (cross-data-grid) transactions.

In general, interactions with the cache can be categorized as single-partition transactions or cross-data-grid transactions, as discussed in the following section.

Single-partition transactions

Single-partition transactions are the preferable method for interacting with caches hosted by WXS.

When a transaction is limited to a single partition, then by default it is limited to a single JVM, and therefore a single server computer. A server can complete M number of these transactions per second, and if you have N computers, we can complete M*N transactions per second. If your business increases and you need to perform twice as many of these transactions per second, we can double N by buying more computers. Then we can meet capacity demands without changing the application, upgrading hardware, or even taking the application offline.

In addition to letting the cache scale so significantly, single-partition transactions also maximize the availability of the cache. Each transaction only depends on one computer. Any of the other (N-1) computers can fail without affecting the success or response time of the transaction. So if we are running 100 computers and one of them fails, only 1 percent of the transactions in flight at the moment that server failed are rolled back. After the server fails, WXS relocates the partitions hosted by the failed server to the other 99 computers. During this brief period, before the operation completes, the other 99 computers can still complete transactions. Only the transactions that would involve the partitions that are being relocated are blocked. After the failover process is complete, the cache can continue running, fully operational, at 99 percent of its original throughput capacity. After the failed server is replaced and returned to the data grid, the cache returns to 100 percent throughput capacity.

Cross-data-grid transactions

In terms of performance, availability and scalability, cross-data-grid transactions are the opposite of single-partition transactions . Cross-data-grid transactions access every partition and therefore every computer in the configuration. Each computer in the data grid is asked to look up some data and then return the result. The transaction cannot complete until every computer has responded, and therefore the throughput of the entire data grid is limited by the slowest computer. Adding computers does not make the slowest computer faster and therefore does not improve the throughput of the cache.

Cross-data-grid transactions have a similar effect on availability. Extending the previous example, if we are running 100 servers and one server fails, then 100 percent of the transactions that are in progress at the moment that server failed are rolled back. After the server fails, WXS starts to relocate the partitions hosted by that server to the other 99 computers.

During this time, before the failover process completes, the data grid cannot process any of these transactions. After the failover process is complete, the cache can continue running, but at reduced capacity. If each computer in the data grid serviced 10 partitions, then 10 of the remaining 99 computers receive at least one extra partition as part of the failover process. Adding an extra partition increases the workload of that computer by at least 10 percent. Because the throughput of the data grid is limited to the throughput of the slowest computer in a cross-data-grid transaction, on average, the throughput is reduced by 10 percent.

Single-partition transactions are preferable to cross-data-grid transactions for scaling out with a distributed, highly available, object cache like WXS. Maximizing the performance of these kinds of systems requires the use of techniques that are different from traditional relational methodologies, but we can turn cross-data-grid transactions into scalable single-partition transactions .

Best practices for building scalable data models

The best practices for building scalable applications with products like WXS include two categories: foundational principles and implementation tips. Foundational principles are core ideas that need to be captured in the design of the data itself. An application that does not observe these principles is unlikely to scale well, even for its mainline transactions. Implementation tips are applied for problematic transactions in an otherwise well-designed application that observes the general principles for scalable data models.

Foundational principles

Some of the important means of optimizing scalability are basic concepts or principles to keep in mind.

Implementation tips

We can also consider the following tips to achieve the best scalability.

Scaling in units or pods

Although we can deploy a data grid over thousands of JVMs, you might consider splitting the data grid into units or pods to increase the reliability and ease of testing of the configuration. A pod is a group of servers running the same set of applications.

Deploy a large single data grid

Testing has verified that eXtreme Scale can scale out to over 1000 JVMs. Such testing encourages building applications to deploy single data grids on large numbers of boxes. Although it is possible to do this, it is not recommended, for several reasons:

  1. Budget concerns: Your environment cannot realistically test a 1000-server data grid. However, it can test a much smaller data grid considering budget reasons, so you do not need to buy twice the hardware, especially for such a large number of servers.

  2. Different application versions: Requiring a large number of boxes for each testing thread is not practical. The risk is that we are not testing the same factors as you would in a production environment.

  3. Data loss: Running a database on a single hard drive is unreliable. Any problem with the hard drive causes you to lose data. Running a growing application on a single data grid is similar. You will likely have bugs in your environment and in the applications. So placing all of the data on a single large system will often lead to a loss of large amounts of data.

Splitting the data grid

Splitting the application data grid into pods (units) is a more reliable option. A pod is a group of servers running a homogenous application stack. Pods can be of any size, but ideally they should consist of about 20 physical servers. Instead of having 500 physical servers in a single data grid, we can have 25 pods of 20 physical servers. A single version of an application stack should run on a given pod, but different pods can have their own versions of an application stack.

Generally, an application stack considers levels of the following components.

A pod is a conveniently sized deployment unit for testing. Instead of having hundreds of servers for testing, it is more practical to have 20 servers. In this case, we are still testing the same configuration as you would have in production. Production uses grids with a maximum size of 20 servers, constituting a pod. We can stress-test a single pod and determine its capacity, number of users, amount of data, and transaction throughput. This makes planning easier and follows the standard of having predictable scaling at predictable cost.

Set up a pod-based environment

In different cases, the pod does not necessarily have to have 20 servers. The purpose of the pod size is for practical testing. The size of a pod should be small enough that if a pod encounters problems in production, the fraction of transactions affected is tolerable.

Ideally, any bug impacts a single pod. A bug would only have an impact on four percent of the application transactions rather than 100 percent. In addition, upgrades are easier because they can be rolled out one pod at a time. As a result, if an upgrade to a pod creates problems, the user can switch that pod back to the prior level. Upgrades include any changes to the application, the application stack, or system updates. As much as possible, upgrades should only change a single element of the stack at a time to make problem diagnosis more precise.

To implement an environment with pods, you need a routing layer above the pods that is forwards and backwards compatible if pods get software upgrades. Also, you should create a directory that includes information about which pod has what data. Use another WXS data grid for this with a database behind it, preferably using the write-behind scenario.) This yields a two-tier solution. Tier 1 is the directory and is used to locate which pod handles a specific transaction. Tier 2 is composed of the pods themselves. When tier 1 identifies a pod, the setup routes each transaction to the correct server in the pod, which is usually the server holding the partition for the data used by the transaction. Optionally, we can also use a near cache on tier 1 to lower the impact associated with looking up the correct pod.

Use pods is slightly more complex than having a single data grid, but the operational, testing, and reliability improvements make it a crucial part of scalability testing.

High availability

With high availability, WXS provides reliable data redundancy and detection of failures.

WXS self-organizes data grids of JVMs into a loosely federated tree. The catalog service at the root and core groups holding containers are at the leaves of the tree.

Each core group is automatically created by the catalog service into groups of about 20 servers. The core group members provide health monitoring for other members of the group. Also, each core group elects a member to be the leader for communicating group information to the catalog service. Limiting the core group size allows for good health monitoring and a highly scalable environment.

In a WebSphere Application Server environment, in which core group size can be altered, WXS does not support more than 50 members per core group.

Heart beating

  1. Sockets are kept open between JVMs, and if a socket closes unexpectedly, this unexpected closure is detected as a failure of the peer JVM. This detection catches failure cases such as the JVM exiting very quickly. Such detection also allows recovery from these types of failures typically in less than a second.

  2. Other types of failures include: an operating system panic, physical server failure, or network failure. These failures are discovered through heart beating .

Heartbeats are sent periodically between pairs of processes: When a fixed number of heartbeats are missed, a failure is assumed. This approach detects failures in N*M seconds. N is the number of missed heart beats and M is the heartbeat interval. Directly specifying M and N is not supported. A slider mechanism is used to allow a range of tested M and N combinations to be used.


There are several ways that a process can fail. The process could fail because some resource limit was reached, such as maximum heap size, or some process control logic terminated a process. The operating system could fail, causing all of the processes running on the system to be lost. Hardware can fail, though less frequently, like the network interface card (NIC), causing the operating system to be disconnected from the network. Many more points of failure can occur, causing the process to be unavailable. In this context, all of these failures can be categorized into one of two types: process failure and loss of connectivity.

Process failure

WXS reacts to process failures quickly. When a process fails, the operating system is responsible for cleaning up any left over resources that the process was using. This cleanup includes port allocation and connectivity. When a process fails, a signal is sent over the connections that were being used by that process to close each connection. With these signals, a process failure can be instantly detected by any other process connected to the failed process.

Loss of connectivity

Loss of connectivity occurs when the operating system becomes disconnected. As a result, the operating system cannot send signals to other processes. There are several reasons that loss of connectivity can occur, but they can be split into two categories: host failure and islanding .

Host failure

If the machine is unplugged from the power outlet, then it is gone instantly.


This scenario presents the most complicated failure condition for software to handle correctly because the process is presumed to be unavailable, though it is not. Essentially, a server or other process appears to the system to have failed while it is actually running properly.

Container failures

Container failures are generally discovered by peer containers through the core group mechanism. When a container or set of containers fails, the catalog service migrates the shards that were hosted on that container or containers. The catalog service looks for a synchronous replica first before migrating to an asynchronous replica. After the primary shards are migrated to new host containers, the catalog service looks for new host containers for the replicas that are now missing.

Container islanding - The catalog service migrates shards off containers when the container is discovered to be unavailable. If those containers then become available, the catalog service considers the containers eligible for placement just like in the normal startup flow.
Container failure detection latency

Failures can be categorized into soft and hard failures. Soft failures are typically caused when a process fails. Such failures are detected by the operating system, which can recover used resources, such as network sockets, quickly. Typical failure detection for soft failures is less than one second. Hard failures might take up to 200 seconds to detect with the default heart beat tuning. Such failures include: physical machine crashes, network cable disconnects, or operating system failures. The run time relies on heart beating to detect hard failures which can be configured.

Catalog service failure

Because the catalog service grid is a WXS grid, it also uses the core grouping mechanism in the same way as the container failure process. The primary difference is that the catalog service domain uses a peer election process for defining the primary shard instead of the catalog service algorithm used for the containers.

The placement service and the core grouping service are One of N services. A One of N service runs in one member of the high availability group. The location service and administration run in all of the members of the high availability group. The placement service and core grouping service are singletons because they are responsible for laying out the system. The location service and administration are read-only services and exist everywhere to provide scalability.

The catalog service uses replication to make itself fault tolerant. If a catalog service process fails, then the service restarts to restore the system to the wanted level of availability. If all of the processes that are hosting the catalog service fail, the data grid has a loss of critical data. This failure results in a required restart of all the container servers. Because the catalog service can run on many processes, this failure is an unlikely event. However, if we are running all of the processes on a single box, within a single blade chassis, or from a single network switch, a failure is more likely to occur. Try to remove common failure modes from boxes that are hosting the catalog service to reduce the possibility of failure.

Multiple container failures

A replica is never placed in the same process as its primary because if the process is lost, it would result in a loss of both the primary and the replica. In a development environment on a single machine, you might want to have two containers and replicate between them. We can define the development mode attribute in the deployment policy to configure a replica to be placed on the same machine as a primary. However, in production, using a single machine is not sufficient because loss of that host results in the loss of both container servers. To change between development mode on a single machine and a production mode with multiple machines, disable development mode in the deployment policy configuration file.

Discovery detection mechanism Recovery method
Process loss I/O Restart
Server loss Heartbeat Restart
Network outage Heartbeat Reestablish network and connection
Server-side hang Heartbeat Stop and restart server
Server busy Heartbeat Wait until server is available

Replication for availability

Replication provides fault tolerance and increases performance for a distributed WXS topology. Replication is enabled by associating backing maps with a map set.

About map sets

A map set is a collection of maps that are categorized by a partition-key. This partition-key is derived from the key on the individual map by taking its hash modulo the number of partitions. If one group of maps within the map set has partition-key X, those maps are stored in a corresponding partition X in the data grid. If another group has partition-key Y, all of the maps are stored in partition Y, and so on. The data within the maps is replicated based on the policy defined on the map set. Replication occurs on distributed topologies.

Map sets are assigned the number of partitions and a replication policy. The map set replication configuration identifies the number of synchronous and asynchronous replica shards for the map set must in addition to the primary shard . For example, if one synchronous and one asynchronous replica exist, all of the BackingMaps that are assigned to the map set each have a replica shard distributed automatically within the set of available container server s for the data grid. The replication configuration can also enable clients to read data from synchronously replicated servers. This can spread the load for read requests over additional servers in the WXS. Replication has a programming model impact only when preloading the backing maps.

Map preloading

Maps can be associated with Loaders. A loader is used to fetch objects when they cannot be found in the map (a cache miss) and also to write changes to a back-end when a transaction commits. Loaders can also be used for preloading data into a map. The preloadMap method of the Loader interface is called on each map when its corresponding partition in the map set becomes a primary. The preloadMap method is not called on replicas. It attempts to load all the intended referenced data from the back-end into the map using the provided session. The relevant map is identified by the BackingMap argument that is passed to the preloadMap method.

void preloadMap(Session session, BackingMap backingMap) throws LoaderException;

Preloading in partitioned map set

Maps can be partitioned into N partitions. Maps can therefore be striped across multiple servers, with each entry identified by a key that is stored only on one of those servers. Very large maps can be held in a WXS because the application is no longer limited by the heap size of a single JVM to hold all the entries of a Map. Applications that want to preload with the preloadMap method of the Loader interface must identify the subset of the data that it preloads. A fixed number of partitions always exists. We can determine this number using the following code example:

int numPartitions = backingMap.getPartitionManager().getNumOfPartitions();
int myPartition = backingMap.getPartitionId();
This code example shows that an application can identify the subset of the data to preload from the database. Applications must always use these methods even when the map is not initially partitioned. These methods allow flexibility: If the map is later partitioned by the administrators, then the loader continues to work correctly.

The application must issue queries to retrieve the myPartition subset from the backend. If a database is used, then it might be easier to have a column with the partition identifier for a given record unless there is some natural query that allows the data in the table to partition easily.


The preload implementation copies data from the back-end into the map by storing multiple objects in the map in a single transaction. The optimal number of records to store per transaction depends on several factors, including complexity and size. For example, after the transaction includes blocks of more than 100 entries, the performance benefit decreases as you increase the number of entries. To determine the optimal number, begin with 100 entries and then increase the number until the performance benefit decreases to none. Larger transactions result in better replication performance. Remember, only the primary runs the preload code. The preloaded data is replicated from the primary to any replicas that are online.

Preloading map sets

If the application uses a map set with multiple maps then each map has its own loader. Each loader has a preload method. Each map is loaded serially by the WXS. It might be more efficient to preload all the maps by designating a single map as the preloading map. This process is an application convention. For example, two maps, department and employee, might use the department Loader to preload both the department and the employee maps. This procedure ensures that, transactionally, if an application wants a department then the employees for that department are in the cache. When the department Loader preloads a department from the back-end, it also fetches the employees for that department. The department object and its associated employee objects are then added to the map using a single transaction.

Recoverable preloading

Some customers have very large data sets that need caching. Preloading this data can be very time consuming. Sometimes, the preloading must complete before the application can go online. We can benefit from making preloading recoverable. Suppose there are a million records to preload. The primary is preloading them and fails at the 800,000th record. Normally, the replica chosen to be the new primary clears any replicated state and starts from the beginning. WXS can use a ReplicaPreloadController interface. The loader for the application would also need to implement the ReplicaPreloadController interface.

This example adds a single method to the Loader:

<>ul> Status checkPreloadStatus(Session session, BackingMap bmap);

This method is called by the WXS run time before the preload method of the Loader interface is normally called. WXS tests the result of this method (Status) to determine its behavior whenever a replica is promoted to a primary.

WXS response
Status.PRELOADED_ALREADY WXS does not call the preload method at all because this status value indicates that the map is fully preloaded.
Status.FULL_PRELOAD_NEEDED WXS clears the map and calls the preload method normally.
Status.PARTIAL_PRELOAD_NEEDED WXS leaves the map as-is and calls preload. This strategy allows the application loader to continue preloading from that point onwards.

Clearly, while a primary is preloading the map, it must leave some state in a map in the map set that is being replicated so that the replica determines what status to return. Use an extra map named, for example, RecoveryMap. This RecoveryMap must be part of the same map set that is being preloaded to ensure that the map is replicated consistently with the data being preloaded. A suggested implementation follows.

As the preload commits each block of records, the process also updates a counter or value in the RecoveryMap as part of that transaction. The preloaded data and the RecoveryMap data are replicated atomically to the replicas. When the replica is promoted to primary, it can now check the RecoveryMap to see what has happened.

The RecoveryMap can hold a single entry with the state key. If no object exists for this key then you need a full preload (checkPreloadStatus returns FULL_PRELOAD_NEEDED). If an object exists for this state key and the value is COMPLETE, the preload completes, and the checkPreloadStatus method returns PRELOADED_ALREADY. Otherwise, the value object indicates where the preload restarts and the checkPreloadStatus method returns: PARTIAL_PRELOAD_NEEDED. The loader can store the recovery point in an instance variable for the loader so that when preload is called, the loader knows the starting point. The RecoveryMap can also hold an entry per map if each map is preloaded independently.

Handling recovery in synchronous replication mode with a Loader

The WXS run time is designed not to lose committed data when the primary fails. The following section shows the algorithms used. These algorithms apply only when a replication group uses synchronous replication. A loader is optional.

The WXS run time can be configured to replicate all changes from a primary to the replicas synchronously. When a synchronous replica is placed, it receives a copy of the existing data on the primary shard . During this time, the primary continues to receive transactions and copies them to the replica asynchronously. The replica is not considered to be online at this time.

After the replica catches up the primary, the replica enters peer mode and synchronous replication begins. Every transaction committed on the primary is sent to the synchronous replicas and the primary waits for a response from each replica. A synchronous commit sequence with a Loader on the primary looks like the following set of steps:

Step with loader Step without loader
Get locks for entries same
Flush changes to the loader no-op
Save changes to the cache same
Send changes to replicas and wait for acknowledgment same
Commit to the loader through the TransactionCallback plug-in Plug-in commit called, but does nothing
Release locks for entries same

Notice that the changes are sent to the replica before they are committed to the loader. To determine when the changes are committed on the replica, revise this sequence: At initialize time, initialize the tx lists on the primary as below.

CommitedTx = {}, RolledBackTx = {}

During synchronous commit processing, use the following sequence:

Step with loader Step without loader
Get locks for entries same
Flush changes to the loader no-op
Save changes to the cache same
Send changes with a committed transaction, roll back transaction to replica, and wait for acknowledgment same
Clear list of committed transactions and rolled back transactions same
Commit the loader through the TransactionCallBack plug-in TransactionCallBack plug-in commit is still called, but typically does not do anything
If commit succeeds, add the transaction to the committed transactions, otherwise add to the rolled back transactions no-op
Release locks for entries same

For replica processing, use the following sequence:

  1. Receive changes
  2. Commit all received transactions in the committed transaction list
  3. Roll back all received transactions in the rolled back transaction list
  4. Start a transaction or session
  5. Apply changes to the transaction or session
  6. Save the transaction or session to the pending list
  7. Send back reply

Notice that on the replica, no loader interactions occur while the replica is in replica mode. The primary must push all changes through the Loader. The replica does not push any changes. A side effect of this algorithm is that the replica always has the transactions, but they are not committed until the next primary transaction sends the commit status of those transactions. The transactions are then committed or rolled back on the replica. Until then, the transactions are not committed. We can add a timer on the primary that sends the transaction outcome after a small period (a few seconds). This timer limits, but does not eliminate, any staleness to that time window. This staleness is only a problem when using replica read mode. Otherwise, the staleness does not have an impact on the application.

When the primary fails, it is likely that a few transactions were committed or rolled back on the primary, but the message never made it to the replica with these outcomes. When a replica is promoted to the new primary, one of the first actions is to handle this condition. Each pending transaction is reprocessed against the new primary's set of maps. If there is a Loader, then each transaction is given to the Loader. These transactions are applied in strict first in first out (FIFO) order. If a transaction fails, it is ignored. If three transactions are pending, A, B, and C, then A might commit, B might rollback, and C might also commit. No one transaction has any impact on the others. Assume that they are independent.

A loader might want to use slightly different logic when it is in failover recovery mode versus normal mode. The loader can easily know when it is in failover recovery mode by implementing the ReplicaPreloadController interface. The checkPreloadStatus method is only called when failover recovery completes. Therefore, if the apply method of the Loader interface is called before the checkPreloadStatus method, then it is a recovery transaction. After the checkPreloadStatus method is called, the failover recovery is complete.

Load balancing across replicas

The WXS, unless configured otherwise, sends all read and write requests to the primary server for a given replication group. The primary must service all requests from clients. You might want to allow read requests to be sent to replicas of the primary. Sending read requests to the replicas allows the load of the read requests to be shared by multiple JVMs. However, using replicas for read requests can result in inconsistent responses.

Load balancing across replicas is typically used only when clients are caching data that is changing all the time or when the clients are using pessimistic locking .

If the data is continually changing and then being invalidated in client near caches, the primary should see a relatively high get request rate from clients as a result. Likewise, in pessimistic locking mode, no local cache exists, so all requests are sent to the primary.

If the data is relatively static or if pessimistic mode is not used, then sending read requests to the replica does not have a large impact on performance. The frequency of get requests from clients with caches that are full of data is not high.

When a client first starts, its near cache is empty. Cache requests to the empty cache are forwarded to the primary. The client cache gets data over time, causing the request load to drop. If many clients start concurrently, then the load might be significant and replica read might be an appropriate performance choice.

Client-side replication

With WXS, we can replicate a server map to one or more clients by using asynchronous replication. A client can request a local read-only copy of a server side map using the ClientReplicableMap.enableClientReplication method.

The first parameter is the replication mode. This mode can be a continuous replication or a snapshot replication. The second parameter is an array of partition IDs that represent the partitions from which to replicate the data. If the value is null or an empty array, the data is replicated from all the partitions. The last parameter is a listener to receive client replication events. See ClientReplicableMap and ReplicationMapListener in the API documentation for details.

After the replication is enabled, then the server starts to replicate the map to the client. The client is eventually only a few transactions behind the server at any point in time.

High availability catalog service

A catalog service domain is the data grid of catalog servers we are using, which retain topology information for all of the container servers in your WXS environment. The catalog service controls balancing and routing for all clients.

Figure 1. Catalog service domain

When multiple catalog servers start, one of the servers is elected as the master catalog server that accepts heartbeats and handles system data changes in response to any catalog service or container changes.

Configure at least three catalog servers in the catalog service domain. Catalog servers must be installed on separate nodes or separate installation images from your container servers to ensure that we can seamlessly upgrade your servers at a later date. If the configuration has zones, we can configure one catalog server per zone.

When a container server contacts one of the catalog servers, the routing table for the catalog service domain is also propagated to the catalog server and container server through the CORBA service context. Furthermore, if the contacted catalog server is not currently the master catalog server, the request is automatically rerouted to the current master catalog server and the routing table for the catalog server is updated.

A catalog service domain and the container server data grid are very different. The catalog service domain is for high availability of your system data. The container server data grid is for your data high availability, scalability, and workload management. Therefore, two different routing tables exist: the routing table for the catalog service domain and the routing table for the container server data grid shards.

Catalog server quorums

When the quorum mechanism is enabled, all the catalog servers in the quorum must be available for placement operations to occur in the data grid.

Heartbeats and failure detection

Container servers and core groups

The catalog service places container servers into core groups of a limited size. A core group tries to detect the failure of its members. A single member of a core group is elected to be the core group leader. The core group leader periodically tells the catalog service that the core group is alive and reports any membership changes to the catalog service. A membership change can be a JVM failing or a newly added JVM that joins the core group.

If a JVM socket is closed, that JVM is regarded as being no longer available. Each core group member also heart beats over these sockets at a rate determined by configuration. If a JVM does not respond to these heartbeats within a configured maximum time period, then the JVM is considered to be no longer available, which triggers a failure detection.

If the catalog service marks a container (JVM) as failed and the container server is later reported as being available, the container is told to shut down the WXS container servers. A in this state is not visible in xscmd utility command queries. Messages in the logs of the container indicate that the container has failed. You must manually restart these JVMs.

If the core group leader cannot contact any member, it continues to retry contacting the member.

The complete failure of all members of a core group is also a possibility. If the entire core group has failed, it is the responsibility of the catalog service to detect this loss.

Catalog service domain heart-beating

The catalog service domain looks like a private core group with a static membership and a quorum mechanism. It detects failures the same way as a normal core group. However, the behavior is modified to include quorum logic. The catalog service also uses a less aggressive heart-beating configuration.

Failure detection

WXS detects when processes terminate through abnormal socket closure events. The catalog service is notified immediately when a process terminates.

Quorum behavior

Normally, the members of the catalog service have full connectivity. The catalog service domain is a static set of JVMs. WXS expects all members of the catalog service to be online. When all the members are online, the catalog service has quorum. The catalog service responds to container events only while the catalog service has quorum.

Reasons for quorum loss

WXS expects to lose quorum for the following scenarios:

WXS does not lose quorum in the following scenario:

If the catalog service loses a quorum, it waits for quorum to be reestablished. While the catalog service does not have a quorum, it ignores events from container servers. Container servers continue to try any requests that are rejected by the catalog server during this time. Heart-beating is suspended until a quorum is reestablished.

Quorum loss from JVM failure

A catalog server that fails causes quorum to be lost. If a JVM fails, quorum can be reestablished by either overriding quorum or by restarting the failed catalog server.

loss from network brownout

WXS is designed to expect the possibility of brownouts. A brownout is when a temporary loss of connectivity occurs between data centers. Brown outs are usually transient and clear within seconds or minutes. While WXS tries to maintain normal operation during the brownout period, a brownout is regarded as a single failure event. The failure is expected to be fixed and then normal operation resumes with no actions necessary.

A long duration brown out can be classified as a blackout only through user intervention.

Overriding quorum on one side of the brownout is required in order for the event to be classified as a blackout.

Catalog service JVM cycling

If a catalog server is stopped using the stopOgServer command, then the quorum drops to one less server. The remaining servers still have quorum. Restarting the catalog server sets quorum back to the previous number.

Consequences of lost quorum

If a container JVM was to fail while quorum is lost, recovery does not occur until the brownout recovers. In a blackout scenario, the recovery does not occur until we run the override quorum command. Quorum loss and a container failure as are considered a double failure, which is a rare event. Because of the double failure, applications might lose write access to data that was stored on the failed JVM. When quorum is restored, the normal recovery occurs.

Similarly, if you attempt to start a container during a quorum loss event, the container does not start.

Full client connectivity is allowed during quorum loss. If no container failures or connectivity issues happen during the quorum loss event then clients can still fully interact with the container servers.

If a brownout occurs, then some clients might not have access to primary or replica copies of the data until the brownout clears.

New clients can be started because a catalog service JVM must exist in each data center. Therefore, at least one catalog server can be reached by a client even during a brownout event.

Quorum recovery

If quorum is lost for any reason, when quorum is reestablished, a recovery protocol is run. When the quorum loss event occurs, all liveness checking for core groups is suspended and failure reports are also ignored. After quorum is back, then the catalog service checks all the core groups to immediately determine their membership. Any shards previously hosted on container JVMs reported as failed are recovered. If primary shards were lost, then surviving replicas are promoted to being primary shards. If replica shards were lost then additional replicas shards are created on the survivors.

Overriding quorum

Override quorum only when a data center failure has occurred. Quorum loss due to a catalog service JVM failure or a network brownout recovers automatically after the catalog service JVM is restarted or the network brownout ends.

Administrators are the only ones with knowledge of a data center failure. WXS treats a brownout and a blackout similarly. Inform the WXS environment of such failures with the xscmd -c overrideQuorum command. This command tells the catalog service to assume that quorum is achieved with the current membership, and full recovery takes place. When issuing an override quorum command, we are guaranteeing that the JVMs in the failed data center have truly failed and do not have a chance of recovering.

The following list considers some scenarios for overriding quorum. In this scenario, you have three catalog servers: A, B, and C.

Container behavior during quorum loss

Containers host one or more shards. Shards are either primaries or replicas for a specific partition. The catalog service assigns shards to a container and the container server uses that assignment until new instructions arrive from the catalog service. For example, a primary shard continues to try communication with its replica shards during network brownouts, until the catalog service provides further instructions to the primary shard .

Synchronous replica behavior

The primary shard can accept new transactions while the connection is broken if the number of replicas online are at least at the minsync property value for the map set. If any new transactions are processed on the primary shard while the link to the synchronous replica is broken, the replica is resynchronized with the current state of the primary when the link is reestablished.

Do not configure synchronous replication between data centers or over a WAN-style link.

Asynchronous replica behavior

While the connection is broken, the primary shard can accept new transactions. The primary shard buffers the changes up to a limit. If the connection with the replica is reestablished before that limit is reached then the replica is updated with the buffered changes. If the limit is reached, then the primary destroys the buffered list and when the replica reattaches then it is cleared and resynchronized.

Client behavior during quorum loss

Clients are always able to connect to the catalog server to bootstrap to the data grid whether the catalog service domain has quorum or not. The client tries to connect to any catalog server instance to obtain a route table and then interact with the data grid. Network connectivity might prevent the client from interacting with some partitions due to network setup. The client might connect to local replicas for remote data if it has been configured to do so. Clients cannot update data if the primary partition for that data is not available.

Replicas and shards

With WXS, an in-memory database or shard can be replicated from one JVM to another. A shard represents a partition that is placed on a container. Multiple shards that represent different partitions can exist on a single container. Each partition has an instance that is a primary shard and a configurable number of replica shards. The replica shards are either synchronous or asynchronous. The types and placement of replica shards are determined by WXS using a deployment policy, which specifies the minimum and maximum number of synchronous and asynchronous shards.

Shard types

Replication uses three types of shards:

The primary shard receives all insert, update and remove operations. The primary shard adds and removes replicas, replicates data to the replicas, and manages commits and rollbacks of transactions.

Synchronous replicas maintain the same state as the primary. When a primary replicates data to a synchronous replica, the transaction is not committed until it commits on the synchronous replica.

Asynchronous replicas might or might not be at the same state as the primary. When a primary replicates data to an asynchronous replica, the primary does not wait for the asynchronous replica to commit.

Figure 1. Communication path between a primary shard and replica shards

Minimum synchronous replica shards

When a primary prepares to commit data, it checks how many synchronous replica shards voted to commit the transaction. If the transaction processes normally on the replica, it votes to commit. If something went wrong on the synchronous replica, it votes not to commit. Before a primary commits, the number of synchronous replica shards that are voting to commit must meet the minSyncReplica setting from the deployment policy. When the number of synchronous replica shards that are voting to commit is too low, the primary does not commit the transaction and an error results. This action ensures that the required number of synchronous replicas are available with the correct data. Synchronous replicas that encountered errors reregister to fix their state.

The primary throws a ReplicationVotedToRollbackTransactionException error if too few synchronous replicas voted to commit.

Replication and Loaders

Normally, a primary shard writes changes synchronously through the Loader to a database. The Loader and database are always in sync. When the primary fails over to a replica shard, the database and Loader might not be in synch. For example:

Either approach leads to either the replica being one transaction in front of or behind the database. This situation is not acceptable. WXS uses a special protocol and a contract with the Loader implementation to solve this issue without two phase commit. The protocol follows:

Primary side

Replica side

Replica side

on failover

This protocol ensures that the database is at the same level as the new primary state.

Shard placement

The catalog service is responsible for placing shards. Each ObjectGrid has a number of partitions, each of which has a primary shard and an optional set of replica shards. The catalog service allocates the shards by balancing them so that they are evenly distributed over the available container servers. Replica and primary shards for the same partition are never placed on the same container server or the same IP address, unless the configuration is in development mode.

If a new container server starts, then WXS retrieves shards from relatively overloaded container servers to the new empty container server. This movement of shards enables horizontal scaling.

Scaling out

Scaling out means that when extra container servers are added to a data grid, WXS tries to move existing shards, primaries or replicas, from the old set of container servers to the new set. This movement expands the data grid to take advantage of the processor, network and memory of the newly added container servers. The movement also balances the data grid and tries to ensure that each JVM in the data grid hosts the same amount of data. As the data grid expands, each server hosts a smaller subset of the total grid. WXS assumes that data is distributed evenly among the partitions. This expansion enables scaling out .

Scaling in

Scaling in means that if a JVM fails, then WXS tries to repair the damage. If the failed JVM had a replica, then WXS replaces the lost replica by creating a new replica on a surviving JVM. If the failed JVM had a primary, then WXS finds the best replica on the survivors and promotes the replica to be the new primary. WXS then replaces the promoted replica with a new replica that is created on the remaining servers. To maintain scalability, WXS preserves the replica count for partitions as servers fail.

Figure 1. Placement of an ObjectGrid map set with a deployment policy of 3 partitions with a minSyncReplicas value of 1, a maxSyncReplicas value of 1, and a maxAsyncReplicas value of 1

Reading from replicas

We can configure map sets such that a client is permitted to read from a replica rather than being restricted to primary shards only.

It can often be advantageous to allow replicas to serve as more than simply potential primaries in the case of failures. For example, map sets can be configured to allow read operations to be routed to replicas by setting the replicaReadEnabled option on the MapSet to true. The default setting is false.

Enable reading of replicas can improve performance by spreading read requests to more JVMs. If the option is not enabled, all read requests such as the ObjectMap.get or the Query.getResultIterator methods are routed to the primary. When replicaReadEnabled is set to true, some get requests might return stale data, so an application using this option must be able to tolerate this possibility. However, a cache miss will not occur. If the data is not on the replica, the get request is redirected to the primary and tried again.

The replicaReadEnabled option can be used with both synchronous and asynchronous replication.

Load balancing across replicas

Load balancing across replicas is typically used only when clients are caching data that is changing all the time or when the clients are using pessimistic locking .

The WXS, unless configured otherwise, sends all read and write requests to the primary server for a given replication group. The primary must service all requests from clients. You might want to allow read requests to be sent to replicas of the primary. Sending read requests to the replicas allows the load of the read requests to be shared by multiple JVM. However, using replicas for read requests can result in inconsistent responses.

Load balancing across replicas is typically used only when clients are caching data that is changing all the time or when the clients are using pessimistic locking .

If the data is continually changing and then being invalidated in client near caches, the primary should see a relatively high get request rate from clients as a result. Likewise, in pessimistic locking mode, no local cache exists, so all requests are sent to the primary.

If the data is relatively static or if pessimistic mode is not used, then sending read requests to the replica does not have a big impact on performance. The frequency of get requests from clients with caches that are full of data is not high.

When a client first starts, its near cache is empty. Cache requests to the empty cache are forwarded to the primary. The client cache gets data over time, causing the request load to drop. If a large number of clients start concurrently, then the load might be significant and replica read might be an appropriate performance choice.

Shard lifecycles

Shards go through different states and events to support replication. The lifecycle of a shard includes coming online, run time, shut down, fail over and error handling. Shards can be promoted from a replica shard to a primary shard to handle server state changes.

Lifecycle events

When primary and replica shards are placed and started, they go through a series of events to bring themselves online and into listening mode.

Primary shard

The catalog service places a primary shard for a partition. The catalog service also does the work of balancing primary shard locations and initiating failover for primary shards.

When a shard becomes a primary shard, it receives a list of replicas from the catalog service. The new primary shard creates a replica group and registers all the replicas.

When the primary is ready, an open for business message displays in the SystemOut.log file for the container on which it is running. The open message, or the CWOBJ1511I message, lists the map name, map set name, and partition number of the primary shard that started.

Replica shard

Replica shards are mainly controlled by the primary shard unless the replica shard detects a problem. During a normal lifecycle, the primary shard places, registers, and de-registers a replica shard.

When the primary shard initializes a replica shard, a message displays the log that describes where the replica runs to indicate that the replica shard is available. The open message, or the CWOBJ1511I message, lists the map name, map set name, and partition number of the replica shard. This message follows:


Asynchronous replica shard

An asynchronous replica shard polls the primary for data. The replica automatically will adjust the poll timing if it does not receive data from the primary, which indicates that it is caught up with the primary. It also will adjust if it receives an error that might indicate that the primary has failed, or if there is a network problem.

When the asynchronous replica starts replicating, it prints the following message to the SystemOut.log file for the replica. This message might print more than one time per CWOBJ1511 message. It will print again if the replica connects to a different primary or if template maps are added.

CWOBJ1543I: The asynchronous replica objectGridName:mapsetName:partitionNumber started or 
continued replicating from the primary. Replicating for maps: [mapName]

Synchronous replica shard

When the synchronous replica shard first starts, it is not yet in peer mode. When a replica shard is in peer mode, it receives data from the primary as data comes into the primary. Before entering peer mode, the replica shard needs a copy of all of the existing data on the primary shard .

The synchronous replica copies data from the primary shard similar to an asynchronous replica by polling for data. When it copies the existing data from the primary, it switches to peer mode and begins to receive data as the primary receives the data.

When a replica shard reaches peer mode, it prints a message to the SystemOut.log file for the replica. The time refers to the amount of time that it took the replica shard to get all of its initial data from the primary shard . The time might display as zero or very low if the primary shard does not have any existing data to replicate. This message may print more than one time per CWOBJ1511 message. It will print again if the replica connects to a different primary or if template maps are added.

When the synchronous replica shard is in peer mode, the primary shard must replicate transactions to all peer mode synchronous replicas. The synchronous replica shard data remains at the same level as the primary shard data. If a minimum number of synchronous replicas or minSync is set in the deployment policy, that number of synchronous replicas must vote to commit before the transaction can successfully commit on the primary.

Recovery events

Replication is designed to recover from failure and error events. If a primary shard fails, another replica takes over. If errors are on the replica shards, the replica shard attempts to recover. The catalog service controls the placement and transactions of new primary shards or new replica shards.

Replica shard becomes a primary shard

A replica shard becomes a primary shard for two reasons. Either the primary shard stopped or failed, or a balance decision was made to move the previous primary shard to a new location.

The catalog service selects a new primary shard from the existing synchronous replica shards. If a primary move needs to take place and there are no replicas, a temporary replica will be placed to complete the transition. The new primary shard registers all of the existing replicas and accepts transactions as the new primary shard . If the existing replica shards have the correct level of data, the current data is preserved as the replica shards register with the new primary shard. Asynchronous replicas will poll against the new primary.

Figure 1. Example placement of an ObjectGrid map set for the partition0 partition. The deployment policy has a minSyncReplicas value of 1, a maxSyncReplicas value of 2, and a maxAsyncReplicas value of 1.

Figure 2. The container for the primary shard fails

Figure 3. The synchronous replica shard on ObjectGrid container 2 becomes the primary shard

Figure 4. Machine B contains the primary shard . Depending on how automatic repair mode is set and the availability of the containers, a new synchronous replica shard might or might not be placed on a machine.

Replica shard recovery

A synchronous replica shard is controlled by the primary shard . However, if a replica shard detects a problem, it can trigger a reregister event to correct the state of the data. The replica clears the current data and gets a fresh copy from the primary.

When a replica shard initiates a reregister event, the replica prints a log message.

If a transaction causes an error on a replica shard during processing, then the replica shard is in an unknown state. The transaction successfully processed on the primary shard, but something went wrong on the replica. To correct this situation, the replica initiates a reregister event. With a new copy of data from the primary, the replica shard can continue. If the same problem reoccurs, the replica shard does not continuously reregister.

Failure events

A replica can stop replicating data if it encounters error situations for which the replica cannot recover.

Too many register attempts

If a replica triggers a reregister multiple times without successfully committing data, the replica stops. Stopping prevents a replica from entering an endless reregister loop. By default, a replica shard tries to reregister three times in a row before stopping.

If a replica shard reregisters too many times, it prints the following message to the log.

If the replica is unable to recover by reregistering, a pervasive problem might exist with the transactions that are relative to the replica shard. A possible problem could be missing resources on the classpath if an error occurs while inflating the keys or values from the transaction.

Failure while entering peer mode

If a replica attempts to enter peer mode and experiences an error processing the bulk existing data from the primary (the checkpoint data), the replica shuts down. Shutting down prevents a replica from starting with incorrect initial data. Because it receives the same data from the primary if it reregisters, the replica does not retry.

If a replica shard fails to enter peer mode, it prints the following message to the log:

CWOBJ1527W Replica objectGridName:mapSetName:partition:mapName failed to enter peer mode after numSeconds seconds.
An additional message displays in the log that explains why the replica failed to enter peer mode.

Recovery after re-register or peer mode failure

If a replica fails to re-register or enter peer mode, the replica is in an inactive state until a new placement event occurs. A new placement event might be a new server starting or stopping. We can also start a placement event using the triggerPlacement method on the PlacementServiceMBean Mbean.

Map sets for replication

Replication is enabled by associating BackingMaps with a map set.

A map set is a collection of maps that are categorized by partition-key. This partition-key is derived from the individual map's key by taking its hash modulo the number of partitions. If one group of maps within the map set has partition-key X, those maps will be stored in a corresponding partition X in the data grid. If another group has partition-key Y, all of the maps will be stored in partition Y, and so on. Also, the data within the maps is replicated based on the policy defined on the map set, which is only used for distributed WXS topologies (unnecessary for local instances).

Map sets are assigned what number of partitions they will have and a replication policy. The map set replication configuration simply identifies the number of synchronous and asynchronous replica shards a map set should have in addition to the primary shard . If there is to be 1 synchronous and 1 asynchronous replica, all of the BackingMaps assigned to the map set will each have a replica shard distributed automatically within the set of available containers for the WXS. The replication configuration can also enable clients to read data from synchronously replicated servers. This can spread the load for read requests over additional servers in the WXS.

Replication only has a programming model impact when preloading the BackingMaps.

Transaction processing

WXS uses transactions as its mechanism for interaction with data.

To interact with data, the thread in the application needs its own session. To obtain a session, call...

With the session, the application can work with data that is stored in the ObjectGrid maps.

A transaction begins and commits or begins and rolls back using the begin, commit, and rollback methods on the Session object. Applications can also work in auto-commit mode, in which the Session automatically begins and commits a transaction whenever an operation is performed on the map. Auto-commit mode cannot group multiple operations into a single transaction, so it is the slower option for grouping multiple operations into a single transaction. However, for transactions that contain only one operation, auto-commit is faster.

When the application is finished with the Session, use the optional Session.close() method to close the session. Closing the Session releases it from the heap and allows subsequent calls to the getSession() method to be reused, improving performance.


Use transactions to...

When a transaction starts, WXS allocates a difference map to hold copies of key/value pairs used by the transaction. The difference map tracks changes for operations such as insert, update, get, remove, and so on. If an ObjectTransformer object is specified, this object is used for copying the value. If the transaction is using optimistic locking, then "before" images of the values are tracked for comparison when the transaction commits.

If a transaction is rolled back, the difference map information is discarded, and locks on entries are released. When a transaction commits, the changes are applied to the maps and locks are released. If optimistic locking is being used, then WXS compares the before image versions of the values with the values that are in the map. These values must match for the transaction to commit. This comparison enables a multiple version locking scheme, but at a cost of two copies being made when the transaction accesses the entry. All values are copied again and the new copy is stored in the map. WXS performs this copy to protect itself against the application changing the application reference to the value after a commit.

We can avoid using several copies of the information. The application can save a copy by using pessimistic locking instead of optimistic locking as the cost of limiting concurrency. The copy of the value at commit time can also be avoided if the application agrees not to change a value after a commit.

Advantages of transactions

By using transactions, we can:

Transaction size

Larger transactions are more efficient, especially for replication. However, larger transactions can adversely impact concurrency because the locks on entries are held for a longer period of time. If you use larger transactions, we can increase replication performance. This performance increase is important when we are pre-loading a Map. Experiment with different batch sizes to determine what works best for your scenario.

Larger transactions also help with loaders. If a loader is being used that can perform SQL batching, then significant performance gains are possible depending on the transaction and significant load reductions on the database side. This performance gain depends on the Loader implementation.

Automatic commit mode

If no transaction is actively started, then when an application interacts with an ObjectMap object, an automatic begin and commit operation is done on behalf of the application. This automatic begin and commit operation works, but prevents rollback and locking from working effectively. Synchronous replication speed is impacted because of the very small transaction size. If we are using an entity manager application, then do not use automatic commit mode because objects that are looked up with the EntityManager.find method immediately become unmanaged on the method return and become unusable.

External transaction coordinators

Typically, transactions begin with the session.begin method and end with the session.commit method. However, when WXS is embedded, the transactions might be started and ended by an external transaction coordinator. If we are using an external transaction coordinator, you do not need to call the session.begin method and end with the session.commit method. If we are using WebSphere Application Server, we can use the WebSphereTranscationCallback plug-in.

Java EE transaction integration

WXS includes a Java Connector Architecture (JCA) 1.5 compliant resource adapter that supports both client connections to a remote data grid and local transaction management. JEE (Java EE) applications such as servlets, JavaServer Pages (JSP) files and Enterprise JavaBeans (EJB) components can demarcate WXS transactions using the standard javax.resource.cci.LocalTransaction interface or the WXS session interface.

When the running in WebSphere Application Server with last participant support enabled in the application, we can enlist the WXS transaction in a global transaction with other two-phase commit transactional resources.

Transaction processing in Java EE applications

WXS provides its own resource adapter that we can use to connect applications to the data grid and process local transactions.

Through support from the WXS resource adapter, Java EE applications can look up WXS client connections and demarcate local transactions using Java EE local transactions or using the WXS APIs. When the resource adapter is configured, we can complete the following actions with your Java EE applications:

The following additional capabilities are available with WebSphere Application Server:

Administrator responsibilities

The WXS resource adapter is installed into the Java EE application server or embedded with the application. After you install the resource adapter, the administrator creates one or more resource adapter connection factories for each catalog service domain and optionally each data grid instance. The connection factory identifies the properties that are required to communicate with the data grid.

Applications reference the connection factory, which establishes the connection to the remote data grid. Each connection factory hosts a single WXS client connection that is reused for all application components.

Because the WXS client connection might include a near cache, applications must not share a connection. A connection factory must exist for a single application instance to avoid problems sharing objects between applications.

The connection factory hosts a WXS client connection, which is shared between all referencing application components. Use a managed bean (MBean) to access information about the client connection or to reset the connection when it is no longer needed.

Application developer responsibilities

An application developer creates resource references for managed connection factories in the application deployment descriptor or with annotations. Each resource reference includes a local reference for the WXS connection factory, as well as the resource-sharing scope.

Enabling resource sharing is important because it allows the local transaction to be shared between application components.

Applications can inject the connection factory into the Java EE application component, or it can look up the connection factory using Java Naming Directory Interface (JNDI). The connection factory is used to obtain connection handles to the WXS client connection.

The eXtreme Scale client connection is managed independently from the resource adapter connection and is established on first use, and reused for all subsequent connections.

After finding the connection, the application retrieves a WXS session reference. With the WXS session reference, the application can use the entire eXtreme Scale client APIs and features.

We can demarcate transactions in one of the following ways:

Application deployer responsibilities

The application deployer binds the local reference to the resource adapter connection factory that the application developer defines to the resource adapter connection factories that the administrator defines. The application deployer must assign the correct connection factory type and scope to the application and ensure that the connection factory is not shared between applications to avoid Java object sharing. The application deployer is also responsible for configuring and mapping other appropriate configuration information that is common to all connection factories.

CopyMode attribute

We can tune the number of copies by defining the CopyMode attribute of the BackingMap or ObjectMap objects. The copy mode has the following values:

The COPY_ON_READ_AND_COMMIT value is the default. The COPY_ON_READ value copies on the initial data retrieved, but does not copy at commit time. This mode is safe if the application does not modify a value after committing a transaction. The NO_COPY value does not copy data, which is only safe for read-only data. If the data never changes then you do not need to copy it for isolation reasons.

Be careful when you use the NO_COPY attribute value with maps that can be updated. WXS uses the copy on first touch to allow the transaction rollback. The application only changed the copy, and as a result, eXtreme Scale discards the copy. If the NO_COPY attribute value is used, and the application modifies the committed value, completing a rollback is not possible. Modifying the committed value leads to problems with indexes, replication, and so on because the indexes and replicas update when the transaction commits. If you modify committed data and then roll back the transaction, which does not actually roll back at all, then the indexes are not updated and replication does not take place. Other threads can see the uncommitted changes immediately, even if they have locks. Use the NO_COPY attribute value for read-only maps or for applications that complete the appropriate copy before modifying the value. If you use the NO_COPY attribute value and call IBM support with a data integrity problem, we are asked to reproduce the problem with the copy mode set to COPY_ON_READ_AND_COMMIT.

The COPY_TO_BYTES value stores values in the map in a serialized form. At read time, eXtreme Scale inflates the value from a serialized form and at commit time it stores the value to a serialized form. With this method, a copy occurs at both read and commit time.


When you use optimistic locking with COPY_TO_BYTES, you might experience ClassNotFoundException exceptions during common operations, such as invalidating cache entries.

These exceptions occur because the optimistic locking mechanism must call the "equals(...)" method of the cache object to detect any changes before the transaction is committed. To call the equals(...) method, the WXS server must be able to deserialize the cached object, which means that eXtreme Scale must load the object class.

To resolve these exceptions, we can package the cached object classes so that the WXS server can load the classes in stand-alone environments. Therefore, you must put the classes in the classpath.

If your environment includes the OSGi framework, then package the classes into a fragment of the objectgrid.jar bundle. If we are running eXtreme Scale servers in the Liberty profile, package the classes as an OSGi bundle, and export the Java packages for those classes. Then, install the bundle by copying it into the grids directory.

In WebSphere Application Server, package the classes in the application or in a shared library that the application can access.

Alternatively, we can use custom serializers that can compare the byte arrays that are stored in eXtreme Scale to detect any changes.

The default copy mode for a map can be configured on the BackingMap object. We can also change the copy mode on maps before you start a transaction using the ObjectMap.setCopyMode method.

An example of a backing map snippet from an objectgrid.xml file that shows how to set the copy mode for a given backing map follows. This example assumes that we are using cc as the objectgrid/config namespace.

<cc:backingMap name="RuntimeLifespan" copyMode="NO_COPY"/>

Lock manager

When you configure a locking strategy, a lock manager is created for the backing map to maintain cache entry consistency.

Lock manager configuration

When either a PESSIMISTIC or an OPTIMISTIC lock strategy is used, a lock manager is created for the BackingMap. The lock manager uses a hash map to track entries that are locked by one or more transactions. If many map entries exist in the hash map, more lock buckets can result in better performance. The risk of Java synchronization collisions is lower as the number of buckets grows. More lock buckets also lead to more concurrency. The previous examples show how an application can set the number of lock buckets to use for a given BackingMap instance.

To avoid a java.lang.IllegalStateException exception, the setNumberOfLockBuckets method must be called before calling the initialize or getSession methods on the ObjectGrid instance. The setNumberOfLockBuckets method parameter is a Java primitive integer that specifies the number of lock buckets to use. Using a prime number can allow for a uniform distribution of map entries over the lock buckets. A good starting point for best performance is to set the number of lock buckets to about 10 percent of the expected number of BackingMap entries.

Locking strategies

Locking strategies include pessimistic, optimistic and none. To choose a locking strategy, you must consider issues such as the percentage of each type of operations you have, whether or not you use a loader and so on.

Locks are bound by transactions. We can specify the following locking settings:

Set the locking strategy on the BackingMap. We cannot change the locking strategy for each transaction. An example XML snippet that shows how to set the locking mode on a map using the XML file follows, assuming

cc is the namespace for the objectgrid/config namespace:

<cc:backingMap name="RuntimeLifespan" lockStrategy="PESSIMISTIC" />

Pessimistic locking

When an ObjectGrid map is configured to use the pessimistic locking strategy, a pessimistic transaction lock for a map entry is obtained when a transaction first gets the entry from the BackingMap. The pessimistic lock is held until the application completes the transaction. Typically, the pessimistic locking strategy is used in the following situations:

Because the pessimistic locking strategy has the greatest impact on performance and scalability, this strategy should only be used for read and write maps when other locking strategies are not viable. For example, these situations might include when optimistic update failures occur frequently, or when recovery from optimistic failure is difficult for an application to handle.

When using pessimistic locking, we can use the lock method to lock data, or keys, without returning any data values. With the lock method, we can lock the key in the grid or lock the key and determine whether the value exists in the grid. In previous releases, you used the get and getForUpdate APIs to lock keys in the data grid. However, if you did not need data from the client, performance is degraded retrieving potentially large value objects to the client. Additionally, containsKey does not currently hold any locks, so you were forced do use get and getForUpdate to get appropriate locks when using pessimistic locking.

The lock API now gives you a containsKey semantics while holding the lock.

LockMode is an enum with possible values SHARED, UPGRADABLE, and EXCLUSIVE, where we can specify the keys to lock. See the following table to understand the relationship between these lock mode values and the behavior of existing methods:

LockMode values and existing method equivalents...

See the following example code of the LockMode parameter:

map.lock(key, LockMode.UPGRADABLE);

Optimistic locking

The optimistic locking strategy assumes that no two transactions might attempt to update the same map entry while running concurrently. Because of this belief, the lock mode does not need to be held for the life cycle of the transaction because it is unlikely that more than one transaction might update the map entry concurrently.

The optimistic locking strategy is typically used in the following situations:

Like the pessimistic locking strategy, the methods on the ObjectMap interface determine how eXtreme Scale automatically attempts to acquire a lock mode for the map entry that is being accessed. However, the following differences between the pessimistic and optimistic strategies exist:

All other ObjectMap methods are handled exactly like they are handled for the pessimistic locking strategy. That is, when the commit method is invoked, an X lock mode is obtained for any map entry that is inserted, updated, removed, touched, or invalidated and the X lock mode is held until the transaction completes commit processing. The optimistic locking strategy assumes that no concurrently running transactions attempt to update the same map entry. Because of this assumption, the lock mode does not need to be held for the life of the transaction because it is unlikely that more than one transaction might update the map entry concurrently. However, because a lock mode was not held, another concurrent transaction might potentially update the map entry after the current transaction has released its S lock mode.

To handle this possibility, eXtreme Scale gets an X lock at commit time and performs an optimistic versioning check to verify that no other transaction has changed the map entry after the current transaction read the map entry from the BackingMap. If another transaction changes the map entry, the version check fails and an OptimisticCollisionException exception occurs. This exception forces the current transaction to be rolled back and the application must try the entire transaction again. The optimistic locking strategy is very useful when a map is mostly read and it is unlikely that updates for the same map entry might occur.


When you use optimistic locking with COPY_TO_BYTES, you might experience ClassNotFoundException exceptions during common operations, such as invalidating cache entries. These exceptions occur because the optimistic locking mechanism must call the "equals(...)" method of the cache object to detect any changes before the transaction is committed.

To call the equals(...) method, the WXS server must be able to deserialize the cached object, which means that eXtreme Scale must load the object class.

To resolve these exceptions, we can package the cached object classes so that the WXS server can load the classes in stand-alone environments. Therefore, you must put the classes in the classpath.

If your environment includes the OSGi framework, then package the classes into a fragment of the objectgrid.jar bundle. If we are running eXtreme Scale servers in the Liberty profile, package the classes as an OSGi bundle, and export the Java packages for those classes. Then, install the bundle by copying it into the grids directory.

In WebSphere Application Server, package the classes in the application or in a shared library that the application can access.

Alternatively, we can use custom serializers that can compare the byte arrays that are stored in eXtreme Scale to detect any changes.

No locking

When a BackingMap is configured to use no locking strategy, no transaction locks for a map entry are obtained. BackingMaps configured to use a no locking strategy cannot participate in a multi-partition transaction.

Use no locking strategy is useful when an application is a persistence manager such as an Enterprise JavaBeans (EJB) container or when an application uses Hibernate to obtain persistent data. In this scenario, the BackingMap is configured without a loader and the persistence manager uses the BackingMap as a data cache. In this scenario, the persistence manager provides concurrency control between transactions that are accessing the same Map entries.

WXS does not need to obtain any transaction locks for the purpose of concurrency control. This situation assumes that the persistence manager does not release its transaction locks before updating the ObjectGrid map with committed changes. If the persistence manager releases its locks, then a pessimistic or optimistic lock strategy must be used. For example, suppose that the persistence manager of an EJB container is updating an ObjectGrid map with data that was committed in the EJB container-managed transaction. If the update of the ObjectGrid map occurs before the persistence manager transaction locks are released, then we can use the no lock strategy. If the ObjectGrid map update occurs after the persistence manager transaction locks are released, then use either the optimistic or pessimistic lock strategy.

Another scenario where no locking strategy can be used is when the application uses a BackingMap directly and a Loader is configured for the map. In this scenario, the loader uses the concurrency control support that is provided by a relational database management system (RDBMS) by using either...

...to access data in a relational database. The loader implementation can use either an optimistic or pessimistic approach. A loader that uses an optimistic locking or versioning approach helps to achieve the greatest amount of concurrency and performance. See OptimisticCallback

If we are using a loader that uses pessimistic locking support of an underlying backend, you might want to use the forUpdate parameter that is passed on the get method of the Loader interface. Set this parameter to true if the getForUpdate method of the ObjectMap interface was used by the application to get the data. The loader can use this parameter to determine whether to request an upgradeable lock on the row that is being read. For example, DB2 obtains an upgradeable lock when an SQL select statement contains a FOR UPDATE clause. This approach offers the same deadlock prevention that is described in Pessimistic locking.

Distributing transactions

Use Java Message Service (JMS) for distributed transaction changes between different tiers or in environments on mixed platforms.

JMS is an ideal protocol for distributed changes between different tiers or in environments on mixed platforms. For example, some applications that use eXtreme Scale might be deployed on IBM WebSphere Application Server Community Edition, Apache Geronimo, or Apache Tomcat, whereas other applications might run on WebSphere Application Server Version 6.x. JMS is ideal for distributed changes between eXtreme Scale peers in these different environments. The high availability manager message transport is very fast, but can only distribute changes to JVMs that are in a single core group. JMS is slower, but allows larger and more diverse sets of application clients to share an ObjectGrid. JMS is ideal when sharing data in an ObjectGrid between a fat Swing client and an application deployed on WebSphere Extended Deployment.

The built-in Client Invalidation Mechanism and Peer-to-Peer Replication are examples of JMS-based transactional changes distribution.

Implementing JMS

JMS is implemented for distributing transaction changes by using a Java object that behaves as an ObjectGridEventListener. This object can propagate the state in the following four ways:

Listen for changes for publishing

The plug-in implements the ObjectGridEventListener interface to intercept the transactionEnd event. When eXtreme Scale invokes this method, the plug-in attempts to convert the LogSequence list for each map that is touched by the transaction to a JMS message and then publish it. The plug-in can be configured to publish changes for all maps or a subset of maps. LogSequence objects are processed for the maps that have publishing enabled. The LogSequenceTransformer ObjectGrid class serializes a filtered LogSequence for each map to a stream. After all LogSequences are serialized to the stream, then a JMS ObjectMessage is created and published to a well-known topic.

Listen for JMS messages and apply them to the local ObjectGrid

The same plug-in also starts a thread that spins in a loop, receiving all messages that are published to the well known topic. When a message arrives, it passes the message contents to the LogSequenceTransformer class where it is converted to a set of LogSequence objects. Then, a no-write-through transaction is started. Each LogSequence object is provided to the Session.processLogSequence method, which updates the local Maps with the changes. The processLogSequence method understands the distribution mode. The transaction is committed and the local cache now reflects the changes.

Develop applications that update multiple partitions in a single transaction

If your data is distributed across multiple partitions in the data grid, we can read and update several partitions in a single transaction. This type of transaction is called a multi-partition transaction and uses the two-phase commit protocol to coordinate and recover the transaction in case of failure.

Security overview

WXS can secure data access, including allowing for integration with external security providers.

In an existing non-cached data store such as a database, you likely have built-in security features that you might not need to actively configure or enable. However, after you have cached your data with WXS, you must consider the important resulting situation that your backend security features are no longer in effect. We can configureeXtreme Scale security on necessary levels so that your new cached architecture for your data is also secured. A brief summary of eXtreme Scale security features follows. For more detailed information about configuring security see the Administration Guide and the Programming Guide.

Distributed security basics

Distributed eXtreme Scale security is based on three key concepts:


WXS supports a distributed client server framework. A client server security infrastructure is in place to secure access to WXS servers. For example, when authentication is required by the WXS server, a WXS client must provide credentials to authenticate to the server. These credentials can be a user name and password pair, a client certificate, a Kerberos ticket, or data that is presented in a format that is agreed upon by client and server.


WXS authorizations are based on subjects and permissions. Use the Java Authentication and Authorization Services (JAAS) to authorize the access, or we can plug in a custom approach, such as Tivoli Access Manager (TAM), to handle the authorizations. The following authorizations can be given to a client or group:

Transport security

To secure the client server communication, WXS supports TLS/SSL. These protocols provide transport layer security with authenticity, integrity, and confidentiality for a secure connection between a WXS client and server.

Grid security

In a secure environment, a server must be able to check the authenticity of another server. WXS uses a shared secret key string mechanism for this purpose. This secret key mechanism is similar to a shared password. All the WXS servers agree on a shared secret string. When a server joins the data grid, the server is challenged to present the secret string. If the secret string of the joining server matches the one in the master server, then the joining server can join the grid. Otherwise, the join request is rejected.

Sending a clear text secret is not secure. The eXtreme Scale security infrastructure provides a SecureTokenManager plug-in to allow the server to secure this secret before sending it. We can choose how you implement the secure operation. WXS provides an implementation, in which the secure operation is implemented to encrypt and sign the secret.

JMX security in a dynamic deployment topology

JMX MBean security is supported in all versions of eXtreme Scale. Clients of catalog server MBeans and container server MBeans can be authenticated, and access to MBean operations can be enforced.

Local eXtreme Scale security

Local eXtreme Scale security is different from the distributed eXtreme Scale model because the application directly instantiates and uses an ObjectGrid instance. Your application and eXtreme Scale instances are in the same JVM. Because no client-server concept exists in this model, authentication is not supported. Your applications must manage their own authentication, and then pass the authenticated Subject object to the WXS. However, the authorization mechanism used for the local eXtreme Scale programming model is the same as what is used for the client-server model.

REST data services overview

The WXS REST data service is a Java HTTP service that is compatible with Microsoft WCF Data Services (formally ADO.NET Data Services) and implements the Open Data Protocol (OData). Microsoft WCF Data Services is compatible with this specification when using Visual Studio 2008 SP1 and the .NET Framework 3.5 SP1.

Compatibility requirements

The REST data service allows any HTTP client to access a data grid. The REST data service is compatible with the WCF Data Services support supplied with the Microsoft .NET Framework 3.5 SP1. RESTful applications can be developed with the rich tooling provided by Microsoft Visual Studio 2008 SP1.

Microsoft WCF Data Services

WXS includes a function-rich API set for Java clients. As shown in the following figure, the REST data service is a gateway between HTTP clients and the WXS data grid, communicating with the grid through an WXS client. The REST data service is a Java servlet, which allows flexible deployments for common JEE (JEE) platforms, such as WebSphere Application Server. The REST data service communicates with the WXS data grid using the WXS Java APIs. It allows WCF Data Services clients or any other client that can communicate with HTTP and XML.

Figure 2. WXS REST data service


This version of the WXS REST data service supports the following features:

Known problems and limitations