前言
默认看到这篇文章的人都已经知道RabbitMQ是什么了,文章下列内容不解释具体用法,如果在这之前没有创建交换机和消息队列以及绑定,请看这篇文章–使用AmqpAdmin建立交换机,消息队列和绑定
自定义以Json的方式序列化
众所周知,Rabbit原始的序列化方法是把数据转化为字节数组。
有源码为证:
public final class SerializationUtils { private SerializationUtils() { } public static byte[] serialize(Object object) { if (object == null) { return null; } else { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { (new ObjectOutputStream(stream)).writeObject(object); } catch (IOException var3) { throw new IllegalArgumentException(“Could not serialize object of type: ” + object.getClass(), var3); } return stream.toByteArray(); } } /** * Creates a newly allocated byte array. Its size is the current * size of this output stream and the valid contents of the buffer * have been copied into it. * * @return the current contents of this output stream, as a byte array. * @see java.io.ByteArrayOutputStream#size() */ public synchronized byte toByteArray()[] { return Arrays.copyOf(buf, count); }
再看自动配置类源码:
@Bean@ConditionalOnMissingBeanpublic RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties,ObjectProvider<MessageConverter> messageConverter,ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer();configurer.setMessageConverter(messageConverter.getIfUnique());configurer.setRetryTemplateCustomizers(retryTemplateCustomizers.orderedStream().collect(Collectors.toList()));configurer.setRabbitProperties(properties);return configurer;}
所以我们只要自定义一个返回json数据类型的MessageConverter就可以了。
代码:
@Configurationpublic class MyMsgConverter { @Bean public MessageConverter messageConverter(){ return new 昏睡的摩托2JsonMessageConverter(); }} 发送消息
下面的demo用了三种交换机方式发送消息,RabbitMQ本身的配置我已经在自己的服务器上配好了,大家自己实验的时候也需要自己先配好交换机和绑定。
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMsgDirect() {Map<String,Object> map = new HashMap<>();map.put(“mapMsg01″,”mapMsg–string”);map.put(“mapMsg02”, Arrays.asList(“a”,111));//direct模式rabbitTemplate.convertAndSend(“exchange.direct”,”tree”,map);}@Testpublic void sendMsgFanout(){//fanout模式rabbitTemplate.convertAndSend(“exchange.fanout”,””,new Book(“红楼梦”,”hhddx”));}@Testpublic void sendMsgTopic(){//topic模式rabbitTemplate.convertAndSend(“exchange.topic”,”*.news”,”hello rabbit by topic”);} 接收消息 方式一:手动接收
下面的demo作用为手动接收消息队列名为tree的消息,这里的消息队列名根据自己的配置来写。起码不能
//接收消息@Testpublic void receive(){Object tree = rabbitTemplate.receiveAndConvert(“tree”);System.out.println(“类型:”+tree.getClass());System.out.println(“消息:”+tree);} 方式二:设置监听器
下面的两个方法都使用到了@RabbitListener注解,其作用是监听queues参数中的消息队列,一有消息就将其接受,queues参数为数组类型参数,可设置多个消息队列。使用这种方法需要在启动类上加上@EnableRabbit注解,表示开启rabbitmq注解模式。
@Servicepublic class BookService { @RabbitListener(queues = “tree.news”) public void receiveListener01(Book book){ System.out.println(“收到消息”+book); } @RabbitListener(queues = “tree.emps”) public void receiveListener02(Message message){ System.out.println(“message.body:”+message.getBody()); System.out.println(“message.getMessageProperties():”+message.getMessageProperties()); }}