lateral view用法,analysis核心流程

flink

CREATE legacy function opt_log_from_json AS 'com.bytedance.ad.dataplatform.hiveudf.flink.OptLogUDTF';

CREATE LEGACY FUNCTION TIMESTAMP_TO_LONG AS 'com.bytedance.flink.udf.udf.time.TimestampToLong';

CREATE legacy function FROM_UNIXTIME AS 'com.bytedance.flink.udf.udf.time.FromUnixTime';

CREATE LEGACY FUNCTION json_str_to_array AS 'com.bytedance.flink.udf.udf.JSONStringToArray';

CREATE LEGACY FUNCTION string_array_to_long_array AS 'com.bytedance.ad.dataplatform.hiveudf.flink.StringArrayToLongArray';

CREATE LEGACY FUNCTION array_to_rows AS 'com.bytedance.flink.udf.udtf.StringArrayExplode';

-- ADD resources dim_middleware_jar_11;

ADD resources dim_middleware_service;

CREATE TABLE source (

id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,

create_time ROW<before_value TIMESTAMP, after_value TIMESTAMP, after_updated BOOLEAN>,

operator_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,

advertiser_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,

object_type ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,

opt_type ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,

object_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,

old_value ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,

new_value ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,

operator_staff_id ROW<before_value BIGINT, after_value BIGINT, after_updated BOOLEAN>,

system_origin ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,

opt_ip ROW<before_value VARCHAR, after_value VARCHAR, after_updated BOOLEAN>,

status ROW<before_value TINYINT, after_value TINYINT, after_updated BOOLEAN>,

proctime AS PROCTIME()

)

WITH (

'connector' = 'rocketmq',

'cluster' = 'dbus_realtime_big',

'topic' = 'binlog_ad_data_log',

'group' = 'ad_stats_opt_log_20230705_v1',

'format' = 'binlog',

'binlog.target-table' = 'opt_log',

'tag' = 'opt_log',

'scan.consumer-offset-reset-to' = 'latest'

);

CREATE TABLE rpc_dim

WITH(

'connector' = 'rpc',

'consul' = 'ad.stats.thanos_dim_middleware',

'thrift.service-class' = 'com.bytedance.ad.dataplatform.thanos.dim.middleware.spi.DimMiddlewareService',

'thrift.method' = 'GetDimInfosBatch',

'cluster' = 'default',

'connection.timeout' = '20000ms',

'consul.update-interval' = '60s',

'lookup.max-retries' = '3',

'lookup.cache.max-rows' = '30000',

'lookup.cache.ttl' = '1200000ms',

'lookup.failure-handle-strategy' = 'EMIT_EMPTY',

'thrift.transport' = 'Buffered',

'lookup.batch-mode.enabled' = 'true',

'lookup.batch-request-field-name' = 'requests',

'lookup.batch-response-field-name' = 'responses',

'lookup.batch-size' = '500'

);

LEFT JOIN

LATERAL TABLE(

array_to_rows(

json_str_to_array(

COALESCE(

dim_fifth.GetDimInfosResponse.dimensions ['ad_id_array'],

'[]'

)

)

)

) AS ad_id_array_table(promotion_ad_id)

ON TRUE

spark hive sql

ad_dm.dm_search_adengine_stats_daily lateral view explode(split(query_industry, ',')) ind as industry

analysis

@Override

@Async(value = "main-executor")

public void run(SceneTaskExecutionEntity entity, LanguageRangeEnum language, Boolean isUnion) {

long startTime = System.();

StopWatch stopWatch = new StopWatch("SceneTaskExecutionRun");

SceneTaskExecutionStatusEnum sceneTaskExecutionStatusEnum =

SceneTaskExecutionStatusEnum.;

List<ConclusionDataNode> conclusionData = null;

AnalysisChartConclusion analysisChartConclusion = null;

// 注意重分析 extraConfig 字段可能为null需要进行兼容处理否则NPE

ExtraConfig extraConfig = entity.getExtraConfig();

if (extraConfig == null) {

extraConfig = new ExtraConfig();

}

try {

stopWatch.start("get_tree_node");

.info(String.("SceneTaskExecutionId %s starts", entity.getId()));

taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);

ProjectEnum project = ProjectEnum.(entity.getProjectId());

// get and etl tree node info

List<CompassAnalysisSceneTreeNodeEntity> treeNodeEntities =

treeDao.getSceneTreeNodesByTreeIdAndVersion(entity.getTreeId(), entity.getVersion());

List<NodeTemplate> treeNodes =

treeNodeEntities

.stream()

.map(

o ->

JsonUtil.(

o.getModuleAlgorithmConfigJson(), new TypeReference<NodeTemplate>() {}))

.filter(Objects::)

.collect(Collectors.());

stopWatch.stop();

stopWatch.start("get_metric_and_dimension");

List<Integer> metricIds = NodeTemplateConverter.(treeNodes);

List<Integer> dimensionIds =

NodeTemplateConverter.(treeNodes);

Map<Integer, String> metricIdToName =

analysisQueryClient

.getMetricV2MetaByIds(null, metricIds, language, metricIds.size(), 1, project)

.stream()

.collect(

Collectors.(

MetricsV2GetResponse.Metric::getId, MetricsV2GetResponse.Metric::getName));

Map<Integer, String> dimensionIdToName =

analysisQueryClient

.getDimensionsSimple(dimensionIds, language, project)

.stream()

.collect(Collectors.(Dimension::getId, Dimension::getName));

stopWatch.stop();

stopWatch.start("get_node_execution_config");

ModuleExecutions moduleExecutions = new ModuleExecutions();

Map<Integer, CompassAnalysisSceneTreeNodeEntity> nodeIdToNodeEntity = new HashMap<>();

for (CompassAnalysisSceneTreeNodeEntity nodeEntity : treeNodeEntities) {

nodeIdToNodeEntity.put(nodeEntity.getTreeNodeId(), nodeEntity);

}

CompassAnalysisSceneTreeNodeEntity rootNode = nodeIdToNodeEntity.get(0);

Filter baselineFilter = entity.getBaselineFilter();

Filter testFilter = entity.getTestFilter();

// 实验分组事实上是一个特殊filter.假设实验组vid为1000,则baselineFilter是原来的则baselineFilter是原来的与vid=1000作and

if (entity.getVidConfig() != null) {

SceneTaskUtil.BaselineAndTestFilter baselineAndTestLibraFilter =

SceneTaskUtil.(

baselineFilter, testFilter, vidDimensionId, entity.getVidConfig());

baselineFilter = baselineAndTestLibraFilter.getBaselineFilter();

testFilter = baselineAndTestLibraFilter.getTestFilter();

}

SceneTreeEntity sceneTreeEntity =

treeDao.getSceneTreeByTreeIdAndVersion(entity.getTreeId(), entity.getVersion());

TreeAttributes treeAttributes =

JsonUtil.(

sceneTreeEntity.getTreeAttributesJson(), new TypeReference<TreeAttributes>() {});

metricIdToName.putAll(SceneTaskUtil.(treeAttributes, language));

dimensionIdToName.putAll(SceneTaskUtil.(treeAttributes, language));

ExecutionDataInfo executionDataInfo =

ExecutionDataInfo.()

.metricIdToName(metricIdToName)

.dimensionIdToName(dimensionIdToName)

.extraConfig(extraConfig)

.build();

stopWatch.stop();

stopWatch.start("init_root_node_execution");

// construct middle data struct

Integer comparisonTypeId = entity.getComparisonTypeId();

initRootNodeExecution(

entity,

moduleExecutions,

nodeIdToNodeEntity,

rootNode,

baselineFilter,

testFilter,

project,

comparisonTypeId,

executionDataInfo);

stopWatch.stop();

// get accident strategy abtest data

String baseStartTime = entity.getBaselineTime().getStartTime();

String baseEndTime = entity.getBaselineTime().getEndTime();

String testStartTime = entity.getTestTime().getStartTime();

String testEndTime = entity.getTestTime().getEndTime();

Future<String> baseAccidentFuture =

moduleExecutor.getAccidentData(baseStartTime, baseEndTime, language, region);

Future<String> testAccidentFuture =

moduleExecutor.getAccidentData(testStartTime, testEndTime, language, region);

Future<String> baseStrategyFuture =

moduleExecutor.getStrategyData(baseStartTime, baseEndTime, language, region);

Future<String> testStrategyFuture =

moduleExecutor.getStrategyData(testStartTime, testEndTime, language, region);

boolean fullSuccess = true;

Integer retryTimes = 1;

if (entity.getExtraConfig() != null && entity.getExtraConfig().getRetryTimes() != null) {

retryTimes = entity.getExtraConfig().getRetryTimes();

}

stopWatch.start("execution_main");

for (int i = 0; i < retryTimes; i++) {

.info("Task id {}, run {}th time", entity.getId(), i);

// start run

long start = System.();

while (!moduleExecutions.toRunModules.isEmpty()

|| !moduleExecutions.runningModules.isEmpty()) {

long time = System.();

// 如果距离上一次检查已经过了5秒那就再检查一次

if (time - start > ) {

SceneTaskStatusEntity statusEntity =

sceneTaskDao.getSceneTaskStatus(Collections.(entity.getId())).get(0);

SceneTaskExecutionStatusEnum statusEnum =

SceneTaskExecutionStatusEnum.(statusEntity.getSceneTaskStatus());

// 当前任务可能已经被标记取消或者超时,如果是这样就不再继续执行.

if (statusEnum != SceneTaskExecutionStatusEnum.) {

.info(String.("[SUCCESS] interrupted task execution %s", entity.getId()));

try {

SceneTaskExecutionEntity newEntity = taskExecutionDao.findById(entity.getId());

Integer taskId = newEntity.getCompassAnalysisSceneTaskId();

String taskName =

sceneTaskDao.getSceneTaskName(taskId == null ? null : taskId.longValue());

// 如果任务是主动取消的不发送通知

if (!SceneTaskExecutionStatusEnum..equals(statusEnum)) {

larkNotification.send(

entity.getCreator(),

taskName,

entity.getCompassAnalysisSceneTaskId(),

entity.getId(),

statusEnum,

language,

project,

extraConfig);

}

} catch (Exception e) {

String errorMessage =

String.(

"Failed to send message to %s, execution id:%d",

entity.getCreator(), entity.getId());

.error(errorMessage, e);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(

MetricsTagKey., MetricsTagValue.)

.build(),

1);

}

// 如果任务被faas标记为超时,将处理中的任务数-1然后返回.

.info(

String.(

"SceneTaskExecutionId %s status is not processing, abort", entity.getId()));

SceneTaskConsumer..getAndDecrement();

SceneTaskConsumer..remove(entity.getId());

return;

}

start = time;

}

// iterator runningModules

for (Iterator<ModuleExecutionContext> iterator =

moduleExecutions.runningModules.iterator();

iterator.hasNext(); ) {

ModuleExecutionContext runningModule = iterator.next();

if (runningModule.outputFuture.isDone()) {

try {

ModuleExecutorOutput output = runningModule.outputFuture.get();

.info(

String.(

"[FINISHED][Task %s / Module %s] is finished, output %s",

runningModule.input.getTaskExecutionId(),

runningModule.input.getModuleExecutionId(),

output));

iterator.remove();

// generate drill down node, add it to toRunModules

List<CompassAnalysisSceneModuleExecutionEntity> moduleExecutionEntities =

moduleExecutionDao.findByIds(output.getDrillDownModuleExecutionIds());

for (CompassAnalysisSceneModuleExecutionEntity executionEntity :

moduleExecutionEntities) {

ModuleExecutorInput executorInput =

new ModuleExecutorInput(

entity.getId(),

executionEntity.getId(),

executionEntity.getCompassAnalysisSceneTreeNodeId(),

executionEntity.getAnalysisMetricId(),

executionEntity.getAggregationFunctionEnum(),

executionEntity.getBaselineFilter(),

executionEntity.getTestFilter(),

executionEntity.getBaselineInterval(),

executionEntity.getTestInterval(),

entity.getBaselinePeriodsIntervals(),

executionEntity.getAnalysisModule(),

ComparisonTypeEnum.(comparisonTypeId),

nodeIdToNodeEntity,

project,

executionDataInfo);

moduleExecutions.toRunModules.add(

new ModuleExecutionContext(executorInput, null));

.info(

String.(

"[ADD TO CAN]Add task %s / module %s to toRunModules",

entity.getId(), executionEntity.getId()));

}

} catch (AccessThirdAPIException e) {

.error("第三方api调用失败,请检查是否starling平台文案拉取失败", e);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(MetricsTagKey., MetricsTagValue.)

.build(),

1);

} catch (Throwable t) {

.error("Failed to run task. executor input:" + runningModule.input, t);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(

MetricsTagKey., MetricsTagValue.)

.putTag(

MetricsTagKey.,

runningModule.input.getAnalysisModule().getSceneModuleType().getName())

.putTag(

MetricsTagKey.,

SceneTaskUtil.())

.build(),

1);

}

}

}

// run module in toRunModules, update status

if (!moduleExecutions.toRunModules.isEmpty()) {

ModuleExecutionContext toRunModule = moduleExecutions.toRunModules.remove(0);

.info(

String.(

"[START][Task %s/ Module %s] starts to run",

toRunModule.input.getTaskExecutionId(),

toRunModule.input.getModuleExecutionId()));

Future<ModuleExecutorOutput> outputFuture =

moduleExecutor.execute(toRunModule.input, language);

moduleExecutions.runningModules.add(

new ModuleExecutionContext(toRunModule.input, outputFuture));

}

}

// 确定是否所有的节点都执行成功

List<CompassAnalysisSceneModuleExecutionEntity> moduleExecutionEntities =

moduleExecutionDao.findByTaskExecutionId(entity.getId());

List<CompassAnalysisSceneModuleExecutionEntity> failedModuleExecutionEntities =

new ArrayList<>();

for (CompassAnalysisSceneModuleExecutionEntity moduleExecution : moduleExecutionEntities) {

Integer status = moduleExecution.getStatus();

if (!TaskExecutionStateEnum..getId().equals(status)) {

failedModuleExecutionEntities.add(moduleExecution);

}

}

// 如果有一个节点失败就算失败

fullSuccess = failedModuleExecutionEntities.size() == 0;

// 如果不是最后一次尝试,并且失败了,需要初始化一下任务的执行并且等待一段时间

if (i < retryTimes - 1 && !fullSuccess) {

// 需要重试,软删除当前 execution 对应的 moduleExecution

moduleExecutionDao.deleteModuleExecutionEntitiesByTaskExecutionId(entity.getId());

// 重新 init 根节点

initRootNodeExecution(

entity,

moduleExecutions,

nodeIdToNodeEntity,

rootNode,

baselineFilter,

testFilter,

project,

comparisonTypeId,

executionDataInfo);

Thread.(60_000L * 3);

} else {

break;

}

}

stopWatch.stop();

stopWatch.start("get_accident_strategy_data");

String baseAccident =

(

baseAccidentFuture,

MultiLingualUtil.(

SceneTaskKeywords., language));

String testAccident =

(

testAccidentFuture,

MultiLingualUtil.(

SceneTaskKeywords., language));

String baseStrategy =

(

baseStrategyFuture,

MultiLingualUtil.(

SceneTaskKeywords., language));

String testStrategy =

(

testStrategyFuture,

MultiLingualUtil.(

SceneTaskKeywords., language));

stopWatch.stop();

stopWatch.start("get_conclusion_data");

// 生成简化版结论

ComparisonTypeEnum comparisonTypeEnum = ComparisonTypeEnum.(comparisonTypeId);

List<NodeTemplate> nodeTemplates =

treeNodeEntities

.stream()

.map(

o ->

JsonUtil.(

o.getModuleAlgorithmConfigJson(), new TypeReference<NodeTemplate>() {}))

.collect(Collectors.());

List<Integer> showReplacedMetricNameNodeIds =

nodeTemplates

.stream()

.filter(Objects::)

.filter(o -> !BooleanUtils.(o.getShowReplacedMetricName()))

.map(NodeTemplate::getNodeId)

.collect(Collectors.());

ConclusionDataMixture conclusionDataMixture =

getConclusionDataMixture(

entity.getId(),

language,

comparisonTypeEnum,

project,

extraConfig.getHideFirstLayerFilters(),

showReplacedMetricNameNodeIds,

executionDataInfo.getMetricIdToName(),

executionDataInfo.getDimensionIdToName(),

extraConfig);

// 在根节点的运行过程中,当前executionId的base和test根指标值会存下来.这里重新获取一下

// 另外异常检测相关结论也会读取到

entity =

sceneTaskDao.getSceneTaskExecutionByIds(Collections.(entity.getId())).get(0);

// 指南针/幸福里时间对比任务需要生成图表版结论

if (AnalysisHardCode..contains(project)

// && ComparisonTypeEnum.TIME_COMPARISON_TASK.equals(

// ComparisonTypeEnum.fromId(comparisonTypeId))

&& entity.getBaselineAggregationValue() != null

&& entity.getTestAggregationValue() != null) {

try {

String rootFilteredMetricName;

if (comparisonTypeEnum == ComparisonTypeEnum.) {

rootFilteredMetricName =

executorUtil.getFilteredMetricName(

entity.getAnalysisMetricId(),

entity.getBaselineFilter(),

entity.getTestFilter(),

comparisonTypeEnum,

language,

project,

executionDataInfo.getMetricIdToName(),

executionDataInfo.getDimensionIdToName());

} else {

rootFilteredMetricName = metricIdToName.get(entity.getAnalysisMetricId());

}

conclusionDataMixture.setFilteredRootMetricName(rootFilteredMetricName);

conclusionDataMixture.setBaseRootMetricValue(

Double.(entity.getBaselineAggregationValue()));

conclusionDataMixture.setTestRootMetricValue(

Double.(entity.getTestAggregationValue()));

.info("sceneTaskExecutionEntity is {}", JsonUtil.(entity));

analysisChartConclusion =

chartConclusionGeneration.createChartConclusion(

entity,

conclusionDataMixture,

project,

language,

executionDataInfo.getMetricIdToName(),

executionDataInfo.getDimensionIdToName(),

extraConfig.getSpanTableMinNodes());

} catch (Throwable t) {

.error(

String.("executionId %s generate chart conclusion failed", entity.getId()), t);

}

}

conclusionData = conclusionDataMixture.getConclusionDataNodes();

stopWatch.stop();

String getEventFailed =

MultiLingualUtil.(SceneTaskKeywords., language);

EventDataMixture eventDataMixture =

new EventDataMixture(

getEventFailed,

getEventFailed,

null,

new ArrayList<>(),

new ArrayList<>(),

new ArrayList<>());

try {

stopWatch.start("get_metric");

MetricsV2GetResponse.Metric metric =

analysisQueryClient.getMetricV2MetaById(

entity.getAnalysisMetricId(), language, project);

stopWatch.stop();

stopWatch.start("get_event");

eventDataMixture =

eventServiceClient.getEventDataMixture(

entity, metric.getName(), conclusionDataMixture, language);

stopWatch.stop();

} catch (Exception e) {

if (stopWatch.isRunning()) {

stopWatch.stop();

}

.warn(String.("entity %s get event failed", entity), e);

}

// 结论以列表形式存储,因为存在用户编辑,一个任务可能有多版结论

stopWatch.start("scene_conclusion");

SceneTaskConclusion conclusionDistinct =

new SceneTaskConclusion(

conclusionData,

String.(

StringEscapeUtils.(

MultiLingualUtil.(

"ANALYSIS_COMPARISON_TEMPLATE", language)),

eventDataMixture.getTestEvent(),

eventDataMixture.getBaseEvent()),

String.(

StringEscapeUtils.(

MultiLingualUtil.(

"ANALYSIS_COMPARISON_TEMPLATE", language)),

testAccident,

baseAccident),

String.(

StringEscapeUtils.(

MultiLingualUtil.(

"ANALYSIS_COMPARISON_TEMPLATE", language)),

testStrategy,

baseStrategy),

"",

"system",

ZonedDateTime.().format(TimeConstant.),

eventDataMixture.getBaselineEventAnnotations(),

eventDataMixture.getTestEventAnnotations(),

eventDataMixture.getOverallEventAnnotations(),

eventDataMixture.getEventTitle());

List<SceneTaskConclusion> conclusions = Collections.(conclusionDistinct);

// 对于需要重试的任务,如果不是所有节点都运行成功,就标记为失败

if (retryTimes > 1 && !fullSuccess) {

.error("executionId {} failed for {} times", entity.getId(), retryTimes);

taskExecutionDao.updateConclusion(

conclusions, entity.getId().intValue(), analysisChartConclusion);

sceneTaskExecutionStatusEnum = SceneTaskExecutionStatusEnum.;

taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(MetricsTagKey., String.(entity.getId()))

.build(),

1);

} else {

// update task conclusion

taskExecutionDao.updateConclusion(

conclusions, entity.getId().intValue(), analysisChartConclusion);

// 是否完全产出和缓存问题怎么解决

ConclusionCache cache =

new ConclusionCache(

SceneTaskExecutionStatusEnum..getStatus(),

entity.getId(),

conclusions.get(0).getData());

analysisRedis.set(entity.getId(), JsonUtil.(cache), 86400, TimeUnit.);

// 刷新驾驶舱执行任务状态缓存

sceneTaskDao.getCockpitHistoryExecution(

entity.getTreeId(),

JsonUtil.(FilterUtil.(entity.getTestFilters())),

JsonUtil.(entity.getBaselineTime()),

JsonUtil.(entity.getTestTime()),

entity.getMetricAggregateFunctionId(),

language.getIdentifier(),

false);

.info(String.("[SUCCESS] finished task execution %s", entity.getId()));

}

stopWatch.stop();

} catch (Exception e) {

if (stopWatch.isRunning()) {

stopWatch.stop();

}

String errorMessage =

String.("Failed to run task execution, execution id: %s.", entity.getId());

.error(errorMessage, e);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(MetricsTagKey., MetricsTagValue.)

.putTag(MetricsTagKey., SceneTaskUtil.())

.build(),

1);

// 发生异常,任务失败

taskExecutionDao.updateStatus(entity.getId(), SceneTaskExecutionStatusEnum.);

sceneTaskExecutionStatusEnum = SceneTaskExecutionStatusEnum.;

}

stopWatch.start("send_lark");

try {

boolean larkNotify = true;

// 如果是预览任务,则不发送lark通知

if (entity.getTreeId() != null && entity.getVersion() != null) {

Boolean isPreview = treeDao.isSceneTreePreview(entity.getTreeId(), entity.getVersion());

larkNotify = !isPreview;

}

// 实验分析创建的子任务或者来自订阅平台,报警平台的任务不发送通知.

if (BooleanUtils.(extraConfig.getSkipLarkNotification())) {

larkNotify = false;

}

if (larkNotify) {

SceneTaskExecutionEntity newEntity = taskExecutionDao.findById(entity.getId());

Integer taskId = newEntity.getCompassAnalysisSceneTaskId();

String taskName = sceneTaskDao.getSceneTaskName(taskId == null ? null : taskId.longValue());

larkNotification.send(

entity.getCreator(),

taskName,

entity.getCompassAnalysisSceneTaskId(),

entity.getId(),

sceneTaskExecutionStatusEnum,

language,

ProjectEnum.(entity.getProjectId()),

extraConfig);

}

} catch (Exception e) {

String errorMessage =

String.(

"Failed to send message to %s, execution id:%s", entity.getCreator(), entity.getId());

.error(errorMessage, e);

this.metricsClient.emitCounter(

Metric.(MetricsKey.)

.putTag(MetricsTagKey., MetricsTagValue.)

.build(),

1);

}

stopWatch.stop();

stopWatch.start("send_mq");

try {

if (BooleanUtils.(extraConfig.getNeedPushMQ())) {

List<String> firstLayerConclusions = Collections.();

String detailUrl = null;

if (SceneTaskExecutionStatusEnum..equals(sceneTaskExecutionStatusEnum)) {

if (!CollectionUtils.(conclusionData)) {

firstLayerConclusions =

conclusionData

.stream()

.map(ConclusionDataNode::getText)

.collect(Collectors.());

detailUrl =

larkNotification.getTaskExecutionUrl(

ProjectEnum.(entity.getProjectId()),

entity.getCompassAnalysisSceneTaskId(),

entity.getId());

}

RocketMQMessage rocketMQMessage =

RocketMQMessage.()

.conclusions(firstLayerConclusions)

.detailUrl(detailUrl)

.executionId(entity.getId())

.executionStatus(sceneTaskExecutionStatusEnum.getStatus())

.taskTypeId(OneSiteTaskTypeEnum..getId())

.build();

rocketMQProducer.produce(

rocketMQMessage,

rocketMQConfig.getAlarmTopic(),

rocketMQConfig.getAlarmTags(),

entity.getId());

}

}

} catch (Exception e) {

String errorMessage =

String.("Failed to send message to mq, execution id %s", entity.getId());

.error(errorMessage, e);

}

// 回调bff发送日报

if (extraConfig.getDailyReportConfig() != null

&& extraConfig.getDailyReportConfig().getPushConfig() != null

&& BooleanUtils.(extraConfig.getDailyReportConfig().getPushConfig().getEnable())) {

.info(

"executionId {} push daily report, extraConfig {}",

entity.getId(),

JsonUtil.(extraConfig));

try {

dailyReportGeneration.pushDailyReport(entity, extraConfig);

} catch (Exception e) {

String errorMessage =

String.(

"Failed to push dailyReport, execution id %s extraConfig %s",

entity.getId(), JsonUtil.(extraConfig));

.error(errorMessage, e);

}

}

stopWatch.stop();

Duration duration = Duration.(System.() - startTime);

.info(

"SceneTaskExecutionId {} duration is {}, cost {}",

entity.getId(),

duration.toMillis(),

stopWatch.prettyPrint());

this.metricsClient.emitTimer(

Metric.(MetricsKey.)

.putTag(MetricsTagKey., String.(entity.getId()))

.putTag(MetricsTagKey., String.(entity.getTreeId()))

.build(),

duration);

SceneTaskConsumer..remove(entity.getId());

SceneTaskConsumer..getAndDecrement();

}

全部评论

相关推荐

不放弃的小鱼干很洒脱:好可爱的离职理由
点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客企业服务