NameNode FSImage类
前言
对于HDFS分布式文件系统来说,其Namenode会定期将文件系统的命名空间(文件目录树、文件/目录元信息)保存到fsimage文件中,以防止Namenode掉电或者进程崩溃。但如果Namenode实时地将内存中的元数据同步到fsimage文件中,将会非常消耗资源且造成Namenode运行缓慢。所以Namenode会先将命名空间的修改保存在editlog文件中,然后定期合并fsimage和editlog文件
FSImage类
FSImage类主要实现了以下功能。
- 1.保在命名空间——将当前时刻Namenode内存中的命名空间保存到fsimage文件中。
- 2.加载fsimage文件—将磁盘上fsimage文件中保存的命名空间加载到Namenode内存中,这个操作是保存命名空间操作的逆操作。
- 3.加载editlog文件—Namenode加载了fsimage文件后,内存中只包含了命名空间在保存fsimage文件时的信息,Namenode还需要加载后续对命名空间的修改操作,即editlog文件中记录的内容。所以FSImage类还提供了加载 editlog文件到Namenode内存中的功能。
1.保存命名空间
FSImage类最重要的功能之一就是将当前时刻Namenode的命名空间保存到fsimage文件中
FSImage类中这个功能的实现,保存namespace操作的调用顺序,如图所示:
FSNameSystem会调用FSImage.saveNamespace()方法触发命名空间的保存操作,saveNamespace()会调用saveFSImagelnAlIDirs()方法执行具体的保存逻辑。我们从入口方法saveFSImagelnAIDirs()开始介绍。
(1)saveFSImagelnAllDirs()
Namenode 可以定义多个存储路径来保存fsimage文件,对于每一个存储路径,saveFSImagelnAllDirs()方法都会启动一个线程负责在这个路径上保存fsimage文件。同时,为了防止保存过程中出现错误,命名空间信息首先会被保存在一个fsimage.ckpt文件中,当保存操作全部完成之后,才会将fsimage.ckpt 重命名为fsimage文件。之后saveFSImagelnAlIDirs()方*清理Namenode元数据存储文件夹中过期的editlog文件和fsimage文件。
saveFSImagelnAllDirs()方法的代码如下:
private synchronized void saveFSImageInA1lpirs(FSNamesystem source,NameNodeFile nnf,long txid,Canceler canceler)throws IoException{ //构造保存命名空间操作的上下文 SaveNamespaceContext ctx=new SaveNamespaceContext( source,txid,canceler);List<Thread>saveThreads =new Arraylist<Thread>(); //在每一个保存路径上启动一个线程,该线程使用FSImageSaver 类保存fsimage文件for(Iterator<storageDirectory>it =storage.dirIterator(NameNodeDirType.IMAGE);it.hasNext();){ StorageDirectory sd=it.next();FSImageSaver saver=new FSImageSaver(ctx,sd,nnf);Thread saveThread=new Thread(saver,saver.tostring());saveThreads.add(saveThread);saveThread.start(); //等待所有线程执行完毕 waitForThreads(saveThreads);saveThreads.clear();storage.reportErrorsOnDirectories(ctx.getErrorSDs()); //保存文件失败则抛出异常 if(storage.getNumStorageDirs(NameNodeDirType.IMAGE)==0){ throw new IOException( "Failed to save in any storage directories while saving namespace."); //将fsimage.ckpt改名为fsimage renameCheckpoint(txid,NameNodeFile.IMAGE_NEW,nnf,false); //我们已经完成了fsimage的保存,那么可以将存储上的一部分edit1og和fsimage删除purgeoldstorage(nnf); ]finally{ //通知所有等待的线程 ctx.markComplete();ctx=nul1;prog.endPhase(Phase.SAVING_CHECKPOINT);
通过分析saveFSImagelnAIDirs()方法可知,命名空间具体的保存操作是由FSImageSaver这个类来承担的,FSImageSaver是FSlmage中的内部类,也是一个线程类,它的run()方法调用了saveFSImage()方法来保存fsimage文件。我们再看一下saveFSlmage()方法的实现。
saveFSImage()方法 使用一个FSImageFormat.Saver 对象来完成保存操作,FSImageFormat.Saver 类会以fsimage文件定义的格式保存Namenode的命名空间信息,需要注意命名空间信息会先写入fsimage.ckpt文件中。saveFSImage()方法还会生成fsimage文件的md5校验文件,以确保fsimage文件的正确性。saveFSlmage()方法的代码如下:
void saveFSImage(SaveNamespaceContext context,StorageDirectory sd) throws IOException{ 1ong txid=context.getTxId();//获取当前命名空间中记录的最新事务的txid //fsimage文件 File newFile=NNStorage.getstorageFile(sd,NameNodeFile.IMAGE_NEW,txid);File dstFile=NNStorage.getStorageFile(sd,NameNodeFile.IMAGE,txid); //FSImageFormatProtobuf.Saver类负责保存 fsimage FSImageFormatProtobuf.Saver saver=new FSImageFormatProtobuf.Saver(context); FSImageCompression compression =FSImageCompression.createCompression(conf);//压缩类 saver.save(newFile,compression);//调用Saver类保存fsimage文件MD5FileUtils.saveMD5File(dstFile,saver.getSavedDigest());//保存MD5校验值storage.setMostRecentCheckpointInfo(txid,Time.now());
saveFSlmage()方法构造了一个FSImageFormatProtobuf.Saver对象来保存命名空间,FSlmageFormatProtobuf是一个工具类,它提供了以protobuf 格式读取和写入fsimage文件的方法。在HDFS2.4版本的实现中,FSImage使用FSImageFormat 类作为读取和写入fsimage文件的标准格式类,而HDFS2.6版本则使用了FSImageFormatProtobuf替代了FSImageFormat类。
FSImageFormatProtobuf 除了有Saver 内部类用于保存命名空间到fsimage文件外,还提供了一个Loader内部类用于解析和加载fsimage文件。这里我们先学习FSImageFormatProtobuf.Saver类的实现。
(2)FSImageFormatProtobuf.Saver
HDFS2.6版本使用了protobuf作为fsimage文件序列化的工具,fsimage文件的格式也被重新定义了。它包括了4个部分的信息。
- MAGIC:fsimage的文件头,是“HDFSIMG1”这个字符串的二进制形式,MAGIC头标识了当前fsimage文件是使用protobuf格式序列化的。FSImage类在读取fsimage文件时,会先判断fsimage文件是否包含了MAGIC头,如果包含了则使用protobuf格式反序列化fsimage文件。
- SECTIONS:fsimage 文件会将同一类型的Namenode 元信息保存在一个section中,例如将文件系统元信愿保存在NameSystemSection中,将文件系统目录树中的所有INode信息保存在INodeSection中,将快照信息保存在SnapshotSection中等。
- FileSummary:FileSummary记录了fsimage文件的元信息,以及fsimage文件保存的所有section的信息。FileSummary中的ondiskVersion字段记录了fsimage文件的版本号(2.6版本中这个字段的值为1),layout Version字段记录了当前HDFS的文件系统布局版本号,codec字段记录了fsimage文件的压缩编码,sections字段则记录了fsimage文件中各个section字段的元信息,每个fsimage文件中记录的section在FileSummary中都有一个与之对应的section字段。FileSummary的section字段记录了对应的fsimage中section的名称、在fsimage文件中的长度,以及这个section在fsimage中的起始位置。FSImage类在读取fsimage文件时,会先从fsimage中读取出FileSummary部分,然后利用FileSummary记录的元信息指导fsimage文件的反序列化操作。
- EileSummaryLength:FileSummaryLength 记录了FileSummary 在fsimage文件中所占的长度,FSImage类在读取fsimage文件时,会首先读取FileSummaryLength获取FileSummary部分的长度,然后根据这个长度从fsimage中反序列化出FileSummary。
FSImageFormatProtobuf.Saver类就是以protobuf格式将Namenode的命名空间保存至 fsimage文件的工具类,这个类的入口方法是save()方法。save()方法打开 fsimage文件的输出流并且获得文件通道,然后调用savelntemal()方法将命名空间保存到 fsimage文件中。 savelntemal()方法首先构造底层fsimage文件的输出流,构造 fsimage文件的描述类 FileSummary,然后在 FileSummary 中记录 o ndisk Version layout Version codec 等信息。接下来 savelntemal()方法依次向fsimage文件中写入命名空间信息、inode信息、快照信息、安全 信息、缓存信息、StringTable信息等。注意上述信息都是以section为单位写入的,每个section 的格式定义请参考fsimage.proto文件。savelntemal()方法以section为单位写入元数据信息时, 还会在FileSummary中记录这个section的长度,以及section在 fsimage文件中的起始位置等 信息。当完成了所有section的写入后,FileSummary对象也就构造完毕了,savelntemal()最后会将FileSummary对象写入fsimage文件中,savelntemal()方法的代码如下:
private void saveInternal(FileOutputStream fout,FSImageCompression compression,String filepath)throws IOException{ //构造输出流,一边写入数据,一边写入校验值 MessageDigest digester=MD5Hash.getDigester(); underlyingoutputStream=new DigestoutputStream(new BufferedoutputStream(fout),digester); underlyingoutputStream.write(FSImageUtil.MAGIC_HEADER); fileChanne1=fout.getchannel(); //FileSummary为fsimage文件的描述部分,也是protobuf定义的FileSummary.Builderb=FileSummary.newBuilder() .setOndiskVersion(FSImageUtil.FILE_VERSION) .setLayoutVersion(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); codec=compression.getImagecodec();//获取压缩格式,并装饰输出流 if(codec!=nul1){ b.setCodec(codec.getClass().getCanonicalName()); sectionoutputStream=codec.createOutputStream(underlyingoutputStream); }else{ sectionOutputStream=underlyingoutputStream; saveNameSystemsection(b);//保存命名空间信息 context.checkCancelled();//检查是否取消了保存操作 saveINodes(b);//保存命名空间中的inode信息 saveSnapshots(b);//保存快照信息 saveSecretManagerSection(b);//保存安全信息 saveCacheManagerSection(b);//保存缓存信息 savestringTableSection(b);//保存stringTable flushSectionoutputStream();//flush 输出流 FileSummary summary=b.build(); saveFileSummary(underlyingoutputStream,summary);//将FileSummary写入文件 underlyingoutputStream.close();//关闭底层输出流 savedDigest=new MD5Hash(digester.digest());
2.加载fsimage文件
当Namenode启动时,首先会将fsimage文件中记录的命名空间加载到Namenode内存中, 然后再一条一条地将editlog文件中记录的更新操作加载并合并到命名空间中。接下来Namenode会等待各个Datanode向自己汇报数据块信息来组装blockMap,从而离开安全模式。 Namenode每次启动时都会调用FSImage.loadFSImage()方法执行加载fsimage和editlog文件的操作。
我们看一下loadFSImage()方法的代码。
boolean loadFSImage(FSNamesystem target, MetaRecoveryContext recovery) throws IOException { / / 准 备 工 作 ••• //获取editlog文件IO流 initEditLog(); if(LayoutVersion.supports(Feature.TXID_BASED_LAYOUT,getLayoutVersion())){ long toAtLeastTxId=editLog.isopenForWrite()?inspector.getMaxSeenTxId():0;editStreams=editLog.selectInputStreams( imageFiles.get(0).getCheckpointTxId()+1,toAtLeastTxId,recovery,false); }else{ editStreams=FSImagePreTransactionalstorageInspector .getEditLogStreams(storage); //调用1oadFSImageFile()方法,加载fsimage文件 for(inti=0;i<imageFiles.size();i++){ try{ imageFile=imageFiles.get(i); loadFSImageFile(target,recovery,imageFile); break; )catch(IOException ioe){ LoG.error("Failed to load image from"+imageFile,ioe); target.clear(); imageFile=nul1; //如果加载失败,则抛出异常 if(imageFile==null){ FSEditLog.closeA11Streams(editStreams); throw new IOException("Failed to load an FSImage file!"); //调用1oadEdit()方法加载并合并editlog long txnsAdvanced=loadEdits(editStreams,target,recovery); needToSavel=needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),txnsAdvanced); editLog.setNextTxId(lastAppliedTxId +1); return needToSave;
在loadFSImage()方法中加载fsimage文件是通过调用FSImage.loadFSImageFIle()方法实现 的,加 载 editlog文件则是通过调用FSImage.loadEdits()方法实现
FSImage.loadFSImage()方法调用 FSImage.loadFSImageFile()方法执 行加载fsimage文件的操作。loadFSImageFile。方法的调用顺序如图所示。
loadFSImageFile()方法根据当前Namenode的版本调用不同的加载方法,不同版本加载 方法的区别主要是在fsimage文件的校验操作上,例如对于2.6版本的加载操作,每个fsimage 文件都有一个对应的md5校验文件。fsimage文件最终的加载工作是由私有的loadFSImage() 方法实现的,loadFSImage()方法构造一个 FSImageFormat.LoaderDelegator 对象加载 fsimage 文件,这个对象的load()方法判断当前fsimage使用了什么序列化方式,如果在fsimage文 件 中 有MAGIC_HEADER,则 该 fsimage文件使用的是protobuf序列化方式,那么构造 FSImageFormatProtobuf.Loader执行加载操作,否则使用FSImageFormat.Loader类执行加载操作。
load()的代码如下:
public void load(File file,boolean requireSameLayoutVersion) throws IOException{ FileInputStream is=null; try{ is=new FileInputStream(file); byte[]magic=new byte[FSImageUti1.MAGIC_HEADER.1ength];roUtils.readFully(is,magic,0,magic.length); //fsimage文件中包括magicHeader,使用的是protobuf序列化方式if(Arrays.equals(magic,FSImageUtil.MAGIC_HEADER)){ //构造FSImageFormatProtobuf.Loader加载fsimage文件FSImageFormatProtobuf.Loader loader =new FSImageFormatProtobuf.Loader( conf,fsn,requireSameLayoutVersion); imp1=loader; loader.load(file); )else{ //否则构造FSImageFormat.Loader加载fsimage文件 Loader loader=new Loader(conf,fsn); imp1=loader; loader.1oad(file); )finally{ roUtils.cleanup(LOG,is);
HDFS2.6版本使用protobuf作为fsimage的序列化工具。所以在加载fsimage操作中,最终会调用FSImageFormatProtobuf.Loader作为fsimage 文件的加载类。FSImageFormatProtobuf.Loader.loadlntenal()方法执行了加载 fsimage 文件的操作,loadlntenal()方法打开fsimage文件通道,然后读取fsimage文件中的FileSummary对象,FileSummary对象中记录了 fsimage中保存的所有section的信息。loadlntenal()会对 FileSummary对象中保存的section排序,然后遍历每个section并调用对应的方法从fsimage 文件中加载这个section。
loadlntenal()方法的代码如下:
private void loadInternal(RandomAccessFile raFile,FileInputStream fin) throws IOException{ //从fsimage文件末尾加载FileSummary,也就是fsimage文件内容的描述FileSummary summary=FSImageUti1.1oadSummary(raFile); //获取通道 FileChannel channel=fin.getchannel(); //构造FSImageFormatPBINode.Loader和FSImageFormatPBSnapshot.Loader 加载INode 以及Snapshot FSImageFormatPBINode.Loader inodeloader =new FSImageFormatPBINode.Loader( fsn,this); FSImageFormatPBSnapshot.Loader snapshotLoader=new FSImageFormatPBSnapshot.Loader( fsn,this); //对fsimage文件描述中记录的sections进行排序 ArrayList<FileSummary.Section>sections=Lists.newArrayList(summary .getSectionsList()); Collections. sort(sections, new Comparator<FileSummary. Section>(){ @ override public int compare(FileSummary. Section s1, FileSummary. Section s2){ SectionName n1=SectionName. fromString(sl. getName()); SectionName n2=SectionName. fromString(s2. getName()); if (n1==nul1){ return n2==null?0:-1;) else if (n2== null){ return-1; } else{ return n1. ordinal()-n2. ordinal(); }} }); //遍历每个section,并调用对应的方法加载这个section for(FileSummary.Sections:sections){ channel.position(s.getoffset()); //在通道中定位这个section的起始位置 InputStream in=new BufferedInputStream(new LimitInputStream(fin,s.getLength()));in=FSImageUtil.wrapInputStreamForCompression(conf,summary.getCodec(),in); string n=s.getName(); switch(SectionName.fromstring(n)){ //调用对应的方法加载不同的section case NS_INFO: 1oadNamesystemsection(in); break; case STRING_TABLE:loadstringTablesection(in);break; case INODE:{ currentStep=new Step(StepType.INODES);inodeLoader.loadINodeSection(in);break; case INODE REFERENCE:snapshotLoader.loadINodeReferenceSection(in);break; case INODE_DIR:inodeloader.loadINodeDirectorysection(in);break;case FILES UNDERCONSTRUCTION:inodelLoader.loadFilesUnderConstructionsection(in);break; case SNAPSHOT: snapshotLoader. loadSnapshotSection(in); break; case SNAPSHOT DIFF: snapshotLoader. loadSnapshotDiffSection(in); break; case SECRET_MANAGER:{ 1oadSecretManagerSection(in); break; case CACHE_MANAGER:{ 1oadCacheManagerSection(in); break; default: LoG. warn("Unrecognized section"+n); break; )}
对于不同类型的section,loadlnternal()方法调用不同的方法加载这个section,例如对于INodeSection会调用InodeLoader.loadINodeSection()方法加载。load()方法的实现都比较简单,就是按照protobuf格式加载不同的section
3.加载editlog文件
Namenode将fsimage中记录的特定时刻的命名空间镜像加载到内存之后,还需要加载后续对命名空间的修改,也就是editlog中记录的修改命名空间的操作。Namenode会调用FSImage.loadEdits()方法将editlog文件中记录的更新操作与当前Namenode的命名空间进行合并。FSImage.loadEdits()方法构造一个FSEditLogLoader对象,然后遍历Namenode所有存储路径上保存的editlog文件的输入流,并调用FSEditLogLoader.loadFSEdits()方法加载指定路径上的editlog文件。loadEdits()方法的代码如下:
private long loadEdits(Iterable<EditLogInputStream>editStreams,FSNamesystem target,StartupOption startOpt,MetaRecoveryContext recovery) throws IOException{ //记录命名空间中加载的最新的事务id long prevLastAppliedTxId=lastAppliedTxId; try{ //构造FSEditLogLoader对象用于加载editlog文件 FSEditLogLoader loader =new FSEditLogLoader(target,lastAppliedTxId); //遍历所有存储路径上editlog文件对应的输入流 for(EditLogInputStream editIn:editStreams){ //调用FSEditLogLoader.loadFSEdits()从某个存储路径上的edit1og文件加载修改操作 try{ loader.loadFSEdits(editIn,lastAppliedTxId +1,startopt,recovery); }finally{ //lastAppliedTxId记录从editlog加载的最新的事务id lastAppliedTxId=loader.getLastAppliedTxId();if(editIn.getlastTxId()!=HdfsConstants.INVALID_TXID){ 1astAppliedTxId=editIn.getLastTxId(); )finally{ //关闭所有editlog文件的输入流 FSEditLog.closeA11Streams(editStreams);updateCountForQuota(target.dir.rootDir); return lastAppliedTxId-prevLastAppliedTxId;
FSEditLogLoader.loadFSEdits()会使用EditLoglnputStream对象读取并加载 editlog文件,这个方法的操作比较冗长,基本可以归纳为从ediltLog文件中读取一个操作,并使用FSEditLogOp对象封装,然后调用applyEditLogOp()方法修改Namenode的命名空间。
FSEditLogLoader.loadFSEdits()方法的简版代码如下:
//从editlog文件中读取一个操作 FSEditLogop op; try{ op=in.readOp(); if(op==null){ break; } )catch(Throwable e){ //... //.… //在当前命名空间中执行对应的修改操作 1ong inodeId=applyEditLogop(op,fsDir,in.getVersion(),1astINoderd); if(lastINodeId<inodeId){ lastINodeId=inodeId;
4.检查点机制
一个正常大小的editlog文件往往在几十到几百个字节之间,但在某些极端的情况下,editlog文件会变得非常大,甚至将磁盘空间写满。通过上面小节的学习我们知道,在Namenode 启动过程中一个很重要的部分就是逐条读取editlog文件中的记录,之后与Namenode命名空间合并。巨大的editlog文件会导致Namenode的启动时间过长,为了解决这个问题,HDFS 引入了检查点机制(checkpointing)。
如图所示,HDFS的检查点机制会定时将editlog文件与fsimage文件合并以产生新 的fsimage文件。这样Namenode就可以直接从fsimage加载元数据,而不用读取与合并editlog 中的记录了。有了检查点机制,Namenode命名空间的重建效率会大大提高,同时Namenode 的启动时间也会相应减少。
但是合并fsimage与editlog以产生新的fsimage文件是一项非常消耗I/O和 CPU资源的 操作。在执行检查点操作期间,Namenode还需要限制其他用户对HDFS的访问和操作。为了预防这种情况的发生,HDFS将检查点操作从Active Namenode移动到了 Secondary Namenode 或者Standby Namenode 上 , 至于具体是哪一种Namenode,则取决于当前的HDFS是否开启了HA功能。
在以下两种情况下,Namenode会触发一次检查点操作:
① 超过了配置的检查点操作时 长 (dfs.namenode.checkpoint.period配置项配置);
② 从上一次检查点操作后,发生的事务 (transaction) 数超过了配置(dfs.namenode.checkpoint.txns配置项配置)。
(1) Secondary Namenode执行检查点操作
在非HA部署环境下,检查点操作是由Secondary Namenode来执行的。Secondary Namenode是 HDFS1.X版本中一个非热备的Namenode备份节点,整个检查点流程如图所示。
整个检查点流程如下所示。
- Secondary Namenode检查两个触发检查点流程的条件是否满足。由于在非HA状态下,Secondary Namenode和Namenode之间并没有共享的editlog文件目录,所以最新的事务 id (transactionld) 是 Secondary Namenode 通过调用 RPC 方法 Namenode Protocol.getTransactionld ()获取的。
- Secondary Namenode 调用 RPC 方法 NamenodeProtocol.rollEditLog()触发 editlog 重置 操作,将当前正在写的editlog段落结束,并创建新的edit.new文件,这个操作还会 返回当前fsimage以及刚刚重置的editlog的事务id (seen id)o这样当Secondary Namenode从Namenode读取editlog文件时,新的操作就可以写入edit.new文件中, 不影响editlog记录功能。在 HA模式下,并不需要显式地触发editlog的重置操作, 因为 Standby Namenode 会定期重置 editlog
- 有了最新的txid以及seen id, Secondary Namenode就会发起HTTP GET请求到 Namenode 的 GetlmageServlet 以获取新的 fsimage 和 editlog 文件。需要注意,Secondary Namenode在进行上一次的检查点操作时,可能已经获取了部分fsimage和edits文件。
- Secondary Namenode会加载新下载的fsimage文件以重建Secondary Namenode的命 名空间。
- Secondary Namenode读取edits中的记录,并与当前的命名空间合并,这样Secondary Namenode的命名空间和Namenode的命名空间就同步了。
- Secondary Namenode将最新的同步的命名空间写入新的fsimage文件中。
- Secondary Namenode 向 Namenode 的 ImageServlet 发送 HTTP GET 请求/getimage? putimage=l。这个请求的URL中还包含了新的fsimage文件的事务ID,以及Secondary Namenode用于下载的端口和IP地址。
- Namenode 会根据 Secondary Namenode 提供的信息向 Secondary Namenode 的 GetlmageServlet发起HTTP GET请求下载fsimage文件。Namenode首先将下载文件 命名为fsimage.ckpt_,然后创建MD5校验和,最后将fsimage.ckpt_重命名为fsimage_。
至此,一个完整的fsimage的检查点操作就完成了。
(2)Standby Namenode执行检查点操作
HDFS2.X版本中加入了 Namenode HA策略,这使得HA机制下检查点操作的流程与非 HA机制下的完全不同,因为在HA配置下已经没有Secondary Namenode这个节点了,而是 直接通过配置奇数个JoumalNode来实现Namenode热备HA策略。 这里我们首先介绍HDFS2.X版本中的HA策略,在同一个HA HDFS集群中,将会同时运行两个Namenode实例,其中一个为Active Namenode,用于实时处理所有客户端请求;另一个为Standby Namenode, Standby Namenode 的命名空间与ActiveNamenode是完全保持一致的。所以当ActiveNamenode岀现故障时, Standby Namenode可以立即切换成Active状态。 为了让Standby Namenode的命名空间与Active Namenode保持同步,它们都将和 JoumalNodes守护进程通信。当Active Namenode执行任何修改命名空间的操作时,它至少需 要将产生的editlog文件持久化到N- (N-1)/2个JoumalNode节点上才能保证命名空间修改的安 全性。换句话说,如果在HA策略下启动了 N 个JoumalNode进程,那么整个JoumalNode集群最多允许(N-1)/2个进程死掉,这样才能保证editlog成功完整地被写入。比如集群中有3 个 JoumalNode时,最多允许1个JoumalNode挂掉;集群中有5 个JoumalNode时,最多允许2 个JoumalNode挂掉。Standby Namenode则负责观察editlog文件的变化,它能够从JoumalNodes 中读取editlog文件,然后合并更新到它自己的命名空间中。一旦Active Namenode岀现故障, Standby Namenode就会保证从JoumalNodes中读出全部的editlog文件,然后切换成Active状 态。Standby Namenode读取全部的editlog文件可确保在发生故障转移之前,和 Active Namenode拥有完全同步的命名空间状态。
Standby Namenode始终保持着一个最新版本的命名空间,它会不断地将读入的editlog文 件与当前的命名空间合并,所以检查点机制在HA模式下就简单了很多。Standby Namenode 只需判断当前是否满足触发检查点操作的两个条件,如果满足触发条件,则 将 Standby Namenode的命名空间写入一个新的fsimage文件中,并通过HTTP将这个fsimage文件传回 Active Namenode HA状态下的Standby Namenode检查点流程如图:
- Standby Namenode检查是否满足触发检査点操作的两个条件。
- Standby Namenode将当前的命名空间保存到fsimage.ckpt_txid文件中,这里的txid是当前最新的editlog文件中记录的事务ido之后Standby Namenode写入fsimage文 件 的MD5校验和,最后重命名这个fsimage.ckpt_txid文件为fsimage_txido当执行 这个操作时,其他的Standby Namenode操作将会被阻塞,例如Active Namenode发 生错误时,需要进行主备切换或者访问Standby Namenode的Web接口等操作。注意, Active Namenode的操作并不会被影响,例如listing、readingwriting文件等。
- Standby Namenode 向 Active Namenode 的 ImageServlet 发送 HTTP GET 请求 /getimage?putimage=1 这个请求的URL中包含了新的fsimage文件的事务id ,以及 Standby Namenode用于下载的端口和IP地址。
- Active Namenode 会根据 Standby Namenode 提供的信息向 Standby Namenode 的 ImageServlet发起HTTP GET请求下载fsimage文件。Namenode首先将下载文件命 名 为 fsimage,ckpt_* , 然 后 创 建 M D5校验和,最 后 将 fsimage.ckpt_重命名为 fsimage_。
知道了检查点操作的流程之后,我们看 一 下HDFS代码是如何实现检查点功能的。 StandbyNamenode会 持 有 一 个 StandbyCheckpointer类 , 这 个 类 维 护 着 一 个 叫 作 CheckpointerThread 的线程,CheckpointerThread 线程会每隔 1000*Math.min(checkpointCheck Period, checkpointPeriod)秒 检 测 是 否 执 行 一 次 检 查 点 逻 辑 (checkpointCheckPeriod 由 dfs.namenode.checkpoint.period 配置,,checkpointPeriod 则由dfknamenode.checkpoint.check.period 配置)。整个检查点逻辑是在CheckpointeiThread.doWork()方法中实现的,流程与上面介绍的 一 样 。doWork()方法首先会判断是否满足检查点操作的两个条件,如果满足则调用 doCheckpoint()执行检查点操作
private void dowork(){ while(shouldRun){ try{ //间隔时长1000*Math.min(checkpointCheckPeriod,checkpointPeriod) Thread.sleep(1000*checkpointConf.getCheckPeriod()); }catch(InterruptedException ie){ if(!shouldRun){ break; try{ long now=now(); //获得最后一次往JournalNode写入的txid和最近一次做检查点的txid的差值1ong uncheckpointed=countUncheckpointedTxns(); //计算当前时间和上一次检查点操作时间的间隔 long secssinceLast=(now-lastCheckpointTime)/1000; boolean needCheckpoint=false; //第一种符合合并的情况:当最后一次往 JournalNode写入的txid和最近一次做检查点的txid的差值大于或者等于dfs.namenode.checkpoint.txns配置的数量(默认为1000000)时做一次合并if(uncheckpointed>=checkpointConf.getTxncount()){ needCheckpoint=true; //第二种符合合并的情况:当时间间隔大于或者等于dfs.namenode.checkpoint.period配置的时间时做合并 else if(secssincelast>=checkpointConf.getPeriod()){ needCheckpoint=true; //满足检查点执行条件,则调用docheckpoint()方法执行检查点操作if(needcheckpoint){ docheckpoint(); lastCheckpointTime=now; }catch(SaveNamespaceCancelledException ce){ canceledCount++; }catch(InterruptedException ie){ continue; }catch(Throwable t){ )finally{ synchronized(cancelLock){ canceler=nul1; } } } } }
可以看到,整个检查点执行操作的逻辑都是在doCheckpoint方法中实现的。doCheckpoint()方法首先获取当前保存的fsimage的prevCheckpointTxld,然后 获取最近更新的 editlog 的 thisCheckpointTxId, 只有新的 thisCheckpointTxId 大于 prevCheckpointTxId, 也就是当前命名空间有更新,但是并没有保存到新的fsimage文件中时,才有必要进行一次检査点操作。 判断完成后,doCheckpointO会调用saveNamespace()方法将最新的命名空间保存到fsimage文件 中。之后构造一个线程,将新产生的fsimage文件通过HTTP方式上传到AvtiveNamenode中。
private void doCheckpoint()throws InterruptedException,IOException{ //. namesystem.1ongReadLockInterruptibly(); try{ //获取当前Standby Namenode上保存的最新的fsimage对象 FSImage img =namesystem.getFSImage(); //获取fsimage中保存的txid,也就是上一次进行检查点操作时保存的txid long prevCheckpointTxId=img.getStorage().getMostRecentCheckpointTxId(); //获取当前命名空间的最新的txid,也就是收到的editlog的最新的txid 1ong thischeckpointTxId=img.getLastAppliedorwrittenTxId(); //thischeckpointTxId一定大于prevcheckpointTxId assert thisCheckpointTxId >=prevCheckpointTxId; //如果相等则没有必要执行检查点操作,当前fsimage已经是最新的了 if(thischeckpointTxId==prevcheckpointTxId){ return; } if(namesystem.isRollingUpgrade() &&!namesystem.getFSImage().hasRollbackFSImage()){ //如果当前Namenode正在执行升级操作,则创建fsimage rollback文件imageType=NameNodeFile.IMAGE_ROLLBACK; )else{ //在正常情况下创建fsimage文件 imageType=NameNodeFile.IMAGE; ) //调用saveNamespace()将当前命名空间保存到新的文件中img.saveNamespace(namesystem,imageType,canceler); }finally{ namesystem.1ongReadunlock(); } //构造一个线程,通过HTTP将fsimage上传到Active Namenode中 ExecutorService executor =Executors.newSingleThreadExecutor(uploadThreadFactory); Future<Void>upload=executor. submit(new Callable<Void>(){ @ override public Void cal1() throws IOException{ TransferFsImage. uploadImageFromStorage(activeNNAddress, conf, namesystem. getFSImage(). getstorage(), imageType, txid, canceler); return null; }); executor. shutdown(); try{ upload. get(); ) catch(InterruptedException e){ upload. cance1(true); throw e; ) catch(ExecutionException e){ throw new IOException("Exception during image upload:"+e. getMessage(), e. getCause());
doCheckpoint()方法调用了FSImage.saveNamespace()方法将当前命名空间保存到新的fsimage文件中。saveNamespace()方法首先重置(roll)了editlog文件,将当前的edit inprogress文件关闭并重命名,为与fsimage文件合并做准备。之后调用saveFSImagelnAlIDirs()将 fsimage和editlog文件加载到命名空间中,并将更新的命名空间保存到新的fsimage文件中。最后开启新的 editlog inprogress文件,用于记录新的操作。
public synchronized void saveNamespace(FSNamesystem source,NameNodeFile nnf,Canceler canceler)throws IOException{ boolean editLogwasopen=editLog.issegmentopen();if(editLogWasOpen){ //将当前editinprogress文件关闭,并重命名 editLog.endCurrentLogSegment(true); long imageTxId=getLastAppliedorwrittenTxId(); try{ //调用saveFSImageInA11Dirs()方法将当前的命名空间保存到新的fsimage文件中 saveFSImageInA11Dirs(source,nnf,imageTxId,canceler);storage.writeAl1(); }finally{ if(editLogwasOpen){ //开启新的editlog_inprogress文件用于记录请求 editLog.startLogSegment(imageTxId+1,true);storage.writeTransactionIdFileToStorage(imageTxId +1); }
将 fsimage 保存成功后,doCheckpoint()方法就会调用 TransferFsImage.uploadlmageFrom Storage()方法将新生成的fsimage文件上传到Active Namenode中了至此,HDFS中完整的检査点流程就介绍完了。