lateral view用法,analysis核心流程
flink
CREATE legacy function opt_log_from_json AS 'com.bytedance.ad.dataplatform.hiveudf.flink.OptLogUDTF';
CREATE LEGACY FUNCTION TIMESTAMP_TO_LONG AS 'com.bytedance.flink.udf.udf.time.TimestampToLong';
CREATE legacy function FROM_UNIXTIME AS 'com.bytedance.flink.udf.udf.time.FromUnixTime';
CREATE LEGACY FUNCTION json_str_to_array AS 'com.bytedance.flink.udf.udf.JSONStringToArray';
CREATE LEGACY FUNCTION string_array_to_long_array AS 'com.bytedance.ad.dataplatform.hiveudf.flink.StringArrayToLongArray';
CREATE LEGACY FUNCTION array_to_rows AS 'com.bytedance.flink.udf.udtf.StringArrayExplode';
-- ADD resources dim_middleware_jar_11;
ADD resources dim_middleware_service;
CREATE TABLE source (
id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,
create_time ROW<before_value TIMESTAMP, after_value TIMESTAMP, after_updated BOOLEAN>,
operator_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,
advertiser_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,
object_type ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,
opt_type ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,
object_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,
old_value ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,
new_value ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,
operator_staff_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,
system_origin ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,
opt_ip ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,
status ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,
proctime AS PROCTIME()
)
WITH (
'connector' = 'rocketmq',
'cluster' = 'dbus_realtime_big',
'topic' = 'binlog_ad_data_log',
'group' = 'ad_stats_opt_log_20230705_v1',
'format' = 'binlog',
'binlog.target-table' = 'opt_log',
'tag' = 'opt_log',
'scan.consumer-offset-reset-to' = 'latest'
);
CREATE TABLE rpc_dim
WITH(
'connector' = 'rpc',
'consul' = 'ad.stats.thanos_dim_middleware',
'thrift.service-class' = 'com.bytedance.ad.dataplatform.thanos.dim.middleware.spi.DimMiddlewareService',
'thrift.method' = 'GetDimInfosBatch',
'cluster' = 'default',
'connection.timeout' = '20000ms',
'consul.update-interval' = '60s',
'lookup.max-retries' = '3',
'lookup.cache.max-rows' = '30000',
'lookup.cache.ttl' = '1200000ms',
'lookup.failure-handle-strategy' = 'EMIT_EMPTY',
'thrift.transport' = 'Buffered',
'lookup.batch-mode.enabled' = 'true',
'lookup.batch-request-field-name' = 'requests',
'lookup.batch-response-field-name' = 'responses',
'lookup.batch-size' = '500'
);
LEFT JOIN
LATERAL TABLE(
array_to_rows(
json_str_to_array(
COALESCE(
dim_fifth.GetDimInfosResponse.dimensions ['ad_id_array'],
'[]'
)
)
)
) AS ad_id_array_table(promotion_ad_id)
ON TRUE
spark hive sql
ad_dm.dm_search_adengine_stats_daily lateral view explode(split(query_industry, ',')) ind as industry
analysis
@Override
@Async(value = "main-executor")
public void run(SceneTaskExecutionEntity entity, LanguageRangeEnum language, Boolean isUnion) {
long startTime = System.();
StopWatch stopWatch = new StopWatch("SceneTaskExecutionRun");
SceneTaskExecutionStatusEnum sceneTaskExecutionStatusEnum =
SceneTaskExecutionStatusEnum.;
List<ConclusionDataNode> conclusionData = null;
AnalysisChartConclusion analysisChartConclusion = null;
// 注意重分析 extraConfig 字段可能为null需要进行兼容处理否则NPE
ExtraConfig extraConfig = entity.getExtraConfig();
if (extraConfig == null) {
extraConfig = new ExtraConfig();
}
try {
stopWatch.start("get_tree_node");
.info(String.("SceneTaskExecutionId %s starts", entity.getId()));
taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);
ProjectEnum project = ProjectEnum.(entity.getProjectId());
// get and etl tree node info
List<CompassAnalysisSceneTreeNodeEntity> treeNodeEntities =
treeDao.getSceneTreeNodesByTreeIdAndVersion(entity.getTreeId(), entity.getVersion());
List<NodeTemplate> treeNodes =
treeNodeEntities
.stream()
.map(
o ->
JsonUtil.(
o.getModuleAlgorithmConfigJson(), new TypeReference<NodeTemplate>() {}))
.filter(Objects::)
.collect(Collectors.());
stopWatch.stop();
stopWatch.start("get_metric_and_dimension");
List<Integer> metricIds = NodeTemplateConverter.(treeNodes);
List<Integer> dimensionIds =
NodeTemplateConverter.(treeNodes);
Map<Integer, String> metricIdToName =
analysisQueryClient
.getMetricV2MetaByIds(null, metricIds, language, metricIds.size(), 1, project)
.stream()
.collect(
Collectors.(
MetricsV2GetResponse.Metric::getId, MetricsV2GetResponse.Metric::getName));
Map<Integer, String> dimensionIdToName =
analysisQueryClient
.getDimensionsSimple(dimensionIds, language, project)
.stream()
.collect(Collectors.(Dimension::getId, Dimension::getName));
stopWatch.stop();
stopWatch.start("get_node_execution_config");
ModuleExecutions moduleExecutions = new ModuleExecutions();
Map<Integer, CompassAnalysisSceneTreeNodeEntity> nodeIdToNodeEntity = new HashMap<>();
for (CompassAnalysisSceneTreeNodeEntity nodeEntity : treeNodeEntities) {
nodeIdToNodeEntity.put(nodeEntity.getTreeNodeId(), nodeEntity);
}
CompassAnalysisSceneTreeNodeEntity rootNode = nodeIdToNodeEntity.get(0);
Filter baselineFilter = entity.getBaselineFilter();
Filter testFilter = entity.getTestFilter();
// 实验分组事实上是一个特殊filter.假设实验组vid为1000,则baselineFilter是原来的则baselineFilter是原来的与vid=1000作and
if (entity.getVidConfig() != null) {
SceneTaskUtil.BaselineAndTestFilter baselineAndTestLibraFilter =
SceneTaskUtil.(
baselineFilter, testFilter, vidDimensionId, entity.getVidConfig());
baselineFilter = baselineAndTestLibraFilter.getBaselineFilter();
testFilter = baselineAndTestLibraFilter.getTestFilter();
}
SceneTreeEntity sceneTreeEntity =
treeDao.getSceneTreeByTreeIdAndVersion(entity.getTreeId(), entity.getVersion());
TreeAttributes treeAttributes =
JsonUtil.(
sceneTreeEntity.getTreeAttributesJson(), new TypeReference<TreeAttributes>() {});
metricIdToName.putAll(SceneTaskUtil.(treeAttributes, language));
dimensionIdToName.putAll(SceneTaskUtil.(treeAttributes, language));
ExecutionDataInfo executionDataInfo =
ExecutionDataInfo.()
.metricIdToName(metricIdToName)
.dimensionIdToName(dimensionIdToName)
.extraConfig(extraConfig)
.build();
stopWatch.stop();
stopWatch.start("init_root_node_execution");
// construct middle data struct
Integer comparisonTypeId = entity.getComparisonTypeId();
initRootNodeExecution(
entity,
moduleExecutions,
nodeIdToNodeEntity,
rootNode,
baselineFilter,
testFilter,
project,
comparisonTypeId,
executionDataInfo);
stopWatch.stop();
// get accident strategy abtest data
String baseStartTime = entity.getBaselineTime().getStartTime();
String baseEndTime = entity.getBaselineTime().getEndTime();
String testStartTime = entity.getTestTime().getStartTime();
String testEndTime = entity.getTestTime().getEndTime();
Future<String> baseAccidentFuture =
moduleExecutor.getAccidentData(baseStartTime, baseEndTime, language, region);
Future<String> testAccidentFuture =
moduleExecutor.getAccidentData(testStartTime, testEndTime, language, region);
Future<String> baseStrategyFuture =
moduleExecutor.getStrategyData(baseStartTime, baseEndTime, language, region);
Future<String> testStrategyFuture =
moduleExecutor.getStrategyData(testStartTime, testEndTime, language, region);
boolean fullSuccess = true;
Integer retryTimes = 1;
if (entity.getExtraConfig() != null && entity.getExtraConfig().getRetryTimes() != null) {
retryTimes = entity.getExtraConfig().getRetryTimes();
}
stopWatch.start("execution_main");
for (int i = 0; i < retryTimes; i++) {
.info("Task id {}, run {}th time", entity.getId(), i);
// start run
long start = System.();
while (!moduleExecutions.toRunModules.isEmpty()
|| !moduleExecutions.runningModules.isEmpty()) {
long time = System.();
// 如果距离上一次检查已经过了5秒那就再检查一次
if (time - start > ) {
SceneTaskStatusEntity statusEntity =
sceneTaskDao.getSceneTaskStatus(Collections.(entity.getId())).get(0);
SceneTaskExecutionStatusEnum statusEnum =
SceneTaskExecutionStatusEnum.(statusEntity.getSceneTaskStatus());
// 当前任务可能已经被标记取消或者超时,如果是这样就不再继续执行.
if (statusEnum != SceneTaskExecutionStatusEnum.) {
.info(String.("[SUCCESS] interrupted task execution %s", entity.getId()));
try {
SceneTaskExecutionEntity newEntity = taskExecutionDao.findById(entity.getId());
Integer taskId = newEntity.getCompassAnalysisSceneTaskId();
String taskName =
sceneTaskDao.getSceneTaskName(taskId == null ? null : taskId.longValue());
// 如果任务是主动取消的不发送通知
if (!SceneTaskExecutionStatusEnum..equals(statusEnum)) {
larkNotification.send(
entity.getCreator(),
taskName,
entity.getCompassAnalysisSceneTaskId(),
entity.getId(),
statusEnum,
language,
project,
extraConfig);
}
} catch (Exception e) {
String errorMessage =
String.(
"Failed to send message to %s, execution id:%d",
entity.getCreator(), entity.getId());
.error(errorMessage, e);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(
MetricsTagKey., MetricsTagValue.)
.build(),
1);
}
// 如果任务被faas标记为超时,将处理中的任务数-1然后返回.
.info(
String.(
"SceneTaskExecutionId %s status is not processing, abort", entity.getId()));
SceneTaskConsumer..getAndDecrement();
SceneTaskConsumer..remove(entity.getId());
return;
}
start = time;
}
// iterator runningModules
for (Iterator<ModuleExecutionContext> iterator =
moduleExecutions.runningModules.iterator();
iterator.hasNext(); ) {
ModuleExecutionContext runningModule = iterator.next();
if (runningModule.outputFuture.isDone()) {
try {
ModuleExecutorOutput output = runningModule.outputFuture.get();
.info(
String.(
"[FINISHED][Task %s / Module %s] is finished, output %s",
runningModule.input.getTaskExecutionId(),
runningModule.input.getModuleExecutionId(),
output));
iterator.remove();
// generate drill down node, add it to toRunModules
List<CompassAnalysisSceneModuleExecutionEntity> moduleExecutionEntities =
moduleExecutionDao.findByIds(output.getDrillDownModuleExecutionIds());
for (CompassAnalysisSceneModuleExecutionEntity executionEntity :
moduleExecutionEntities) {
ModuleExecutorInput executorInput =
new ModuleExecutorInput(
entity.getId(),
executionEntity.getId(),
executionEntity.getCompassAnalysisSceneTreeNodeId(),
executionEntity.getAnalysisMetricId(),
executionEntity.getAggregationFunctionEnum(),
executionEntity.getBaselineFilter(),
executionEntity.getTestFilter(),
executionEntity.getBaselineInterval(),
executionEntity.getTestInterval(),
entity.getBaselinePeriodsIntervals(),
executionEntity.getAnalysisModule(),
ComparisonTypeEnum.(comparisonTypeId),
nodeIdToNodeEntity,
project,
executionDataInfo);
moduleExecutions.toRunModules.add(
new ModuleExecutionContext(executorInput, null));
.info(
String.(
"[ADD TO CAN]Add task %s / module %s to toRunModules",
entity.getId(), executionEntity.getId()));
}
} catch (AccessThirdAPIException e) {
.error("第三方api调用失败,请检查是否starling平台文案拉取失败", e);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(MetricsTagKey., MetricsTagValue.)
.build(),
1);
} catch (Throwable t) {
.error("Failed to run task. executor input:" + runningModule.input, t);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(
MetricsTagKey., MetricsTagValue.)
.putTag(
MetricsTagKey.,
runningModule.input.getAnalysisModule().getSceneModuleType().getName())
.putTag(
MetricsTagKey.,
SceneTaskUtil.())
.build(),
1);
}
}
}
// run module in toRunModules, update status
if (!moduleExecutions.toRunModules.isEmpty()) {
ModuleExecutionContext toRunModule = moduleExecutions.toRunModules.remove(0);
.info(
String.(
"[START][Task %s/ Module %s] starts to run",
toRunModule.input.getTaskExecutionId(),
toRunModule.input.getModuleExecutionId()));
Future<ModuleExecutorOutput> outputFuture =
moduleExecutor.execute(toRunModule.input, language);
moduleExecutions.runningModules.add(
new ModuleExecutionContext(toRunModule.input, outputFuture));
}
}
// 确定是否所有的节点都执行成功
List<CompassAnalysisSceneModuleExecutionEntity> moduleExecutionEntities =
moduleExecutionDao.findByTaskExecutionId(entity.getId());
List<CompassAnalysisSceneModuleExecutionEntity> failedModuleExecutionEntities =
new ArrayList<>();
for (CompassAnalysisSceneModuleExecutionEntity moduleExecution : moduleExecutionEntities) {
Integer status = moduleExecution.getStatus();
if (!TaskExecutionStateEnum..getId().equals(status)) {
failedModuleExecutionEntities.add(moduleExecution);
}
}
// 如果有一个节点失败就算失败
fullSuccess = failedModuleExecutionEntities.size() == 0;
// 如果不是最后一次尝试,并且失败了,需要初始化一下任务的执行并且等待一段时间
if (i < retryTimes - 1 && !fullSuccess) {
// 需要重试,软删除当前 execution 对应的 moduleExecution
moduleExecutionDao.deleteModuleExecutionEntitiesByTaskExecutionId(entity.getId());
// 重新 init 根节点
initRootNodeExecution(
entity,
moduleExecutions,
nodeIdToNodeEntity,
rootNode,
baselineFilter,
testFilter,
project,
comparisonTypeId,
executionDataInfo);
Thread.(60_000L * 3);
} else {
break;
}
}
stopWatch.stop();
stopWatch.start("get_accident_strategy_data");
String baseAccident =
(
baseAccidentFuture,
MultiLingualUtil.(
SceneTaskKeywords., language));
String testAccident =
(
testAccidentFuture,
MultiLingualUtil.(
SceneTaskKeywords., language));
String baseStrategy =
(
baseStrategyFuture,
MultiLingualUtil.(
SceneTaskKeywords., language));
String testStrategy =
(
testStrategyFuture,
MultiLingualUtil.(
SceneTaskKeywords., language));
stopWatch.stop();
stopWatch.start("get_conclusion_data");
// 生成简化版结论
ComparisonTypeEnum comparisonTypeEnum = ComparisonTypeEnum.(comparisonTypeId);
List<NodeTemplate> nodeTemplates =
treeNodeEntities
.stream()
.map(
o ->
JsonUtil.(
o.getModuleAlgorithmConfigJson(), new TypeReference<NodeTemplate>() {}))
.collect(Collectors.());
List<Integer> showReplacedMetricNameNodeIds =
nodeTemplates
.stream()
.filter(Objects::)
.filter(o -> !BooleanUtils.(o.getShowReplacedMetricName()))
.map(NodeTemplate::getNodeId)
.collect(Collectors.());
ConclusionDataMixture conclusionDataMixture =
getConclusionDataMixture(
entity.getId(),
language,
comparisonTypeEnum,
project,
extraConfig.getHideFirstLayerFilters(),
showReplacedMetricNameNodeIds,
executionDataInfo.getMetricIdToName(),
executionDataInfo.getDimensionIdToName(),
extraConfig);
// 在根节点的运行过程中,当前executionId的base和test根指标值会存下来.这里重新获取一下
// 另外异常检测相关结论也会读取到
entity =
sceneTaskDao.getSceneTaskExecutionByIds(Collections.(entity.getId())).get(0);
// 指南针/幸福里时间对比任务需要生成图表版结论
if (AnalysisHardCode..contains(project)
// && ComparisonTypeEnum.TIME_COMPARISON_TASK.equals(
// ComparisonTypeEnum.fromId(comparisonTypeId))
&& entity.getBaselineAggregationValue() != null
&& entity.getTestAggregationValue() != null) {
try {
String rootFilteredMetricName;
if (comparisonTypeEnum == ComparisonTypeEnum.) {
rootFilteredMetricName =
executorUtil.getFilteredMetricName(
entity.getAnalysisMetricId(),
entity.getBaselineFilter(),
entity.getTestFilter(),
comparisonTypeEnum,
language,
project,
executionDataInfo.getMetricIdToName(),
executionDataInfo.getDimensionIdToName());
} else {
rootFilteredMetricName = metricIdToName.get(entity.getAnalysisMetricId());
}
conclusionDataMixture.setFilteredRootMetricName(rootFilteredMetricName);
conclusionDataMixture.setBaseRootMetricValue(
Double.(entity.getBaselineAggregationValue()));
conclusionDataMixture.setTestRootMetricValue(
Double.(entity.getTestAggregationValue()));
.info("sceneTaskExecutionEntity is {}", JsonUtil.(entity));
analysisChartConclusion =
chartConclusionGeneration.createChartConclusion(
entity,
conclusionDataMixture,
project,
language,
executionDataInfo.getMetricIdToName(),
executionDataInfo.getDimensionIdToName(),
extraConfig.getSpanTableMinNodes());
} catch (Throwable t) {
.error(
String.("executionId %s generate chart conclusion failed", entity.getId()), t);
}
}
conclusionData = conclusionDataMixture.getConclusionDataNodes();
stopWatch.stop();
String getEventFailed =
MultiLingualUtil.(SceneTaskKeywords., language);
EventDataMixture eventDataMixture =
new EventDataMixture(
getEventFailed,
getEventFailed,
null,
new ArrayList<>(),
new ArrayList<>(),
new ArrayList<>());
try {
stopWatch.start("get_metric");
MetricsV2GetResponse.Metric metric =
analysisQueryClient.getMetricV2MetaById(
entity.getAnalysisMetricId(), language, project);
stopWatch.stop();
stopWatch.start("get_event");
eventDataMixture =
eventServiceClient.getEventDataMixture(
entity, metric.getName(), conclusionDataMixture, language);
stopWatch.stop();
} catch (Exception e) {
if (stopWatch.isRunning()) {
stopWatch.stop();
}
.warn(String.("entity %s get event failed", entity), e);
}
// 结论以列表形式存储,因为存在用户编辑,一个任务可能有多版结论
stopWatch.start("scene_conclusion");
SceneTaskConclusion conclusionDistinct =
new SceneTaskConclusion(
conclusionData,
String.(
StringEscapeUtils.(
MultiLingualUtil.(
"ANALYSIS_COMPARISON_TEMPLATE", language)),
eventDataMixture.getTestEvent(),
eventDataMixture.getBaseEvent()),
String.(
StringEscapeUtils.(
MultiLingualUtil.(
"ANALYSIS_COMPARISON_TEMPLATE", language)),
testAccident,
baseAccident),
String.(
StringEscapeUtils.(
MultiLingualUtil.(
"ANALYSIS_COMPARISON_TEMPLATE", language)),
testStrategy,
baseStrategy),
"",
"system",
ZonedDateTime.().format(TimeConstant.),
eventDataMixture.getBaselineEventAnnotations(),
eventDataMixture.getTestEventAnnotations(),
eventDataMixture.getOverallEventAnnotations(),
eventDataMixture.getEventTitle());
List<SceneTaskConclusion> conclusions = Collections.(conclusionDistinct);
// 对于需要重试的任务,如果不是所有节点都运行成功,就标记为失败
if (retryTimes > 1 && !fullSuccess) {
.error("executionId {} failed for {} times", entity.getId(), retryTimes);
taskExecutionDao.updateConclusion(
conclusions, entity.getId().intValue(), analysisChartConclusion);
sceneTaskExecutionStatusEnum = SceneTaskExecutionStatusEnum.;
taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(MetricsTagKey., String.(entity.getId()))
.build(),
1);
} else {
// update task conclusion
taskExecutionDao.updateConclusion(
conclusions, entity.getId().intValue(), analysisChartConclusion);
// 是否完全产出和缓存问题怎么解决
ConclusionCache cache =
new ConclusionCache(
SceneTaskExecutionStatusEnum..getStatus(),
entity.getId(),
conclusions.get(0).getData());
analysisRedis.set(entity.getId(), JsonUtil.(cache), 86400, TimeUnit.);
// 刷新驾驶舱执行任务状态缓存
sceneTaskDao.getCockpitHistoryExecution(
entity.getTreeId(),
JsonUtil.(FilterUtil.(entity.getTestFilters())),
JsonUtil.(entity.getBaselineTime()),
JsonUtil.(entity.getTestTime()),
entity.getMetricAggregateFunctionId(),
language.getIdentifier(),
false);
.info(String.("[SUCCESS] finished task execution %s", entity.getId()));
}
stopWatch.stop();
} catch (Exception e) {
if (stopWatch.isRunning()) {
stopWatch.stop();
}
String errorMessage =
String.("Failed to run task execution, execution id: %s.", entity.getId());
.error(errorMessage, e);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(MetricsTagKey., MetricsTagValue.)
.putTag(MetricsTagKey., SceneTaskUtil.())
.build(),
1);
// 发生异常,任务失败
taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);
sceneTaskExecutionStatusEnum = SceneTaskExecutionStatusEnum.;
}
stopWatch.start("send_lark");
try {
boolean larkNotify = true;
// 如果是预览任务,则不发送lark通知
if (entity.getTreeId() != null && entity.getVersion() != null) {
Boolean isPreview = treeDao.isSceneTreePreview(entity.getTreeId(), entity.getVersion());
larkNotify = !isPreview;
}
// 实验分析创建的子任务或者来自订阅平台,报警平台的任务不发送通知.
if (BooleanUtils.(extraConfig.getSkipLarkNotification())) {
larkNotify = false;
}
if (larkNotify) {
SceneTaskExecutionEntity newEntity = taskExecutionDao.findById(entity.getId());
Integer taskId = newEntity.getCompassAnalysisSceneTaskId();
String taskName = sceneTaskDao.getSceneTaskName(taskId == null ? null : taskId.longValue());
larkNotification.send(
entity.getCreator(),
taskName,
entity.getCompassAnalysisSceneTaskId(),
entity.getId(),
sceneTaskExecutionStatusEnum,
language,
ProjectEnum.(entity.getProjectId()),
extraConfig);
}
} catch (Exception e) {
String errorMessage =
String.(
"Failed to send message to %s, execution id:%s", entity.getCreator(), entity.getId());
.error(errorMessage, e);
this.metricsClient.emitCounter(
Metric.(MetricsKey.)
.putTag(MetricsTagKey., MetricsTagValue.)
.build(),
1);
}
stopWatch.stop();
stopWatch.start("send_mq");
try {
if (BooleanUtils.(extraConfig.getNeedPushMQ())) {
List<String> firstLayerConclusions = Collections.();
String detailUrl = null;
if (SceneTaskExecutionStatusEnum..equals(sceneTaskExecutionStatusEnum)) {
if (!CollectionUtils.(conclusionData)) {
firstLayerConclusions =
conclusionData
.stream()
.map(ConclusionDataNode::getText)
.collect(Collectors.());
detailUrl =
larkNotification.getTaskExecutionUrl(
ProjectEnum.(entity.getProjectId()),
entity.getCompassAnalysisSceneTaskId(),
entity.getId());
}
RocketMQMessage rocketMQMessage =
RocketMQMessage.()
.conclusions(firstLayerConclusions)
.detailUrl(detailUrl)
.executionId(entity.getId())
.executionStatus(sceneTaskExecutionStatusEnum.getStatus())
.taskTypeId(OneSiteTaskTypeEnum..getId())
.build();
rocketMQProducer.produce(
rocketMQMessage,
rocketMQConfig.getAlarmTopic(),
rocketMQConfig.getAlarmTags(),
entity.getId());
}
}
} catch (Exception e) {
String errorMessage =
String.("Failed to send message to mq, execution id %s", entity.getId());
.error(errorMessage, e);
}
// 回调bff发送日报
if (extraConfig.getDailyReportConfig() != null
&& extraConfig.getDailyReportConfig().getPushConfig() != null
&& BooleanUtils.(extraConfig.getDailyReportConfig().getPushConfig().getEnable())) {
.info(
"executionId {} push daily report, extraConfig {}",
entity.getId(),
JsonUtil.(extraConfig));
try {
dailyReportGeneration.pushDailyReport(entity, extraConfig);
} catch (Exception e) {
String errorMessage =
String.(
"Failed to push dailyReport, execution id %s extraConfig %s",
entity.getId(), JsonUtil.(extraConfig));
.error(errorMessage, e);
}
}
stopWatch.stop();
Duration duration = Duration.(System.() - startTime);
.info(
"SceneTaskExecutionId {} duration is {}, cost {}",
entity.getId(),
duration.toMillis(),
stopWatch.prettyPrint());
this.metricsClient.emitTimer(
Metric.(MetricsKey.)
.putTag(MetricsTagKey., String.(entity.getId()))
.putTag(MetricsTagKey., String.(entity.getTreeId()))
.build(),
duration);
SceneTaskConsumer..remove(entity.getId());
SceneTaskConsumer..getAndDecrement();
}