创建用户:

rabbitmqctl add_user admin EMR123456emr
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

修改密码:

rabbitmqctl change_password admin EMR123456emr

开启stom:

rabbitmq-plugins enable rabbitmq_stomp

与spring集成的时候会有一个问题:

Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'OK'
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:285)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:108)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:373)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:281)
... 12 common frames omitted

对于spring的:

@RabbitListener(queues = Constants.POST_PUSH_QUEUE_THIRD_REDIS)
@RabbitHandler

来说,队列的监听函数,不能返回任何值,否则会导致一个 rabbit reply message 回复异常,该异常是由于此方法返回的消息没有设置目的地,查看源码可以看出:

protected void handleResult(Object resultArg, Message request, Channel channel, Object source) throws Exception {
if (channel != null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Listener method returned result [" + resultArg
+ "] - generating response message for it");
}
try {
Object result = resultArg instanceof ResultHolder ? ((ResultHolder) resultArg).result : resultArg;
Message response = buildMessage(channel, result);
postProcessResponse(request, response);
Address replyTo = getReplyToAddress(request, source, resultArg);
sendResponse(channel, replyTo, response);
}
catch (Exception ex) {
throw new ReplyFailureException("Failed to send reply with payload '" + resultArg + "'", ex);
}
}
else if (this.logger.isWarnEnabled()) {
this.logger.warn("Listener method returned result [" + resultArg
+ "]: not generating response message for it because no Rabbit Channel given");
}
}

其中g etReplyToAddress()方法:

protected Address getReplyToAddress(Message request, Object source, Object result) throws Exception {
Address replyTo = request.getMessageProperties().getReplyToAddress();
if (replyTo == null) {
if (this.responseAddress == null && this.responseExchange != null) {
this.responseAddress = new Address(this.responseExchange, this.responseRoutingKey);
}
if (result instanceof ResultHolder) {
replyTo = evaluateReplyTo(request, source, result, ((ResultHolder) result).sendTo);
}
else if (this.responseExpression != null) {
replyTo = evaluateReplyTo(request, source, result, this.responseExpression);
}
else if (this.responseAddress == null) {
throw new AmqpException(
"Cannot determine ReplyTo message property value: " +
"Request message does not contain reply-to property, " +
"and no default response Exchange was set.");
}
else {
replyTo = this.responseAddress;
}
}
return replyTo;
}

扫码手机观看或分享: