上一篇 讲解了nameNode启动的时候,NameNodeHttpServer的启动流程,其实简单来说就是基于hadoop自己实现的HttpServer2服务绑定一个InetSokcetAddress地址,也就是端口号,端口号哪来的?默认配置文件获取呗,最后在将HttpServer2中绑定一些servlet来处理url请求就完成了我们50070端口的请求处理。
那么本篇分析nameNode进程第二个比较核心的,应该说是最核心的组件 FSNamesystem,为什么说是最核心组件呢,因为元数据管理和block的管理都在这个里面进行操作。
直接进入正题,先回顾一下fsimage(全量快照) 文件 以及 edits log(增量事务记录) 文件,自己回忆下这两个文件的重要性。
找到FSNamesystem 的入口
找到之前的nameNode初始化的地方,里面的loadNamesystem(conf)方法,这个就是核心组件FSNamesystem 构建的入口
/**
* Initialize name-node.
*
* @param conf the configuration
*/
protected void initialize(Configuration conf) throws IOException {
//其实这些配置 就是对应着我们配置的 hdfs-site.xml 或者 core-default.xml或者其他文件中一些配置信息
//这些一般也不是很重要
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf);
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
//如果是nameNode 启动一个httpServer
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
// 初始化FSNameSystem 核心组件
loadNamesystem(conf);
//初始化rpc server 组件
rpcServer = createRpcServer(conf);
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(rpcServer.getRpcAddress());
LOG.info("Clients are to use " clientNamenodeAddress " to access"
" this namenode/service.");
}
//如果是NameNode 设置NameNodeAddress 以及 FsImage
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
}
pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
//一些公共服务的初始化
startCommonServices(conf);
}
这里面会做一些什么操作呢,跟进去看一下
protected void loadNamesystem(Configuration conf) throws IOException {
//创建和初始化FSNameSystem
//nameNode启动的时候 会讲磁盘上的fsimange 文件 以及 edits log 文件读取到内存中进行合并,
// 合并形成最新的元数据
//loadFromDisk 就是讲fsimage 和 edits log 文件读取到内存进行合并
//合并成最新的元数据 就会作为FSNamesystem存放在内存中
this.namesystem = FSNamesystem.loadFromDisk(conf);
}
还是原先那样,如果有注释,尽可能的把注释看懂,并且结合自己强大的技术底蕴,大胆的猜测里面做了什么操作。。。
/**
* Instantiates an FSNamesystem loaded from the image and edits
* directories specified in the passed Configuration.
* 实例化一个FSNamesystem,怎么实例化呢?加载配置文件中指定的 image 和 edits 文件目录
* 如果没有指定呢,hadoop肯定自己默认的文件目录
*
* @param conf the Configuration which specifies the storage directories
* from which to load
* 包含了存储目录的Configuration,虽然有默认的,至少也要传递过来把,对吧,肯定是通过可识别方式,难不成用眼神?
*
* @return an FSNamesystem which contains the loaded namespace
* 返回一个包含了加载了 namespace 元数据的 FSNamesystem实例
*
* @throws IOException if loading fails
*/
static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
checkConfiguration(conf);
//构建FSImage,怎么构建?肯定是从磁盘加载
//FSImage 就是一个时间点的元数据快照信息,其实也就是元数据信息
//FSNamesystem.getNamespaceDirs(conf) 获取元数据的目录,肯定是找指定的配置信息,如果没有肯定就是默认值啦
// 默认值在hdfs-default.xml 以及 hadoop common 中的core-default.xml 文件中
//file://${hadoop.tmp.dir}/dfs/name ${hadoop.tmp.dir}:/tmp/hadoop-${user.name}
//可以自己观察下启动的namenode进程的这个目录是否和这个匹配
FSImage fsImage = new FSImage(conf,
FSNamesystem.getNamespaceDirs(conf),
//FSNamesystem.getNamespaceEditsDirs(conf)) 获取edits log 的目录,和上面的一样对不对
//默认情况下 edis log 和namespace 是在同一个目录下,可以进去看下配置信息
FSNamesystem.getNamespaceEditsDirs(conf));
//实例化 FSnamesystem 对象,讲fsImage对象放入到了FSNamesystem中
FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
StartupOption startOpt = NameNode.getStartupOption(conf);
if (startOpt == StartupOption.RECOVER) {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = now();
try {
//这里就是说通过FSNamesystem 将fsimage 以及 edits log 加载到内存中
//然后再内存中合并两个文件,形成新的fsImange 信息
//(注:默认情况下 每隔一段时间 就会有check point 将旧的fsimage 与 edits log
// 就行合并形成新的fsimage文件,启动的时候肯定也需要合并才能形成新的fsimage文件 对吧)
//最后再内存中持有一份完整的元数据信息
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
LOG.warn("Encountered exception loading fsimage", ioe);
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = now() - loadStart;
LOG.info("Finished loading FSImage in " timeTakenToLoadFSImage " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
}
return namesystem;
}
大家可以看到里面做了什么操作呢?
1.构建了一个fsImage对象,注意传入的参数名称,可以发现传入了fsImage 以及edits log,这里大概就猜测出新的fsImage 对象肯定是通过两个合并成新的,但是这里并没有加载数据
2.将新的完整的fsImage元数据信息传递给了FSNamesystem
3.最后加载,意思就是前面只是在初始化一些环境以及配置选项信息以及检查,到这里loadFromDisk才是真正的加载合并
构建FSNamesystem其实里面就是获取一些配置信息以及初始化一些选项,我们简单看一下,里面的内容很多我们主要看下注释 ,看到没,看到没
注释明确说了,不会加载数据,只是构建,如果加载数据使用loadFromDisk,所以注意上面的关联
/**
* Create an FSNamesystem associated with the specified image.
* 通过指定的 image 元数据快照创建一个 FSNamesystem对象
* Note that this does not load any data off of disk -- if you would
* like that behavior, use {@link #loadFromDisk(Configuration)}
* 注意 这里不从磁盘加载任何的数据,如果想加载数据,请使用 loadFromDisk(Configuration)
* 看到没,这里明确说了不加载数据对吧
* @param conf configuration
* @param fsImage The FSImage to associate with
* @param ignoreRetryCache Whether or not should ignore the retry cache setup
* step. For Secondary NN this should be set to true.
* @throws IOException on bad configuration
*/
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
那么我们就直接看下核心的加载数据的方法
loadFSImage()方法
里面大概会有哪些操作?
1.肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息fsImage,
2.将新的fsImage 文件进行覆盖
3.那么edits log 文件呢,肯定也会重新刷新一份。
/**
* 肯定就是加载 fsImage 文件 和 edits log 文件 进行合并为一份完整的内存元数据信息,在进行
* 磁盘回写,同时注意哦,edits log 文件肯定也要重新来一份新的
**/
private void loadFSImage(StartupOption startOpt) throws IOException {
final FSImage fsImage = getFSImage();
//刚启动的时候 namenode 读取fsimage 以及 edits log 两个文件
//然后再内存中合并形成一个新的fsiamge文件 包含了完整的最新的元数据信息
//然后重新存储在磁盘,替换旧的fsimage 文件,同时会打开一个edits log 文件,
// format before starting up if requested
//处理下数据,
if (startOpt == StartupOption.FORMAT) {
fsImage.format(this, fsImage.getStorage().determineClusterId());// reuse current id
startOpt = StartupOption.REGULAR;
}
boolean success = false;
writeLock();
try {
// We shouldn't be calling saveNamespace if we've come up in standby state.
//不会存储元数据,如果我们启动了一个 standby 服务
MetaRecoveryContext recovery = startOpt.createRecoveryContext();
//进行数据加载和 合并
final boolean staleImage
= fsImage.recoverTransitionRead(startOpt, this, recovery);
if (RollingUpgradeStartupOption.ROLLBACK.matches(startOpt) ||
RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
rollingUpgradeInfo = null;
}
final boolean needToSave = staleImage && !haEnabled && !isRollingUpgrade();
LOG.info("Need to save fs image? " needToSave
" (staleImage=" staleImage ", haEnabled=" haEnabled
", isRollingUpgrade=" isRollingUpgrade() ")");
//如果需要将fsImage 存储到磁盘 就调用
//其实里面就是将最新的
if (needToSave) {
fsImage.saveNamespace(this);
} else {
updateStorageVersionForRollingUpgrade(fsImage.getLayoutVersion(),
startOpt);
// No need to save, so mark the phase done.
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAVING_CHECKPOINT);
prog.endPhase(Phase.SAVING_CHECKPOINT);
}
//打开一个新的edits log 文件 去进行写入
// This will start a new log segment and write to the seen_txid file, so
// we shouldn't do it when coming up in standby state
if (!haEnabled || (haEnabled && startOpt == StartupOption.UPGRADE)
|| (haEnabled && startOpt == StartupOption.UPGRADEONLY)) {
fsImage.openEditLogForWrite();
}
success = true;
} finally {
if (!success) {
fsImage.close();
}
writeUnlock();
}
imageLoadComplete();
}
我们可以看到除了一些常规的check 和 formate 那么比较重要的就是加载fsImage 以及 edits log 文件的方法入口
fsImage.recoverTransitionRead(startOpt, this, recovery);
大致想想里面会做什么操作?就是将fsImage 文件 和 edits log 文件加载并合并
/**
* Analyze storage directories.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
* 分析存储的目录,什么存储目录,就是存储fsImage 以及edits log 的目录、
* 从以前的状态恢复
* 根据元信息 判断是否执行fs状态的转换
* 读取存储的信息
* 上面意思是什么呢,大概意思就是如果以前有fsImage 和 edits log 就从文件信息中加载出来 并进行恢复
* @throws IOException
* @return true if the image needs to be saved or false otherwise
*/
boolean recoverTransitionRead(StartupOption startOpt, FSNamesystem target,
MetaRecoveryContext recovery)
throws IOException {
assert startOpt != StartupOption.FORMAT :
"NameNode formatting should be performed before reading the image";
//获取fsImage 文件资源地址 其实也就是目录
Collection<URI> imageDirs = storage.getImageDirectories();
//获取edits log 目录
Collection<URI> editsDirs = editLog.getEditURIs();
// none of the data dirs exist
if((imageDirs.size() == 0 || editsDirs.size() == 0)
&& startOpt != StartupOption.IMPORT)
throw new IOException(
"All specified directories are not accessible or do not exist.");
// 1. For each data directory calculate its state and
// check whether all is consistent before transitioning.
//检查每个数据目录,判断是否状态一致性,什么意思呢,
// 进行数据恢复 里面就是对一些之前停机的时候 更新 回滚 新增数据的恢复操作
Map<StorageDirectory, StorageState> dataDirStates =
new HashMap<StorageDirectory, StorageState>();
boolean isFormatted = recoverStorageDirs(startOpt, storage, dataDirStates);
if (LOG.isTraceEnabled()) {
LOG.trace("Data dir states:\n "
Joiner.on("\n ").withKeyValueSeparator(": ")
.join(dataDirStates));
}
if (!isFormatted && startOpt != StartupOption.ROLLBACK
&& startOpt != StartupOption.IMPORT) {
throw new IOException("NameNode is not formatted.");
}
int layoutVersion = storage.getLayoutVersion();
if (startOpt == StartupOption.METADATAVERSION) {
System.out.println("HDFS Image Version: " layoutVersion);
System.out.println("Software format version: "
HdfsConstants.NAMENODE_LAYOUT_VERSION);
return false;
}
if (layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION) {
NNStorage.checkVersionUpgradable(storage.getLayoutVersion());
}
if (startOpt != StartupOption.UPGRADE
&& startOpt != StartupOption.UPGRADEONLY
&& !RollingUpgradeStartupOption.STARTED.matches(startOpt)
&& layoutVersion < Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION
&& layoutVersion != HdfsConstants.NAMENODE_LAYOUT_VERSION) {
throw new IOException(
"\nFile system image contains an old layout version "
storage.getLayoutVersion() ".\nAn upgrade to version "
HdfsConstants.NAMENODE_LAYOUT_VERSION " is required.\n"
"Please restart NameNode with the \""
RollingUpgradeStartupOption.STARTED.getOptionString()
"\" option if a rolling upgrade is already started;"
" or restart NameNode with the \""
StartupOption.UPGRADE.getName() "\" option to start"
" a new upgrade.");
}
//执行一些启动选项以及一些二更操作
storage.processStartupOptionsForUpgrade(startOpt, layoutVersion);
// 2. Format unformatted dirs.
for (Iterator<StorageDirectory> it = storage.dirIterator(); it.hasNext();) {
StorageDirectory sd = it.next();
StorageState curState = dataDirStates.get(sd);
switch(curState) {
case NON_EXISTENT:
throw new IOException(StorageState.NON_EXISTENT
" state cannot be here");
case NOT_FORMATTED:
LOG.info("Storage directory " sd.getRoot() " is not formatted.");
LOG.info("Formatting ...");
sd.clearDirectory(); // create empty currrent dir
break;
default:
break;
}
}
// 3. Do transitions
switch(startOpt) {
case UPGRADE:
case UPGRADEONLY:
doUpgrade(target);
return false; // upgrade saved image already
case IMPORT:
doImportCheckpoint(target);
return false; // import checkpoint saved image already
case ROLLBACK:
throw new AssertionError("Rollback is now a standalone command, "
"NameNode should not be starting with this option.");
case REGULAR:
default:
// just load the image
}
//真正的加载fsImage 和 edits log 文件进行合并
return loadFSImage(target, startOpt, recovery);
}
我们其实又看到了一个loadFSImage 操作,这个里面会是真的加载 数据进行合并?
/**
* Choose latest image from one of the directories,
* load it and merge with the edits.
*
* 选择最新的 image 全量快照 和 edits log 文件进行合并
* 哇,终于看到了
*
* Saving and loading fsimage should never trigger symlink resolution.
* The paths that are persisted do not have *intermediate* symlinks
* because intermediate symlinks are resolved at the time files,
* directories, and symlinks are created. All paths accessed while
* loading or saving fsimage should therefore only see symlinks as
* the final path component, and the functions called below do not
* resolve symlinks that are the final path component.
*
* @return whether the image should be saved
* @throws IOException
*/
private boolean loadFSImage(FSNamesystem target, StartupOption startOpt,
MetaRecoveryContext recovery)
throws IOException {
final boolean rollingRollback
= RollingUpgradeStartupOption.ROLLBACK.matches(startOpt);
final EnumSet<NameNodeFile> nnfs;
if (rollingRollback) {
// if it is rollback of rolling upgrade, only load from the rollback image
nnfs = EnumSet.of(NameNodeFile.IMAGE_ROLLBACK);
} else {
// otherwise we can load from both IMAGE and IMAGE_ROLLBACK
nnfs = EnumSet.of(NameNodeFile.IMAGE, NameNodeFile.IMAGE_ROLLBACK);
}
final FSImageStorageInspector inspector = storage
.readAndInspectDirs(nnfs, startOpt);
isUpgradeFinalized = inspector.isUpgradeFinalized();
List<FSImageFile> imageFiles = inspector.getLatestImages();
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.LOADING_FSIMAGE);
File phaseFile = imageFiles.get(0).getFile();
prog.setFile(Phase.LOADING_FSIMAGE, phaseFile.getAbsolutePath());
prog.setSize(Phase.LOADING_FSIMAGE, phaseFile.length());
boolean needToSave = inspector.needToSave();
Iterable<EditLogInputStream> editStreams = null;
initEditLog(startOpt);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// If we're open for write, we're either non-HA or we're the active NN, so
// we better be able to load all the edits. If we're the standby NN, it's
// OK to not be able to read all of edits right now.
// In the meanwhile, for HA upgrade, we will still write editlog thus need
// this toAtLeastTxId to be set to the max-seen txid
// For rollback in rolling upgrade, we need to set the toAtLeastTxId to
// the txid right before the upgrade marker.
long toAtLeastTxId = editLog.isOpenForWrite() ? inspector
.getMaxSeenTxId() : 0;
if (rollingRollback) {
// note that the first image in imageFiles is the special checkpoint
// for the rolling upgrade
toAtLeastTxId = imageFiles.get(0).getCheckpointTxId() 2;
}
///加载edits log 文件
editStreams = editLog.selectInputStreams(
imageFiles.get(0).getCheckpointTxId() 1,
toAtLeastTxId, recovery, false);
} else {
editStreams = FSImagePreTransactionalStorageInspector
.getEditLogStreams(storage);
}
int maxOpSize = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT);
for (EditLogInputStream elis : editStreams) {
elis.setMaxOpSize(maxOpSize);
}
for (EditLogInputStream l : editStreams) {
LOG.debug("Planning to load edit log stream: " l);
}
if (!editStreams.iterator().hasNext()) {
LOG.info("No edit log streams selected.");
}
FSImageFile imageFile = null;
for (int i = 0; i < imageFiles.size(); i ) {
try {
imageFile = imageFiles.get(i);
//记载Image文件
loadFSImageFile(target, recovery, imageFile, startOpt);
break;
} catch (IOException ioe) {
LOG.error("Failed to load image from " imageFile, ioe);
target.clear();
imageFile = null;
}
}
// Failed to load any images, error out
if (imageFile == null) {
FSEditLog.closeAllStreams(editStreams);
throw new IOException("Failed to load an FSImage file!");
}
prog.endPhase(Phase.LOADING_FSIMAGE);
if (!rollingRollback) {
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
needToSave |= needsResaveBasedOnStaleCheckpoint(imageFile.getFile(),
txnsAdvanced);
if (RollingUpgradeStartupOption.DOWNGRADE.matches(startOpt)) {
// rename rollback image if it is downgrade
renameCheckpoint(NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE);
}
} else {
// Trigger the rollback for rolling upgrade. Here lastAppliedTxId equals
// to the last txid in rollback fsimage.
rollingRollback(lastAppliedTxId 1, imageFiles.get(0).getCheckpointTxId());
needToSave = false;
}
editLog.setNextTxId(lastAppliedTxId 1);
return needToSave;
}
/** rollback for rolling upgrade. */
private void rollingRollback(long discardSegmentTxId, long ckptId)
throws IOException {
// discard discard unnecessary editlog segments starting from the given id
this.editLog.discardSegments(discardSegmentTxId);
// rename the special checkpoint
renameCheckpoint(ckptId, NameNodeFile.IMAGE_ROLLBACK, NameNodeFile.IMAGE,
true);
// purge all the checkpoints after the marker
archivalManager.purgeCheckpoinsAfter(NameNodeFile.IMAGE, ckptId);
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
if (HAUtil.isHAEnabled(conf, nameserviceId)) {
// close the editlog since it is currently open for write
this.editLog.close();
// reopen the editlog for read
this.editLog.initSharedJournalsForRead();
}
}
进行loadFSImanageFile文件的加载,这里就比较枯燥了,为什么?因为我们明显要有大局观,首先我们肯定直到一定是加载文件,对吧?那么文件最后加载到哪里去进行存储?因为我们的FSNamesystem 是核心管理元数据的组件,大家肯定就也想到了,最终数据肯定是加载到其中啦,所以大家注意看这里对啊不,一直有将FSNamesystem 作为参数再进行传递
/**
* 这里面就不仔细去详细的跟到文件加载了,
* loadFSImage()方法就是最终加载文件的方法
*/
void loadFSImageFile(FSNamesystem target, MetaRecoveryContext recovery,
FSImageFile imageFile, StartupOption startupOption) throws IOException {
LOG.debug("Planning to load image :\n" imageFile);
StorageDirectory sdForProperties = imageFile.sd;
storage.readProperties(sdForProperties, startupOption);
if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.TXID_BASED_LAYOUT, getLayoutVersion())) {
// For txid-based layout, we should have a .md5 file
// next to the image file
boolean isRollingRollback = RollingUpgradeStartupOption.ROLLBACK
.matches(startupOption);
loadFSImage(imageFile.getFile(), target, recovery, isRollingRollback);
} else if (NameNodeLayoutVersion.supports(
LayoutVersion.Feature.FSIMAGE_CHECKSUM, getLayoutVersion())) {
// In 0.22, we have the checksum stored in the VERSION file.
String md5 = storage.getDeprecatedProperty(
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY);
if (md5 == null) {
throw new InconsistentFSStateException(sdForProperties.getRoot(),
"Message digest property "
NNStorage.DEPRECATED_MESSAGE_DIGEST_PROPERTY
" not set for storage directory " sdForProperties.getRoot());
}
loadFSImage(imageFile.getFile(), new MD5Hash(md5), target, recovery,
false);
} else {
// We don't have any record of the md5sum
loadFSImage(imageFile.getFile(), null, target, recovery, false);
}
}
/**
* Load in the filesystem image from file. It's a big list of
* filenames and blocks.
* 加载 fsimage文件,
*/
private void loadFSImage(File curFile, MD5Hash expectedMd5,
FSNamesystem target, MetaRecoveryContext recovery,
boolean requireSameLayoutVersion) throws IOException {
// BlockPoolId is required when the FsImageLoader loads the rolling upgrade
// information. Make sure the ID is properly set.
target.setBlockPoolId(this.getBlockPoolID());
//一个持有FSNamesystem 以及 conf 对象的loader,加载器嘛,为什么持有FSNamesystem?
//因为FSNamesystem 是管理元数据的核心组件,最终的元数据都是存储在FSNamesystem 中对把
FSImageFormat.LoaderDelegator loader = FSImageFormat.newLoader(conf, target);
loader.load(curFile, requireSameLayoutVersion);
// Check that the image digest we loaded matches up with what
// we expected
MD5Hash readImageMd5 = loader.getLoadedImageMd5();
if (expectedMd5 != null &&
!expectedMd5.equals(readImageMd5)) {
throw new IOException("Image file " curFile
" is corrupt with MD5 checksum of " readImageMd5
" but expecting " expectedMd5);
}
long txId = loader.getLoadedImageTxId();
LOG.info("Loaded image for txid " txId " from " curFile);
lastAppliedTxId = txId;
storage.setMostRecentCheckpointInfo(txId, curFile.lastModified());
}
其实知道这大家也就知道了其实就是实例化了一个loader 组件去加载fsImage文件了,底层就是基于文件流加载咯
接下就是加载edits log 以及进行一个合并了入口就是
//执行加载edits log 其实里面也包含了合并 fsImage 与 edits log 的操作
long txnsAdvanced = loadEdits(editStreams, target, startOpt, recovery);
这里大家也可以发现其实也是将我们的editStream 目录 以及 FSNamesystem 作为了参数传入,底层说不定也是基于loader 加载edits log 然后放入到FSNamesystem中对吧,当然 肯定也会有一些其他的操作,比如检查,筛选对吧
/**
* Load the specified list of edit files into the image.
* 加载指定的edit files 文件到image中
* 怎么加载 就是再跑一边记录的指令咯
*/
public long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target) throws IOException {
return loadEdits(editStreams, target, null, null);
}
private long loadEdits(Iterable<EditLogInputStream> editStreams,
FSNamesystem target, StartupOption startOpt, MetaRecoveryContext recovery)
throws IOException {
LOG.debug("About to load edits:\n " Joiner.on("\n ").join(editStreams));
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.LOADING_EDITS);
long prevLastAppliedTxId = lastAppliedTxId;
try {
//实例化一个loader 加载组件,也是和加载FSImage 文件一样,持有了 FSNamesystem
FSEditLogLoader loader = new FSEditLogLoader(target, lastAppliedTxId);
// Load latest edits
//加载最新的edits log
for (EditLogInputStream editIn : editStreams) {
LOG.info("Reading " editIn " expecting start txid #"
(lastAppliedTxId 1));
try {
//进行文件流的加载对吧,底层不用说 肯定是基于文件流咯
loader.loadFSEdits(editIn, lastAppliedTxId 1, startOpt, recovery);
} finally {
// Update lastAppliedTxId even in case of error, since some ops may
// have been successfully applied before the error.
lastAppliedTxId = loader.getLastAppliedTxId();
}
// If we are in recovery mode, we may have skipped over some txids.
if (editIn.getLastTxId() != HdfsConstants.INVALID_TXID) {
lastAppliedTxId = editIn.getLastTxId();
}
}
} finally {
FSEditLog.closeAllStreams(editStreams);
// update the counts
updateCountForQuota(target.dir.rootDir);
}
prog.endPhase(Phase.LOADING_EDITS);
return lastAppliedTxId - prevLastAppliedTxId;
}
/**
* Load an edit log, and apply the changes to the in-memory structure
* This is where we apply edits that we've been writing to disk all
* along.
* 记载edit log,同时将一些更改变化加入到内存结构中,这里其实就是将我们的记录再
* edits log 中的操作指令再进行一次刷新
*/
long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
StartupOption startOpt, MetaRecoveryContext recovery) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = createStartupProgressStep(edits);
prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock();
try {
long startTime = now();
FSImage.LOG.info("Start loading edits file " edits.getName());
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
startOpt, recovery);
FSImage.LOG.info("Edits file " edits.getName()
" of size " edits.length() " edits # " numEdits
" loaded in " (now()-startTime)/1000 " seconds");
return numEdits;
} finally {
edits.close();
fsNamesys.writeUnlock();
prog.endStep(Phase.LOADING_EDITS, step);
}
}
到这其实大家也就知道了逻辑了,最后的时候其实在返回一个needToSave ,做什么的我就不说了
小总结:FSNamesystem 初始化的步骤
1.从配置文件中获取fsimage 以及 edits log 目录
2.构建FSnamesystem,并传入 fsimage 以及 edits log 目录以及其他信息
3.loadfromdisk 加载 fsimage 以及edits log 文件到内存
4.将fsimage 和 edits log 文件进行合并成新的fsimage,也就是完整的元数据信息,在内存中持有一份
5.将新的完整的fsimage 文件替换 旧的fsimage 文件
6.开启一个新的edits log 文件记录增量事务

来源:https://www./content-1-823401.html
|