AMQP操作文档

一、发送消息:(示例:GoodsIndexSendManager.java)

  1. 声明注入
     @Autowired 
      private AmqpTemplate amqpTemplate;
    
  2. 发送消息

    /**
      发送索引生成消息   
    */
    @Override
     public boolean startCreate() {
         if (progressManager.getProgress(INDEXID) != null) {
             return false;
         } 
         /** 发送索引生成消息 */
         this.amqpTemplate.convertAndSend(AmqpExchange.INDEX_CREATE.name(), "GoodsIndexMsg","");
         return true;
     }
    

    发送消息,第一个参数为枚举,String类型,代表接收的交换器名字。
    第二个参数为字符串,指定接收路由,String类型,自行指定一个有意义的字串
    第三个参数为发送的对象,object类型,接收器处理所需要的参数。无参需要写空字符串

二、接收消息(示例:GoodsIndexCreateListener.java)

@Configuration                        //注入spring容器,加载amqp配置
public class GoodsIndexCreateListener {
      //定义一个自己的队列的名字
    String queue = "goodsIndex-create-queue";
    /**
     * 消息监听
     * 
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean                            
    //生成消息监听器容器 代码不需要处理,说明一下这里方法的名,代表bean的beanid,名字要有唯一性,
    //不能和其它业务类名字相同,命名规则为:listenerContainer+业务
    public SimpleMessageListenerContainer listenerContainerGoodsIndex (ConnectionFactory connectionFactory,
            ChannelAwareMessageListener listenerAdapterGoodsIndex) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queue); 
        container.setMessageListener(listenerAdapterGoodsIndex);
        return container;
    }

    /**
     * 消息监听代理
     * 
     * @param receiver
     * @return
     */
    @Bean                    
    //消息代理,这里方法的名,代表bean的beanid,名字要有唯一性,不能和其它业务类名字相同 命名规则为:listenerAdapter+业务,要和消息监听第二个参数名称相对应.
    MessageListenerAdapter listenerAdapterGoodsIndex(GoodsIndexCreateConsumer receiver) {
        return new MessageListenerAdapter(receiver, "createGoodsIndex");//createGoodsIndex方法名要和以下处理消息中Consumer类种的方法名称对应 

    }

    @Bean    //绑定 交换器和队列 命名为:binding+业务
    Binding bindingGoodsIndexCreate(Queue queueGoodsIndex, FanoutExchange exchangeGoodsIndex) {
        return BindingBuilder.bind(queueGoodsIndex).to(exchangeGoodsIndex);
    }

    @Bean //队列 命名:quene+业务,要和以上绑定交换器和队列中第一个参数名称对应
    Queue queueGoodsIndex() {
        return new Queue(queue, false);
    }

    @Bean    //交换器 命名为exchange+业务,要和以上绑定交换器和队列中第二个参数名称对应
    FanoutExchange exchangeGoodsIndex() {
        return new FanoutExchange(AmqpExchange.INDEX_CREATE.name()); 
        //需要修改这里,参数推荐为枚举,对应上方消息发送的第一个参数
    }
}

三、处理消息(事例:GoodsIndexCreateConsumer.java)

普通java类,注入到spring容器,执行消息接收处声明的调用方法,设置参数为消息发送出传递的第三个object类型的参数,进行业务处理即可

/**
 * 索引生成消费者
 * @author zh
 * @version v1.0
 * @since v6.4.0
 * 2017年4月12日 下午4:33:14
 */
@Component
public class GoodsIndexCreateConsumer {

    @Autowired
    private IDaoSupport daoSupport;
    @Autowired
    private IProgressManager progressManager;
    @Autowired
    private IGoodsIndexManager goodsIndexManager;
    protected final Logger logger = Logger.getLogger(getClass());


     /**
     * 订阅生成商品索引消息
     */
      public void createGoodsIndex(){
          //createGoodsIndex 方法名称要和消息监听中消息代理中方法名对应    
      //此处为处理具体的业务,省略具体内容
    }
}

四、消息路由名称的定义

  • 路径 com.enation.app.base.AmqpExchange

  • 内容

    PC_INDEX_CHANGE("PC首页变化消息"),
    MOBILE_INDEX_CHANGE("移动端首页变化消息"),
    GOODS_CHANGE("商品变化消息"),
    HELP_CHANGE("帮助变化消息"),
    PAGE_CREATE("页面生成消息"),
    INDEX_CREATE("索引生成消息"),
    ORDER_CREATE("订单创建消息"), //没有入库
    ORDER_STATUS_CHANGE("订单状态变化消息"), //带入库的
    MEMEBER_LOGIN("会员登录消息"),
    MEMEBER_REGISTER("会员注册消息"),
    SHOP_CHANGE_REGISTER("店铺变更消息"),
    GOODS_CATEGORY_CHANGE("分类变更消息"),
    REFUND_PASS("退款审核通过"),
    MEMBER_MESSAGE("发送站内信息"), 
    SMS_SEND_MESSAGE("发送手机短信消息"),
    EMAIL_SEND_MESSAGE("邮件发送消息"),

results matching ""

    No results matching ""