文章目录
- 前言
- 一、订单数据更新
- 1. 领域模型更新服务工厂
- 2. 聚合创建工厂
- 2.1 数据库更新服务
- 2.2 聚合创建工厂
- 二、限流渠道入队
- 三、异步通知
- 1. 判断是否需要通知
- 2. 组装异步通知报文
- 3. 获取异步通知协议类型
- 3. 异步通知
- 四、交易结果通知payrouter
- 总结
前言
本篇将继业务处理《支付系统设计三:渠道网关设计06-业务处理》之后的后置处理逻辑进行介绍,在请求过支付渠道并将支付渠道响应的报文通过解析脚本转化为系统所需的统一对象了,接下来就是订单数据更新、订单入队(配置限流)和结果通知(同步/异步标识)流程了。
一、订单数据更新
int count = domainDBUpdateServiceFactory.getDomainDBUpdateService(context.getClientTransCode()).update(context);
1. 领域模型更新服务工厂
领域模型数据库更新服务工厂定义,根据transCode+DomainDBSaveServiceImpl获取到领域模型数据库更新服务,获取不到则采用默认domainDBUpdateService服务实现。
@Componentpublic class DomainDBUpdateServiceFactory { private static final Logger logger = LoggerFactory.getLogger(DomainDBUpdateServiceFactory.class); private final static String DOMAINDBUPDATESERVICE_SUFF = "DomainDBUpdateServiceImpl"; @Autowired private MapdomainDBUpdateServiceMap; @Resource(name = "domainDBUpdateService") private DomainDBUpdateService defaultDomainDBUpdateService; public DomainDBUpdateService getDomainDBUpdateService(String transCode) { String dbTransCode = TransactionManager.getTransCode(transCode); StringBuilder key = new StringBuilder(); if (StringUtils.isNotBlank(dbTransCode)) { key.append(dbTransCode).append(DOMAINDBUPDATESERVICE_SUFF); } else { key.append(transCode).append(DOMAINDBUPDATESERVICE_SUFF); } DomainDBUpdateService domainDBUpdateService = domainDBUpdateServiceMap.get(key.toString()); if (domainDBUpdateService == null) { LoggerUtil.info(logger, "交易({})-未获取到交易数据补全服务-采用默认服务", transCode); domainDBUpdateService = defaultDomainDBUpdateService; } return domainDBUpdateService; }}
2. 聚合创建工厂
数据更新相关服务类关系如下:
2.1 数据库更新服务
域模型工厂的bean名称为域模型类名简称+BuildFactory。
默认数据库更新服务
@Service("domainDBUpdateService")public class DomainDBUpdateServiceImpl extends AbstractDomainDBUpdateService{ private static final Logger logger = LoggerFactory.getLogger(DomainDBUpdateServiceImpl.class); @Override public void execute(PayGwContext context) { super.update(context); }}
数据库更新服务抽象层
@Servicepublic abstract class AbstractDomainDBUpdateService extends AbstractTransactionService implements DomainDBUpdateService { private static final Logger logger = LoggerFactory.getLogger(AbstractDomainDBUpdateService.class); @Resource(name = "defaultAggregateBuildFactory") private DefaultAggregateBuildFactory defaultAggregateBuildFactory; @Override public int update(PayGwContext context) { LoggerUtil.info(logger, "交易({})-数据库状态更新-开始", context.getClientTransCode()); int count = defaultAggregateBuildFactory.modify(context.getMessageDescription().getDatas()); LoggerUtil.info(logger, "交易({})-数据库状态更新-更新记录数({})-结束", context.getClientTransCode(), count); return count; }}
2.2 聚合创建工厂
默认聚合创建工厂
@Component("defaultAggregateBuildFactory")public class DefaultAggregateBuildFactory { private static final Logger logger = LoggerFactory.getLogger(DefaultAggregateBuildFactory.class); private static final String DOMAIN_FACTORY_SUFF = "BuildFactory";//域模型构建工厂后缀 private static final String DOMAIN_SUFF = "Domain";//domain后缀 private static final String ENTITY_SUFF = "Entity";//entity后缀 private static final String DOMAIN_REPOSITORY_SIMPLE_SUFF = "RepositoryImpl";//简略的域模型仓储后缀 private static final String DOMAIN_REPOSITORY_SUFF = "DomainRepositoryImpl";//域模型仓储后缀 public void save(Mapdata) { LoggerUtil.info(logger, "交易({})-交易入库-开始", data.get(PayGwConstant.PAYGW_TRANS_CODE)); //1、模型创建 BusinessModel model = (BusinessModel) build(data); //2、模型持久化 if (model != null) { store(model); } LoggerUtil.info(logger, "交易({})-交易入库-结束", data.get(PayGwConstant.PAYGW_TRANS_CODE)); } public AggregateBase build(Map data) { //1、获取域模型Class String transCode = StringUtils.valueOf(data.get(PayGwConstant.PAYGW_TRANS_CODE)); Class extends AggregateBase> domainClass = getDomainClassByTransCode(transCode); //2、新建模型 AggregateBase domain = null; try { domain = domainClass.newInstance(); } catch (Exception e) { LoggerUtil.error(logger, "交易({})-模型构建-构建模型异常", transCode); throw new PayGwException(SystemErrorCode.SYSTEM_ERROR, e); } //3、模型数据填充 domain.fill(data); return domain; } public Class extends AggregateBase> getDomainClassByTransCode(String transCode) { return TransactionManager.getDomainClass(transCode); } public void store(AggregateBase domain) { //1、获取模型对应的仓储; BusinessModelRepository businessModelRepository = getBusinessModelRepository(domain.getClass()); //2、调用仓储进行持久化; businessModelRepository.store(domain); } public int modify(Map data) { //1、获取域模型 String transCode = StringUtils.valueOf(data.get(PayGwConstant.PAYGW_TRANS_CODE)); Class extends AggregateBase> domainClass = getDomainClassByTransCode(transCode); //2、获取模型对应的仓储 BusinessModelRepository businessModelRepository = getBusinessModelRepository(domainClass); //3、调用仓储进行修改 return businessModelRepository.modify(data); } public BusinessModelRepository getBusinessModelRepository(Class domainClass) { String domainClassName = StringUtils.convertFirstCharToLower(domainClass.getSimpleName()); String beanName = ""; if (domainClassName.endsWith(DOMAIN_SUFF) || domainClassName.endsWith(ENTITY_SUFF)) { beanName = domainClassName + DOMAIN_REPOSITORY_SIMPLE_SUFF; } else { beanName = domainClassName + DOMAIN_REPOSITORY_SUFF; } return ApplicationContextUtil.getBean(beanName, BusinessModelRepository.class); } // ... ...}
模型修改操作如下:
public int modify(Mapdata) { //1、获取域模型 String transCode = StringUtils.valueOf(data.get(PayGwConstant.PAYGW_TRANS_CODE)); Class extends AggregateBase> domainClass = getDomainClassByTransCode(transCode); //2、获取模型对应的仓储 BusinessModelRepository businessModelRepository = getBusinessModelRepository(domainClass); //3、调用仓储进行修改 return businessModelRepository.modify(data); }
模型修改:
//1、获取域模型 //2、获取模型对应的仓储 //3、调用仓储进行修改
二、限流渠道入队
String processStatus = StringUtils.valueOf(data.get(PayGwConstant.PAYGW_PROCESS_STATUS)); if (ProcessStatusEnum.QUEUEING.getCode().equals(processStatus)) { String instCode = StringUtils.valueOf(data.get(PayGwConstant.INST_CODE)); String instTransId = StringUtils.valueOf(data.get(PayGwConstant.INST_TRANS_ID)); String channelCode = StringUtils.valueOf(data.get(PayGwConstant.CHANNEL_CODE)); String channelTransNo = StringUtils.valueOf(data.get(PayGwConstant.CHANNEL_TRANS_NO)); String rateFlagRedisVal = channelCode + "|" + channelTransNo; rateLimitService.enqueueHeadForSend(instCode, instTransId, rateFlagRedisVal); }
判断processStatus状态是否为QUEUEING排队中,如果是则入队:
@Override public void enqueueHeadForSend(String instCode, String instTransId, String val) { //1、获取redisKey String rateFlagRedisKey = "rateLimit_" + instCode + "_" + instTransId; LoggerUtil.info(logger, "限流渠道入队列首部:{}-{}", rateFlagRedisKey, val); //进行排队队列 try { jedisCluster.lpush(rateFlagRedisKey, val); } catch (Exception e) { LoggerUtil.error(logger, "限流渠道入队列异常:{}-{}", rateFlagRedisKey, val); } }
入队后等待消费方从Redis队列中中获取到任务实体,然后组装请求支付渠道报文,进行请求。
三、异步通知
1. 判断是否需要通知
首先判断交易上送的同步/异步标识,如果为异步并且交易为终态,则通知上游系统,即paygw–>paycore的通知。
2. 组装异步通知报文
根据上送的支付渠道和交易类型信息获取到通讯信息,从通讯信息获取到关联的模板,如上paycore–>paygw通讯的报文模板,然后使用报文组装引擎进行模板填充。
#set($map ={ "header": { "transCode": "$!data.transCode", "transType": "$!data.transType", "channelCode": "$!data.channelCode", "channelTransNo": "$!data.channelTransNo", "channelDateTime": "$!DateUtil.format($data.channelDateTime , 'yyyyMMddHHmmss')", "success": "$!data.success", "errorCode": "$!data.errorCode", "errorMsg": "$!data.errorMsg"}, "body": { "transStatus": "$!data.transStatus", "respCode": "$!data.respCode", "respMsg": "$!data.respMsg", "transNo": "$!data.transNo", "transDateTime": "$!DateUtil.format($data.transDateTime , 'yyyyMMddHHmmss')", "transAmount": "$!data.transAmount", "acctNo": "$!data.acctNo", "uuid": "$!data.uuid", "content": "$!data.content", "instCode": "$!data.instCode", "instRespNo": "$!data.instRespNo", "instTransStatus": "$!data.instTransStatus", "instTransDate": "$!DateUtil.format($data.instTransDate , 'yyyyMMdd')", "instMerCode": "$!data.instMerCode", "instAcctNo": "$!data.instAcctNo"}})#set($json = $map.get("body").put("content", $!data.content))$!MapUtils.toJsonStr($map)
3. 获取异步通知协议类型
根据上送的支付渠道和交易类型信息获取到通讯信息,从通讯信息获取到异步通知协议,即配置通讯信息时的异步通知协议:
3. 异步通知
NotifyFactory.getNotify(notifyProtocolType).notify(notifyKey, notifyMessage);
从通知器工厂根据配置的通知协议获取到通知客户端进行结果通知。
四、交易结果通知payrouter
将交易结果上送到支付路由系统,支付路由需求交易结果进行一些指标累计,如单日交易量、单日交易金额累计等。
//交易结果通知payrouter String transStatus = StringUtils.valueOf(data.get(PayGwConstant.PAYGW_TRANS_STATUS)); if (StringUtils.equals(TransStatusEnum.SUCCESS.getCode(), transStatus)|| StringUtils.equals(TransStatusEnum.FAIL.getCode(), transStatus)) {//交易失败 Object notifyMessage = getNotifyMessage(transId, data); NotifyFactory.getNotify(ProtocolTypeEnum.MQ).notify(MqConstant.PAYROUTER_RESULT_NOTIFY_KEY, notifyMessage); }
总结
本篇简略介绍了订单数据更新、订单入队和结果通知实现流程。