Example 3: Unmanaged MQ subscriber
The unmanaged subscriber is an important class of subscriber application. With it, you combine the benefits of publish/subscribe with control of queuing and consumption of publications. The example demonstrates different ways of combining subscriptions and queues.
The unmanaged pattern is more commonly associated with durable subscriptions than non-durable. Typically the lifecycle of a subscription created by an unmanaged subscriber is independent of the lifecycle of the subscribing application itself. By making the subscription durable the subscription receives publications even when no subscribing application is active.
We can create durable managed subscriptions to achieve the same result, but some applications require more flexibility and control over queues and messages than is possible with a managed subscription. For a durable managed subscription, the queue manager creates a permanent queue for the publications that match the subscription topic. It deletes the queue and associated publications when the subscription is deleted.
Typically durable managed subscriptions are used if the lifecycle of the application and the subscription is essentially the same, but hard to guarantee. By making the subscription durable, and having the publisher create persistent publications, there are no lost messages should the queue manager or subscriber terminate prematurely and need to be recovered.
The queue manager implicitly opens the durable managed subscription queue for a subscriber in such a way that shared processing of the queue is not possible. In addition, we cannot create more than one subscription for each managed queue and you may find the queues harder to manage because we have less control over the names of the queues. For these reasons, consider whether the unmanaged MQ subscriber is a better fit for applications requiring durable subscriptions than the managed MQ subscriber.
The code in Figure 3 demonstrates an unmanaged durable subscription pattern. For illustration the code also creates unmanaged, non-durable subscriptions. This example illustrates the following pattern facets:- On demand subscriptions: the subscription topic strings are dynamic. They are provided by the application when it runs.
- Simplified subscription topic management: subscription topic management is simplified by defining the root part of the subscription topic string using an administratively defined topic. This hides the root part of the topic tree from the application. By hiding the root part a subscriber can be deployed to different topic trees.
- Flexible subscription management: we can define a subscription either administratively, or create it on-demand in a subscriber program. There is no difference between administratively and programmatically created subscriptions, except an attribute that shows how the subscription was created. There is a third type of subscription that is created automatically by the queue manager for distribution of subscriptions. All subscriptions are displayed in the IBM MQ Explorer.
- Flexible association of subscriptions with queues: a predefined local queue is associated with a subscription by the MQSUB function. There are different ways to use MQSUB to associate subscriptions with queues:
- Associate a subscription with a queue having no existing subscriptions, MQSO_CREATE + (Hobj from MQOPEN).
- Associate a new subscription with a queue having existing subscriptions, MQSO_CREATE + (Hobj from MQOPEN).
- Move a existing subscription to a different queue, MQSO_ALTER + (Hobj from MQOPEN).
- Resume an existing subscription associated with an existing queue, MQSO_RESUME + (Hobj = MQHO_NONE), or MQSO_RESUME + (Hobj = from MQOPEN of queue with existing subscription).
- By combining MQSO_CREATE | MQSO_RESUME | MQSO_ALTER in different combinations, we can cater for different input states of the subscription and the queue without having to code multiple versions of MQSUB with different sd.Options values.
- Alternatively, by coding a specific choice of MQSO_CREATE | MQSO_RESUME | MQSO_ALTER the queue manager returns an error ( Table 1 ) if the states of the subscription and queue provided as input to MQSUB are inconsistent with the value of sd.Options. Figure 9 shows the results of issuing MQSUB for Subscription X with different individual settings of the sd.Options flag, and passing it three different object handles.
Explore different inputs to the example program in Figure 2 to become familiar with these different kinds of errors. One common error, RC = 2440, that is not included in the cases listed in the table, is a subscription name error. it is commonly caused by passing a null or invalid subscription name with MQSO_RESUME or MQSO_ALTER.
- Multiprocessing: We can share among many consumers the work of reading publications. The publications all go onto the single queue associated with the subscription topic. Consumers have a choice of opening the queue directly using MQOPEN or resuming the subscription using MQSUB.
- Subscription concentration: multiple subscriptions can be created on the same queue. Be cautious with this capability as it can lead to overlapping subscriptions, and receiving the same publication multiple times. The MQSO_GROUP_SUB option eliminates duplicate publications caused by overlapping subscriptions.
- Subscriber and consumer separation: As well as the three consumer models illustrated in the examples, another model is to separate the consumer from the subscriber. It is a variation of the unmanaged MQ Subscriber, but rather than issue the MQOPEN and MQSUB in the same program, one program subscribes to publications, and another program consumes them. For example, the subscriber might be part of a publish/subscribe cluster and the consumer attached to a queue manager outside the queue manager cluster. The consumer receives publications through standard distributed queuing by defining the subscription queue as a remote queue definition.
Understanding the behavior of MQSO_CREATE | MQSO_RESUME | MQSO_ALTER is important, especially if you plan to simplify your code by using combinations of these options. Study the table Table 1 that shows the results of passing different queue handles to MQSUB, and the results of running the example program shown in Figure 4 to Figure 9.
The scenario used to construct the table has one subscription X and two queues, A and B. The subscription name parameter sd.SubName is set to X, the name of a subscription attached to queue A. Queue B has no subscription attached to it.
In Table 1, MQSUB is passed subscription X and the queue handle to queue A. The results from subscription options are as follows:- MQSO_CREATE fails because the queue handle corresponds to the queue A which already has a subscription to X. Contrast this behavior to the successful call. That call succeeds because queue B does not have a subscription to X attached to it.
- MQSO_RESUME succeeds because the queue handle corresponds to the queue A which already has a subscription to X. In contrast, the call fails where the subscription X does not exist on queue A.
- MQSO_ALTER behaves in a similar way to MQSO_RESUME with respect to opening the subscription and queue. However if the attributes contained within the subscription descriptor passed to MQSUB differ from the attributes of the subscription, MQSO_RESUME fails, whereas MQSO_ALTER succeeds as long as the program instance has permission to alter the attributes. Note that we can never change the topic string in a subscription; but rather than return an error, MQSUB ignores the topic name and topic string values in the subscription descriptor and uses the values in the existing subscription.
- MQSO_CREATE succeeds and creates subscription X on queue B because this is a new subscription on queue B.
- MQSO_RESUME fails. MQSUB looks for subscription X on queue B and does not find it, but rather than returning RC = 2428 - subscription X does not exist, it returns RC = 2019 - Subscription queue does not match queue object handle. The behavior of the third option MQSO_ALTER suggests the reason for this unexpected error. MQSUB expects the queue handle to point to a queue with a subscription. It checks this first before checking whether the subscription named in sd.SubName exists.
- MQSO_ALTER succeeds, and moves the subscription from queue A to queue B.
A case that is not shown in the table is if the subscription name of the subscription on queue A does not match the subscription name in sd.SubName. That call fails with a RC = 2428 - subscription X does not exist on Queue A.
|
|
|
---|---|---|
Hobj for Queue A passed to MQSUB |
|
|
Hobj for Queue B passed to MQSUB |
|
|
MQHO_NONE passed to MQSUB |
|
|
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <cmqc.h> void inquireQname(MQHCONN HConn, MQHOBJ Hobj, MQCHAR48 qName); int main(int argc, char **argv) { MQCHAR48 topicNameDefault = "STOCKS"; char topicStringDefault[] = "IBM/PRICE"; char subscriptionNameDefault[] = "IBMSTOCKPRICESUB"; char subscriptionQueueDefault[] = "STOCKTICKER"; char publicationBuffer[101]; /* Allocate to receive messages */ char resTopicStrBuffer[151]; /* Allocate to resolve topic string */ MQCHAR48 qmName = ""; /* Default queue manager */ MQCHAR48 qName = ""; /* Allocate storage for MQINQ */ MQHCONN Hconn = MQHC_UNUSABLE_HCONN; /* connection handle */ MQHOBJ Hobj = MQHO_NONE; /* subscription queue handle */ MQHOBJ Hsub = MQSO_NONE; /* subscription handle */ MQLONG CompCode = MQCC_OK; /* completion code */ MQLONG Reason = MQRC_NONE; /* reason code */ MQLONG messlen = 0; MQOD od = {MQOD_DEFAULT}; /* Unmanaged subscription queue */ MQSD sd = {MQSD_DEFAULT}; /* Subscription Descriptor */ MQMD md = {MQMD_DEFAULT}; /* Message Descriptor */ MQGMO gmo = {MQGMO_DEFAULT}; /* get message options */ MQLONG sdOptions = MQSO_CREATE | MQSO_RESUME | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING; char * topicName = topicNameDefault; char * topicString = topicStringDefault; char * subscriptionName = subscriptionNameDefault; char * subscriptionQueue = subscriptionQueueDefault; char * publication = publicationBuffer; char * resTopicStr = resTopicStrBuffer; memset(resTopicStrBuffer, 0, sizeof(resTopicStrBuffer));
switch(argc){ /* Replace defaults with args if provided */ default: switch((argv[5][0])) { case('A'): sdOptions = MQSO_ALTER | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING; break; case('C'): sdOptions = MQSO_CREATE | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING; break; case('R'): sdOptions = MQSO_RESUME | MQSO_DURABLE | MQSO_FAIL_IF_QUIESCING; break; default: ; } case(5): if (strcmp(argv[4],"/")) /* "/" invalid = No subscription */ subscriptionQueue = argv[4]; else { *subscriptionQueue = '\0'; if (argc > 5) { if (argv[5][0] == 'C') { sdOptions = sdOptions + MQSO_MANAGED; } } else sdOptions = sdOptions + MQSO_MANAGED; } case(4): if (strcmp(argv[3],"/")) /* "/" invalid = No subscription */ subscriptionName = argv[3]; else { *subscriptionName = '\0'; sdOptions = sdOptions - MQSO_DURABLE; } case(3): if (strcmp(argv[2],"/")) /* "/" invalid = No topic string */ topicString = argv[2]; else *topicString = '\0'; case(2): if (strcmp(argv[1],"/")) /* "/" invalid = No topic object */ topicName = argv[1]; else *topicName = '\0'; case(1): sd.Options = sdOptions; printf("Optional parameters: " printf("topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume)\n"); printf("Values \"%-.48s\" \"%s\" \"%s\" \"%-.48s\" sd.Options=%d\n", topicName, topicString, subscriptionName, subscriptionQueue, sd.Options); }
- switch((argv[5][0]))
- You have the choice of entering A lter | C reate | R esume in parameter 5, to test the effect of overriding part of the MQSUB option setting used by default in the example. The default setting used by the example is MQSO_CREATE | MQSO_RESUME | MQSO_DURABLE. Note: Setting MQSO_ALTER or MQSO_RESUME without setting MQSO_DURABLE is an error, and sd.SubName must be set and refer to a subscription that can be resumed or altered.
- *subscriptionQueue = '\0';
- sdOptions = sdOptions + MQSO_MANAGED;
- If the default subscription queue, STOCKTICKER is replaced by a null string then as long as MQSO_CREATE is set, the example sets the MQSO_MANAGED flag and creates a dynamic subscription queue. If Alter or Resume are set in the fifth parameter the behavior of the example will depend on the value of subscriptionName.
- *subscriptionName = '\0';
- sdOptions = sdOptions - MQSO_DURABLE;
- If the default subscription, IBMSTOCKPRICESUB, is replaced by a null string then the example removes the MQSO_DURABLE flag. If you run the example providing the default values for the other parameters an additional temporary subscription destined to STOCKTICKER is created and receives duplicate publications. Next time you run the example, without any parameters, you receive just one publication again.
do {
MQCONN(qmName, &Hconn, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
if (strlen(subscriptionQueue)) {
strncpy(od.ObjectName, subscriptionQueue, MQ_Q_NAME_LENGTH);
MQOPEN(Hconn, &od, MQOO_INPUT_AS_Q_DEF | MQOO_FAIL_IF_QUIESCING | MQOO_INQUIRE,
&Hobj, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
}
strncpy(sd.ObjectName, topicName, MQ_TOPIC_NAME_LENGTH);
sd.ObjectString.VSPtr = topicString;
sd.ObjectString.VSLength = MQVS_NULL_TERMINATED;
sd.SubName.VSPtr = subscriptionName;
sd.SubName.VSLength = MQVS_NULL_TERMINATED;
sd.ResObjectString.VSPtr = resTopicStr;
sd.ResObjectString.VSBufSize = sizeof(resTopicStrBuffer)-1;
MQSUB(Hconn, &sd, &Hobj, &Hsub, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
gmo.Options = MQGMO_WAIT | MQGMO_NO_SYNCPOINT | MQGMO_CONVERT;
gmo.WaitInterval = 10000;
gmo.MatchOptions = MQMO_MATCH_CORREL_ID;
memcpy(md.CorrelId, sd.SubCorrelId, MQ_CORREL_ID_LENGTH);
inquireQname(Hconn, Hobj, qName);
printf("Waiting %d seconds for publications matching \"%s\" from %-0.48s\n",
gmo.WaitInterval/1000, resTopicStr, qName);
do {
memcpy(md.MsgId, MQMI_NONE, sizeof(md.MsgId));
memcpy(md.CorrelId, MQCI_NONE, sizeof(md.CorrelId));
md.Encoding = MQENC_NATIVE;
md.CodedCharSetId = MQCCSI_Q_MGR;
MQGET(Hconn, Hobj, &md, &gmo, sizeof(publication), publication, &messlen, &CompCode, &Reason);
if (Reason == MQRC_NONE)
printf("Received publication \"%s\"\n", publication);
}
while (CompCode == MQCC_OK);
if (CompCode != MQCC_OK && Reason != MQRC_NO_MSG_AVAILABLE) break;
MQCLOSE(Hconn, &Hsub, MQCO_NONE, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
MQCLOSE(Hconn, &Hobj, MQCO_NONE, &CompCode, &Reason);
if (CompCode != MQCC_OK) break;
MQDISC(&Hconn, &CompCode, &Reason);
} while (0);
printf("Completion code %d and Return code %d\n", CompCode, Reason);
}
void inquireQname(MQHCONN Hconn, MQHOBJ Hobj, MQCHAR48 qName) {
#define _selectors 1
#define _intAttrs 1
MQLONG select[_selectors] = {MQCA_Q_NAME}; /* Array of attribute selectors */
MQLONG intAttrs[_intAttrs]; /* Array of integer attributes */
MQLONG CompCode, Reason;
MQINQ(Hconn, Hobj, _selectors, select, _intAttrs, intAttrs, MQ_Q_NAME_LENGTH, qName, &CompCode, &Reason);
if (CompCode != MQCC_OK) {
printf("MQINQ failed with Condition code %d and Reason %d\n", CompCode, Reason);
strncpy(qName, "unknown queue", MQ_Q_NAME_LENGTH);
}
return;
}
- if (strlen(subscriptionQueue))
- If there is no subscription queue name then the example uses MQHO_NONE as the value of Hobj.
- MQOPEN(...);
- The subscription queue is opened and the queue handle saved in Hobj.
- MQSUB(Hconn, &sd, &Hobj, &Hsub, &CompCode, &Reason);
- The subscription is opened using the Hobj passed from MQOPEN (or MQHO_NONE if there is no subscription queue name). An unmanaged queue can be resumed without explicitly opening it with an MQOPEN.
- MQCLOSE(Hconn, &Hsub, MQCO_NONE, &CompCode, &Reason);
- The subscription is closed using the subscription handle. Depending on whether the subscription is durable or not, the subscription is closed with an implicit MQCO_KEEP_SUB or MQCO_REMOVE_SUB. We can close a durable subscription with MQCO_REMOVE_SUB, but we cannot close a non-durable subscription with MQCO_KEEP_SUB. The action of MQCO_REMOVE_SUB is to remove the subscription which stops any further publications being sent to the subscription queue.
- MQCLOSE(Hconn, &Hobj, MQCO_NONE, &CompCode, &Reason);
- No special action is taken if the subscription is unmanaged. If the queue is managed and the subscription closed with either an explicit or implicit MQCO_REMOVE_SUB, then all publications are purged from the queue and queue deleted at this point.
- gmo.MatchOptions = MQMO_MATCH_CORREL_ID;
- memcpy(md.CorrelId, sd.SubCorrelId, MQ_CORREL_ID_LENGTH);
- Ensure that the messages received are those for our subscription.
Results from the example illustrate aspects of publish/subscribe:
In Figure 4 the example starts by publishing 130 on the NYSE/IBM/PRICE topic.W:\Subscribe3\Debug>..\..\Publish2\Debug\publishstock Provide parameters: TopicObject TopicString Publication Publish "130" to topic "STOCKS" and topic string "IBM/PRICE" Published "130" to topic string "NYSE/IBM/PRICE" Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "STOCKS" "IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8206 Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER Received publication "130" Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE IBMSTOCKPRICESUB / Resume Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "STOCKS" "IBM/PRICE" "IBMSTOCKPRICESUB" "" sd.Options=8204 MQINQ failed with Condition code 2 and Reason 2038 Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from unknown queue Completion code 0 and Return code 0
W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE / STOCKTICKER Create Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "STOCKS" "IBM/PRICE" "" "STOCKTICKER" sd.Options=8194 Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER Received publication "130" Completion code 0 and Return code 0
W:\Subscribe3\Debug>..\..\Publish2\Debug\publishstock Provide parameters: TopicObject TopicString Publication Publish "130" to topic "STOCKS" and topic string "IBM/PRICE" Published "130" to topic string "NYSE/IBM/PRICE" Completion code 0 and Return code 0 W:\Subscribe3\Debug>solution3 STOCKS IBM/PRICE / STOCKTICKER Create Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "STOCKS" "IBM/PRICE" "" "STOCKTICKER" sd.Options=8194 Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER Received publication "130" Received publication "130" Completion code 0 and Return code 0
- In the first case, Resume resumes the existing subscription, as you might expect, and ignores the changed topic string.
- In the second case, Alter causes an error, RC = 2510, Topic not alterable.
- In the third example, Create causes an error RC = 2432, Sub already exists.
W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Resume Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8204 Waiting 10 seconds for publications matching "NYSE/IBM/PRICE" from STOCKTICKER Received publication "130" Completion code 0 and Return code 0 W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Alter Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8201 Completion code 2 and Return code 2510 W:\Subscribe3\Debug>solution3 "" NASDAC/IBM/PRICE IBMSTOCKPRICESUB STOCKTICKER Create Optional parameters: topicName, topicString, subscriptionName, subscriptionQueue, A(lter)|C(reate)|R(esume) Values "" "NASDAC/IBM/PRICE" "IBMSTOCKPRICESUB" "STOCKTICKER" sd.Options=8202 Completion code 2 and Return code 2432