Administration guide > Configure the deployment environment > Configuring data grids > Configuring write-behind loader support



Example: Writing a write-behind dumper class

This sample source code shows how to write a watcher (dumper) to handle failed write-behind updates.

//
//This sample program is provided AS IS and may be used, executed, copied and
//modified without royalty payment by customer (a) for its own instruction and
//study, (b) in order to develop applications designed to run with an IBM
//WebSphere product, either for customer's own internal use or for redistribution
//by customer, as part of such an application, in customer's own products. "
//
//5724-J34 (C) COPYRIGHT International Business Machines Corp. 2009
//All Rights Reserved * Licensed Materials - Property of IBM
//
package utils;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import com.ibm.websphere.objectgrid.BackingMap;
import com.ibm.websphere.objectgrid.ObjectGrid;
import com.ibm.websphere.objectgrid.ObjectGridException;
import com.ibm.websphere.objectgrid.ObjectGridRuntimeException;
import com.ibm.websphere.objectgrid.ObjectMap;
import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.UndefinedMapException;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventGroup;
import com.ibm.websphere.objectgrid.plugins.ObjectGridEventListener;
import com.ibm.websphere.objectgrid.writebehind.FailedUpdateElement;
import com.ibm.websphere.objectgrid.writebehind.WriteBehindLoaderConstants;

/**
 * Write behind expects transactions to the Loader to succeed. If a transaction for a key fails then
 * it inserts an entry in a Map called PREFIX + mapName. The application should be checking this
 * map for entries to dump out write behind transaction failures. The application is responsible for
 * analyzing and then removing these entries. These entries can be large as they include the key, before
 * and after images of the value and the exception itself. Exceptions can  be 20k on their own.
 * 
 * The class is registered with the grid and an instance is created per primary shard in a JVM. It creates 
 * a single thread
 * and that thread then checks each write behind error map for the shard, prints out the problem and 
 * then removes the entry.
 * 
 * This means there will be one thread per shard. If the shard is moved to another JVM then the deactivate 
 * method stops the thread.
 * @author bnewport
 *
 */
public class WriteBehindDumper implements ObjectGridEventListener, ObjectGridEventGroup.ShardEvents,
 Callable<Boolean>
{
    static Logger logger = Logger.getLogger(WriteBehindDumper.class.getName());
    
    ObjectGrid grid;
    
    /**
     * Thread pool to handle table checkers. If the application has it's own pool
     * then change this to reuse the existing pool
     */
    static ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(2); // two threads to dump records

    // the future for this shard
    ScheduledFuture<Boolean> future;
    
    // true if this shard is active
    volatile boolean isShardActive;

    /**
     * Normal time between checking Maps for write behind errors
     */
    final long BLOCKTIME_SECS = 20L;
    
    /**
     * An allocated session for this shard. No point in allocating them again and again
     */
    Session session;
    /**
     * When a primary shard is activated then schedule the checks to periodically check
     * the write behind error maps and print out any problems
     */
    public void shardActivated(ObjectGrid grid) 
    {
        try
        {
            this.grid = grid;
            session = grid.getSession();
    
            isShardActive = true;
            future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS); // check every BLOCKTIME_SECS seconds initially
        }
        catch(ObjectGridException e)
        {
            throw new ObjectGridRuntimeException("Exception activating write dumper", e);
        }
    }

    /**
     * Mark shard as inactive and then cancel the checker
     */
    public void shardDeactivate(ObjectGrid arg0) 
    {
        isShardActive = false;
        // if it's cancelled then cancel returns true
        if(future.cancel(false) == false)
        {
            // otherwise just block until the checker completes
            while(future.isDone() == false) // wait for the task to finish one way or the other
            {
                try
                {
                    Thread.sleep(1000L); // check every second
                }
                catch(InterruptedException e)
                {
                }
            }
        }
    }

    /**
     * Simple test to see if the map has write behind enabled and if so then return
     * the name of the error map for it.
     * @param mapName The map to test
     * @return The name of the write behind error map if it exists otherwise null
     */
    static public String getWriteBehindNameIfPossible(ObjectGrid grid, String mapName)
    {
        BackingMap map = grid.getMap(mapName);
        if(map != null && map.getWriteBehind() != null)
        {
            return WriteBehindLoaderConstants.WRITE_BEHIND_FAILED_UPDATES_MAP_PREFIX + mapName;
        }
        else
            return null;
    }

    /**
     * This runs for each shard. It checks if each map has write behind enabled and if it does 
   * then it prints out any write behind
     * transaction errors and then removes the record.
     */
    public Boolean call()
    {
        logger.fine("Called for " + grid.toString());
        try
        {
            // while the primary shard is present in this JVM
            // only user defined maps are returned here, no system maps like write behind maps are in
            // this list.
            Iterator<String> iter = grid.getListOfMapNames().iterator();
            boolean foundErrors = false;
            // iterate over all the current Maps
            while(iter.hasNext() && isShardActive)
            {
                String origName = iter.next();
                
                // if it's a write behind error map
                String name = getWriteBehindNameIfPossible(grid, origName);
                if(name != null) 
                {
                    // try to remove blocks of N errors at a time
                    ObjectMap errorMap = null;
                    try
                    {
                        errorMap = session.getMap(name);
                    }
                    catch(UndefinedMapException e)
                    {
                        // at startup, the error maps may not exist yet, patience...
                        continue;
                    }
                    // try to dump out up to N records at once
                    session.begin();
                    for(int counter = 0; counter
< 100; ++counter)
                    {
                        Integer seqKey = (Integer)errorMap.getNextKey(1L);
                        if(seqKey != null)
                        {
                            foundErrors = true;
                            FailedUpdateElement elem = (FailedUpdateElement)errorMap.get(seqKey);
                            //
                            // Your application should log the problem here
                            logger.info("WriteBehindDumper ( " + origName + ") for key (" + elem.getKey() + ") Exception: " + 
                                elem.getThrowable().toString());
                            //
                            //
                            errorMap.remove(seqKey);
                        }
                        else
                            break;
                    }
                    session.commit();
                }
            } // do next map
            // loop faster if there are errors
            if(isShardActive)
            {
                // reschedule after one second if there were bad records
                // otherwise, wait 20 seconds.
                if(foundErrors)
                    future = pool.schedule(this, 1L, TimeUnit.SECONDS);
                else
                    future = pool.schedule(this, BLOCKTIME_SECS, TimeUnit.SECONDS);
            }
        }
        catch(ObjectGridException e)
        {
            logger.fine("Exception in WriteBehindDumper" + e.toString());
            e.printStackTrace();
            
            //don't leave a transaction on the session.
            if(session.isTransactionActive())
            {
                try { session.rollback(); } catch(Exception e2) {}
            }
        }
        return true;
    }
    
    public void destroy() {
        // TODO Auto-generated method stub

    }

    public void initialize(Session arg0) {
        // TODO Auto-generated method stub

    }

    public void transactionBegin(String arg0, boolean arg1) {
        // TODO Auto-generated method stub

    }

    public void transactionEnd(String arg0, boolean arg1, boolean arg2,
            Collection arg3) {
        // TODO Auto-generated method stub

    }
}


Parent topic:

Configure write-behind loader support


Related concepts

Write-behind caching

Write-behind caching support

Handle failed write-behind updates


+

Search Tips   |   Advanced Search