数据推送服务

This commit is contained in:
钱涛 2024-12-02 14:08:03 +08:00
parent 4606e11754
commit 186473f9f6
6 changed files with 45 additions and 82 deletions

View File

@ -23,11 +23,6 @@ public class PushRecordDTO {
@ElogTransform(name = "")
private Long id;
/**
* 批次号
*/
private Long batchId;
/**
* 记录名

View File

@ -22,12 +22,6 @@ public class PushRecordPO {
@ElogTransform(name = "id")
private Long id;
/**
* 批次号
*/
private Long batchId;
/**
* 记录名
*/

View File

@ -14,12 +14,13 @@ import java.util.Objects;
* @version 1.0
**/
public enum PushRecordStatusEnum implements BaseEnum<Integer> {
PREPARE(0, "准备中", 87625),
WAITING(1, "等待中", 87625),
PROGRESS(2, "执行中", 85393),
COMPLETE(3, "执行完毕", 85393),
SUCCESS(4, "执行成功", 85393),
FAIL(5, "执行失败", 85393);
DATA_PREPARE(0, "数据构建", 87625),
DATA_FINISH(1, "构建完毕", 87625),
RUN_WAITING(2, "等待执行", 87625),
RUN_PROGRESS(3, "执行中", 85393),
RUN_COMPLETE(4, "执行完毕", 85393),
RUN_SUCCESS(5, "执行成功", 85393),
RUN_FAIL(6, "执行失败", 85393);
private int value;

View File

@ -3,7 +3,6 @@
<mapper namespace="com.engine.salary.mapper.push.PushRecordMapper">
<resultMap id="BaseResultMap" type="com.engine.salary.entity.push.po.PushRecordPO">
<result column="id" property="id"/>
<result column="batch_id" property="batchId"/>
<result column="mode_id" property="modeId"/>
<result column="table_name" property="tableName"/>
<result column="name" property="name"/>
@ -23,7 +22,6 @@
t
.
acct_record_id
, t.batch_id
, t.mode_id
, t.table_name
, t.create_time
@ -64,9 +62,6 @@
<if test="acctRecordId != null">
AND acct_record_id = #{acctRecordId}
</if>
<if test="batchId != null">
AND batch_id = #{batchId}
</if>
<if test="modeId != null">
AND mode_id = #{modeId}
</if>
@ -118,9 +113,6 @@
INSERT INTO hrsa_push_record
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="batchId != null">
batch_id,
</if>
<if test="modeId != null">
mode_id,
</if>
@ -162,9 +154,6 @@
</if>
</trim>
<trim prefix="VALUES (" suffix=")" suffixOverrides=",">
<if test="batchId != null">
#{batchId},
</if>
<if test="modeId != null">
#{modeId},
</if>
@ -211,7 +200,6 @@
<update id="update" parameterType="com.engine.salary.entity.push.po.PushRecordPO">
UPDATE hrsa_push_record
<set>
batch_id=#{batchId},
mode_id=#{modeId},
table_name=#{tableName},
acct_record_id=#{acctRecordId},
@ -233,9 +221,6 @@
<update id="updateIgnoreNull" parameterType="com.engine.salary.entity.push.po.PushRecordPO">
UPDATE hrsa_push_record
<set>
<if test="batchId != null">
batch_id=#{batchId},
</if>
<if test="modeId != null">
mode_id=#{modeId},
</if>
@ -299,7 +284,6 @@
SELECT count(0)
FROM hrsa_push_record t
WHERE delete_type = 0
and batch_id <![CDATA[ < ]]> #{batchId}
and status <![CDATA[ < ]]> 3
</select>

View File

@ -9,6 +9,8 @@ import com.engine.salary.entity.push.po.PushSettingItemPO;
import com.engine.salary.entity.push.po.PushSettingPO;
import com.engine.salary.util.page.PageInfo;
import java.util.List;
public interface PushService {
/**
@ -66,7 +68,7 @@ public interface PushService {
void createPushRecord(Long salaryAcctRecordId);
void removeBatch(Long batchId);
void removeRecords(List<Long> records);
PageInfo<PushRecordDTO> recordList(RecordListQueryParam param);

View File

@ -408,23 +408,24 @@ public class PushServiceImpl extends Service implements PushService {
List<PushSettingPO> pushSettingPOS = getPushSettingMapper().listSome(PushSettingPO.builder().able(1).build());
//批次号
long batchId = IdGenerator.generate();
//推送记录id
List<Long> recordIds = new ArrayList<>();
try {
pushSettingPOS.stream()
.filter(setting -> setting.getSalarySobIds().contains(salaryAcctRecordPO.getSalarySobId()))
.forEach(setting -> {
long recordId = IdGenerator.generate();
recordIds.add(recordId);
PushRecordPO record = PushRecordPO.builder()
.id(IdGenerator.generate())
.batchId(batchId)
.id(recordId)
.name(setting.getName())
.settingId(setting.getId())
.modeId(setting.getModeId())
.tableName(setting.getTableName())
.acctRecordId(salaryAcctRecordId)
.type(PushRecordTypeEnum.PUSH.getValue())
.status(PushRecordStatusEnum.PREPARE.getValue())
.status(PushRecordStatusEnum.DATA_PREPARE.getValue())
.createTime(now)
.updateTime(now)
.creator((long) user.getUID())
@ -437,7 +438,7 @@ public class PushServiceImpl extends Service implements PushService {
Long id = setting.getId();
List<PushSettingItemPO> pushSettingItemPOS = getPushSettingItemMapper().listSome(PushSettingItemPO.builder().settingId(id).build());
//每个人员生成一天明细
//构建数据每个人员生成一天明细
salaryAcctEmployeePOS.forEach(emp -> {
//1 获取当前薪资核算人员的公式中的变量的值
List<CalculateFormulaVarBO.FormulaVarValue> formulaVarValues = formulaVarMap.get(emp.getEmployeeId() + "_" + emp.getTaxAgentId());
@ -479,7 +480,7 @@ public class PushServiceImpl extends Service implements PushService {
String sql = String.format("insert into %s (%s) values (%s)", tableName, String.join(",", fields), values.stream().map(Object::toString).collect(Collectors.joining(",")));
PushRecordDetailPO pushRecordDetailPO = PushRecordDetailPO.builder()
.id(IdGenerator.generate())
.id(recordId)
.acctEmpId(emp.getId())
.recordId(record.getId())
.status(PushRecordDetailStatusEnum.PREPARE.getValue())
@ -493,53 +494,39 @@ public class PushServiceImpl extends Service implements PushService {
getPushRecordDetailMapper().insertIgnoreNull(pushRecordDetailPO);
});
//数据构建完毕
record.setUpdateTime(new Date());
record.setStatus(PushRecordStatusEnum.DATA_FINISH.getValue());
getPushRecordMapper().updateIgnoreNull(record);
}
);
} catch (Exception e) {
removeBatch(batchId);
removeRecords(recordIds);
log.error("推送失败", e);
throw new SalaryRunTimeException("推送失败");
}
//开始
startBatchPush(batchId);
startBatchPush(recordIds);
}
/**
* 启动推送
*
* @param batchId
* @param recordIds
*/
private void startBatchPush(Long batchId) {
List<PushRecordPO> pushRecordPOS = getPushRecordMapper().listSome(PushRecordPO.builder().batchId(batchId).build());
pushRecordPOS.forEach(
pushRecord -> {
pushRecord.setStatus(PushRecordStatusEnum.WAITING.getValue());
getPushRecordMapper().updateIgnoreNull(pushRecord);
}
);
private void startBatchPush(List<Long> recordIds) {
//先查看是否还有前置批次没执行完
while (true) {
int countBeforeBatch = getPushRecordMapper().countBeforeBatch(batchId);
if (countBeforeBatch > 0) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
break;
}
}
recordIds.forEach(recordId -> {
//待推送
PushRecordPO pushRecordPO = getPushRecordMapper().getById(recordId);
pushRecordPO.setStatus(PushRecordStatusEnum.RUN_WAITING.getValue());
getPushRecordMapper().updateIgnoreNull(pushRecordPO);
pushRecordPOS.forEach(pushRecordPO -> {
try {
pushRecordPO.setStartTime(new Date());
pushRecordPO.setStatus(PushRecordStatusEnum.PROGRESS.getValue());
pushRecordPO.setStatus(PushRecordStatusEnum.RUN_PROGRESS.getValue());
getPushRecordMapper().updateIgnoreNull(pushRecordPO);
List<PushRecordDetailPO> pushRecordDetailPOS = getPushRecordDetailMapper().listSome(PushRecordDetailPO.builder().recordId(pushRecordPO.getId()).build());
pushRecordDetailPOS.forEach(pushRecordDetailPO -> {
@ -567,27 +554,28 @@ public class PushServiceImpl extends Service implements PushService {
pushRecordDetailPO.setFail_reason(e.getMessage());
pushRecordDetailPO.setStatus(PushRecordDetailStatusEnum.FAIL.getValue());
}
});
pushRecordPO.setEndTime(new Date());
pushRecordPO.setStatus(PushRecordStatusEnum.SUCCESS.getValue());
pushRecordPO.setStatus(PushRecordStatusEnum.RUN_SUCCESS.getValue());
} catch (Exception e) {
pushRecordPO.setStatus(PushRecordStatusEnum.FAIL.getValue());
pushRecordPO.setFail_reason(e.getMessage());
pushRecordPO.setStatus(PushRecordStatusEnum.RUN_FAIL.getValue());
}
getPushRecordMapper().updateIgnoreNull(pushRecordPO);
});
}
@Override
public void removeBatch(Long batchId) {
List<PushRecordPO> pushRecordPOS = getPushRecordMapper().listSome(PushRecordPO.builder().batchId(batchId).build());
pushRecordPOS.forEach(pushRecordPO -> {
getPushRecordDetailMapper().deleteByRecordId(pushRecordPO.getId());
getPushRecordMapper().delete(pushRecordPO);
public void removeRecords(List<Long> recordIds) {
recordIds.forEach(recordId -> {
PushRecordPO recordPO = getPushRecordMapper().getById(recordId);
if (recordPO == null){
throw new SalaryRunTimeException("推送记录不存在");
}
getPushRecordDetailMapper().deleteByRecordId(recordId);
getPushRecordMapper().delete(recordPO);
});
}
@ -598,14 +586,13 @@ public class PushServiceImpl extends Service implements PushService {
.stream()
.map(po -> PushRecordDTO.builder()
.id(po.getId())
.batchId(po.getBatchId())
.name(po.getName())
.settingId(po.getId())
.modeId(po.getModeId())
.tableName(po.getTableName())
.acctRecordId(po.getAcctRecordId())
.type(PushRecordTypeEnum.PUSH.getValue())
.status(PushRecordStatusEnum.PREPARE.getValue())
.type(po.getType())
.status(po.getStatus())
.startTime(po.getStartTime())
.endTime(po.getEndTime())
.build())