ibm websphere mq 7.5Java编程出现这样的问题怎么解决

本帖子已过去太久远了,不再提供回复功能。developerWorks 社区
本文介绍了 WebSphere MQ 中处理大消息的两种方法:消息分片和消息分组,并配以详细的示例代码演示了如何使用 Java API 来实现消息分片和消息分组。
(), 售前工程师, EMC
严国军,目前是 IBM 公司软件部 WebSphere 售前工程师,5年以来一直从事 IBM 业务整合中间件的技术支持工作,拥有丰富的产品经验,产品包括 WebSphere MQ、 Message Broker 等,同时,具有很多大型项目实施经验,曾参与中国人民银行国库系统、建设银行 EAIH 项目、辽宁网通 BOSS 系统、江苏电信 BSS EAI 等项目的实施。
WebSphere MQ 中处理大消息的方法使用过 WebSphere MQ 的读者都知道,WebSphere MQ 对处理的单条消息的大小是有限制的,目前支持的最大消息是100M,而且,随着消息大小的增大,WebSphere MQ 处理的性能也会随之下降。从最佳实践来说,WebSphere MQ 传输大小为几K的消息其效率是最高的。那如何使 WebSphere MQ 能高效的处理大消息呢?WebSphere MQ 提供了处理大消息的两种方法:消息分片和消息分组。下面我们来看在使用 Java API 编写 WebSphere MQ 程序时如何实现消息分片和分组。消息分片的做法是把应用上一个大的逻辑消息分割成一个一个小的片段,每一个片段作为一个 WebSphere MQ 消息独立传输,通过 MQMD 中 GroupId、 MsgSeqNumber 和 Offset 3 个属性来标识,起始消息的 Offset 值为 0,而最后一个消息则会使用如下标记标识这是最后一个片段:MQMF_LAST_SEGMENT。具体从实现上来说,消息分片可以细分为两种模式:一是由队列管理器自动实现消息的分片和组装;二是由应用程序来实现消息的分片和组装。下面我们将详细向您介绍这两种实现方式。队列管理器自动实现的消息分片顾名思义,队列管理器自动实现的消息分片就是由队列管理器来完成消息的分片和组装。对应用程序来说,不管是发送方还是接收方,它所处理的还是一个完整的大消息,只是在程序中通过设置一些标识来指示队列管理器切分消息后再传输。所以,这种方式适用的场合为,出于传输效率的考虑,WebSphere MQ 不适宜传输大消息,而应用程序可以处理大消息,允许占用大块内存。而且,此种方式对编写应用程序来说也比较简单。实际在使用 Java API 编程时,对于发送方,发送消息时需要设置消息的 messageFlags 如下:Msg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;对于接收方,接收消息时需要设置 MQGetMessageOptions:MQGetMessageOptions gmo = new MQGetMessageOptions ();gmo.options = MQC.MQGMO_COMPLETE_MSG;队列管理器自动实现消息分片的部分代码如清单 1,您可以详细的示例程序代码。清单 1 队列管理器自动实现消息分片QMgrSegSender.java:
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myMsg = new MQMessage ();
myMsg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;
MQPutMessageOptions pmo = new MQPutMessageOptions ();
String strMsg = "";
for (int i=0;i&=100;i++)
strMsg = strMsg + "Hello";
myMsg.write(strMsg.getBytes());
myQueue.put(myMsg,pmo);
System.out.println("Put message:\n" + strMsg);
myQueue.close();
myQMgr.disconnect();
QMgrSegReceiver.java:
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myMsg = new MQMessage ();
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options = MQC.MQGMO_COMPLETE_MSG;
myQueue.get(myMsg, gmo);
byte[] b = new byte[myMsg.getMessageLength()];
myMsg.readFully(b);
String strMsg = new String(b);
System.out.println("Got message:\n" + strMsg);
myQueue.close();
myQMgr.disconnect();程序功能介绍:(1)QMgrSegSender 程序是构造一个长度为505字节的消息并把它写入队列 TESTQ 中。为使 MQ 不能传输505字节的消息,可以修改队列 TESTQ 的属性“最大消息长度(MAXMSGL)”为100。执行结果如下图 1 所示,该消息被队列管理器自动分割成6个片段消息:图 1 在 WebSphere MQ 资源管理器中浏览分片消息(2)QMgrSegReceiver 程序是从队列 TESTQ 中读取一个消息。我们观察执行的结果是它把队列中6个片段消息组成一个完整的大消息取出。使用队列管理器自动实现消息分片对应用开发人员来讲比较简单,但是需要确保程序在内存使用等方面可以处理完整的大消息。应用程序实现的消息分片应用程序实现消息分片是指,在发送方应用程序中事先把大消息切分成多个片段,每一个片段作为一个独立的消息,写入到队列中;在接收方应用程序中,每一个片段作为一个独立的消息被取出,由程序把所有的消息片段组装成一个完整的消息。这种模式适用的场合为,WebSphere MQ 和应用程序两者都不方便处理这么大的单个消息。一般在发送方程序实现中,我们是把所有的消息片段放在一个同步点中发送,所以需要设置 MQPutMessageOptions 为 MQPMO_SYNCPOINT;同时,我们推荐使用选项 MQPMO_LOGICAL_ORDER,这意味着队列管理器自动维护每个消息片段的偏移量(Offset),否则,需要应用程序自身来设置:MQPutMessageOptions pmo = new MQPutMessageOptions ();pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;对于每一个消息片段,我们还应标识这是一个消息片段(MQMF_SEGMENT):myMsg.messageFlags = MQC.MQMF_SEGMENT;对于最后一个消息片段,也需要设置特殊标识(MQMF_LAST_SEGMENT):myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;同样的,在接收方程序中,我们也是把所有的消息片段放在一个同步点中接收,所以需要设置 MQGetMessageOptions 为 MQGMO_SYNCPOINT;同时,我们也设置 MQGMO_LOGICAL_ORDER 来保证所有的消息片段是按逻辑顺序被取出;另外,我们还需设置所有的消息片段都到达后才处理的选项(MQGMO_ALL_SEGMENTS_AVAILABLE),这是为了防止万一由于异常导致消息片段丢失而引起程序无限等待的情形:MQGetMessageOptions gmo = new MQGetMessageOptions ();gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;由于我们是按逻辑顺序来取消息片段的,所以设置循环取消息的时候,只要遇到某一个消息片段是最后一个的标识,我们就认为已经取到了完整的消息。如果没有设置按照逻辑顺序来取消息片段,则需要应用程序根据消息序列号、偏移量、是否是最后一个消息片段等标识来判断是否已经取到完整的消息。应用程序实现消息分片的部分代码如清单 2,您可以详细的示例代码:清单 2 应用程序实现消息分片AppSegSender.java
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
for(int i=0;i&3;i++)
MQMessage myMsg = new MQMessage ();
MQPutMessageOptions pmo = new MQPutMessageOptions ();
pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
myMsg.messageFlags = MQC.MQMF_SEGMENT;
myMsg.messageFlags = MQC.MQMF_LAST_SEGMENT;
String strMsg = "Hello" +
myMsg.write(strMsg.getBytes());
myQueue.put(myMsg,pmo);
System.out.println("Put message '" + strMsg + "'! ");
myQueue.close();
myQMgr.disconnect();
AppSegReceiver.java
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myM
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options =
MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_SEGMENTS_AVAILABLE;
String strMsg = "";
boolean isLastSegment =
while(!isLastSegment)
myMsg = new MQMessage ();
myQueue.get(myMsg, gmo);
if (myMsg.messageFlags == MQC.MQMF_SEGMENT + MQC.MQMF_LAST_SEGMENT)
isLastSegment =
byte[] b = new byte[myMsg.getMessageLength()];
myMsg.readFully(b);
strMsg += new String(b);
System.out.println("Got message:\n" + strMsg);
myQueue.close();
myQMgr.disconnect();程序功能介绍:
AppSegSender 程序是使用一个 for 循环,构造一个完整消息的三个消息片段,分别写入队列 TESTQ 中。
AppSegReceiver 程序是从队列 TESTQ 中循环读取消息片段,根据其逻辑顺序以及是否是最后一个消息片段来组装完整的消息。相对于队列管理器器自动实现消息分片的方式,应用程序实现消息分片略显复杂,但是它能够处理更大的消息。消息分组从实现手段上来讲,消息分组和消息分片非常类似,但二者有着完全不同的业务意义。在消息分片中,虽然每一个消息片段都作为一个独立的消息进行传输,但只有收集到所有的消息片段组成一个完整的消息之后才有业务意义,单独的一个消息片段是没有任何业务意义的。从这一点上讲,我们是由于技术上处理大消息有困难,才想到把大消息进行切分的。而消息分组则不同,它的每一个成员消息都是一个具有业务意义的独立消息,只是由于某些需要,比如,组内消息有明确的先后顺序,等等,才把这批消息作为一组进行传输。在实际实现中,组内的消息是通过 MQMD 中 GroupId 和 MsgSeqNumber 2个属性来标识,而最后一个消息则会标记这是组内的最后一个消息(MQMF_LAST_MSG_IN_GROUP)。与消息分片类似,一般在发送方程序中,我们是把同一组的所有消息放在一个同步点中发送,所以需要设置 MQPutMessageOptions 为 MQPMO_SYNCPOINT;同时,我们推荐使用选项 MQPMO_LOGICAL_ORDER,这意味着队列管理器自动维护每个消息的序列号(MsgSeqNumber),否则,需要应用程序自身来设置:MQPutMessageOptions pmo = new MQPutMessageOptions ();pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;对于每一个消息,我们还应标识这是一个组内的消息(MQMF_MSG_IN_GROUP):myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;对于组内的最后一个消息,也需要设置特殊标识(MQMF_LAST_MSG_IN_GROUP):myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;同样的,在接收方程序中,我们也是把同一组的所有消息放在一个同步点中接收,所以需要设置 MQGetMessageOptions 为 MQGMO_SYNCPOINT;同时,我们也设置 MQGMO_LOGICAL_ORDER 来保证同一个组里的所有消息是按逻辑顺序被取出;另外,我们还需设置同一组所有的消息都到达后才处理的选项(MQGMO_ALL_MSGS_AVAILABLE),这是为了防止万一由于异常导致某一成员消息丢失而引起程序无限等待的情形:MQGetMessageOptions gmo = new MQGetMessageOptions ();gmo.options = MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;由于我们是按逻辑顺序来取组内成员消息的,所以设置循环取消息的时候,只要遇到某一个消息是组内最后一个的标识,我们就认为已经取到了该组所有的消息。如果没有设置按照逻辑顺序来取消息片段,则需要应用程序根据消息序列号、取到的消息个数、是否是组内最后一个消息等标识来判断是否已经取到该组所有的消息。部分代码如清单 3,您可以详细的示例代码。清单 3 消息分组AppGrpSender.java
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
for(int i=0;i&3;i++)
MQMessage myMsg = new MQMessage ();
MQPutMessageOptions pmo = new MQPutMessageOptions ();
pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;
myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
String strMsg = "Hello" +
myMsg.write(strMsg.getBytes());
myQueue.put(myMsg,pmo);
System.out.println("Put message" + (i+1) + " '" + strMsg + "'! ");
myQueue.close();
myQMgr.disconnect();
AppGrpReceiver.java
int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
myQMgr = new MQQueueManager ("QM1");
myQueue = myQMgr.accessQueue("TESTQ", openOptions);
MQMessage myM
MQGetMessageOptions gmo = new MQGetMessageOptions ();
gmo.options =
MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;
String strMsg = "";
boolean isLastMsg =
int seq = 0;
while(!isLastMsg)
myMsg = new MQMessage ();
myQueue.get(myMsg, gmo);
if (myMsg.messageFlags == MQC.MQMF_MSG_IN_GROUP + MQC.MQMF_LAST_MSG_IN_GROUP)
isLastMsg =
byte[] b = new byte[myMsg.getMessageLength()];
myMsg.readFully(b);
strMsg = new String(b);
System.out.println("Got message" + seq + ":\n" + strMsg);
myQueue.close();
myQMgr.disconnect();程序功能介绍:
AppGrpSender 程序是使用一个 for 循环,构造一个组的三个消息,分别写入队列 TESTQ 中。
AppGrpReceiver 程序是从队列 TESTQ 中循环读取消息,根据其逻辑顺序以及是否是组内最后一个消息来判断是否已取完同一组内的所有消息。相对于消息分片,消息分组不仅仅是处理大消息的一种方法,更为重要的是,消息分组还能维护一组业务数据中的逻辑关系。结束语消息分片和消息分组是在 WebSphere MQ 的编程中处理大消息的常用手段,到底采用哪种方式比较合适,需要根据实际的需求而定。如果大消息需要分割成有实际业务意义的一批小消息,那么采用消息分组比较合适;反之,如果大消息无法分割成有实际业务意义的小消息,那么就采用消息分片。甚至在某些复杂的场合下,消息分片和消息分组可以结合起来使用,比如,某批消息传输时由于有先后顺序的要求,被归并到一个组内,同时由于部分消息比较大,又需要分片传输,有兴趣的读者可以自己来实现一下这个复杂的场景。
下载描述名字大小代码示例6 KB
参考资料 :WebSphere MQ 文档资料库,资源丰富,包括信息中心、快速开始系列、红皮书、白皮书,等等。
:介绍 WebSphere MQ 的相关概念,指导设计和编写 WebSphere MQ 程序。
:介绍如何使用 Java 语言编写 WebSphere MQ 程序。
:介绍 WebSphere MQ 系统管理的相关知识。
:为您提供关于 WebSphere MQ 最新的技术文章等资源。
:帮助您更好更快地学习 WebSphere MQ 资源。
:聆听 IBM 专家娄丽军对消息中间件技术的独特理解。
:本文描述有关设计、构建、运行和维护 WebSphere MQ 解决方案的最常见最佳实践,以便实现 WebSphere MQ 的全部好处。
马上下载。
developerWorks: 登录
标有星(*)号的字段是必填字段。
保持登录。
单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件。
在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。
所有提交的信息确保安全。
选择您的昵称
当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。昵称长度在 3 至 31 个字符之间。
您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。
标有星(*)号的字段是必填字段。
(昵称长度在 3 至 31 个字符之间)
单击提交则表示您同意developerWorks 的条款和条件。 .
所有提交的信息确保安全。
文章、教程、演示,帮助您构建、部署和管理云应用。
立即加入来自 IBM 的专业 IT 社交网络。
免费下载、试用软件产品,构建应用并提升技能。
static.content.url=/developerworks/js/artrating/SITE_ID=10Zone=WebSphere, Java technologyArticleID=372495ArticleTitle=使用 Java API 处理 WebSphere MQ 大消息publish-date=WebSphere MQ For JAVA编程实例----请求/回复---样例 - jay.windows - 博客园
Requester.java源码:
import com.ibm.mq.*;
public class Requester {
public static void main(String args[]) {
String hostName = &127.0.0.1&;
String channel = &CHAN1&;
String qManager = &QM1&;
String requestQueue = &QL1&;
String replyToQueue = &REPLYQ&;
String replyToQueueManager = &QM1&;
// Set up the MQEnvironment properties for Client Connections
MQEnvironment.hostname = hostN
MQEnvironment.channel =
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// Connection To the Queue Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
// Open the queue
MQQueue queue = qMgr.accessQueue(requestQueue, openOptions, null,
null, null);
// Set the put message options , we will use the default setting.
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = pmo.options + MQC.MQPMO_NEW_MSG_ID;
pmo.options = pmo.options + MQC.MQPMO_SYNCPOINT;
MQMessage outMsg = new MQMessage();
// Create the message buffer
outMsg.format = MQC.MQFMT_STRING;
// Set the MQMD format field.
outMsg.messageFlags = MQC.MQMT_REQUEST;
outMsg.replyToQueueName = replyToQ
outMsg.replyToQueueManagerName = replyToQueueM
// Prepare message with user data
String msgString = &Test Request Message from Requester program &;
outMsg.writeString(msgString);
// Now we put The message on the Queue
queue.put(outMsg, pmo);
// Commit the transaction.
System.out
.println(& The message has been Sussesfully putnn#########&);
// Close the the Request Queue
queue.close();
// Set openOption for response queue
openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue respQueue = qMgr.accessQueue(replyToQueue, openOptions,
null, null, null);
MQMessage respMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
// Get messages under syncpoint control
gmo.options = gmo.options + MQC.MQGMO_WAIT;
// Wait for Response Message
gmo.matchOptions = MQC.MQMO_MATCH_CORREL_ID;
gmo.waitInterval = 10000;
respMessage.correlationId = outMsg.messageId;
System.out.println(&The response message correlID : & + respMessage.correlationId);
// Get the response message.
respQueue.get(respMessage, gmo);
String response = respMessage.readString(respMessage
.getMessageLength());
System.out.println(&The response message is : & + response);
respQueue.close();
qMgr.disconnect();
} catch (MQException ex) {
System.out.println(&An MQ Error Occurred: Completion Code is :t&
+ ex.completionCode + &nn The Reason Code is :t&
+ ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
Responder.java源码如下:
import com.ibm.mq.*;
public class Responder {
public static void main(String args[]) {
String hostName = &127.0.0.1&;
String channel = &CHAN1&;
String qManager = &QM1&;
String qName = &QL1&;
// Set up the MQEnvironment properties for Client
// Connections
MQEnvironment.hostname = hostN
MQEnvironment.channel =
MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
MQC.TRANSPORT_MQSERIES);
MQEnvironment.CCSID = 1381;
// Connection To the Queue Manager
MQQueueManager qMgr = new MQQueueManager(qManager);
* Set up the open options to open the queue for out put and
* additionally we have set the option to fail if the queue manager
* is quiescing.
int openOptions = MQC.MQOO_INPUT_SHARED
| MQC.MQOO_FAIL_IF_QUIESCING;
// Open the queue
MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
// Set the put message options.
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;
// Get messages under syncpoint control
gmo.options = gmo.options + MQC.MQGMO_WAIT;
// Wait if no messages on the Queue
gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;
// Fail if QeueManager Quiescing
gmo.waitInterval = 3000;
// Sets the time limit for the wait.
* Next we Build a message The MQMessage class encapsulates the data
* buffer that contains the actual message data, together with all
* the MQMD parameters that describe the message.
* Build a new message, create a new instance of MQMessage class and
* use writxxx (we will be using writeString method). The put()
* method of MQQueue also takes an instance of the
* MQPutMessageOptions class as a parameter.
MQMessage inMsg = new MQMessage();
// Create the message buffer Get the message from the queue on to the message buffer.
queue.get(inMsg, gmo);
// Read the User data from the message.
String msgString = inMsg.readString(inMsg.getMessageLength());
System.out.println(& The Message from the Queue is : & + msgString);
// Check if message if of type request message and reply to the
// request.
if (inMsg.messageFlags == MQC.MQMT_REQUEST) {
System.out.println(&Preparing To Reply To the Request &);
String replyQueueName = inMsg.replyToQueueN
System.out.println(&The reply queue: & + replyQueueName);
openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
MQQueue respQueue = qMgr.accessQueue(replyQueueName,
openOptions, inMsg.replyToQueueManagerName, null, null);
MQMessage respMessage = new MQMessage();
respMessage.correlationId = inMsg.messageId;
System.out.println(&The response CorrelID & + respMessage.correlationId);
MQPutMessageOptions pmo = new MQPutMessageOptions();
respMessage.format = MQC.MQFMT_STRING;
respMessage.messageFlags = MQC.MQMT_REPLY;
String response = &Reply from the Responder Program &;
respMessage.writeString(response);
respQueue.put(respMessage, pmo);
System.out.println(&The response Successfully send &);
respQueue.close();
queue.close();
qMgr.disconnect();
} catch (MQException ex) {
System.out.println(&An MQ Error Occurred: Completion Code is :t&
+ ex.completionCode + &nn The Reason Code is :t&
+ ex.reasonCode);
ex.printStackTrace();
} catch (Exception e) {
e.printStackTrace();查看: 35221|回复: 21
求救--关于WebSphere MQ异常:MQ 队列管理器立即关闭通道,关闭原因2009
论坛徽章:0
请教各位大虾:
客户端连接到队列管理器所在的服务器进行消息接收时,如果长时间该队列中一直没有消息可以取,但是客户接收程序还继续运行,过一段时间MQ就会报如下异常:
com.ibm.mqservices.MQInternalException: MQJE001:发生 MQException:完成码 2,原因码 2009
MQJE016:连接期间,MQ 队列管理器立即关闭通道
关闭原因 = 2009
导致队列管理器的消息通道关闭,发送方无法再放置消息,除非将导致该通道关闭异常的客户端接收接收程序关闭,这样队列管理器就恢复正常了,但是我还不想让接收程序关闭,不知道怎样避免这种情况的发生,有什么解决方案?导致这种情况的原因是什么?
我的队列管理器是建在UNIX环境下,只定义了一个消息通道类型为SVRCONN:
DEFINE CHANNEL(S_QM_APSIS_TEST) CHLTYPE(SVRCONN)&&MAXMSGL() REPLACE
采用的通讯方式为将消息放置到Server端的本地队列中,客户端从到该Server端的本地队列中取得消息;当客户端需要返回消息给服务器端时,客户端发往服务器端的队列,不知道我的这个通道就是属于MQI通道类型?我看网上好多资料都是建立多个通道:发送方,接收方和服务器端通道,不知道这种通道什么情况下使用,对通道的类型及每种类型什么情况下使用不是很清楚,还望大家多多帮忙指教,谢谢!!:)
论坛徽章:0
大家不要光看不发表意见啊!知道的仁兄帮帮忙啊!!好急的,不要见死不救啊!!
为了方便找出导致通道关闭的原因,现将接收消息的代码贴出来,如下:
import org.dom4j.D
import com.ibm.mq.*;
public class RSVMonitorIndexReturnAction extends Thread {
& &* @function& &以多线程的方式接收信息&br&
& &* @author& &&&hidden_wing&br&
&&public void run() {
& & Document doc =
& & & & & & String qName = QL;
& && &//设置MQ的环境变量
& && &MQEnvironment.hostname = 10.6.12.33;
& && &MQEnvironment.channel = S_QM;
& && &MQEnvironment.CCSID = 819;
& && &MQEnvironment.port = 1414;
& && &MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
& && && && && && && && && && && &&&MQC.TRANSPORT_MQSERIES_CLIENT);
& && &// 连接到队列管理器
& && &MQQueueManager qMgr = new MQQueueManager(qManager);
& && &// 设置打开属性
& && &int openOptions = MQC.MQOO_INPUT_SHARED
& && && && && && && && &| MQC.MQOO_FAIL_IF_QUIESCING;
& && &// 打开队列
& && &MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
& && && && && && && && && && && && && &null);
& && &// 从消息读取用户数据
& && &MQMessage msg =
& && &//获取队列中的所有消息
& && &while ((msg = fetchMsg(queue)) != null) {
& && &&&doc = (Document) msg.readObject();
& && &&&......
& && &// 提交事务
& && &mit();
& && &// 关闭队列和队列管理器对象
& && &queue.close();
& && &qMgr.disconnect();
& & } catch (MQException ex) {
& && &(&接收MQ出现错误:& + ex.getMessage());
& & } catch (Exception e) {
& && &if(e.getMessage()!=null){
& && &&&(&出现错误:& + e.getMessage());
& &* @function& &获取队列中的消息&br&
& &* @author& &&&hidden_wing&br&
& &* @param& && &MQQueue& &&&q& &&&--定义一个MQQueue类型的对象来获取消息缓冲区中的消息
&&private MQMessage fetchMsg(MQQueue q) throws Exception {
& & MQGetMessageOptions mgo = new MQGetMessageOptions();
& & mgo.options |= MQC.MQGMO_NO_WAIT;
& & MQMessage msg = new MQMessage();
& && &//获取消息
& && &q.get(msg, mgo);
& & } catch (MQException e) {
说明:RSVMonitorIndexReturnAction 类是通过一个多线程的方式每隔6秒钟调用一次,在RSVMonitorIndexReturnAction 类中我通过一个while循环,来将队列中现有的消息一次取出,不知道这种实现方式在逻辑上是否有问题,是否存在一些没有释放的资源?
另外问个问题,q.get(msg, mgo)方法是调用一次就取一条消息吗?例如我的程序中消息是一个document对象,那么是不是每调用一次get方法就取出一个document对象,而不是多个?
论坛徽章:0
下边是MQ服务器端记录的错误日志
AMQ9513: Maximum number of channels reached.
EXPLANATION:
The maximum number of channels that can be in use simultaneously has been
reached. The number of permitted channels is a configurable parameter in the
queue manager configuration file.
Wait for some of the operating channels to close. Retry the operation when some
channels are available.
大家帮帮忙看看啊,这么详尽的资料提供出来,如果还没有人回复的话,真是欲哭无泪了
论坛徽章:16
Re: 求救--关于WebSphere MQ异常:MQ 队列管理器立即关闭通道,关闭原因2009
最初由 hidden_wing 发布
[B]请教各位大虾:
客户端连接到队列管理器所在的服务器进行消息接收时,如果长时间该队列中一直没有消息可以取,但是客户接收程序还继续运行,过一段时间MQ就会报如下异常:
com.ibm.mqservices.MQInternalException: MQJE001:发生 MQException:完成码 2,原因码 2009
MQJE016:连接期间,MQ 队列管理器立即关闭通道
关闭原因 = 2009
导致队列管理器的消息通道关闭,发送方无法再放置消息,除非将导致该通道关闭异常的客户端接收接收程序关闭,这样队列管理器就恢复正常了,但是我还不想让接收程序关闭,不知道怎样避免这种情况的发生,有什么解决方案?导致这种情况的原因是什么?
我的队列管理器是建在UNIX环境下,只定义了一个消息通道类型为SVRCONN:
DEFINE CHANNEL(S_QM_APSIS_TEST) CHLTYPE(SVRCONN)&&MAXMSGL() REPLACE
......不知道我的这个通道就是属于MQI通道类型?我看网上好多资料都是建立多个通道:发送方,接收方和服务器端通道,不知道这种通道什么情况下使用,对通道的类型及每种类型什么情况下使用不是很清楚,还望大家多多帮忙指教,谢谢!!:) [/B]
问题比较多, 先回答后面提到的:
1. 你定义的通道类型为 SVRCONN 服务器连接通道,在WMQ 中称为MQI通道,是供应用程序以客户端的方式连接服务器使用的;
另外在 WMQ 中称为消息通道的是指服务器到服务器之间的连接(准确地说,是队列管理器和队列管理器之间),这时候,会有你提到的发送方,接收方和服务器端通道等类型。
详细的资料,最好参考下WMQ 的信息中心。
2, 从你上面的错误描述来看, 2009 的错误代码为CONNECTION_BROKEN (可以用命令 #mqrc 2009),这表示连接中断,但是连接中断的原因很多,通常为网络通信的原因,如果是这种情况,必须在程序中检查到这样的返回值后,程序重新建立连接;如果程序不重新建立连接,就像你提到的一样,需要中断掉程序。一般,你可以查看 MQ 的 error log 来获得更多的信息,其中可能包含通信错误的原因( /var/mqm/errors and&&/var/mqm/YourQueueManager/errors 目录中), 你可能会看到如下表所列的 tcp/ip 的一些错误。
表 1. Unix TCP/IP errno。 Errno&&Errno 号码&&描述&&
AIX&&HP-UX&&Solaris&&Linux&&
EINTR&&4&&4&&4&&4&&系统调用中断。&&
EAGAIN&&11&&11&&11&&11&&资源临时不可用。&&
EBUSY&&16&&16&&16&&16&&资源正忙。&&
EMFILE&&24&&24&&24&&24&&每进程文件描述符表已满。&&
EPIPE&&32&&32&&32&&32&&管道断开。&&
EADDRINUSE&&67&&226&&125&&98&&已经在使用指定的地址。&&
ENETDOWN&&69&&228&&127&&100&&网络已停止。&&
ENETUNREACH&&70&&229&&128&&101&&没有任何至网络的路由可用。&&
ENETRESET&&71&&230&&129&&102&&网络复位时删除了连接。&&
ECONNRESET&&73&&232&&131&&104&&伙伴已复位连接。&&
ENOBUFS&&74&&233&&132&&105&&系统中没有足够的缓冲区空间资源可用来完成调用。&&
ENOTCONN&&76&&235&&134&&107&&未连接套接字。&&
ETIMEDOUT&&78&&238&&145&&110&&连接超时。&&
ECONNREFUSED&&79&&239&&146&&111&&连接已被拒绝。如果您正在尝试与数据库相连,则检查是否已成功启动了服务器上的数据库管理器和 TCP/IP 协议支持。
如果使用 SOCKS 协议支持,则还要确保在 SOCKS 服务器上已成功启动了 TCP/IP 协议支持。
EHOSTDOWN&&80&&241&&147&&112&&主机已当机。&&
EHOSTUNREACH&&81&&242&&148&&113&&没有任何至主机的可用路由。&&
有关 Unix TCP/IP 通信错误的更多信息,请参阅适当操作系统的技术参考手册
论坛徽章:16
最初由 hidden_wing 发布
[B]大家不要光看不发表意见啊!知道的仁兄帮帮忙啊!!好急的,不要见死不救啊!!
为了方便找出导致通道关闭的原因,现将接收消息的代码贴出来,如下:
import org.dom4j.D
import com.ibm.mq.*;
public class RSVMonitorIndexReturnAction extends Thread {
& &* @function& &以多线程的方式接收信息&br&
& &* @author& &&&hidden_wing&br&
&&public void run() {
& & Document doc =
& & & & & & String qName = QL;
& && &//设置MQ的环境变量
& && &MQEnvironment.hostname = 10.6.12.33;
& && &MQEnvironment.channel = S_QM;
& && &MQEnvironment.CCSID = 819;
& && &MQEnvironment.port = 1414;
& && &MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,
& && && && && && && && && && && &&&MQC.TRANSPORT_MQSERIES_CLIENT);
& && &// 连接到队列管理器
& && &MQQueueManager qMgr = new MQQueueManager(qManager);
& && &// 设置打开属性
& && &int openOptions = MQC.MQOO_INPUT_SHARED
& && && && && && && && &| MQC.MQOO_FAIL_IF_QUIESCING;
& && &// 打开队列
& && &MQQueue queue = qMgr.accessQueue(qName, openOptions, null, null,
& && && && && && && && && && && && && &null);
& && &// 从消息读取用户数据
& && &MQMessage msg =
& && &//获取队列中的所有消息
& && &while ((msg = fetchMsg(queue)) != null) {
& && &&&doc = (Document) msg.readObject();
& && &&&......
& && &// 提交事务
& && &mit();
& && &// 关闭队列和队列管理器对象
& && &queue.close();
& && &qMgr.disconnect();
& & } catch (MQException ex) {
& && &(&接收MQ出现错误:& + ex.getMessage());
& & } catch (Exception e) {
& && &if(e.getMessage()!=null){
& && &&&(&出现错误:& + e.getMessage());
& &* @function& &获取队列中的消息&br&
& &* @author& &&&hidden_wing&br&
& &* @param& && &MQQueue& &&&q& &&&--定义一个MQQueue类型的对象来获取消息缓冲区中的消息
&&private MQMessage fetchMsg(MQQueue q) throws Exception {
& & MQGetMessageOptions mgo = new MQGetMessageOptions();
& & mgo.options |= MQC.MQGMO_NO_WAIT;
& & MQMessage msg = new MQMessage();
& && &//获取消息
& && &q.get(msg, mgo);
& & } catch (MQException e) {
说明:RSVMonitorIndexReturnAction 类是通过一个多线程的方式每隔6秒钟调用一次,在RSVMonitorIndexReturnAction 类中我通过一个while循环,来将队列中现有的消息一次取出,不知道这种实现方式在逻辑上是否有问题,是否存在一些没有释放的资源?
另外问个问题,q.get(msg, mgo)方法是调用一次就取一条消息吗?例如我的程序中消息是一个document对象,那么是不是每调用一次get方法就取出一个document对象,而不是多个? [/B]
一口气这么多问题,一个个来:
1. 程序中的不妥地方:
没有看到MQGetMessageOptions 中设定 MQC.MQGMO_SYNCPOINT ,你的&&
& && &// 提交事务
& && &mit();
语句应当不能达到你的效果吧? 你本意是想把所有的消息放到一个事务中吗? 需要你检查程序!
2. 你是用的 mq java base class 接口, 其实你的多线程以及while 循环达到的效果, 是否可以考虑使用 mq 的 get 消息时,加上时间等待的方式(通常称为 block read ) 的方式? 这种设计是否可以适应你的需求呢? 因为这样在 block 的时候不需要耗用 cpu, 不像你这样子轮循对 cpu 耗用很大; 另外, 也不会发生,刚好消息到的时候,你却没有读走,只能靠下次才读走的问题。
3. 一次 get 操作,当然只能读取一条消息了, 这是最基本的单元操作。
论坛徽章:16
最初由 hidden_wing 发布
[B]下边是MQ服务器端记录的错误日志
AMQ9513: Maximum number of channels reached.
EXPLANATION:
The maximum number of channels that can be in use simultaneously has been
reached. The number of permitted channels is a configurable parameter in the
queue manager configuration file.
Wait for some of the operating channels to close. Retry the operation when some
channels are available.
大家帮帮忙看看啊,这么详尽的资料提供出来,如果还没有人回复的话,真是欲哭无泪了 [/B]
首先, 不哭,哈,MQ 多么简单好玩的东西;
不知道你看到的这个信息是不是服务器端唯一的错误? 也不知道是不是你测试过,明确对应的你客户段程序发生问题时候,看到的错误,&&我猜测不是,需要你继续查看其它更多的信息。
关于这个信息出现后,解决的办法为:
调整服务器通道的最大连接数等参数,unix 上是通过修改队列管理器的 qm.ini 文件,详细的信息和含义参考如下连接:
(System Admin/Configuring WebSphere MQ/Configuring WebSphere MQ /Changing queue manager configuration information /Channels )
对应中文内容为:
CHANNELS 节(在文件 qm.ini 中)来指定关于通道的信息。
MaxChannels=100|number
允许的通道最大数。缺省值是 100。
MaxActiveChannels=MaxChannels_value
允许在任何时候都活动的通道的最大数。缺省值是在 MaxChannels 属性上指定的值。
阅读后基本步骤:
1. stop QueueManager
2. tune or add the channel paramaters within channel section of qm.ini
&&(& &MaxChannels和 MaxActiveChannels具体设置多大,需要参考你总共到服务器的并发连接实例,比如,统一时间,最大并发建立的队列管理器连接数为 500)
3. start QueueManager,
qm.ini 样例:
(System Admin/Configuring WebSphere MQ/Configuring WebSphere MQ /Changing configuration information on UNIX systems/ Queue manager configuration files, qm.ini )
相关片断如下:
& &MaxChannels=500
& &MaxActiveChannels=500
& &MQIBindType=STANDARD
论坛徽章:16
最后的要求
最后的要求 to hidden_wing:
看在我一口气回复的面子上,你去
这里,下载一下 MQ v6.0 的信息中心(infoCenter), 在其中,能找到所有的你上面提到的关心的内容, 工具书总的有呀, 不然,哭得时候,只能自己面向墙角, 呵呵。
不下载,可对不起我,呵呵
论坛徽章:0
谢谢 websphereli 的热心与耐心的讲解,奉献精神实属伟大,你提供的资料对我很有启发,现在是感动的要哭了 一看您就是个MQ高手,如果方便的话,能给个联系方式就更好了
还有问题需要向大家请教:
1、出现通道关闭2009这个异常时,就有一个客户端连接到队列管理器进行消息接收,有消息接收时还正常,长时间没有消息接收时就报这个错误,所以错误日志里记录了通道达到了最大数这个错误,但是就是一个客户端连接啊,怎么会默认的100通道数都不够,所以我怀疑是接收程序有问题,是不是用过的连接没有关闭或者释放?所以发上来让大家帮忙找找错。
2、既然我的通道是MQI类型的通道,它是一个真是存在的通道吗?我看材料说MQI不是一个通道,类似一个监听,当有客户端程序连接到MQ队列管理器进行消息通讯时才建立一个真实的通道,这种说法对吗?通道到底什么时候建立,是当程序new MQQueueManager()时就建立一个吗?还是说一个队列对应一个通道?建立后什么时候该通道的链接消失?
3、我怎么通过利用MQ本身的机制可以做到,队列中有消息的时候就触发接收程序进行消息接收?而不是通过定时轮循的方式?如果有例子给个例子最好了
问题有些多,因为本人对MQ的机制不是很了解,属于刚上路阶段,希望大家多多指教,我也要加油努力
论坛徽章:16
最初由 hidden_wing 发布
[B]谢谢 websphereli 的热心与耐心的讲解,奉献精神实属伟大,你提供的资料对我很有启发,现在是感动的要哭了 一看您就是个MQ高手,如果方便的话,能给个联系方式就更好了
还有问题需要向大家请教:
1、出现通道关闭2009这个异常时,就有一个客户端连接到队列管理器进行消息接收,有消息接收时还正常,长时间没有消息接收时就报这个错误,所以错误日志里记录了通道达到了最大数这个错误,但是就是一个客户端连接啊,怎么会默认的100通道数都不够,所以我怀疑是接收程序有问题,是不是用过的连接没有关闭或者释放?所以发上来让大家帮忙找找错。
2、既然我的通道是MQI类型的通道,它是一个真是存在的通道吗?我看材料说MQI不是一个通道,类似一个监听,当有客户端程序连接到MQ队列管理器进行消息通讯时才建立一个真实的通道,这种说法对吗?通道到底什么时候建立,是当程序new MQQueueManager()时就建立一个吗?还是说一个队列对应一个通道?建立后什么时候该通道的链接消失?
3、我怎么通过利用MQ本身的机制可以做到,队列中有消息的时候就触发接收程序进行消息接收?而不是通过定时轮循的方式?如果有例子给个例子最好了
问题有些多,因为本人对MQ的机制不是很了解,属于刚上路阶段,希望大家多多指教,我也要加油努力 [/B]
#######################
问题1 的回复:
首先,在你的程序里没看到,主动重建连接的动作, 建议你在 qm.ini&&文件中,channel 段里加上下面的设定,看看结果如何,需要做的动作和前面调整最大数一样。
AdoptNewMCA=ALL
MaxChannels=500
MaxActiveChannels=500
AdoptNewMCA=ALL
MQIBindType=STANDARD
记得要重新启动队列管理器
#######################
问题2 的回复:
这个问题好,在 runmqsc 命令里, 你用 dis chs(*) 可以看到你的MQI 通道在有应用程序连接后实际启动的实例数,也就是当前活动的通道个数;&&可以看出,在产品内部,MQI 就是一种通道(client to server) ,和消息通道(server to server) 一样的。
通道建立的时间,你可以通过自己测试启动你的程序,同时 用 dis chs(*) or dis chs(Your Channel Name) 来监控,就可以知道,是在建立到队列管理器的连接时,建立的。
至于,通过通道连接时,不管 消息通道,MQI 通道,读需要有 监听器 listener 的定义并启动的配合,不然你让别人怎么通过 TCP/IP找到你呀。
队列和通道时两个独立的概念,通道只是在涉及到分布式网络存取
的时候,才需要; 但是队列是你使用 mq 的基础,一定需要;
#######################
问题3 的回复:
通过使用 mq 的触发机制,就可以,
在队列上定义触发(first, deepth, everyone) ,然后触发你的程序就可以,程序几乎可以不用调整,当然,你要确定,程序被触发启动后,是作为 deamon 的方式一直运行,还是读取消息后,就退出结束,等待下一次触发后再被调用启动。
对应资料:
(System Admin/Administration using WebSphere MQ commands /Administering local WebSphere MQ objects /Managing objects for triggering ) 你也可以在下载后的信息中心里找到对应的中文资料
论坛徽章:16
一个简单的trigger 例子:
1. 定义队列上的trigger:
DEFINE QLOCAL (Q1) +
& && & PROCESS (P1) +
& && & INITQ (INITQ.Q1) +
& && & TRIGGER +
& && & TRIGTYPE (FIRST)
ALTER QLOCAL (Q1) +
& && & PROCESS (P1) +
& && & INITQ (INITQ.Q1) +
& && & TRIGGER +
& && & TRIGTYPE (FIRST)
2. 定义Q1 trigger 中要用到的 INIT Queue
DEFINE QLOCAL (INITQ.Q1)
3. 定义Q1 trigger 中要用到的 Process
DEFINE PROCESS (P1) +
& && && && && &APPLTYPE (UNIX) +
& && && && && &APPLICID ('/home/mytest/test1')
4. 启动 trigger monitor deamon 监控服务
unix:&&# runmqtrm -m YourQueueManager -q INITQ.Q1 &
win:&&c:\& runmqtrm -m YourQueueManager -q INITQ.Q1
注意,这个deamon 进程 不可以停掉。
5, 你的 /home/mytest/test1 程序,就是你正常写的 mq 的读取程序,没有特别的变化,只是前面提到的如何驻留逻辑需要考虑清楚,这个和 mq 没有一点关系了。
6, 可以放入测试消息到 Q1 中,测试了。
7, 开心笑吧, 呵呵
下面的 url, 你可以找到一些别人以前遇到的问题的解决办法。可以参考
itpub.net All Right Reserved. 北京盛拓优讯信息技术有限公司版权所有    
 北京市公安局海淀分局网监中心备案编号:10 广播电视节目制作经营许可证:编号(京)字第1149号}

我要回帖

更多关于 ibm websphere mq 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信