2 20 数据开发面试题汇总
下面是我在实习中协助面试 然后在牛客上挑选了一些完整的面试问题借助豆包完成的面经答案思路汇总
滴滴数据研发日常实习
一面
- 数据仓库认识
- 维度建模之外还有哪些建模,有什么区别
- 项目中数据仓库分了哪几层,为什么要分层
- Hadoop架构,你这些组件中选一个最熟悉的说(这问题也太友好了,我选了MapReduce,说了一下流程)
- 怎么实现分区,环形缓冲区溢写的分区和后续Reduce的分区一样吗
- 环形缓冲区里的快排是怎么实现的,是对数据快排吗
- 简述了一下快排的过程,时间空间复杂度
- 时间复杂度和空间复杂度的区别,原理,怎么计算。举个例子说明。
- Hive的介绍,对Hive的认识,用处
- 项目的数据流向,数据链路搭建
- 两道sql,共享屏幕手撕。一个是每一天最早登录的三个人的信息,第二个是连续登陆问题。
一面下来感受很好,除了面试时候迟到了20分钟左右。整体面试流程还挺顺畅的,那个时空复杂度的计算说的不好,他还举了个例子给我解释。整体回答下来感觉发挥也还行。正好是周末,第二天约了下周一的二面
二面
二面感觉就问题问的非常宽泛,但有的问题又非常细。会具体到一张表里面一个字段的设计
- 项目的主题设计里面,XX表有哪些字段,你是怎么设计这些字段的,设计过程怎么样的。你说市场调研和业务分析,怎么调研怎么分析的。(说实话上来一连串这样问被问懵了)。表的主键是什么,又举例了哪些字段为什么你不放在这张表里,你是怎么考虑的。给你个新的主题你怎么设计
- 新数据来了怎么更新,比如XX表多了一行数据,你怎么实现更新。调度的话要写多少个任务,怎么写
- Hive的优化经验,为什么会出现数据倾斜
- 学习的一些课程,研究课题。能不能扛得住压力,手上还有没有offer,还面试了其它哪几家公司,表现怎么样(说实话真无力吐槽,你问这个问题,我真不知道怎么回答)
- 看没看过技术类书籍,我回答看过阿里的大数据之路。然后开始问里面的建模理解啊等问题,我说时间太长了,记不太清楚了
面到最后来了句基础比较差,回去商量一下给我结果。让我继续面着其它的,到此是寄了。这是第一次二面,感觉确实和一面风格不一样,很少问八股。一般是找一个他感兴趣的点,然后开始追着问,一连串问下去
作者:懒得低眉链接:https://www.nowcoder.com/experience/733来源:牛客网
详细回答
一面问题回答:
- 数据仓库认识:数据仓库是一个面向主题的、集成的、相对稳定的、反映历史变化的数据集合,用于支持管理决策。它不同于操作型数据库,数据仓库主要用于分析,而不是事务处理。它通过ETL(抽取、转换、加载)过程从多个数据源收集数据,经过清洗、转换和集成后存储在数据仓库中,以便用户进行查询、报表生成和数据分析。例如,在电商领域,数据仓库可以整合订单数据、用户数据、商品数据等,帮助企业分析销售趋势、用户行为等。
- 维度建模之外还有哪些建模,有什么区别:除了维度建模,还有范式建模和Data Vault建模等。范式建模遵循数据库设计的范式原则(如第一范式、第二范式、第三范式等),强调数据的规范化和消除冗余,适用于OLTP(联机事务处理)系统,但在分析场景下可能需要复杂的关联查询。维度建模以事实表和维度表为核心,围绕业务过程构建,易于理解和查询,适合OLAP(联机分析处理)场景。Data Vault建模则是一种混合型建模方法,强调数据的灵活性和可扩展性,通过中心表、链接表和卫星表来存储数据。
- 项目中数据仓库分了哪几层,为什么要分层:一般数据仓库分为ODS(操作数据存储层)、DWD(明细数据层)、DWS(汇总数据层)和ADS(应用数据层)。分层的原因主要有:提高数据的可维护性,每层有明确的职责和功能,便于问题排查和代码管理;减少数据的冗余,不同层可以根据需要对数据进行整合和处理;提高数据的复用性,上层可以直接使用下层处理好的数据;隔离业务逻辑和数据处理逻辑,使得数据仓库的架构更加清晰。
- Hadoop架构,你这些组件中选一个最熟悉的说(选了MapReduce,说了一下流程):MapReduce是Hadoop的核心计算框架,用于处理大规模数据集的并行计算。其流程主要包括:Map阶段,输入数据被分割成多个数据块,每个数据块由一个Mapper处理,Mapper将输入数据转换为键值对形式,并根据业务逻辑进行处理;Shuffle阶段,Mapper输出的键值对会根据键进行排序和分组,然后传递给Reducer;Reduce阶段,Reducer接收相同键的键值对集合,并进行聚合操作,最终输出结果。
- 怎么实现分区,环形缓冲区溢写的分区和后续Reduce的分区一样吗:在MapReduce中,分区是通过Partitioner来实现的,默认的Partitioner是根据键的哈希值进行分区。环形缓冲区溢写的分区和后续Reduce的分区是一样的,都是基于相同的分区逻辑,这样才能保证相同键的数据最终被发送到同一个Reducer进行处理。
- 环形缓冲区里的快排是怎么实现的,是对数据快排吗:环形缓冲区里的快排是对缓冲区中的数据进行排序,一般是按照键进行排序。具体实现是选择一个基准键,将缓冲区中的数据分为两部分,一部分键小于基准键,另一部分键大于基准键,然后对这两部分分别递归进行排序,直到整个缓冲区的数据有序。
- 简述了一下快排的过程,时间空间复杂度:快排的过程是:选择一个基准元素,将数组分为两部分,左边部分的元素都小于基准元素,右边部分的元素都大于基准元素,然后对左右两部分分别递归进行快排,直到整个数组有序。时间复杂度在平均情况下是O(n log n),在最坏情况下是O(n^2),空间复杂度在平均情况下是O(log n),在最坏情况下是O(n)。
- 时间复杂度和空间复杂度的区别,原理,怎么计算。举个例子说明:时间复杂度是衡量算法执行时间随输入规模增长的变化趋势,空间复杂度是衡量算法执行过程中所需额外空间随输入规模增长的变化趋势。计算时间复杂度主要看算法中基本操作的执行次数,空间复杂度主要看算法执行过程中使用的额外空间。例如,对于一个简单的数组遍历求和算法,时间复杂度是O(n),因为需要遍历数组中的每个元素一次,空间复杂度是O(1),因为只使用了有限的额外空间(如一个变量来存储和)。
- Hive的介绍,对Hive的认识,用处:Hive是建立在Hadoop之上的数据仓库基础架构,它提供了一种类似于SQL的查询语言HiveQL,用于处理大规模结构化数据。Hive的主要作用是将结构化的数据映射到一个数据库表中,并提供SQL查询功能,使得不熟悉MapReduce编程的用户也能方便地进行数据分析。它常用于数据仓库的构建、数据分析、报表生成等场景,例如在电商数据分析中,可以使用Hive查询用户的购买记录、商品的销售情况等。
- 项目的数据流向,数据链路搭建:在项目中,数据首先从数据源(如数据库、日志文件等)通过ETL工具抽取到ODS层,然后在ODS层进行简单的清洗和转换后,加载到DWD层。在DWD层对数据进行进一步的清洗和明细处理,生成明细数据。接着,根据业务需求,在DWS层对DWD层的数据进行汇总和聚合,生成汇总数据。最后,ADS层从DWS层获取数据,进行业务逻辑处理和展示,提供给用户使用。数据链路的搭建需要考虑数据的来源、数据的处理逻辑、数据的存储和传输等方面,确保数据的准确性、完整性和及时性。
- 两道sql,共享屏幕手撕。一个是每一天最早登录的三个人的信息,第二个是连续登陆问题:
SELECT user_id, login_time, date FROM ( SELECT user_id, login_time, date, ROW_NUMBER() OVER (PARTITION BY date ORDER BY login_time) AS rn FROM user_login ) AS subquery WHERE rn <= 3;
- 对于连续登陆问题,假设表名为user_login,字段有user_id, login_date,要找出连续登陆天数大于等于3天的用户,可以使用以下SQL:
WITH ranked_dates AS ( SELECT user_id, login_date, DATE_SUB(login_date, INTERVAL ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY login_date) DAY) AS group_date FROM user_login ), counted_dates AS ( SELECT user_id, group_date, COUNT(login_date) AS consecutive_days FROM ranked_dates GROUP BY user_id, group_date ) SELECT user_id FROM counted_dates WHERE consecutive_days >= 3;
二面问题回答:
- 项目的主题设计里面,XX表有哪些字段,你是怎么设计这些字段的,设计过程怎么样的。你说市场调研和业务分析,怎么调研怎么分析的。(说实话上来一连串这样问被问懵了)。表的主键是什么,又举例了哪些字段为什么你不放在这张表里,你是怎么考虑的。给你个新的主题你怎么设计:
- 新数据来了怎么更新,比如XX表多了一行数据,你是怎么实现更新。调度的话要写多少个任务,怎么写:对于新数据的更新,如果是XX表多了一行数据,一般可以使用INSERT INTO语句将新数据插入到表中。如果数据存在更新的情况,可以使用UPDATE语句进行更新。在实现更新时,要确保数据的一致性和完整性,例如可以使用事务来保证数据的原子性。
- Hive的优化经验,为什么会出现数据倾斜:
- 学习的一些课程,研究课题。能不能扛得住压力,手上还有没有offer,还面试了其它哪几家公司,表现怎么样(说实话真无力吐槽,你问这个问题,我真不知道怎么回答):
- 看没看过技术类书籍,我回答看过阿里的大数据之路。然后开始问里面的建模理解啊等问题,我说时间太长了,记不太清楚了:如果看过《阿里的大数据之路》,可以简要回顾一下书中关于建模的主要内容,如维度建模的方法和实践、数据仓库的架构设计等。可以说书中强调了数据仓库在企业数据管理中的重要性,以及如何通过合理的建模方法来提高数据的质量和可用性。同时,可以表达自己对建模的理解,如建模是为了更好地组织和管理数据,满足业务需求,提高数据分析的效率和准确性。
希望以上回答能够帮助你更好地准备面试,祝你面试成功!
Kafka面试重点
- 为什么要使用kafka
- 简述kafka的架构
- 命令行操作
以下是关于Kafka面试重点问题的详细回答:
为什么要使用Kafka
- 解耦系统组件:在一个大型的分布式系统中,通常会有多个不同的组件或服务之间需要进行数据交互。使用Kafka作为消息中间件,可以将这些组件之间的依赖关系解耦。例如,一个电商系统中的订单服务和库存服务,订单服务在生成订单后可以将相关消息发送到Kafka,库存服务从Kafka获取消息来更新库存,这样两个服务之间不需要直接调用,各自可以独立地进行开发、部署和扩展。
- 缓存数据:Kafka可以作为数据的临时缓存区。当数据产生的速度超过了下游系统的处理速度时,Kafka可以将数据暂存起来,让下游系统按照自己的节奏从Kafka中拉取数据进行处理,避免数据丢失或系统过载。比如在日志收集系统中,应用程序产生日志的速度可能很快,Kafka可以先缓存这些日志,然后由日志分析系统慢慢读取和分析。
- 削峰填谷:在某些业务场景下,会出现流量高峰和低谷的情况。例如在电商的促销活动期间,订单量会大幅增加,Kafka可以在流量高峰时接收大量的订单消息,然后在流量低谷时再将这些消息逐步发送给后续的处理系统,使整个系统能够平稳地处理业务,避免因高峰流量导致系统崩溃。
- 异步处理:能够实现异步通信,提高系统的响应速度和性能。比如在用户注册流程中,除了完成基本的注册信息插入数据库操作外,还可能需要发送注册成功邮件、推送新手引导消息等。将这些后续操作的消息发送到Kafka后,注册流程可以快速返回给用户,而不必等待这些后续操作完成,提高了用户体验。
- 数据分发:可以方便地将数据分发给多个不同的消费者,实现数据的多副本分发和多场景使用。例如,一份用户行为数据可以同时分发给数据分析系统进行用户行为分析,分发给推荐系统用于个性化推荐,分发给监控系统用于实时监控用户活动等。
简述Kafka的架构
- 生产者(Producer):负责将数据发送到Kafka集群中的主题(Topic)。生产者可以是各种不同的应用程序,如日志收集器、业务系统中的数据生成模块等。它会根据指定的分区策略,将消息发送到特定的分区中。
- 消费者(Consumer):从Kafka集群中读取消息进行处理。消费者通常以消费者组(Consumer Group)的形式存在,一个消费者组中的多个消费者可以共同消费一个或多个主题中的消息,每个消费者负责消费其中的一部分分区。消费者通过偏移量(Offset)来记录自己已经消费到了哪个位置,以便在故障恢复或重新启动后能够继续从上次的位置开始消费。
- 主题(Topic):是Kafka中消息的逻辑分类,类似于数据库中的表。每个主题可以包含多个分区(Partition),不同的分区可以分布在不同的Broker上,从而实现数据的分布式存储和并行处理。
- 分区(Partition):是Kafka存储消息的基本单位,每个分区是一个有序的、不可变的消息序列。消息在分区内按照顺序存储,并且每个消息都有一个唯一的偏移量。分区的设计使得Kafka能够支持高并发的读写操作,多个生产者可以同时向不同的分区写入消息,多个消费者也可以同时从不同的分区读取消息。
- Broker:是Kafka集群中的节点,负责接收生产者发送的消息,存储消息,并为消费者提供消息读取服务。一个Kafka集群由多个Broker组成,它们之间相互协作,共同完成消息的存储和分发任务。
- Zookeeper:在Kafka架构中起着重要的协调作用。它负责管理Kafka集群的元数据信息,如Broker的注册、主题的创建和删除、分区的分配等。同时,Zookeeper还用于选举Kafka集群中的领导者(Leader)节点,确保集群的高可用性和一致性。
命令行操作
- 创建主题
- 查看主题列表
- 发送消息
- 消费消息
- 查看主题详情
腾讯数开
- 问题:自我介绍 + 项目介绍 答案:一般自我介绍 3 分钟,开头为:面试官你好,我叫 xxx ,目前在 xxx 公司担任大数据开发工程师,组内专注于 xxx 技术,在任职期间,参与过 3 个项目开发,分别为 xxx,我擅长于 xxx 技术,对 xxx 有深入研究,以上就是我的个人介绍,请面试官提问。关注公众号:3 分钟秒懂大数据,获取更多面经、知识点总结。
- 问题:数仓为什么要分层? 答案:数仓进行分层的一个主要原因就是希望在管理数据的时候,能对数据有一个更加清晰的掌控。 主要有以下优点: - 划清层次结构:每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解。 - 数据血缘追踪:简单来讲可以这样理解,我们最终给下游是直接能使用的业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。 - 减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。 - 把复杂问题简单化。将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。而且便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。 - 屏蔽原始数据的异常。屏蔽业务的影响,不必改一次业务就需要重新接入数据。
- 问题:数据仓库都分哪几层? 答案:如果划分细致,数据仓库总共可以划分为 5 层: - ODS 层:Operation Data Store,数据准备区,贴源层。直接接入源数据的:业务库、埋点日志、消息队列等。ODS 层数数据仓库的准备区。 - DWD 层:Data Warehouse Details,数据明细层,属于业务层和数据仓库层的隔离层,把持和 ODS 层相同颗粒度。进行数据清洗和规范化操作,去空值/脏数据、离群值等。 - DWM 层:Data Warehouse middle,数据中间层,在 DWD 的基础上进行轻微的聚合操作,算出相应的统计指标。 - DWS 层:Data warehouse service,数据服务层,在 DWM 的基础上,整合汇总一个主题的数据服务层。汇总结果一般为宽表,用于 OLAP、数据分发等。 - ADS 层:Application data service, 数据应用层,存放在 ES,Redis、PostgreSql 等系统中,供数据分析和挖掘使用。
- 问题:简单介绍一下 Mapreduce 工作原理? 答案:MapReduce 工作原理分为以下 5 个步骤: - 在客户端启动一个作业。 - 向 JobTracker 请求一个 Job ID。 - 将运行作业所需要的资源文件复制到 HDFS 上,包括 MapReduce 程序打包的 JAR 文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在 JobTracker 专门为该作业创建的文件夹中。文件夹名为该作业的 Job ID 。JAR 文件默认会有 10 个副本(mapred.submit.replication 属性控制);输入划分信息告诉了 JobTracker 应该为这个作业启动多少个 map 任务等信息。 - JobTracker 接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,当作业调度器根据自己的调度算法调度到该作业时,会根据输入划分信息为每个划分创建一个 map 任务,并将 map 任务分配给 TaskTracker 执行。对于 map 和 reduce 任务,TaskTracker 根据主机核的数量和内存的大小有固定数量的 map 槽和 reduce 槽。这里需要强调的是:map 任务不是随随便便地分配给某个 TaskTracker 的,这里有个概念叫:数据本地化(Data-Local)。意思是:将 map 任务分配给含有该 map 处理的数据块的 TaskTracker 上,同时将程序 JAR 包复制到该 TaskTracker 上来运行,这叫"运算移动,数据不移动"。而分配 reduce 任务时并不考虑数据本地化。 - TaskTracker 每隔一段时间会给 JobTracker 发送一个心跳,告诉 JobTracker 它依然在运行,同时心跳中还携带着很多的信息,比如当前 map 任务完成的进度等信息。当 JobTracker 收到作业的最后一个任务完成信息时,便把该作业设置成"成功"。当 JobClient 查询状态时,它将得知任务已完成,便显示一条消息给用户。
- 问题:Hdfs 的读数据流程了解吗? 答案: - 客户端通过 Distributed FileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。 - 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。 - DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。 - 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。
- 问题:Hdfs 的写操作呢? 答案: - 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。 - NameNode 返回是否可以上传。 - 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。 - NameNode 返回 3 个 DataNode 节点,分别为 dn1、dn2、dn3。 - 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3 ,将这个通信管道建立完成。 - dn1、dn2、dn3 逐级应答客户端。 - 客户端开始往 dn1 上传第一个 Block (先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。 - 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行 3-7 步)。
- 问题:zookeeper 的选举过程 答案:Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。选举过程:假设有五台服务器组成的 Zookeeper 集群,它们的 id 从 1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。 - 服务器 1 启动,发起一次选举。服务器 1 投自己一票。此时服务器 1 票数一票,不够半数以上(3 票),选举无法完成,服务器 1 状态保持为 LOOKING; - 服务器 2 启动,再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息:此时服务器 1 发现服务器 2 的 ID 比自己目前投票推举的(服务器 1)大,更改选票为推举服务器 2。此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器 1,2 状态保持 LOOKING。 - 服务器 3 启动,发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果:服务器 1 为 0 票,服务器 2 为 0 票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。服务器 1,2 更改状态为 FOLLOWING,服务器 3 更改状态为 LEADING; - 服务器 4 启动,发起一次选举。此时服务器 1,2,3 已经不是 LOOKING 状态,不会更改选票信息。交换选票信息结果:服务器 3 为 3 票,服务器 4 为 1 票。此时服务器 4 服从多数,更改选票信息为服务器 3,并更改状态为 FOLLOWING; - 服务器 5 启动,同 4 一样当小弟。
- 问题:Spark 为什么比 MapReduce 快? 答案: - Spark 是基于内存计算,MapReduce 是基于磁盘运算,所以速度快。 - Spark 拥有高效的调度算法,是基于 DAG,形成一系列的有向无环图。 - Spark 是通过 RDD 算子来运算的,它拥有两种操作,一种转换操作,一种动作操作,可以将先运算的结果存储在内存中,随后在计算出来。 - Spark 还拥有容错机制 Linage。
- 问题:Spark 任务执行流程? 答案: - SparkContext 向资源管理器注册并向资源管理器申请运行 Executor。 - 资源管理器分配 Executor,然后资源管理器启动 Executor。 - Executor 发送心跳至资源管理器。 - SparkContext 构建 DAG 有向无环图。 - 将 DAG 分解成 Stage(TaskSet)。 - 把 Stage 发送给 TaskScheduler。 - Executor 向 SparkContext 申请 Task。 - TaskScheduler 将 Task 发送给 Executor 运行。 - 同时 SparkContext 将应用程序代码发放给 Executor。 - Task 在 Executor 上运行,运行完毕释放所有资源。
- 问题:Spark 用过的解决数据倾斜的方案说一下?答案:数据倾斜主要发生在 Shuffle 阶段。指的是并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。数据倾斜俩大直接致命后果:- 数据倾斜直接会导致一种情况:Out Of Memory。- 运行速度慢。
数据倾斜的处理方法: - 避免数据源的数据倾斜。通过在 Hive 中对倾斜的数据进行预处理,以及在进行 kafka 数据分发时尽量进行平均分配。 - 调整并行度。增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task ,从而让每个 task 处理比原来更少的数据。 - 为倾斜 key 增加随机前/后缀。 - 随机前缀和扩容 RDD 进行 join。 - 过滤少数倾斜 Key。11. 问题:Flink 的四大基石都有哪些? 答案:Flink 四大基石分别是:Checkpoint(检查点)、State(状态)、Time(时间)、Window(窗口)。12. 问题:watermark 的作用是啥?如何保证数据不丢失? 答案:WaterMark 的作用是用来触发窗口进行计算,解决数据延迟、数据乱序等问题。水印就是一个时间戳(timestamp),Flink 可以给数据流添加水印。水印并不会影响原有 Eventtime 事件时间。当数据流添加水印后,会按照水印时间来触发窗口计算,也就是说 watermark 水印是用来触发窗口计算的。设置水印时间,会比事件时间小几秒钟,表示最大允许数据延迟达到多久。水印时间 = 事件时间 - 允许延迟时间 (例如:10:09:57 = 10:10:00 - 3s )。要保证数据不丢失,需要使用:WaterMark + EventTimeWindow + Allowed Lateness 方案(侧道输出),可以做到数据不丢失。allowedLateness(lateness:Time)---设置允许延迟的时间。该方法传入一个 Time 值,设置允许数据迟到的时间。13. 问题:Flink 如何保证 Exactly Once 语义? 答案:Flink 通过两阶段提交协议来保证 Exactly-Once 语义。对于 Source 端:Source 端严格一次处理比较简单,因为数据要进入 Flink 中,所以 Flink 只需要保存消费数据的偏移量 (offset)即可。如果 Source 端为 kafka,Flink 将 Kafka Consumer 作为 Source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性。对于 Sink 端:Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以严格一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用。以 Kafka - Flink -Kafka 为例 说明如何保证 Exactly-Once 语义:Flink 作业包含以下算子: - 一个 Source 算子,从 Kafka 中读取数据(即 KafkaConsumer)。 - 一个窗口算子,基于时间窗口化的聚合运算(即 window+window 函数)。 - 一个 Sink 算子,将结果会写到 Kafka(即 kafkaProducer)。Flink 使用两阶段提交协议 预提交(Pre-commit)阶段和 提交(Commit)阶段保证端到端严格一次。预提交阶段: - 当 Checkpoint 启动时,进入预提交阶段,JobManager 向 Source Task 注入检查点分界线(CheckpointBarrier),Source Task 将 CheckpointBarrier 插入数据流,向下游广播开启本次快照。 - Source 端:Flink Data Source 负责保存 KafkaTopic 的 offset 偏移量,当 Checkpoint 成功时 Flink 负责提交这些写入,否则就终止取消掉它们,当 Checkpoint 完成位移保存,它会将 checkpoint barrier(检查点分界线) 传给下一个 Operator,然后每个算子会对当前的状态做个快照,保存到状态后端(State Backend)。对于 Source 任务而言,就会把当前的 offset 作为状态保存起来。下次从 Checkpoint 恢复时,Source 任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。 - Slink 端:从 Source 端开始,每个内部的 transformation 任务遇到 checkpoint barrier(检查点分界线)时,都会把状态存到 Checkpoint 里。数据处理完毕到 Sink 端时,Sink 任务首先把数据写入外部 Kafka,这些数据都属于预提交的事务(还不能被消费),此时的 Pre-commit 预提交阶段下 Data Sink 在保存状态到状态后端的同时还必须预提交它的外部事务。提交阶段: - 当所有算子任务的快照完成(所有创建的快照都被视为是 Checkpoint 的一部分),也就是这次的 Checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 Checkpoint 完成,此时 Pre-commit 预提交阶段才算完成。才正式到两阶段提交协议的第二个阶段:commit 阶段。该阶段中 JobManager 会为应用中每个 Operator 发起 Checkpoint 已完成的回调逻辑。本例中的 Data Source 和窗口操作无外部状态,因此在该阶段,这两个 Opeartor 无需执行任何逻辑,但是 Data Sink 是有外部状态的,此时我们必须提交外部事务,当 Sink 任务收到确认通知,就会正式提交之前的事务,Kafka 中未确认的数据就改为"已确认",数据就真正可以被消费了。注:Flink 由 JobManager 协调各个 TaskManager 进行 Checkpoint 存储,Checkpoint 保存在 StateBackend(状态后端) 中,默认 StateBackend 是内存级的,也可以改为文件级的进行持久化保存。14. 问题:Flink 如何实时 topN? 答案:Flink 要实现 TopN 功能,主要做如下操作: - Flink 接收 kafka 数据源。 - 基于 EventTime 处理,指定 Watermark,这里调用 DataStream 的 assignTimestampsAndWatermarks 方法,抽取时间和设置 watermark。 - 将 kafka 的 json 格式数据转为实体类对象。 - 根据用户 Username 进行分组,对于实时统计 TopN 可以使用滑动窗口大小。设置窗口长度取 10s,每次滑动(slide)5s,即 5 秒钟更新一次过去 10s 的排名数据。.keyBy("username").timeWindow(Time.seconds(10), Time.seconds(5)).aggregate(new CountAgg(), new WindowResultFunction())。 - 使用.aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用 AggregateFunction 提前聚合掉数据,减少 state 的存储压力。CountAgg 实现了 AggregateFunction 接口,功能是统计窗口中的条数,即遇到一条数据就加一。WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出。这里实现的 WindowResultFunction 将用户名,窗口,访问量封装成了 UserViewCount 进行输出。 - TopN:为了统计每个窗口下活跃的用户,我们需要再次按窗口进行分组,根据 UserViewCount 中的 windowEnd 进行 keyBy() 操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前 3 名的用户,并将排名结果格式化成字符串,便于后续输出。.keyBy("windowEnd").process(new TopNHotUsers(3)).print()。ProcessFunction 是 Flink 提供的一个 low-level API,它主要提供定时器 timer 的功能
字节-商业化部-数据研发面经【附答案】
作者:三石大数据链接:https://www.nowcoder.com/discuss/721088633737838592?sourceSSR=users来源:牛客网
2.八股文
1)RDD是什么
全称(Resilient Distributed Dataset)叫做弹性分布式数据集,是一种数据结构,可以理解成是一个集合。在代码中的话,RDD是一个抽象类。还有一个非常重要的特点:RDD是不保存数据的,仅仅封装了计算逻辑,也就是你直接打印RDD是看不见具体值的。
2)你刚刚提到RDD是弹性分布式数据集,弹性指什么
- 第一、数据容错性,当某个RDD发生故障导致数据丢失时,RDD可以通过其血缘机制重新计算丢失的数据分区,而不需要进行频繁的数据冗余备份和复杂的检查点操作,从而实现数据的自我恢复和容错
- 第二、动态调整性,RDD的数据可动态划分为多个分区,用户可通过 repartition 或 coalesce 调整分区数量,优化并行度以适应资源变化
3)Spark SQL的Join有几种方式
主要有三种方式,分别是broadcast hash join、shuffle hash join、sort merge join
- 先说一下hash join吧,这个算法主要分为三步,首先确定哪张表是build table和哪张表是probe table,这个是由spark决定的,通常情况下,小表会作为build table,大表会作为probe table;然后构建hash table,遍历build table中的数据,对于每一条数据,根据join的字段进行hash,存放到hashtable中;最后遍历probe table中的数据,使用同样的hash函数,在hashtable中寻找join字段相同的数据,如果匹配成功就join到一起。这就是hash join的过程
- broadcast hash join分为broadcast阶段和hash join阶段,broadcast阶段就是将小表广播到所有的executor上,hash join阶段就是在每个executor上执行hash join,小表构建为hash table,大表作为probe table
- shuffle hash join分为shuffle阶段和hash join阶段,shuffle阶段就是对两张表分别按照join字段进行重分区,让相同key的数据进入同一个分区中;hash join阶段就是对每个分区中的数据执行hash join
- sort merge join分为shuffle阶段、sort阶段和merge阶段,shuffle阶段就是对两张表分别按照join字段进行重分区,让相同key的数据进入同一个分区中;sort阶段就是对每个分区内的数据进行排序;merge阶段就是对排好序的分区表进行join,分别遍历两张表,key相同就输出,如果左边小,就继续遍历左边的表,反之遍历右边的表
4)Spark的调优方法有哪些
- 资源分配优化:
- 合理设置 Executor 数量 :num-executors 参数用于设置应用运行时 Executor 的数量,通常可以根据集群资源和任务需求进行设置。一般建议 num-executors = spark.cores.max / spark.executor.cores,在资源允许的情况下,适当增加 Executor 的数量可以提高任务的并行度,从而提升性能。
- 调整每个 Executor 的内存 :executor-memory 参数用于设置每个 Executor 的内存大小,对 Spark 作业运行的性能影响很大,适当增加每个 Executor 的内存量,可以提升性能。
- 设置 Executor 的 CPU 核心数 :executor-cores 参数用于设置每个 Executor 的 CPU 核心数,在资源允许的情况下,增加每个 Executor 的 CPU 核心数可以提高执行 task 的并行度,从而提升性能。
- AQE(自适应查询执行):在运行时,每当shuffle map阶段执行完毕,AQE会结合这个阶段的统计信息,基于既定的规则动态的调整,修改尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化动态合并分区:可以在任务开始时设置较多的shuffle分区个数,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区动态切换join策略:由Sort Merge Join切换成Broadcast Hash Join动态优化join倾斜:将倾斜的分区数据拆分成多个分区
5)你用过Flink吗
用过,Flink是一个分布式的计算框架,主要用于对有界和无界数据流进行有状态计算,其中有界数据流就是指离线数据,有明确的开始和结束时间,无界数据流就是指实时数据,源源不断没有界限,有状态计算指的是 在进行当前数据计算的时候,我们可以使用之前数据计算的结果。Flink还有一个优点就是提供了很多高级的API,比如DataSet API、DataStream API、Table API和FlinkSQL
6)Flink的双流Join有哪几种
主要有四种方式,分别是:滚动窗口Join、滑动窗口Join、会话窗口Join、时间区间Join
- Tumbling Window Join:滚动窗口 Join,窗口无重叠,固定大小,以固定的时间间隔滑动。例如,每 5 秒为一个窗口,每隔 5 秒滑动一次。这种 Join 适用于对时间窗口内数据进行固定周期的聚合或关联操作。
- Sliding Window Join:滑动窗口 Join,窗口有重叠,以固定的时间间隔滑动,窗口大小和滑动间隔可配置。比如窗口大小为 10 秒,滑动间隔为 5 秒,这样窗口之间会有 5 秒的重叠部分。它可以在不同的时间窗口上进行数据关联,获取更灵活的分析结果。
- Session Window Join:会话窗口 Join,基于会话的窗口 Join,窗口的开始和结束由数据的活动间隔决定。当数据在一定时间内没有新的事件发生时,会话窗口就会关闭。这种 Join 适合对用户会话等具有自然边界的数据进行关联分析。
- Interval Join :时间区间Join,允许在一个流的时间范围内与另一个流进行 Join。例如,可以根据某个事件的时间范围,将两个流中在这个时间范围内的数据进行关联。这种 Join 类型适用于处理具有时间相关性的数据,如根据某个时间段内的订单数据和库存数据进行关联分析
7)数仓分层的优势是什么
- 第一、复杂需求简单化;我们通过将复杂的问题分解为多个步骤来完成,每一层只处理单一的步骤,比较容易和理解
- 第二、提高数据的复用性;比如在已经得到最终结果之后,又需要中间层的一些数据,我可以直接查询中间层的数据,不必重新进行计算
8)数仓建模的方法有哪些
- ER模型是Inmon提出的,这个模型是符合3NF的,他的出发点就是整合数据,将各个系统中的数据以整个企业角度按主题进行分类,但是不能直接用于分析决策
- 维度模型是Kimball提出的,这个人和Inmon算是数仓的两个流派,他的出发点就是分析决策,为分析需求服务,而现在多数的数仓的搭建都是基于维度模型进行搭建的。
- 区别:ER模型冗余更少,但是在大规模数据跨表分析中,会造成多表关联,这会大大降低执行效率
9)OLAP数据库用过哪些
主要用过Kylin、Druid、ClickHouse等
10)ClickHouse为什么查询快
- 第一、ClickHouse 采用列存储方式,将数据按列存储在磁盘上,分析查询可以大大减少读取数据量,提高查询效率
- 第二、ClickHouse支持分布式部署,数据可以在多个节点之间进行分片存储和并行处理,充分利用集群资源来提高查询性能
- 第三、ClickHouse为每个表构建了稀疏索引,查询时可通过索引快速定位到需要读取的数据块,减少磁盘IO操作
- 第四、ClickHouse在查询处理过程中采用了向量化技术,将数据以向量的形式进行处理,而不是逐行处理,提高数据处理速度
3.项目
1)你做过最复杂的项目是什么,详细介绍一下
2)在项目中遇到过的最大的难点是什么
阿里巴巴大数据开发高频面试题及答案
作者:三石大数据 链接:https://www.nowcoder.com/discuss/519811764528136192 来源:牛客网
以下是提取的问题和答案:
- 问题:MapReduce Shuffle为什么要将数据写入环形缓冲区? 答案:Map的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
- 问题:MapReduce Shuffle为什么容易发生数据倾斜? 答案:因为key分布不均匀,在shuffle的时候,大量的key可能分配到某一个reduce当中,这就会产生数据倾斜。
- 问题:如何用MapReduce实现join? 答案:
public class Job_JoinDriver { // mapper static class Job_JoinMapper extends Mapper<LongWritable, Text, Text, Text> { Text k = new Text(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 通过切片获取到当前读取文件的文件名 InputSplit inputSplit = context.getInputSplit(); FileSplit fileSplit = (FileSplit) inputSplit; String path = fileSplit.getPath().getName(); // 定义 sid 用于存放获取的 学生ID String sid; String[] split = value.toString().split("\\s+"); // 判断文件名 if (path.startsWith("student")) { // 学生表的 ID 在第一位 sid = split[0]; // 将整条数据作为 vlaue,并添加 Stu 的标识 v.set("Stu" + value); } else { // 成绩表的 ID 在第二位 sid = split[1]; // 将整条数据作为 vlaue,并添加 Sco 的标识 v.set("Sco" + value); } k.set(sid); context.write(k, v); } } // reducer static class Job_JoinReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 用于存放获取到的学生信息 String stuContext = ""; // 用于存放学生的各科成绩 LinkedList<String> scoContext = new LinkedList<>(); for (Text value : values) { String res = value.toString(); // 根据添加的标识,来区分学生信息和成绩 if (res.startsWith("Stu")){ stuContext = res.substring(3); } else { scoContext.add(res.substring(3)); } } for (String score : scoContext) { // 将学生成绩与学生信息拼接 Text v = new Text(stuContext + " " + score); context.write(key, v); } } } }
- 问题:Hadoop HA架构是怎样的? 答案:Hadoop的HA应该分为HDFS 的 HA 和 YARN 的 HA,主要是解决NameNode和ResourceManager的单点故障问题,所以HA就是通过配置Active/Standby两个实例来解决单点故障。
- 问题:Hadoop HA当一个namenode挂掉,会有数据丢失吗? 答案:不会丢失,当Active挂了之后,Standby节点会变为Active节点,其中ZKFC即ZKFailoverController,作为独立进程存在,负责控制NameNode的主备切换。
- 问题:Hadoop和Spark的区别,Spark做了哪些优化? 答案:区别这里就不提了,之前文章发过很多次。做的优化有:内存管理中间结果、优化数据格式、优化执行策略...
- 问题:Spark有哪几种运行模式? 答案:Local、standalone、yarn。
- 问题:Spark的stage划分是怎么实现的? 答案:从最后一个RDD往前推,遇到窄依赖的父RDD时,就将这个父RDD加入子RDD所在的stage;遇到宽依赖的父RDD时就断开,父RDD被划分为新的stage。每个Stage里task的数量由Stage最后一个RDD中的分区数决定。如果Stage要生成Result,则该Stage里的Task都是ResultTask,否则是ShuffleMapTask。
- 问题:SQL中on和where的区别是什么? 答案:on和where后都是查询条件,ON 语句用于在进行多表查询的时候确定两个表之间的连接关系,WHERE 语句用于在单表查询或多表查询的时候筛选数据;如果同时存在,on先执行,where后执行。
- 问题:Left join和 Left semi join区别是什么?答案:Left join:主表记录全部有,如果从表多行的话,主表数据就被重复了一次。 Left semi join:不重复,主表找到第一条就返回记录,如果找不到就不显示,说白了就等价于exists或者in。
- 问题:semi join如何去优化?答案:使用in子查询,并且将子查询进行物化。
- 问题:请问你用过哪些HQL函数?答案:get_json_object、collect_set、rank、row_number、lag、lead、first_value....(面试需要解释每个函数的意思)
- 问题:请问你遇到过数据倾斜吗?答案:请看前面的文章,面试必问!!!(未给出具体内容,提示看前文)
- 问题:事实表的设计流程是怎样的?答案:选择业务过程以及确定事实表类型;声明粒度;确定事实;确定维度;冗余维度。
- 问题:在10亿个整数中找出不重复的整数,怎么做?答案:采用2bit的bitmap(00表示不存在,01表示出现1次,10表示出现多次,11表示无意义),共需内存 2^32*2bit=1GB内存,可以接受,然后扫描这10亿个整数,查看bitmap中相对应的位,如果是00变01、01变10或者10保持不变,扫描完后,查看bitmap,把对应为是01的整数输出。
数据开发常见场景题
https://www.nowcoder.com/discuss/535423705904644096
1亿个整数中找出最大的10000个数
- 首先想到的就是全局排序,那么需要判断内存是否能够装的下?1*10^9*4B = 4GB,需要4G的内存,如果机器的内存小于4G,显然是不行的
- 第二种方法就是分治法,将这1亿个数通过hash算法,分为1000份,每份100万个数据,找到每份数据中最大的10000个数,最后在100*10000中找出最大的10000个数,最大占用内存为 1000000*4B = 4MB。从100万个数中找到最大的10000个数的方法是快速排序的方法,但是我们没有必要将这100万个数排序,只用找到前10000个数,大致的思路是:将第一个数字设置为基准元素,然后将这100万个数分为两堆,如果大于基准的堆的个数大于10000个,那么继续对该堆进行一次快速排序,如果此时大的堆的个数n小于10000个,那么在小的堆中找到前10000-n的数字。
- 【重点】第三种方法就是小顶堆,首先读入前10000个数来创建大小为10000的最小堆,建堆的时间复杂度是O(m),然后遍历后续的数字,并与堆顶元素(最小)进行比较,如果比堆顶元素小,则继续遍历后面的数字即可;如果比堆顶元素大,则替换堆顶元素并重新调整堆为最小堆。直至遍历完所有的数字,最后输出当前堆的所有数字就可以了。整体的时间复杂度是O(mn)
在2.5亿个整数中找出不重复的整数
- 采用2bit的bitmap(00表示不存在,01表示出现1次,10表示出现多次,11表示无意义),共需内存 2^32*2bit=1GB内存,可以接受,然后扫描这2.5亿个整数,查看bitmap中相对应的位,如果是00变01,01变10,10保持不变,扫描完后,查看bitmap,把对应为是01的整数输出。
大整数相乘
- 分治法【递归】
- 当我们输入两个大整数num1,num2,长度分别为n,m,计算机无法直接计算其结果,采用分而治之的思想,我们可以分别将两个数均分为四个部分,记作A,B,C,D,其中: A为num1的前n/2, B为num1的后n/2, C为num2的前m/2D为num2的后m/2
- num1 * num2 = (A * 10^(n/2) + B) * (C * 10^(m/2) + D)= AC * 10^(n/2+m/2) + AD * 10^(n/2) + BC * 10^(m/2) + BD
- 此时将一个大整数的乘法分为了四个相对较小的整数相乘,此时,我们就可以采用递推的思想,再将每个相乘的数进行拆分相乘再相加,当递推到相乘的值较小时,我们可以求出相应的值然后对其进行相加,将结果依次返回,最终得到大整数相乘的结果。
- 乘法变加法【遍历】
- 同字符串相乘
总结:类似的题目还有很多,大的思路就是将大数据拆分为小数据
作者:三石大数据链接:https://www.nowcoder.com/discuss/535423705904644096来源:牛客网
#数据人offer决赛圈怎么选##牛客创作赏金赛#