EOS Low-Code Platform 8 EOS Low-Code Platform 8
产品简介
安装部署
应用开发
专题场景实战案例
低代码(Low-Code)开发参考手册
高开开发参考手册
流程开发参考手册
AFCenter 使用指南
Governor 使用指南
FAQ
  • bps分布式事务
  • 功能描述
  • 接口描述
  • 支持分布式事务的API
  • 示例
  • bps分布式事务接口提交方式
  • 分布式事务的超时
  • 超时配置

# bps分布式事务

# 功能描述

BPS提供了客户端的流程操作API,通过客户端API操作流程时,每一次调用都会立即在引擎端执行并使流程数据入库。客户端分布式事务是为了保证流程批量操作的原子性。

# 接口描述

方法名 功能描述
begin() 开启分布式事务
commit() 提交分布式事务
rollback() 回滚分布式事务
beginWithUT(javax.transaction.UserTransaction userTransaction) 开启分布式事务,同时将该事务与用户事务关联
rollbackWithCompensate(com.primeton.workflow.api2.client.gtx.
IWFClientTransactionCompensate compensate)
回滚分布式事务,并回调补偿接口
getStatus() 获取当前分布式事务的状态
beginWithUTAndTimeout(javax.transaction.UserTransaction userTransaction, int timeoutSeconds) 开启分布式事务,同时将该事务与用户事务关联,并指定过期时间
beginWithTimeout(int timeoutSeconds) 开启分布式事务,同时指定过期时间

# 支持分布式事务的API

只有流程操作相关的API支持分布式事务,具体接口如下表:

接口名 接口描述
com.eos.workflow.api.IWFProcessInstManager 流程实例管理
com.eos.workflow.api.IWFActivityInstManager 活动实例管理
com.eos.workflow.api.IWFWorkItemManager 工作项管理
com.eos.workflow.api.IWFRelativeDataManager 相关数据管理
com.eos.workflow.api.IWFWorkItemDrawbackManager 工作项撤回管理

说明: 即使使用了列表中的接口,也不是所有的方法都支持分布式事务,以get、find、query、generate开头的查询类方法就不支持分布式事务,如IWFWorkItemManager.queryWorkItemUrl()、IWFRelativeDataManager.getRelativeData()等。

# 示例

业务系统需要自己管理业务事务,在业务操作和流程操作都做完之后,需要先提交业务事务,再提交分布式事务。如果在提交业务事务时失败,应该立即回滚业务事务,并同时回滚分布式事务。

// 获取BPS流程服务客户端
IWFOprApiClient oprClient = getOprApiClient(bpsClientID, tenantId);
// 获取分布式全局事务管理器
IWFClientTransactionManager transactionManager = oprClient.getClientTransactionManager();        
Connection conn = null;
try {
    // 获取用户业务系统的数据库连接并设置为不自动提交事务,即自己管理事务
    conn = DBUtil.getConnection();
    conn.setAutoCommit(false);
    // 开启分布式事务
    transactionManager.begin();
    // 流程操作1,创建并启动流程实例
    long procInstId = oprClient.getWFProcessInstApi().createProcessInstance("test-tx1", "test-tx1-8", "test");
    // 业务操作1
    conn.createStatement().executeUpdate(sql1);
    // 流程操作2,创建并启动活动实例
    oprClient.getWFProcessInstApi().createProcessInstance("test-tx1", "test-tx1-9", "test");
    // 业务操作2
    conn.createStatement().executeUpdate(sql2);
    try {
        conn.commit(); // 提交业务系统事务
    } catch (Exception e) {
        conn.rollback(); // 回滚业务系统事务
        throw new Exception(e); // 抛出异常以便阻止分布式事务的提交
    }
    // 提交分布式事务,批量执行流程操作
    transactionManager.commit();
    System.out.println("业务系统事务和流程分布式事务全部提交成功");
} catch (Throwable t) { // 出现异常,可能是业务提交失败,也可能是业务提交成功但流程提交失败
    if(transactionManager.getStatus() == Status.STATUS_ROLLING_BACK) {
        // 如果分布式事务的状态为STATUS_ROLLING_BACK,说明业务提交成功但流程提交失败,则需要回滚时做业务补偿
        transactionManager.rollbackWithCompensate(() -> {
            // 当分布式事务提交失败时,由于业务事务已提交,这里需要做业务补偿动作
            // doCompensate();
        });
    } else {
        // 其他状态,说明是业务提交失败,只需要回滚分布式事务,不需要做业务补偿
        transactionManager.rollback();
    }
} finally {
    // 关闭业务系统数据库资源
    DBUtil.close(conn);
}
//获取BPS操作客户端
public static IWFOprApiClient getOprApiClient(String clientID, String tenantId) {
   try {
      WFApiServerUri serverUri = new WFApiServerUri("http://" + clientID + "/bps");
      String userId = "";   //客户端自定义获取userId
      String userName = ""; //客户端自定义获取userName
      WFApiClientRequestUser requestUser = null;
      if (isMultiTenantMode(clientID)) {
         if (StringUtils.isEmpty(tenantId)) {
            tenantId = "default";   //默认为 default
         }
         String token = getToken(clientID, tenantId);
         requestUser = new WFApiClientRequestUser(clientID, userId, userName, tenantId, token);
      } else {
         requestUser = new WFApiClientRequestUser(clientID, userId, userName);
      }
      WFApiClientFactory.getRequestUserManager().setCurrentUser(requestUser);
      oprApiClient = WFApiClientFactory.getOprApiClient(serverUri);
 
   } catch (WFApiException e) {
      throw new RuntimeException("Bps Client[" + clientID + "] is not existed!");
   }
   return oprApiClient;
}
//获取token
public static String getToken(String clientID, String tenantId) {
   WFApiServerUri serverUri = new WFApiServerUri("http://" + clientID + "/bps");
   String userId = "";  //客户端自定义获取userId
   String userName = "";    //客户端自定义获取userName
   WFApiClientRequestUser requestUser = new WFApiClientRequestUser(clientID, userId, userName);
   WFApiClientFactory.getRequestUserManager().setCurrentUser(requestUser);
   return WFApiClientFactory.getMgrApiClient(serverUri).getWFMultiTenancyApi().getTenantIdentityToken(tenantId);
}

说明:

客户端需要用到bps分布式事务功能,需在客户端pom文件引入bps相关的依赖才能实现功能,如下所示:

<dependency>
        <groupId>com.primeton.bps</groupId>
        <artifactId>bps-server-api2-client-resttemplate</artifactId>
        <version>${bps-version}</version>
    </dependency>
    <dependency>
        <groupId>com.primeton.bps</groupId>
        <artifactId>bps-server-api2-client-localjvm</artifactId>
        <version>${bps-version}</version>
    </dependency>
<dependency>

# bps分布式事务接口提交方式

业务系统在调用分布式全局事务管理器的commit方法时,bps会调用com.primeton.workflow.api2.client.gtx.impl.WFClientTransaction#commit,该方法中会针对业务系统调用的多个支持事务的bps方法批量执行调用提交。

public void commit() {
        try {
            status = Status.STATUS_COMMITTING;
            if (userTransaction != null) {
                userTransaction.commit();
            }
            if (invokeInfos != null && invokeInfos.size() > 0) {
                wfClientTxManager.getOprApiClient().getWFCommonApi().batchExecuteApis(invokeInfos);
            }
            status = Status.STATUS_COMMITTED;
        } catch (Throwable e) {
            status = Status.STATUS_MARKED_ROLLBACK;
            throw WFApiExceptionUtils.wrapException(e);
        }
    }

其中batchExecuteApis方法是对bps拦截到支持事务的api做批量调用提交。

# 分布式事务的超时

当业务系统开启了BPS分布式事务后,必须在一定的时间范围内(默认为5分钟)提交事务,否则事务会超时,事务中所有的流程调用都会自动回滚。

# 超时配置

客户端分布式事务的超时机制默认是开启的,业务系统可以根据实际情况手动调整超时时间。

  • 客户端分布式事务超时配置

目前超时配置不支持客户端自定义配置,需手动调用相关接口并传入超时时间,具体示例代码如下:

transactionManager.beginWithTimout(10); //参数为超时时间
  
//或者调用这个接口
transactionManager.beginWithUTAndTimeout(UserTransaction userTransaction, int timeoutSeconds);

← 客户端API 组织机构 →