分享

hdfs file md5 计算,实现本地与hdfs同步文件

 陈永正的图书馆 2017-03-06

需求

本地有文件,hdfs也有文件,如果是同一个文件,则不同步,否则就同步文件
如果本地有的,hdfs无,则上传
如果本地无得,hdfs有,则删除

重构版本:hdfs sync 重构

思考

计算文件相同,则计算md5值

如何算

本来想用hdfs的checksum,但那个是crc,每次写block会去算一下,最后是一组checksum,而本地文件系统默认不会计算这个值
local
后面就直接用流计算的:


  def getHdfsFileMd5(path: Path, configuration: Configuration): String = {
    val dfs = FileSystem.get(configuration)
    val in = dfs.open(path)
    Try {
      DigestUtils.md5Hex(in)
    } match {
      case Success(s) ? in.close(); dfs.close(); s
      case Failure(e) ? in.close(); dfs.close(); e.getMessage
    }
  }


  def getLocalFileMd5(file: File): String = {
    val in = new FileInputStream(file)
    Try {
      DigestUtils.md5Hex(in)
    } match {
      case Success(s) ? in.close(); s
      case Failure(e) ? in.close(); e.getMessage
    }

  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

设计

按照刚才的思路,可以分成下面几种情况

本地 HDFS 是否相同
文件 文件 相同
文件 文件 不同
文件 文件夹 无需比较
文件夹 文件 无需比较
文件夹 文件夹 无需比较

所以设置一个type:

trait Mode

object Mode{
  type isSameFile = (Boolean,Boolean,Boolean)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

而在模式匹配中也是这样去使用的


  def syncHelper(localFile:File,hdfsPath:Path,configuration: Configuration) :Unit = {
    val fileSystem = FileSystem.get(configuration)
    val mode:Mode.isSameFile = (localFile.isFile,fileSystem.isFile(hdfsPath),
      sameMd5(localFile,hdfsPath,fileSystem))
    mode match {
      case(true,true,true) ? logger.info(s"the file :${localFile.getName} in local and hdfs are same one")
      case (true,true,false) ?
        logger.debug(s"the file: ${localFile.getName} in local and hdfs have same name,but they are different file")
//         copyFromLocal to hdfs by overwrite
        val fileSystem = FileSystem.get(configuration)
        fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath)
      case(true,false,_) ?
        logger.debug(s"the file: ${localFile.getName} in local is file and in hdfs is dir")
        //first delete file in hdfs.then copyFromLocal to hdfs
        fileSystem.delete(hdfsPath,true)
        fileSystem.copyFromLocalFile(false,true,new Path(localFile.getAbsolutePath),hdfsPath)
      case (false,true,_) ?
        val fileSystem = FileSystem.get(configuration)
        logger.debug(s"in local this is a dir and in hdfs is a file,upload ${localFile.getName} ")
        //first delete file in hdfs ,then copyFromLocal to hdfs
        composeAction(localFile,hdfsPath,fileSystem)
      case (false,false,_) ?
        logger.debug(s"both local and hdfs this is dir:${localFile.getName}")
        //three list ,which need update ,which need delete ,which need update
        composeAction(localFile,hdfsPath,fileSystem)
        val childrenDir = localFile.listFiles().filter(_.isDirectory)
        val hdfsParent = hdfsPath.toString
        childrenDir.foreach(file ? syncHelper(file,new Path(s"$hdfsParent/${file.getName}"),configuration))

    }
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

辅助函数

那些需要被删除的:

  def needDelete(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    FileSystem.get(configuration).listStatus(hdfsPath)
 .map(_.getPath.getName).diff(localFile.listFiles().map(_.getName)).toList
  }
  • 1
  • 2
  • 3
  • 4
  • 1
  • 2
  • 3
  • 4

那些需要被更新:

  def needUpdate(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    val tmpFile = FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)
      .intersect(localFile.listFiles().map(_.getName))

    val localMd5 = localFile.listFiles().filter(_.isFile)
      .filter(file ? tmpFile.contains(file.getName))
      .map(file ? (file.getName, getLocalFileMd5(file)))
    val fileSystem = FileSystem.get(configuration)
    val hdfsMd5 = FileSystem.get(configuration).listStatus(hdfsPath)
        .filter(path ? fileSystem.isFile(path.getPath))
      .filter(path ? tmpFile.contains(path.getPath.getName))
      .map(path ? (path.getPath.getName, getHdfsFileMd5(path.getPath, configuration)))
    localMd5.diff(hdfsMd5).map(_._1).toList
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

那些需要被上传的:


  def needUpload(localFile: File, hdfsPath: Path, configuration: Configuration) = {
    localFile.listFiles().map(_.getName).diff(
      FileSystem.get(configuration).listStatus(hdfsPath).map(_.getPath.getName)).toList
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5

组合函数:

  def composeAction(localFile:File,hdfsPath:Path,fileSystem: FileSystem) = {
    val configuration = fileSystem.getConf
    val deleteList = needDelete(localFile,hdfsPath,configuration)
    val uploadList = needUpload(localFile,hdfsPath,configuration)
    val updateList = needUpdate(localFile,hdfsPath,configuration)
    val concatList = uploadList ++ updateList
    val localParent = localFile.getAbsolutePath
    val hdfsParent = hdfsPath.toString
    logger.debug("deleting which file need delete")
    val deleteFileSystem = FileSystem.get(configuration)
    deleteList.foreach(name ? deleteFileSystem.delete(new Path(s"$hdfsParent/$name"),true))
    logger.debug("deleted")
    logger.debug("uploading which file need upload or update")
    val concatFileSystem = FileSystem.get(configuration)
    concatList.foreach(name ? concatFileSystem.copyFromLocalFile(false,true,
      new Path(s"$localParent/$name"),new Path(s"$hdfsParent/$name")))
    logger.debug("uploaded")
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

测试

package

package object sync {
  lazy val sync = PathSyncer
  lazy val fileMd5Spec = new File(new File("src/test/scala/com/ximalaya/data/sync/Md5Spec.scala").getAbsolutePath)
  lazy val pathHDFS = new Path("/tmp/todd/a")
  lazy val pathLcoal = new File("/Users/cjuexuan/data/testfs/a")
  lazy val resources = Seq("/Users/cjuexuan/conf/hadoop/hadoop/core-site.xml",
    "/Users/cjuexuan/conf/hadoop/hadoop/hdfs-site.xml")

  implicit def getHadoopConf(resources: Seq[String]): Configuration = {
    resources.foldLeft(new Configuration()) {
      case (conf, path) ? conf.addResource(new Path(path))
        conf
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

pathSpec:


import java.io.File

import org.apache.hadoop.fs.Path
import org.scalatest.{FlatSpec, Matchers}

/**
  * Created by todd.chen on 16/3/15.
  * email : todd.chen@ximalaya.com
  */
class PathSpec extends FlatSpec with Matchers{

  val path = new Path("/tmp/todd")
  val file = new File("/Users/cjuexuan/data/testfs")
  "File in hdfs and not in local" should  "delete" in {
    sync.needDelete(file,path,resources).length should be (1)
    sync.needDelete(file,path,resources).head should be ("user_info")
  }

  "File in local and not in hdfs" should "update" in {
    sync.needUpload(file,path,resources).length should be (1)
    sync.needUpload(file,path,resources).head should be ("b")
  }

  "File in local and hdfs have diff md5" should "update" in {
    sync.needUpdate(file,path,resources).length should be (1)
    sync.needUpdate(file,path,resources).head should be ("c")
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

md5Spec:

import org.scalatest.{FlatSpec, Matchers}

import scala.com.ximalaya.data.sync._


/**
  * Created by todd.chen on 16/3/14.
  * email : todd.chen@ximalaya.com
  */
class Md5Spec  extends FlatSpec with Matchers{

  "file md5Spec" should "get md5 with String type" in {
    assert(sync.getLocalFileMd5(fileMd5Spec).isInstanceOf[String])
  }

  "file md5Spec's md5 " should "format to int type with hex" in{
    assert(BigInt(sync.getLocalFileMd5(fileMd5Spec),16).isInstanceOf[BigInt])
  }

 "hdfs path" should "get md5 with String type " in {
   assert(sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources)).isInstanceOf[String])
 }

  "Same file in hdfs and lcoalSystem" should "have same md5" in{
    val localMd5 = sync.getLocalFileMd5(pathLcoal)
    val hdfsMd5 = sync.getHdfsFileMd5(pathHDFS,getHadoopConf(resources))
    localMd5  should be (hdfsMd5)
  }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

syncSpec:

import java.io.File

import org.apache.hadoop.fs.Path
import org.scalatest.{FlatSpec, Matchers}

/**
  * Created by todd.chen on 16/3/15.
  * email : todd.chen@ximalaya.com
  */
class SyncerSpec extends FlatSpec with Matchers{

//  val hdfsSystem = FileSystem.get(resources)
  val helper = PathSyncer
  "file in local and in hdfs is same file" should "do nothing" in {
//    val hdfsSystem = FileSystem.get(resources)
    val local = new File("/Users/cjuexuan/data/testfs/a")
    val hdfs = new Path("/tmp/todd/a")
    val localMd5 = helper.getLocalFileMd5(local)
    val oldHDFSMd5 =  helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (oldHDFSMd5)
    helper.syncHelper(local,hdfs,resources)
    val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (newHDFSMd5)
  }

  "file in local and in hdfs with same name and  diff file" should "be update" in {
//    val hdfsSystem = FileSystem.get(resources)
    val local = new File("/Users/cjuexuan/data/testfs/b")
    val hdfs = new Path("/tmp/todd/b")
    val localMd5 = helper.getLocalFileMd5(local)
    val oldHDFSMd5 =  helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should  not be oldHDFSMd5
    helper.syncHelper(local,hdfs,resources)
    val newHDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (newHDFSMd5)
  }


  "in local is file and same path in hdfs is dir" should " be remove hdfs dir and upload local file" in {
    val local = new File("/Users/cjuexuan/data/testfs/b")
    val hdfs = new Path("/tmp/todd/b")
    val localMd5 = helper.getLocalFileMd5(local)
    helper.syncHelper(local,hdfs,resources)
    val HDFSMd5 = helper.getHdfsFileMd5(hdfs,resources)
    localMd5 should be (HDFSMd5)

  }


  "in local is dir and in hdfs is file" should "be remove hdfs file and upload local dir" in {
    val local = new File("/Users/cjuexuan/data/testfs")
    val hdfs = new Path("/tmp/todd/testfs")
    helper.syncHelper(local,hdfs,resources)
  }


  "both in local and hdfs is dir" should "sync to same" in {
    val local = new File("/Users/cjuexuan/data/testfs")
    val hdfs = new Path("/tmp/todd/testfs")
    helper.syncHelper(local,hdfs,resources)
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64

my github

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约