Federation介绍

Federation是一个RabbitMQ官方插件。

Federation直译过来是联邦,它的设计目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群。Federation插件能够运行在不同版本的RabbitMq上。Federation每个连接是单向连接,支持双向连接(A -> B , B -> A)。

federation 拓扑图:

  • Pair of federated exchanges
  • Complete Graph
  • Fan-out
  • Ring

拓扑图 查看详情信息介绍

地址:https://www.rabbitmq.com/federated-exchanges.html#topology-diagrams

应用场景

跨区域部署服务,每个区域直接数据全部隔离,但每区域元数据更新需要同步其他区域。

比如:A、B、C三个区域,A区域收到数据更新,需要把A区域的更新同步到B、C区域。

同步方式:A->B, A->C, B->A ,B->C ,C-A,C->B。

下面是官网介绍使用“max-hops”避免出现死循环方式:

1
2
3
4
5
6
7
8
Complex Topologies and Loop Handling
A federated exchange can be "upstream" from another federated exchange. One can even form "loops", for example, exchange A declares exchange B to be upstream from it, and exchange B declares exchange A to be upstream from it. More complex multiply-connected arrangements are allowed.

Such complex topologies will be increasingly difficult to reason about and troubleshoot, however.

To prevent messages being continually copied and re-routed (in a never-ending cycle) there is a limit placed on the number of times a message can be copied over a link max-hops below).

It is recommended that all the exchanges linked by federation are of the same type. Mixing types can and likely will lead to confusing routing behaviours.

根据业务我们确定使用“Complete Graph”方式去部署,并且“max-hops”设置成1,避免出现出现重复消费情况。

Complete Graph (官网图片)

Federation使用

官网版本: https://www.rabbitmq.com/federation.html

官网翻译版: https://developer.aliyun.com/article/42384

开启Federation 功能

1
2
3
4
5
# 开启Federation功能
rabbitmq-plugins enable rabbitmq_federation

# 开启Federation的管理插件
rabbitmq-plugins enable rabbitmq_federation_management

当需要在集群中使用 Federation 功能的时候,集群中所有的节点都应该开启 Federation 插件

创建Exchanges和Queue

每个区域都要配置相同Exchanges名称,每个区域Exchanges都配置监听队列。

###创建upstream

Federation 功能开启成功后,看到图片1显示Federation Upstream、Federation Status 。

  • Add a new upstream 各项参数如下所述:

    • param 配置

      • Name:upstream的名称。
      • URI(uri):upstram的amqp连接。 参考URL配置方式。
      • Prefetch count(prefetch_count):Federation内部缓存的消息条数,即在接收到上游消息之后且在发送到下游之前缓存的消息条数。
      • Reconnect delay(reconnect-delay):Federation link连接断开后,需要等待多少秒开始连接。
      • Acknowledgement Mode(ack-mode):Federation link消息确认方式,一共有三种,on-confirm表示在接收到下游的确认消息之后再向上游发送消息确认;on-publish表示消息发送到下游后,再向上游发送消息确认;no-ack表示不发送消息确认。默认使用on-confirm。
      • Trust User-ID(trust-user-id):设置Federation是否使用Validated User-ID这个功能,如果设置为false,表示不使用Validated User-ID功能,Federation会忽略消息的user-id这个属性,如果设置为true,则Federation只会转发user_id为上游任意有效的用户的消息。
    • 只适用Federated Exchange的属性:

      • Exchange(exchange):指定上游交换器(upstream exchange),默认与federated exchange同名。
      • Max hops(max-hops):指定消息在丢弃前在federation link中流转的次数,默认为1。
      • Expires(expires):设置federation link断开后,上游队列(upstream queue)的过期时间,相当于普通队列的x-expires属性,默认为none,表示不删除队列。这个属性可以预防因federation link断开,生产者推送到上游队列的消息无法被转发消费而造成上游队列消息堆积的现象。
      • Message TTL(message-ttl):设置上游队列(upstram queue)中消息的过期时间,相当于x-message-ttl属性,默认为none表示不过期。
      • HA Policy(ha-policy):为上游队列(upstream queue)设置,相当于普通队列的x-ha-policy属性。
    • 只适用Federated Queue的属性:

      • Queue(queue):设置上游队列(upstream queue)的名称,默认与federated queue同名。
  • 配置上游集合数据

    Add a new upstream执行后,会出现236、75、test服务器配置信息。

    配置多个federation-upstream集合使用federation-upstream-set执行命令行。

    参考 文档 https://www.rabbitmq.com/federation-reference.html#upstreams

    1
    2
    # rabbitmqctl set_parameter federation-upstream-set [name] '[object1, object2, ...]'
    rabbitmqctl set_parameter federation-upstream-set test '[{"upstream":"236"},{"upstream":"238"}]'

配置Policies信息

  • Add / update a policy各项参数如下所述:
    • Name: 根据项目去配置,可以任意配置。
    • pattern表示匹配的表达式,用法是正则表达式
    • apply to表示应用在exchange还是queue上面,亦或两者都使用
    • priority表示优先级,值越大,优先级越高
    • definition用于定义使用的配置,federation有federation upstream set和 federation upstream两种方式,
      • set表示集合,定义需要该策略的所有上游名称 (创建upstream#配置上游集合数据 配置名称),取值为all是接受全部upstream。
      • 没有set的表示单个定义上游名称。

查看Federation Status

配置policy完成后,查看Federation Status。会出现(创建upstream#配置上游集合数据)配置上游数据。

正常连接后,Exchanges会有两个faderation两个连接。

本区域Exchanges 存在本区域queue队列、两个federation连接。

测试验证

接下来自己就可以测试了。

​ 在每个区域exchanges发送消息,然后在每个区域都能接收到另外区域和自己发送的消息就算成功。