Subscription levels

Set the subscription level of a subscription to intercept a publication before it reaches its final subscribers. An intercepting subscriber subscribes at a higher subscription level, and republishes at a lower publication level. Build a chain of intercepting subscribers to perform message processing on a publication before it is delivered to final subscribers.

Figure 1. Sequence of intercepting subscribers

To intercept a publication, use the MQSD SubLevel attribute. After a message has been intercepted, it can be transformed and then republished at a lower publication level by changing the MQPMO PubLevel attribute. The message then goes to the final subscribers, or it is intercepted again by an intermediate subscriber at a lower subscription level.

The intercepting subscriber typically transforms a message before republishing it. A sequence of intercepting subscribers forms a message flow. Alternatively, you might not republish the intercepted publication: Subscribers at lower subscription levels would not receive the message.

Ensure that the interceptor receives publications before any other subscribers. Set the subscription level of the interceptor higher than other subscribers. By default, subscribers have a SubLevel of 1. The highest value is 9. A publication must start with a PubLevel at least as high as the highest SubLevel. Publish initially with the default PubLevel of 9.

  • If we have one intercepting subscriber on a topic, set the SubLevel to 9.
  • For multiple intercepting applications on a topic, set a lower SubLevel for each successive intercepting subscriber.
  • We can implement a maximum of 8 intercepting applications, with subscription levels from 9 down to 2 inclusive. The final recipient of the message has a SubLevel of 1.

The interceptor with the highest subscription level that is equal to, or lower than, the PubLevel of the publication receives the publication first. Configure only one intercepting subscriber for a topic at a particular subscription level. Having multiple subscribers at a particular subscription level results in multiple copies of the publication being sent to the final set of subscribing applications.

A subscriber with a SubLevel of 0 is used as a catchall. It receives the publication if no final subscriber gets the message. A subscriber with SubLevel of 0 might be used to monitor the publications that no other subscribers received.


Programming an intercepting subscriber

Use the subscription options described in Table 1.

Subscription option Notes
MQSO_SET_CORREL_ID and SubCorrelId set to MQCI_NONE

Keep the CorrelId of the intercepted publication the same as the original publication.

Note: We cannot pass the correlation identifier of a publication in a hierarchy. The field is used by the queue manager.
PubPriority set to MQPRI_PRIORITY_AS_PUBLISHED Keep the priority of the intercepted publication the same as the original publication.
The options in Table 1 must be used by all the intercepting subscribers. The result is that the correlation identifier and message priority are not modified from the setting of the original publisher.

When the intercepting subscriber has processed the publication, it republishes the message to the same topic at a PubLevel one lower than the SubLevel of its own subscription. If the intercepting subscriber set a SubLevel of 9, it republishes the message with a PubLevel of 8.

To republish the message correctly, several pieces of information from the original publication are required. Reuse the same MQMD as in the original message and set MQPMO_PASS_ALL_CONTEXT to ensure all information in the MQMD is passed on to the next subscriber. Copy the values from the message properties shown in Table 2 into the corresponding fields of the republished message. The intercepting subscriber can change these values. Use the OR operator to add additional values to the MQPMO. Options field, to combine the put message options.

We must open the publication queue explicitly rather than use a managed publication queue. We cannot set MQSO_SET_CORREL_ID for a managed queue. You also cannot set MQOO_SAVE_ALL_CONTEXT on a managed queue. See the code fragments listed in Examples.

Republish message using MQPUT Information in publication message
MQOD. ObjectString Message property MQTopicString
MQPMO. Options Message property MQPubOptions

The final subscriber has the choice of setting its subscription options differently. For example, it might set the publication priority explicitly rather than to MQPRI_PRIORITY_AS_PUBLISHED. The settings of a final subscriber only affect publication from the final intercepting subscriber in the chain.


Retained publications

A retained publication must be preserved after it has been intercepted, by copying the original put-message options into the republished message.

The MQPMO_RETAIN option is set by the publisher. Each intercepting subscriber must transfer the MQPubOptions to the put-message options of the republished message as shown in Table 2. Copying the put-message options preserves the options set by the original publisher, including whether to retain the publication.

When a publication finishes its passage down the chain of intercepting subscribers, and is delivered to final subscribers, it is finally retained. New subscribers, at SubLevel 1, requesting the retained publication, receive it without any further interception. Subscribers at a SubLevel greater than 1 are not sent the retained publication. As a result, the retained publication is not modified by the chain of intercepting subscribers a second time round.


Examples

The examples are code fragments that can be combined to build an intercepting subscriber. The code is written to be brief, rather than of production quality.

The preprocessor directives in Figure 2 define the two properties to be extracted from the publication messages that are required by the MQINQMP MQI call.

Figure 2. Preprocessor directives
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <cmqc.h>
#define      MQPUBOPTIONS        (MQPTR)(char*) "MQPubOptions",\
                                 0,\
                                 12,\
                                 MQVS_NULL_TERMINATED,\
                                 MQCCSI_APPL
#define      MQTOPICSTRING       (MQPTR)(char*) "MQTopicString",\
                                 0,\
                                 13,\
                                 MQVS_NULL_TERMINATED,\
                                 MQCCSI_APPL



Figure 3 lists the declarations used in the code fragments. Except for the highlighted terms, the declarations are standard for an IBM MQ application.

The highlighted Put and Get options are initialized to pass all context. The highlighted MQTOPICSTRING and MQPUBOPTIONS are MQCHARV initializers for property names that are defined in the preprocessor directives. The names are passed to MQINQMP.

Figure 3. Declarations
int main(int argc, char **argv) {
    MQLONG   Reason            = MQRC_NONE;
    MQLONG   CompCode          = MQCC_OK;
    MQHCONN  Hcon              = MQHC_UNUSABLE_HCONN;
    MQCHAR   QMName[49]        = " ";
    MQCMHO   CrtMsgHOpts       = {MQCMHO_DEFAULT};
    MQHMSG   Hmsg              = MQHM_NONE;
    MQMD     md                = {MQMD_DEFAULT};
    MQHOBJ   gHobj             = MQHO_NONE;  
    MQOD     getOD             = {MQOD_DEFAULT}; 
    MQGMO    gmo               = {MQGMO_DEFAULT};
    MQLONG   GO_Options        = MQOO_INPUT_AS_Q_DEF
                               | MQOO_FAIL_IF_QUIESCING 
                               | MQOO_SAVE_ALL_CONTEXT;
    MQLONG   GC_Options        = MQCO_DELETE_PURGE;
    MQHOBJ   Hsub              = MQHO_NONE; 
    MQSD     sd                = {MQSD_DEFAULT}; 
    MQLONG   SC_Options        = MQCO_NONE;
    MQHOBJ   pHobj             = MQHO_NONE;
    MQOD     putOD             = {MQOD_DEFAULT}; 
    MQLONG   PO_Options        = MQOO_OUTPUT 
                               | MQOO_FAIL_IF_QUIESCING 
                               | MQOO_PASS_ALL_CONTEXT;
    MQLONG   PC_Options        = MQCO_NONE;
    MQPMO    pmo               = {MQPMO_DEFAULT};
    MQIMPO   InqPropOpts       = {MQIMPO_DEFAULT};
    MQPD     PropDesc          = {MQPD_DEFAULT};
    MQLONG   Type              = MQTYPE_AS_SET;
    MQCHARV  TopStrProp        = {MQTOPICSTRING};	
    MQCHARV  PubOptProp        = {MQPUBOPTIONS};
    MQLONG   DataLength        = 0;
    MQBYTE   buffer[256]       = "";
    MQLONG   buflen            = sizeof(buffer) - 1;
    MQLONG   messlen           = 0;
    char     TopStrBuf[256]    = "Initial value";
    int      i                 = 0;
Initializations that are not easily performed in declarations are shown in Figure 4. The highlighted values require explanation.

    SYSTEM.NDURABLE.MODEL.QUEUE
    In this example, instead of using MQSUB to open a managed non-durable subscription, the model queue, SYSTEM.NDURABLE.MODEL.QUEUE, is used to create a temporary dynamic queue. Its handle is passed to MQSUB. By opening the queue directly we are able to save all message context and set the subscription option, MQSO_SET_CORREL_ID.

    MQGMO_CURRENT_VERSION
    It is important to use the current version of most of the IBM MQ structures. Fields such as gmo.MsgHandle are only available in the latest version of the control structures.

    MQGMO_PROPERTIES_IN_HANDLE
    The topic string and put message options set in the original publication are to be retrieved by the intercepting subscriber using message properties. An alternative would be to read the MQRFH2 structure in the message directly.

    MQSO_SET_CORREL_ID
    Use MQSO_SET_CORREL_ID in combination with,
    memcpy(sd.SubCorrelId, MQCI_NONE, sizeof(sd.SubCorrelId));
    
    The effect of these options is to pass on the correlation identifier. The correlation identifier set by the original publisher is placed in the correlation identifier field of the publication that is received by the intercepting subscriber. Each intercepting subscriber passes on the same correlation identifier. The final subscriber then has the option of receiving the same correlation identifier. Note: If the publication is passed through a publish/subscribe hierarchy, the correlation identifier is never retained.

    MQPRI_PRIORITY_AS_PUBLISHED
    The publication is placed on the publication queue with the same message priority as it was published with.

Figure 4. Initializations
    strncpy(getOD.ObjectName,    "SYSTEM.NDURABLE.MODEL.QUEUE", 
            sizeof(getOD.ObjectName));
    gmo.Version                = MQGMO_VERSION_4; 
    gmo.Options                = MQGMO_WAIT
                               | MQGMO_PROPERTIES_IN_HANDLE
                               | MQGMO_CONVERT;
    gmo.WaitInterval           = 30000;
    sd.Options                 = MQSO_CREATE
                               | MQSO_FAIL_IF_QUIESCING
                               | MQSO_SET_CORREL_ID;
    sd.PubPriority             = MQPRI_PRIORITY_AS_PUBLISHED;
    sd.Version                 = MQSD_VERSION_1;
    memcpy(sd.SubCorrelId, MQCI_NONE, sizeof(sd.SubCorrelId));
    putOD.ObjectType             = MQOT_TOPIC;
    putOD.ObjectString.VSPtr     = &TopStrBuf;     
    putOD.ObjectString.VSBufSize = sizeof(TopStrBuf);     
    putOD.ObjectString.VSLength  = MQVS_NULL_TERMINATED;    
    putOD.ObjectString.VSCCSID   = MQCCSI_APPL;     
    putOD.Version                = MQOD_VERSION_4;     
    pmo.Version                = MQPMO_VERSION_3;

Figure 5 shows the code fragment to read command-line parameters, complete the initialization, and create the intercepting subscription.

Run the program with the command, InterSub TopicString 1SubLevel " "QmgrName

To make error handling as unobtrusive as possible, the reason code from each MQI call is stored in a different array element. After each call the completion code is tested, and if the value is MQCC_FAIL, control exits the do { } while(0) code block.

The two noteworthy lines of code are,

    pmo.PubLevel = sd.SubLevel - 1;
    Sets the publication level for the republished message to one less than the subscription level of the intercepting subscriber.

    gmo.MsgHandle = Hmsg;
    Provides a message handle for MQGET to return the message properties.

Figure 5. Preparing to intercept publications
do {
   printf("Intercepting subscriber start\n");
   if (argc < 2) {
       printf("Required parameter missing - topic string\n");
       exit(99);
   } else {
       sd.ObjectString.VSPtr    = argv[1];
       sd.ObjectString.VSLength = MQVS_NULL_TERMINATED;
       printf("TopicString = %s\n", sd.ObjectString.VSPtr);
   }
   if (argc > 2) {
       sd.SubLevel  = atoi(argv[2]);
       pmo.PubLevel = sd.SubLevel - 1;
       printf("SubLevel is %d, PubLevel is %d\n", sd.SubLevel, pmo.PubLevel);
   }  
   if (argc > 3)
       strncpy(QMName, argv[3], sizeof(QMName));
   MQCONN(QMName, &Hcon, &CompCode, &Reason);   
   if (CompCode == MQCC_FAILED) 
       break;
   MQOPEN(Hcon, &getOD, GO_Options, &gHobj, &CompCode, &Reason); 
   if (CompCode == MQCC_FAILED) 
       break;
   MQSUB(Hcon, &sd, &gHobj, &Hsub, &CompCode, &Reason);
   if (CompCode == MQCC_FAILED) 
       break;
   MQCRTMH(Hcon, &CrtMsgHOpts, &Hmsg, &CompCode, &Reason);
   if (CompCode == MQCC_FAILED) 
       break;
   gmo.MsgHandle = Hmsg;

The main code fragment, Figure 6, gets messages from the publication queue. It queries the message properties and republishes the messages using the topic string, and the original MQPMO. option properties of the publication.

In this example, no transformation is performed on the publication. The topic string of the republished publication always matches the topic string the intercepting subscriber subscribed on. If the intercepting subscriber is responsible for intercepting multiple subscriptions sent to the same publication queue, it might be necessary to query the topic string to distinguish publications that match different subscriptions.

The calls to MQINQMP are highlighted. The topic string and publication put message options properties are written directly into the output control structures. The only reason for altering the MQCHARV length field of putOD.ObjectString from an explicit length to a null terminated string is to use printf to output the string.

Figure 6. Intercept publication and republish
while (CompCode != MQCC_FAILED) {
   memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId));
   memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId));
   md.Encoding       = MQENC_NATIVE;
   md.CodedCharSetId = MQCCSI_Q_MGR;
   printf("MQGET : %d seconds wait time\n", gmo.WaitInterval/1000);
   MQGET(Hcon, gHobj, &md, &gmo, buflen, buffer, &messlen, 
       &CompCode, &Reason); 
   if (CompCode == MQCC_FAILED) 
       break;
   buffer[messlen] = '\0'; 
   MQINQMP(Hcon, Hmsg,	&InqPropOpts, &TopStrProp, &PropDesc, &Type,
       putOD.ObjectString.VSBufSize, putOD.ObjectString.VSPtr, 
       &(putOD.ObjectString.VSLength), &CompCode, &Reason);
   if (CompCode == MQCC_FAILED) 
       break;
   memset((void *)((MQLONG)(putOD.ObjectString.VSPtr) 
       + putOD.ObjectString.VSLength),'\0',1);
   putOD.ObjectString.VSLength = MQVS_NULL_TERMINATED;
   MQINQMP(Hcon, Hmsg, &InqPropOpts, &PubOptProp, &PropDesc, &Type,
       sizeof(pmo.Options), &(pmo.Options), &DataLength, 
       &CompCode, &Reason);
   if (CompCode == MQCC_FAILED) 
       break;
   MQOPEN(Hcon, &putOD, PO_Options, &pHobj, &CompCode, &Reason);				
   if (CompCode == MQCC_FAILED) 
       break;
   printf("Republish message <%s> on topic <%s> with options %d\n", 
       buffer, putOD.ObjectString.VSPtr, pmo.Options);
   MQPUT(Hcon, pHobj, &md, &pmo, messlen, buffer, &CompCode, &Reason);
   if (CompCode == MQCC_FAILED)
       break;
   MQCLOSE(Hcon, &pHobj, PC_Options, &CompCode, &Reason);
   if (CompCode == MQCC_FAILED) 
   break;
}

The final code fragment is shown in Figure 7.

Figure 7. Completion
    } while (0);
    if (CompCode == MQCC_FAILED && Reason != MQRC_NO_MSG_AVAILABLE) 
        printf("MQI Call failed with reason code %d\n", Reason);
    if (Hsub!= MQHO_NONE) 
        MQCLOSE(Hcon, &Hsub, SC_Options, &CompCode, &Reason);     
    if (Hcon!= MQHC_UNUSABLE_HCONN)
        MQDISC(&Hcon, &CompCode, &Reason);
}
Parent topic: Intercepting publications