史上最全的分布式事务教程(上)

2021/01/07

1.基础概念

1.1.什么是事务

什么是事务?举个生活中的例子:你去小卖铺买东西,“一手交钱,一手交货”就是一个事务的例子,交钱和交货必须全部成功,事务才算成功,任一个活动失败,事务将撤销所有已成功的活动。

明白上述例子,再来看事务的定义:

事务可以看做是一次大的活动,它由不同的小活动组成,这些活动要么全部成功,要么全部失败

1.2.本地事务

在计算机系统中,更多的是通过关系型数据库来控制事务,这是利用数据库本身的事务特性来实现的,因此叫数据库事务,由于应用主要靠关系数据库来控制事务,而数据库通常和应用在同一个服务器,所以基于关系型数据库的事务又被称为本地事务。

回顾一下数据库事务的四大特性 ACID:

A(Atomic):原子性,构成事务的所有操作,要么都执行完成,要么全部不执行,不可能出现部分成功部分失败的情况。

C(Consistency):一致性,在事务执行前后,数据库的一致性约束没有被破坏。比如:张三向李四转100元,转账前和转账后的数据是正确状态这叫一致性,如果出现张三转出100元,李四账户没有增加100元这就出现了数据错误,就没有达到一致性。

I(Isolation):隔离性,数据库中的事务一般都是并发的,隔离性是指并发的两个事务的执行互不干扰,一个事务不能看到其他事务运行过程的中间状态。通过配置事务隔离级别可以避脏读、重复读等问题。

D(Durability):持久性,事务完成之后,该事务对数据的更改会被持久化到数据库,且不会被回滚。

数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚

1.3.分布式事务

随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用,下图描述了单体应用向微服务的演变:

分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。

我们知道本地事务依赖数据库本身提供的事务特性来实现,因此以下逻辑可以控制本地事务:

begin transaction;
    //1.本地数据库操作:张三减少金额
    //2.本地数据库操作:李四增加金额
commit transation;

但是在分布式环境下,会变成下边这样:

begin transaction;
    //1.本地数据库操作:张三减少金额
    //2.远程调用:让李四增加金额
commit transation;

可以设想,当远程调用让李四增加金额成功了,由于网络问题远程调用并没有返回,此时本地事务提交失败就回滚了张三减少金额的操作,此时张三和李四的数据就不一致了。

因此在分布式架构的基础上,传统数据库事务就无法使用了,张三和李四的账户不在一个数据库中甚至不在一个应用系统里,实现转账事务需要通过远程调用,由于网络问题就会导致分布式事务问题。

1.4 分布式事务产生的场景

  1. 典型的场景就是微服务架构 微服务之间通过远程调用完成事务操作。 比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。 简言之:跨JVM进程产生分布式事务。

  2. 单体系统访问多个数据库实例 当单体系统需要访问多个数据库(实例)时就会产生分布式事务。比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。 简言之:跨数据库实例产生分布式事务。

  3. 多服务访问同一个数据库实例 比如:订单微服务和库存微服务即使访问同一个数据库也会产生分布式事务,原因就是跨JVM进程,两个微服务持有了不同的数据库链接进行数据库操作,此时产生分布式事务。

2.分布式事务基础理论

通过前面的学习,我们了解到了分布式事务的基础概念。与本地事务不同的是,分布式系统之所以叫分布式,是因为提供服务的各个节点分布在不同机器上,相互之间通过网络交互。不能因为有一点网络问题就导致整个系统无法提供服务,网络因素成为了分布式事务的考量标准之一。因此,分布式事务需要更进一步的理论支持,接下来,我们先来学习一下分布式事务的CAP理论。 在讲解分布式事务控制解决方案之前需要先学习一些基础理论,通过理论知识指导我们确定分布式事务控制的目标,从而帮助我们理解每个解决方案。

2.1.CAP理论

2.1.1.理解CAP

CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。

下边我们分别来解释:

为了方便对CAP理论的理解,我们结合电商系统中的一些业务场景来理解CAP。

如下图,是商品信息管理的执行流程:

整体执行流程如下:

  1. 商品服务请求主数据库写入商品信息(添加商品、修改商品、删除商品)

  2. 主数据库向商品服务响应写入成功。

  3. 商品服务请求从数据库读取商品信息。

C - Consistency:

一致性是指写操作后的读操作可以读取到最新的数据状态,当数据分布在多个节点上,从任意结点读取到的数据都是最新的状态。

上图中,商品信息的读写要满足一致性就是要实现如下目标:

1、商品服务写入主数据库成功,则向从数据库查询新数据也成功。 2、商品服务写入主数据库失败,则向从数据库查询新数据也失败。

如何实现一致性?

  1. 写入主数据库后要将数据同步到从数据库。

  2. 写入主数据库后,在向从数据库同步期间要将从数据库锁定,待同步完成后再释放锁,以免在新数据写入成功后,向从数据库查询到旧的数据。

分布式系统一致性的特点:

  1. 由于存在数据同步的过程,写操作的响应会有一定的延迟。

  2. 为了保证数据一致性会对资源暂时锁定,待数据同步完成释放锁定资源。

  3. 如果请求数据同步失败的结点则会返回错误信息,一定不会返回旧数据。

A - Availability :

可用性是指任何事务操作都可以得到响应结果,且不会出现响应超时或响应错误。

上图中,商品信息读取满足可用性就是要实现如下目标:

  1. 从数据库接收到数据查询的请求则立即能够响应数据查询结果。

  2. 从数据库不允许出现响应超时或响应错误。

如何实现可用性?

  1. 写入主数据库后要将数据同步到从数据库。

  2. 由于要保证从数据库的可用性,不可将从数据库中的资源进行锁定。

  3. 即时数据还没有同步过来,从数据库也要返回要查询的数据,哪怕是旧数据,如果连旧数据也没有则可以按照约定返回一个默认信息,但不能返回错误或响应超时。

分布式系统可用性的特点:

1、 所有请求都有响应,且不会出现响应超时或响应错误。

P - Partition tolerance :

通常分布式系统的各各结点部署在不同的子网,这就是网络分区,不可避免的会出现由于网络问题而导致结点之间通信失败,此时仍可对外提供服务,这叫分区容忍性。

上图中,商品信息读写满足分区容忍性就是要实现如下目标:

  1. 主数据库向从数据库同步数据失败不影响读写操作。

  2. 其一个结点挂掉不影响另一个结点对外提供服务。

如何实现分区容忍性?

  1. 尽量使用异步取代同步操作,例如使用异步方式将数据从主数据库同步到从数据,这样结点之间能有效的实现松耦合。

  2. 添加从数据库结点,其中一个从结点挂掉其它从结点提供服务。

分布式分区容忍性的特点:

1、分区容忍性分是布式系统具备的基本能力。

2.1.2.CAP组合方式

1、上边商品管理的例子是否同时具备 CAP呢?

在所有分布式事务场景中不会同时具备CAP三个特性,因为在具备了P的前提下C和A是不能共存的。

比如:

下图满足了P即表示实现分区容忍:

本图分区容忍的含义是:

  1. 主数据库通过网络向从数据同步数据,可以认为主从数据库部署在不同的分区,通过网络进行交互。

  2. 当主数据库和从数据库之间的网络出现问题不影响主数据库和从数据库对外提供服务。

  3. 其一个结点挂掉不影响另一个结点对外提供服务。

如果要实现C则必须保证数据一致性,在数据同步的时候为防止向从数据库查询不一致的数据则需要将从数据库数据锁定,待同步完成后解锁,如果同步失败从数据库要返回错误信息或超时信息。

如果要实现A则必须保证数据可用性,不管任何时候都可以向从数据查询数据,则不会响应超时或返回错误信息。

通过分析发现在满足P的前提下C和A存在矛盾性。

2、CAP有哪些组合方式呢?

所以在生产中对分布式事务处理时要根据需求来确定满足CAP的哪两个方面。

  1. AP:

放弃一致性,追求分区容忍性和可用性。这是很多分布式系统设计时的选择。

例如:

上边的商品管理,完全可以实现AP,前提是只要用户可以接受所查询的到数据在一定时间内不是最新的即可。 通常实现AP都会保证最终一致性,后面讲的BASE理论就是根据AP来扩展的,一些业务场景。比如:订单退款,今日退款成功,明日账户到账,只要用户可以接受在一定时间内到账即可。

  1. CP:

放弃可用性,追求一致性和分区容错性,我们的zookeeper其实就是追求的强一致,又比如跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。

  1. CA:

放弃分区容忍性,即不进行分区,不考虑由于网络不通或结点挂掉的问题,则可以实现一致性和可用性。那么系统将不是一个标准的分布式系统,我们最常用的关系型数据就满足了CA。

上边的商品管理,如果要实现CA则架构如下:

主数据库和从数据库中间不再进行数据同步,数据库可以响应每次的查询请求,通过事务隔离级别实现每个查询请求都可以返回最新的数据。

2.1.3 总结

通过上面我们已经学习了CAP理论的相关知识,CAP是一个已经被证实的理论:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项。它可以作为我们进行架构设计、技术选型的考量标准。对于多数大型互联网应用的场景,结点众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到N个9(99.99..%),并要达到良好的响应性能来提高用户体验,因此一般都会做出如下选择:保证P和A,舍弃C强一致,保证最终一致性。

2.2.BASE理论

  1. 理解强一致性和最终一致性

CAP理论告诉我们一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容忍性(Partition tolerance)这三项中的两项,其中AP在实际应用中较多,AP即舍弃一致性,保证可用性和分区容忍性,但是在实际生产中很多场景都要实现一致性,比如前边我们举的例子主数据库向从数据库同步数据,即使不要一致性,但是最终也要将数据同步成功来保证数据一致,这种一致性和CAP中的一致性不同,CAP中的一致性要求在任何时间查询每个结点数据都必须一致,它强调的是强一致性,但是最终一致性是允许可以在一段时间内每个结点的数据不一致,但是经过一段时间每个结点的数据必须一致,它强调的是最终数据的一致性。

  1. Base理论介绍

BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。BASE理论是对CAP中AP的一个扩展,通过牺牲强一致性来获得可用性,当出现故障允许部分不可用但要保证核心功能可用,允许数据在一段时间内是不一致的,但最终达到一致状态。满足BASE理论的事务,我们称之为“柔性事务”。

  • 基本可用:分布式系统在出现故障时,允许损失部分可用功能,保证核心功能可用。如,电商网站交易付款出 现问题了,商品依然可以正常浏览。

  • 软状态:由于不要求强一致性,所以BASE允许系统中存在中间状态(也叫软状态),这个状态不影响系统可用 性,如订单的”支付中”、“数据同步中”等状态,待数据最终一致后状态改为“成功”状态。

  • 最终一致:最终一致是指经过一段时间后,所有节点数据都将会达到一致。如订单的”支付中”状态,最终会变 为“支付成功”或者”支付失败”,使订单状态与实际交易结果达成一致,但需要一定时间的延迟、等待。

3.分布式事务解决方案之2PC(两阶段提交)

前面已经学习了分布式事务的基础理论,以理论为基础,针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。

3.1.什么是2PC

2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两个阶段,P是指准备阶段,C是指提交阶段。

举例:张三和李四好久不见,老友约起聚餐,饭店老板要求先买单,才能出票。这时张三和李四分别抱怨近况不如意,囊中羞涩,都不愿意请客,这时只能AA。只有张三和李四都付款,老板才能出票安排就餐。但由于张三和李四都是铁公鸡,形成了尴尬的一幕:

准备阶段:老板要求张三付款,张三付款。老板要求李四付款,李四付款。

提交阶段:老板出票,两人拿票纷纷落座就餐。

例子中形成了一个事务,若张三或李四其中一人拒绝付款,或钱不够,店老板都不会给出票,并且会把已收款退回。

整个事务过程由事务管理器和参与者组成,店老板就是事务管理器,张三、李四就是事务参与者,事务管理器负责决策整个分布式事务的提交和回滚,事务参与者负责自己本地事务的提交和回滚。

在计算机中部分关系数据库如Oracle、MySQL支持两阶段提交协议,如下图:

  1. 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo日志,此时事务没有提交。

    (Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据文件)

  2. 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。

    下图展示了2PC的两个阶段,分成功和失败两个情况说明:成功情况:

失败情况:

3.2.解决方案

3.2.1 XA方案

2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义了分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。

为了让大家更明确XA方案的内容程,下面新用户注册送积分为例来说明:

执行流程如下:

  1. 应用程序(AP)持有用户库和积分库两个数据源。

  2. 应用程序(AP)通过TM通知用户库RM新增用户,同时通知积分库RM为该用户新增积分,RM此时并未提交事务,此时用户和积分资源锁定。

  3. TM收到执行回复,只要有一方失败则分别向其他RM发起回滚事务,回滚完毕,资源锁释放。

  4. TM收到执行回复,全部成功,此时向所有RM发起提交事务,提交完毕,资源锁释放。

DTP模型定义如下角色:

  • AP(Application Program):即应用程序,可以理解为使用DTP分布式事务的程序。

  • RM(Resource Manager):即资源管理器,可以理解为事务的参与者,一般情况下是指一个数据库实例,通过资源管理器对该数据库进行控制,资源管理器控制着分支事务。

  • TM(Transaction Manager):事务管理器,负责协调和管理事务,事务管理器控制着全局事务,管理事务生命周期,并协调各个RM。全局事务是指分布式事务处理环境中,需要操作多个数据库共同完成一个工作,这个 工作即是一个全局事务。

  • DTP模型定义TM和RM之间通讯的接口规范叫XA,简单理解为数据库提供的2PC接口协议,基于数据库的XA协议来实现2PC又称为XA方案。

  • 以上三个角色之间的交互方式如下: 1)TM向AP提供 应用程序编程接口,AP通过TM提交及回滚事务。 2)TM交易中间件通过XA接口来通知RM数据库事务的开始、结束以及提交、回滚等。

    总结: 整个2PC的事务流程涉及到三个角色AP、RM、TM。AP指的是使用2PC分布式事务的应用程序;RM指的是资 源管理器,它控制着分支事务;TM指的是事务管理器,它控制着整个全局事务。

  1. 在准备阶段RM执行实际的业务操作,但不提交事务,资源锁定;

  2. 在提交阶段TM会接受RM在准备阶段的执行回复,只要有任一个RM执行失败,TM会通知所有RM执行回滚操 作,否则,TM将会通知所有RM提交该事务。提交阶段结束资源锁释放。

    XA方案的问题: 1、需要本地数据库支持XA协议。 2、资源锁需要等到两个阶段结束才释放,性能较差。

3.2.2 Seata方案

Seata是由阿里中间件团队发起的开源项目Fescar,后更名为Seata,它是一个是开源的分布式事务框架。

传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

Seata的设计思想如下: Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。

Seata把一个分布式事务理解成一个包含了若干分支事务全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图:

与 传统2PC 的模型类似,Seata定义了3个组件来协议分布式事务的处理过程:

  • Transaction Coordinator (TC): 事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各各分支事务的提交或回滚。
  • Transaction Manager (TM): 事务管理器,TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向TC发起全局提交或全局回滚的指令。
  • Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器TC的指令,驱动分支(本地)事务的提交和回滚。

还拿新用户注册送积分举例Seata的分布式事务过程:

具体的执行流程如下:

  1. 用户服务的 TM 向 TC申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的XID。

  2. 用户服务的 RM 向 TC 注册分支事务,该分支事务在用户服务执行新增用户逻辑,并将其纳入 XID 对应全局事务的管辖。

  3. 用户服务执行分支事务,向用户表插入一条记录。

  4. 逻辑执行到远程调用积分服务时(XID 在微服务调用链路的上下文中传播)。积分服务的RM 向 TC 注册分支事务,该分支事务执行增加积分的逻辑,并将其纳入 XID 对应全局事务的管辖。

  5. 积分服务执行分支事务,向积分记录表插入一条记录,执行完毕后,返回用户服务。

  6. 用户服务分支事务执行完毕。

  7. TM 向 TC 发起针对 XID 的全局提交或回滚决议。

  8. TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

Seata实现2PC与传统2PC的差别:

架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM 本质上就是数据库自身,通过 XA 协议实现,而Seata的 RM 是以jar包的形式作为中间件层部署在应用程序这一侧的。

两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2持锁的时间,整体提高效率。

3.3.seata实现2PC事务

3.3.1.业务说明

本示例通过Seata中间件实现分布式事务,模拟三个账户的转账交易过程。

两个账户在三个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个个微服务。交易过程是,张三给李四转账指定金额。

上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

3.3.2.程序组成部分

本示例程序组成部分如下:

数据库:MySQL-5.7.25 包括bank1和bank2两个数据库。

JDK:64位 jdk1.8.0_201 微服务框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE seata客户端(RM、TM):spring-cloud-alibaba-seata-2.1.0.RELEASE seata服务端(TC):seata-server-0.7.1

微服务及数据库的关系 :

dtx/dtx-seata-demo/seata-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-seata-demo/seata-demo-bank2 银行2,操作李四账户,连接数据库bank2 服务注册中心:dtx/discover-server

本示例程序技术架构如下:

交互流程如下:

  1. 请求bank1进行转账,传入转账金额。

  2. bank1减少转账金额,调用bank2,传入转账金额。

3.3.3.创建数据库

导入数据库脚本:资料\sql\bank1.sql、资料\sql\bank2.sql

包括如下数据库:

bank1库,包含张三账户

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);

bank2库,包含李四账户

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);

分别在bank1、bank2库中创建undo_log表,此表为seata框架使用:

CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3.3.4.启动TC(事务协调器)

  1. 下载seata服务器

下载地址:https://github.com/seata/seata/releases/download/v0.7.1/seata-server-0.7.1.zip 也可以直接解压:资料\seata-server-0.7.1.zip

  1. 解压并启动

[seata服务端解压路径]/bin/seata-server.bat -p 8888 -m file

注:其中8888为服务端口号;file为启动模式,这里指seata服务将采用文件的方式存储信息。

如上图出现“Server started…”的字样则表示启动成功。

3.3.5 discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。

导入:资料\基础代码\dtx父工程,此工程自带了discover-server,discover-server基于Eureka实现。

3.3.6 导入案例工程dtx-seata-demo

dtx-seata-demo是seata的测试工程,根据业务需求需要创建两个dtx-seata-demo工程。

  1. 导入dtx-seata-demo 导入:资料\基础代码\dtx-seata-demo到父工程dtx下。两个测试工程如下:

dtx/dtx-seata-demo/dtx-seata-demo-bank1 ,操作张三账户,连接数据库bank1 dtx/dtx-seata-demo/dtx-seata-demo-bank2 ,操作李四账户,连接数据库bank2

  1. 父工程maven依赖说明

    在dtx父工程中指定了SpringBoot和SpringCloud版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring‐boot‐dependencies</artifactId>
    <version>2.1.3.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring‐cloud‐dependencies</artifactId>
    <version>Greenwich.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

在dtx-seata-demo父工程中指定了spring-cloud-alibaba-dependencies的版本。

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring‐cloud‐alibaba‐dependencies</artifactId>
    <version>2.1.0.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
  1. 配置seata

在src/main/resource中,新增registry.conf、file.conf文件,内容可拷贝seata-server-0.7.1中的配置文件子。

在registry.conf中registry.type使用file:

在file.conf中更改service.vgroup_mapping.[springcloud服务名]-fescar-service-group = “default”,并修改 service.default.grouplist =[seata服务端地址]

关于vgroup_mapping的配置:

vgroup_mapping.事务分组服务名=Seata Server集群名称(默认名称为default) default.grouplist = Seata Server集群地址

org.springframework.cloud:spring-cloud-starter-alibaba-seataorg.springframework.cloud.alibaba.seata.GlobalTransactionAutoConfiguration类中,默认会使用 ${spring.application.name}-fescar-service-group作为事务分组服务名注册到 Seata Server上,如果和 file.conf中的配置不一致,会提示 no available server to connect 错误

也可以通过配置 spring.cloud.alibaba.seata.tx-service-group 修改后缀,但是必须和 file.conf 中的配置保持一致。

  1. 创建代理数据源

新增DatabaseConfiguration.java,Seata的RM通过DataSourceProxy才能在业务代码的事务提交时,通过这个切入点,与TC进行通信交互、记录undo_log等。

@Configuration
public class DatabaseConfiguration { @Bean
    @ConfigurationProperties(prefix = "spring.datasource.ds0") public DruidDataSource ds0() {
        DruidDataSource druidDataSource = new DruidDataSource();

        return druidDataSource;
    }

@Primary @Bean
    public DataSource dataSource(DruidDataSource ds0) { 
    	DataSourceProxy pds0 = new DataSourceProxy(ds0); return pds0;
    }
}

3.3.7 Seata执行流程

  1. 正常提交流程

  2. 回滚流程

回滚流程省略前的RM注册过程。

要点说明:

  1. 每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有 undo_log。
  2. 在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。
  3. TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
  4. 第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
  5. 第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日志,通过回滚日志生成反向的 SQL并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作。

3.3.8 dtx-seata-demo-bank1

dtx-seata-demo-bank1实现如下功能:

1、张三账户减少金额,开启全局事务。

2、远程调用bank2向李四转账。

  • DAO
@Mapper @Component
public interface AccountInfoDao {

    //更新账户金额
    @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}
  • FeignClient

远程调用bank2的客户端

@FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class) public interface Bank2Client {

    @GetMapping("/bank2/transfer")
    String transfer(@RequestParam("amount") Double amount);
}

@Component
public class Bank2ClientFallback implements Bank2Client{ 
    @Override
    public String transfer(Double amount) {
        return "fallback";
    }
}
  • Service
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class); @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    Bank2Client bank2Client;

    //张三转账@Override
    @GlobalTransactional @Transactional
        public void updateAccountBalance(String accountNo, Double amount) { 					logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID());
        //张三扣减金额
        accountInfoDao.updateAccountBalance(accountNo,amount*‐1);
        //向李四转账
        String remoteRst = bank2Client.transfer(amount);
        //远程调用失败if(remoteRst.equals("fallback")){
        throw new RuntimeException("bank1 下游服务异常");
        }
        //人为制造错误if(amount==3){
        throw new RuntimeException("bank1 make exception 3");
        }
    }
}

将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务:

GlobalTransactionalInterceptor会拦截\@GlobalTransactional注解的方法,生成全局事务ID(XID),XID会在整个分布式事务中传递。在远程调用时,spring-cloud-alibaba-seata会拦截Feign调用将XID传递到下游服务。

  • Controller
@RestController
public class Bank1Controller {

    @Autowired
    AccountInfoService accountInfoService;

    //转账@GetMapping("/transfer")
    public String transfer(Double amount){ 				  						accountInfoService.updateAccountBalance("1",amount); return "bank1"+amount;
    }
}

3.3.9 dtx-seata-demo-bank2

dtx-seata-demo-bank2实现如下功能:

1、李四账户增加金额。 dtx-seata-demo-bank2在本账号事务中作为分支事务不使用\@GlobalTransactional。

  • DAO
@Mapper @Component
public interface AccountInfoDao {
    //向李四转账
    @Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

}
  • Service
@Service
public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class); @Autowired
    AccountInfoDao accountInfoDao;
    @Override @Transactional
    public void updateAccountBalance(String accountNo, Double amount) {
        logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID());
        //李四增加金额accountInfoDao.updateAccountBalance(accountNo,amount);
        //制造异常
        if(amount==2){
            throw new RuntimeException("bank1 make exception 2");
        }
    }
}
  • Controller
@RestController
public class Bank2Controller {

    @Autowired
    AccountInfoService accountInfoService;

    @GetMapping("/transfer")
    public String transfer(Double amount){ 	accountInfoService.updateAccountBalance("2",amount); return "bank2"+amount;
    }
}

3.3.10 测试场景

  • 张三向李四转账成功。

  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四事务回滚成功。
  • 分支事务超时测试。

3.4.小结

本节讲解了传统2PC(基于数据库XA协议)和Seata实现2PC的两种2PC方案,由于Seata的0侵入性并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。

Seata实现2PC要点:

  1. 全局事务开始使用 \@GlobalTransactional标识 。

  2. 每个本地事务方案仍然使用\@Transactional标识。

  3. 每个数据都需要创建undo_log表,此表是seata保证本地事务一致性的关键。

4.分布式事务解决方案之TCC

4.1.什么是TCC事务

TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认

Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel 操作若执行失败,TM会进行重试。

分支事务失败的情况:

TCC分为三个阶段:

  1. Try 阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm一起才能真正构成一个完整的业务逻辑。

  2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行Confirm。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。

  3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。

  4. TM事务管理器

    TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑系统结构和软件复用。

TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm和cancel失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求多少次,其结果都相同。

4.2.TCC 解决方案

目前市面上的TCC框架众多比如下面这几种:

(以下数据采集日为2019年07月11日)

框架名称 Gitbub地址 star数量
tcc-transaction https://github.com/changmingxie/tcc-transaction 3850
Hmily https://github.com/yu199195/hmily 2407
ByteTCC https://github.com/liuyangming/ByteTCC 1947
EasyTransaction https://github.com/QNJR-GROUP/EasyTransaction 1690

上一节所讲的Seata也支持TCC,但Seata的TCC模式对Spring Cloud并没有提供支持。我们的目标是理解TCC的原理以及事务协调运作的过程,因此更请倾向于轻量级易于理解的框架,因此最终确了Hmily。

Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等

  • RPC框架进行分布式事务。它目前支持以下特性:支持嵌套事务(Nested transaction support).

  • 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。

  • 支持SpringBoot-starter 项目启动,使用简单。

  • RPC框架支持 : dubbo,motan,springcloud。

  • 本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。

  • 事务日志序列化支持:java,hessian,kryo,protostuff。

  • 采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。

  • RPC事务恢复,超时异常恢复等。

Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。

Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。

Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。

官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html

TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:

空回滚:

在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。

出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。

解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

幂等:

通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC 的二阶段 Try、

Confirm 和 Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。

解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

悬挂:

悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。

出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个 Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。 解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。

举例,场景为 A 转账 30 元给 B,A和B账户在不同的服务。

方案1:

账户A

try:
    检查余额是否够30元
    扣减30元
confirm:
	空
cancel:
	增加30元

账户B

try:
    增加30元
confirm: 
	空
cancel:
	减少30元

方案1说明:

  1. 账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此,我们在扣钱TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。Confirm 接口表示正式提交,由于业务资源已经在 Try接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30元还给账户。

  2. 账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel接口的执行表示整个事务回滚,账户B回滚则需要把

    Try 接口里加的 30 元再减去。

方案1的问题分析:

  1. 如果账户A的try没有执行在cancel则就多加了30元。

  2. 由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。

  3. 账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。

  4. 如果账户B的try没有执行在cancel则就多减了30元。

问题解决:

  1. 账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

  2. try,cancel、confirm方法实现幂等。

  3. 账号B在try方法中不允许更新账户金额,在confirm中更新账户金额。

  4. 账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

优化方案:

账户A

try:
    try幂等校验
    try悬挂处理
    检查余额是否够30元
    扣减30元
confirm: 
	空
cancel:
	cancel幂等校验
	cancel空回滚处理
	增加可用余额30元

账户B

try:
	空
confirm:
    confirm幂等校验
    正式增加30元
cancel:
	空

4.3.Hmily实现TCC事务

4.3.1.业务说明

本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。

两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。

上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

4.3.2.程序组成部分

数据库:MySQL-5.7.25 JDK:64位 jdk1.8.0_201

微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE

Hmily:hmily-springcloud.2.0.4-RELEASE

微服务及数据库的关系 :

dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2 服务注册中心:dtx/discover-server

4.3.3.创建数据库

导入数据库脚本:资料\sql\bank1.sql、资料\sql\bank2.sql、已经导过不用重复导入。创建hmily数据库,用于存储hmily框架记录的数据。

CREATE DATABASE `hmily` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

创建bank1库,并导入以下表结构和数据(包含张三账户)

CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);

创建bank2库,并导入以下表结构和数据(包含李四账户)

CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);

每个数据库都创建try、confirm、cancel三张日志表:

CREATE TABLE `local_try_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_confirm_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_cancel_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

4.3.4 discover-server

discover-server是服务注册中心,测试工程将自己注册至discover-server。导入:资料\基础代码\dtx父工程,此工程自带了discover-server,discover-server基于Eureka实现。已经导过不用重复导入。

4.3.5 导入案例工程dtx-tcc-demo

dtx-tcc-demo是tcc的测试工程,根据业务需求需要创建两个dtx-tcc-demo工程。

(1)导入dtx-tcc-demo

导入:资料\基础代码\dtx-tcc-demo到父工程dtx下。 两个测试工程如下:

dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户,连接数据库bank1 dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2 (2)引入maven依赖

<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>hmily‐springcloud</artifactId>
    <version>2.0.4‐RELEASE</version>
</dependency>

(3)配置hmily

application.yml:

org:
    dromara:
        hmily :
        serializer : kryo recoverDelayTime : 128
        retryMax : 30
        scheduledDelay : 128
        scheduledThreadMax : 10 
        repositorySupport : db 
        started: true 
        hmilyDbConfig :
            driverClassName : com.mysql.jdbc.Driver
            url : jdbc:mysql://localhost:3306/bank?useUnicode=true username : root
            password : root

新增配置类接收application.yml中的Hmily配置信息,并创建HmilyTransactionBootstrap Bean:

@Bean
public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){ 
    HmilyTransactionBootstrap hmilyTransactionBootstrap = new
    HmilyTransactionBootstrap(hmilyInitService); hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));

    hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmi ly.recoverDelayTime")));

    hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retry Max")));

    hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily
    .scheduledDelay")));

    hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.h mily.scheduledThreadMax")));

    hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySup port"));
    hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.st arted")));
    HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();

    hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassNa me"));
    hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url")); hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username")); hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password")); hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
    return hmilyTransactionBootstrap;
}

启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:

@SpringBootApplication @EnableDiscoveryClient @EnableHystrix
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"}) @ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"}) public class Bank1HmilyServer {
    public static void main(String[] args) { SpringApplication.run(Bank1HmilyServer.class, args);

    }
}

4.3.6 dtx-tcc-demo-bank1

dtx-tcc-demo-bank1实现try和cancel方法,如下:

try:
    try幂等校验
    try悬挂处理
    检查余额是够扣减金额
    扣减金额
confirm:
	空
cancel:
    cancel幂等校验
    cancel空回滚处理
    增加可用余额
  • Dao
@Mapper @Component
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance ‐ #{amount} where account_balance>#{amount} and account_no=#{accountNo} ")
int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

/**
*增加某分支事务try执行记录
*@param localTradeNo 本地事务编号
*@return
*/
@Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo);

@Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo);

@Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo);

/**
*查询分支事务try是否已执行
*@param localTradeNo 本地事务编号
*@return
*/
@Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo);
/**
*查询分支事务confirm是否已执行
*@param localTradeNo 本地事务编号
*@return
*/
@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo);

/**
*查询分支事务cancel是否已执行
*@param localTradeNo 本地事务编号
*@return
*/
@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo);

}
  • try和cancel方法
@Service @Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);



    @Autowired
    private AccountInfoDao accountInfoDao;

    @Autowired
    private Bank2Client bank2Client;

    @Override @Transactional
    @Hmily(confirmMethod = "commit", cancelMethod = "rollback")
    public void updateAccountBalance(String accountNo, Double amount) {
    //事务id
    String transId = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("******** Bank1 Service begin try... "+transId );
    int existTry = accountInfoDao.isExistTry(transId);
    //try幂等校验if(existTry>0){
    log.info("******** Bank1 Service 已经执行try,无需重复执行,事务id:{} "+transId );
    return ;
    }
    //try悬挂处理
    if(accountInfoDao.isExistCancel(transId)>0 || accountInfoDao.isExistConfirm(transId)>0){ log.info("******** Bank1 Service 已经执行confirm或cancel,悬挂处理,事务id:{} "+transId
    );
    return ;
    }
    //从账户扣减
    if(accountInfoDao.subtractAccountBalance(accountNo ,amount )<=0){
    //扣减失败
    throw new HmilyRuntimeException("bank1 exception,扣减失败,事务id:{}"+transId);
    }
    //增加本地事务try成功记录,用于幂等性控制标识accountInfoDao.addTry(transId);

    // 远 程 调 用 bank2 if(!bank2Client.test2(amount,transId)){
    throw new HmilyRuntimeException("bank2Client exception,事务id:{}"+transId);
    }
    if(amount==10){//异常一定要抛在Hmily里面
    throw new RuntimeException("bank1 make exception 10");
    }
    log.info("******** Bank1 Service end try... "+transId  );
    }


    @Transactional
    public void commit( String accountNo, double amount) {
    String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();

    logger.info("******** Bank1 Service begin commit..."+localTradeNo );
    }
    @Transactional
    public void rollback( String accountNo, double amount) {
    String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("******** Bank1 Service begin rollback... " +localTradeNo); if(accountInfoDao.isExistTry(localTradeNo) == 0){ //空回滚处理,try阶段没有执行什么也不用做
    log.info("******** Bank1 try阶段失败... 无需rollback "+localTradeNo ); return;
    }
    if(accountInfoDao.isExistCancel(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做log.info("******** Bank1 已经执行过rollback... 无需再次rollback " +localTradeNo); return;
    }
    //再将金额加回账户accountInfoDao.addAccountBalance(accountNo,amount);
    //添加cancel日志,用于幂等性控制标识
    accountInfoDao.addCancel(localTradeNo);
    log.info("******** Bank1 Service end rollback... " +localTradeNo);
    }

}
  • feignClient
@FeignClient(value = "seata‐demo‐bank2", fallback = Bank2Fallback.class) public interface Bank2Client {

    @GetMapping("/bank2/transfer") @Hmily
    	Boolean transfer(@RequestParam("amount") Double amount);
}
  • Controller
@RestController
public class Bank1Controller { 
    @Autowired
    AccountInfoService accountInfoService;

    @RequestMapping("/transfer")
    public String test(@RequestParam("amount") Double amount) { this.accountInfoService.updateAccountBalance("1", amount); return "cn/itcast/dtx/tccdemo/bank1" + amount;
    }

}

4.3.7 dtx-tcc-demo-bank2

dtx-tcc-demo-bank2实现如下功能:

try:
	空
confirm:
	confirm幂等校验
	正式增加金额
cancel:
	空
  • Dao
@Component @Mapper
public interface AccountInfoDao {

    @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
    int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);



    /**
    *增加某分支事务try执行记录
    *@param localTradeNo 本地事务编号
    *@return
    */
    @Insert("insert into local_try_log values(#{txNo},now());") int addTry(String localTradeNo);

    @Insert("insert into local_confirm_log values(#{txNo},now());") int addConfirm(String localTradeNo);

    @Insert("insert into local_cancel_log values(#{txNo},now());") int addCancel(String localTradeNo);

    /**
    *查询分支事务try是否已执行
    *@param localTradeNo 本地事务编号
    *@return
    */
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo);
    /**
    *查询分支事务confirm是否已执行
    *@param localTradeNo 本地事务编号
    *@return
    */
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo);
    /**
    *查询分支事务cancel是否已执行
    *@param localTradeNo 本地事务编号
    *@return
    */
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo);
}
  • 实现confirm方法
@Service @Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    private AccountInfoDao accountInfoDao;

    @Override @Transactional
    @Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod") public void updateAccountBalance(String accountNo, Double amount) {
    String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("******** Bank2 Service Begin try ..."+localTradeNo);

    }

    @Transactional
    public void confirmMethod(String accountNo, Double amount) {
    String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("******** Bank2 Service commit... " +localTradeNo); if(accountInfoDao.isExistConfirm(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做
    log.info("******** Bank2 已经执行过confirm... 无需再次confirm "+localTradeNo ); return ;
    }
    //正式增加金额accountInfoDao.addAccountBalance(accountNo,amount);
    //添加confirm日志
    accountInfoDao.addConfirm(localTradeNo);
    }

    @Transactional
    public void cancelMethod(String accountNo, Double amount) {
    String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId(); log.info("******** Bank2 Service begin cancel... "+localTradeNo );

    }
}

  • Controller
@RestController
public class Bank2Controller { 
    @Autowired
    AccountInfoService accountInfoService;

    @RequestMapping("/transfer")
    public Boolean test2(@RequestParam("amount") Double amount) { this.accountInfoService.updateAccountBalance("2", amount); return true;
    }

}

3.3.8 测试场景

  • 张三向李四转账成功。

  • 李四事务失败,张三事务回滚成功。

  • 张三事务失败,李四分支事务回滚成功。

  • 分支事务超时测试。

4.4.小结

如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使得降低锁冲突、提高吞吐量成为可能。

而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

Post Directory