示例应用程序

Spring AMQP 示例项目包含两个示例应用程序。第一个是简单的“Hello World”示例,演示了同步和异步消息接收。它为理解基本组件提供了极好的起点。第二个示例基于股票交易用例,演示了在实际应用程序中常见的交互类型。在本章中,我们将对每个示例进行快速演练,以便您能够专注于最重要的组件。这两个示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何支持 Maven 的 IDE(例如 SpringSource Tool Suite)。

“Hello World”示例

“Hello World”示例演示了同步和异步消息接收。您可以将 spring-rabbit-helloworld 示例导入 IDE,然后按照下面的讨论进行操作。

同步示例

src/main/java 目录中,导航到 org.springframework.amqp.helloworld 包。打开 HelloWorldConfiguration 类,注意它在类级别包含 @Configuration 注解,并且在方法级别包含一些 @Bean 注解。这是 Spring 基于 Java 的配置示例。您可以在此处了解更多信息。

以下清单显示了连接工厂的创建方式

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

配置中还包含一个 RabbitAdmin 实例,它默认查找任何类型为 exchange、queue 或 binding 的 bean,然后将它们声明到代理上。事实上,在 HelloWorldConfiguration 中生成的 helloWorldQueue bean 就是一个示例,因为它是一个 Queue 实例。

以下清单显示了 helloWorldQueue bean 定义

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

回顾 rabbitTemplate bean 配置,您可以看到它将 helloWorldQueue 的名称设置为其 queue 属性(用于接收消息)和 routingKey 属性(用于发送消息)。

现在我们已经探索了配置,接下来可以查看实际使用这些组件的代码。首先,打开同一个包中的 Producer 类。它包含一个 main() 方法,其中创建了 Spring ApplicationContext

以下清单显示了 main 方法

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

在前面的示例中,AmqpTemplate bean 被检索并用于发送 Message。由于客户端代码应尽可能依赖接口,因此类型为 AmqpTemplate 而不是 RabbitTemplate。即使在 HelloWorldConfiguration 中创建的 bean 是 RabbitTemplate 的实例,依赖接口意味着此代码更具可移植性(您可以独立于代码更改配置)。由于调用了 convertAndSend() 方法,模板将其委托给 MessageConverter 实例。在此示例中,它使用默认的 SimpleMessageConverter,但可以为 HelloWorldConfiguration 中定义的 rabbitTemplate bean 提供不同的实现。

现在打开 Consumer 类。它实际上共享相同的配置基类,这意味着它共享 rabbitTemplate bean。这就是为什么我们将该模板配置为同时具有 routingKey(用于发送)和 queue(用于接收)。正如我们在 AmqpTemplate 中描述的那样,您可以将“routingKey”参数传递给发送方法,将“queue”参数传递给接收方法。Consumer 代码基本上是 Producer 的镜像,调用 receiveAndConvert() 而不是 convertAndSend()

以下清单显示了 Consumer 的 main 方法

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

如果您运行 Producer,然后运行 Consumer,您应该会在控制台输出中看到 Received: Hello World

异步示例

同步示例 演练了同步的 Hello World 示例。本节描述了一个稍微高级但功能更强大的选项。只需进行少量修改,Hello World 示例就可以提供异步接收的示例,也称为消息驱动 POJO。事实上,有一个子包正好提供了这一点:org.springframework.amqp.samples.helloworld.async

同样,我们从发送端开始。打开 ProducerConfiguration 类,注意它创建了一个 connectionFactory 和一个 rabbitTemplate bean。这次,由于配置专门用于消息发送端,我们甚至不需要任何队列定义,并且 RabbitTemplate 只设置了“routingKey”属性。回想一下,消息是发送到交换机而不是直接发送到队列的。AMQP 默认交换机是一个没有名称的直连交换机。所有队列都以其名称作为路由键绑定到该默认交换机。这就是为什么我们在这里只需要提供路由键。

以下清单显示了 rabbitTemplate 定义

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

由于此示例演示了异步消息接收,因此生产端设计为持续发送消息(如果它是像同步版本那样的每次执行一条消息的模型,那么它实际上是消息驱动的消费者就不那么明显了)。负责持续发送消息的组件被定义为 ProducerConfiguration 中的一个内部类。它被配置为每三秒运行一次。

以下清单显示了该组件

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

您不需要理解所有细节,因为真正的重点应该放在接收端(我们接下来会介绍)。但是,如果您还不熟悉 Spring 任务调度支持,可以在这里了解更多。简而言之,ProducerConfiguration 中的 postProcessor bean 将任务注册到调度程序。

现在我们可以转向接收端。为了强调消息驱动的 POJO 行为,我们从对消息作出反应的组件开始。该类名为 HelloWorldHandler,在以下清单中显示

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

该类是一个 POJO。它不扩展任何基类,不实现任何接口,甚至不包含任何导入。它通过 Spring AMQP MessageListenerAdapter 被“适配”到 MessageListener 接口。然后,您可以将该适配器配置在 SimpleMessageListenerContainer 上。对于此示例,容器是在 ConsumerConfiguration 类中创建的。您可以在那里看到被适配器包装的 POJO。

以下清单显示了 listenerContainer 的定义方式

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

SimpleMessageListenerContainer 是一个 Spring 生命周期组件,默认情况下会自动启动。如果您查看 Consumer 类,可以看到它的 main() 方法只包含一行引导代码来创建 ApplicationContext。Producer 的 main() 方法也是一行引导代码,因为其方法带有 @Scheduled 注解的组件也会自动启动。您可以按任意顺序启动 ProducerConsumer,您应该会看到每隔三秒发送和接收消息。

股票交易

股票交易示例演示了比 Hello World 示例 更高级的消息传递场景。然而,配置非常相似,只是稍微复杂一些。由于我们已经详细介绍了 Hello World 配置,这里我们重点介绍此示例的不同之处。有一个服务器将市场数据(股票报价)推送到主题交换机。然后,客户端可以通过将队列与路由模式(例如 app.stock.quotes.nasdaq.*)绑定来订阅市场数据源。此演示的另一个主要功能是客户端发起并由服务器处理的请求-回复“股票交易”交互。这涉及客户端在订单请求消息本身中发送的私有 replyTo 队列。

服务器的核心配置位于 org.springframework.amqp.rabbit.stocks.config.server 包中的 RabbitServerConfiguration 类中。它扩展了 AbstractStockAppRabbitConfiguration。这是定义服务器和客户端共用资源的地方,包括市场数据主题交换机(其名称为 'app.stock.marketdata')和服务器公开用于股票交易的队列(其名称为 'app.stock.request')。在该通用配置文件中,您还可以看到 JacksonJsonMessageConverter 配置在 RabbitTemplate 上。

服务器特定配置包括两件事。首先,它在 RabbitTemplate 上配置了市场数据交换机,这样它就不需要每次发送 Message 时都提供该交换机名称。它通过基配置类中定义的抽象回调方法完成此操作。以下清单显示了该方法

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

其次,声明了股票请求队列。在这种情况下,它不需要任何显式绑定,因为它以其自身的名称作为路由键绑定到默认的无名交换机。如前所述,AMQP 规范定义了这种行为。以下清单显示了 stockRequestQueue bean 的定义

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

现在您已经看到了服务器 AMQP 资源的配置,请导航到 src/test/java 目录下的 org.springframework.amqp.rabbit.stocks 包。在那里,您可以看到提供了 main() 方法的实际 Server 类。它基于 server-bootstrap.xml 配置文件创建了一个 ApplicationContext。在那里,您可以看到发布虚拟市场数据的计划任务。该配置依赖于 Spring 的 task 命名空间支持。引导配置文件还导入了其他一些文件。最有趣的是 server-messaging.xml,它直接位于 src/main/resources 下。在那里,您可以看到负责处理股票交易请求的 messageListenerContainer bean。最后,查看 server-handlers.xml 中定义的 serverHandler bean(也位于 'src/main/resources')。该 bean 是 ServerHandler 类的一个实例,是消息驱动 POJO 的一个很好的例子,它也可以发送回复消息。请注意,它本身不与框架或任何 AMQP 概念耦合。它接受一个 TradeRequest 并返回一个 TradeResponse。以下清单显示了 handleMessage 方法的定义

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

现在我们已经了解了服务器最重要的配置和代码,接下来我们转向客户端。最好的起点可能是 org.springframework.amqp.rabbit.stocks.config.client 包中的 RabbitClientConfiguration。注意它声明了两个没有提供显式名称的队列。以下清单显示了两个队列的 bean 定义

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

这些是私有队列,会自动生成唯一的名称。第一个生成的队列由客户端用于绑定到服务器公开的市场数据交换。回想一下,在 AMQP 中,消费者与队列交互,而生产者与交换机交互。将队列“绑定”到交换机就是告诉代理将消息从给定交换机传递(或路由)到队列。由于市场数据交换机是主题交换机,因此绑定可以用路由模式表示。RabbitClientConfiguration 使用 Binding 对象执行此操作,并且该对象是使用 BindingBuilder 流式 API 生成的。以下清单显示了 Binding

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

请注意,实际值已外部化到属性文件(src/main/resources 下的 client.properties)中,并且我们使用 Spring 的 @Value 注解来注入该值。这通常是一个好主意。否则,该值将被硬编码到类中,并且在不重新编译的情况下无法修改。在这种情况下,在更改用于绑定的路由模式时,运行多个客户端版本要容易得多。我们现在可以尝试一下。

首先运行 org.springframework.amqp.rabbit.stocks.Server,然后运行 org.springframework.amqp.rabbit.stocks.Client。您应该会看到 NASDAQ 股票的虚拟报价,因为 client.properties 中 'stocks.quote.pattern' 键的当前值为 'app.stock.quotes.nasdaq.'。现在,在保持现有 ServerClient 运行的情况下,将该属性值更改为 'app.stock.quotes.nyse.' 并启动第二个 Client 实例。您应该会看到第一个客户端仍然接收 NASDAQ 报价,而第二个客户端接收 NYSE 报价。您可以更改模式以获取所有股票,甚至单个股票代码。

我们探索的最后一个功能是从客户端角度的请求-回复交互。回想一下,我们已经看到了接受 TradeRequest 对象并返回 TradeResponse 对象的 ServerHandler。客户端对应代码是 org.springframework.amqp.rabbit.stocks.gateway 包中的 RabbitStockServiceGateway。它委托给 RabbitTemplate 来发送消息。以下清单显示了 send 方法

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

请注意,在发送消息之前,它设置了 replyTo 地址。它提供了由 traderJoeQueue bean 定义(前面显示)生成的队列。以下清单显示了 StockServiceGateway 类本身的 @Bean 定义

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

如果您不再运行服务器和客户端,请立即启动它们。尝试发送格式为 '100 TCKR' 的请求。在模拟请求“处理”的短暂人工延迟之后,您应该会看到客户端上出现确认消息。

从非 Spring 应用程序接收 JSON

Spring 应用程序在发送 JSON 时,会将 TypeId 头设置为完全限定的类名,以帮助接收应用程序将 JSON 转换回 Java 对象。

spring-rabbit-json 示例探讨了几种从非 Spring 应用程序转换 JSON 的技术。

© . This site is unofficial and not affiliated with VMware.