AMQP操作文档
一、发送消息:(示例:GoodsIndexSendManager.java)
- 声明注入
@Autowired private AmqpTemplate amqpTemplate;
发送消息
/** 发送索引生成消息 */ @Override public boolean startCreate() { if (progressManager.getProgress(INDEXID) != null) { return false; } /** 发送索引生成消息 */ this.amqpTemplate.convertAndSend(AmqpExchange.INDEX_CREATE.name(), "GoodsIndexMsg",""); return true; }
发送消息,第一个参数为枚举,String类型,代表接收的交换器名字。
第二个参数为字符串,指定接收路由,String类型,自行指定一个有意义的字串
第三个参数为发送的对象,object类型,接收器处理所需要的参数。无参需要写空字符串
二、接收消息(示例:GoodsIndexCreateListener.java)
@Configuration //注入spring容器,加载amqp配置
public class GoodsIndexCreateListener {
//定义一个自己的队列的名字
String queue = "goodsIndex-create-queue";
/**
* 消息监听
*
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
//生成消息监听器容器 代码不需要处理,说明一下这里方法的名,代表bean的beanid,名字要有唯一性,
//不能和其它业务类名字相同,命名规则为:listenerContainer+业务
public SimpleMessageListenerContainer listenerContainerGoodsIndex (ConnectionFactory connectionFactory,
ChannelAwareMessageListener listenerAdapterGoodsIndex) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queue);
container.setMessageListener(listenerAdapterGoodsIndex);
return container;
}
/**
* 消息监听代理
*
* @param receiver
* @return
*/
@Bean
//消息代理,这里方法的名,代表bean的beanid,名字要有唯一性,不能和其它业务类名字相同 命名规则为:listenerAdapter+业务,要和消息监听第二个参数名称相对应.
MessageListenerAdapter listenerAdapterGoodsIndex(GoodsIndexCreateConsumer receiver) {
return new MessageListenerAdapter(receiver, "createGoodsIndex");//createGoodsIndex方法名要和以下处理消息中Consumer类种的方法名称对应
}
@Bean //绑定 交换器和队列 命名为:binding+业务
Binding bindingGoodsIndexCreate(Queue queueGoodsIndex, FanoutExchange exchangeGoodsIndex) {
return BindingBuilder.bind(queueGoodsIndex).to(exchangeGoodsIndex);
}
@Bean //队列 命名:quene+业务,要和以上绑定交换器和队列中第一个参数名称对应
Queue queueGoodsIndex() {
return new Queue(queue, false);
}
@Bean //交换器 命名为exchange+业务,要和以上绑定交换器和队列中第二个参数名称对应
FanoutExchange exchangeGoodsIndex() {
return new FanoutExchange(AmqpExchange.INDEX_CREATE.name());
//需要修改这里,参数推荐为枚举,对应上方消息发送的第一个参数
}
}
三、处理消息(事例:GoodsIndexCreateConsumer.java)
普通java类,注入到spring容器,执行消息接收处声明的调用方法,设置参数为消息发送出传递的第三个object类型的参数,进行业务处理即可
/**
* 索引生成消费者
* @author zh
* @version v1.0
* @since v6.4.0
* 2017年4月12日 下午4:33:14
*/
@Component
public class GoodsIndexCreateConsumer {
@Autowired
private IDaoSupport daoSupport;
@Autowired
private IProgressManager progressManager;
@Autowired
private IGoodsIndexManager goodsIndexManager;
protected final Logger logger = Logger.getLogger(getClass());
/**
* 订阅生成商品索引消息
*/
public void createGoodsIndex(){
//createGoodsIndex 方法名称要和消息监听中消息代理中方法名对应
//此处为处理具体的业务,省略具体内容
}
}
四、消息路由名称的定义
路径 com.enation.app.base.AmqpExchange
内容
PC_INDEX_CHANGE("PC首页变化消息"),
MOBILE_INDEX_CHANGE("移动端首页变化消息"),
GOODS_CHANGE("商品变化消息"),
HELP_CHANGE("帮助变化消息"),
PAGE_CREATE("页面生成消息"),
INDEX_CREATE("索引生成消息"),
ORDER_CREATE("订单创建消息"), //没有入库
ORDER_STATUS_CHANGE("订单状态变化消息"), //带入库的
MEMEBER_LOGIN("会员登录消息"),
MEMEBER_REGISTER("会员注册消息"),
SHOP_CHANGE_REGISTER("店铺变更消息"),
GOODS_CATEGORY_CHANGE("分类变更消息"),
REFUND_PASS("退款审核通过"),
MEMBER_MESSAGE("发送站内信息"),
SMS_SEND_MESSAGE("发送手机短信消息"),
EMAIL_SEND_MESSAGE("邮件发送消息"),