博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
scala中hdfs文件的操作
阅读量:6360 次
发布时间:2019-06-23

本文共 6799 字,大约阅读时间需要 22 分钟。

hot3.png

对于org.apache.hadoop.fs.Path来说, 

    path.getName只是文件名,不包括路径 

    path.getParent也只是父文件的文件名,同样不包括路径 

    path.toString才是文件的全路径名

创建文件

hdfs.createNewFile(new Path(fileName))

#hdfs文件系统在创建filename对应的文件时,如果相关的文件夹不存在,会自动创建相关的文件夹
new File(fileName).createNewFile
#本地文件系统在创建filename对应的文件时,如果相关的文件夹不存在,程序会终止,报错:“java.io.IOException: 没有那个文件或目录”

以下是hdfs文件操作的工具类

package util

 
import java.io.{FileSystem => _, _}
 
import org.apache.hadoop.fs._
 
import scala.collection.mutable.ListBuffer
 
/**
  * Created by zls on 16-11-24.
  */
object HDFSHelper {
 
  def isDir(hdfs : FileSystem, name : String) : Boolean = {
    hdfs.isDirectory(new Path(name))
  }
  def isDir(hdfs : FileSystem, name : Path) : Boolean = {
    hdfs.isDirectory(name)
  }
  def isFile(hdfs : FileSystem, name : String) : Boolean = {
    hdfs.isFile(new Path(name))
  }
  def isFile(hdfs : FileSystem, name : Path) : Boolean = {
    hdfs.isFile(name)
  }
  def createFile(hdfs : FileSystem, name : String) : Boolean = {
    hdfs.createNewFile(new Path(name))
  }
  def createFile(hdfs : FileSystem, name : Path) : Boolean = {
    hdfs.createNewFile(name)
  }
  def createFolder(hdfs : FileSystem, name : String) : Boolean = {
    hdfs.mkdirs(new Path(name))
  }
  def createFolder(hdfs : FileSystem, name : Path) : Boolean = {
    hdfs.mkdirs(name)
  }
  def exists(hdfs : FileSystem, name : String) : Boolean = {
    hdfs.exists(new Path(name))
  }
  def exists(hdfs : FileSystem, name : Path) : Boolean = {
    hdfs.exists(name)
  }
  def transport(inputStream : InputStream, outputStream : OutputStream): Unit ={
    val buffer = new Array[Byte](64 * 1000)
    var len = inputStream.read(buffer)
    while (len != -1) {
      outputStream.write(buffer, 0, len - 1)
      len = inputStream.read(buffer)
    }
    outputStream.flush()
    inputStream.close()
    outputStream.close()
  }
  class MyPathFilter extends PathFilter {
    override def accept(path: Path): Boolean = true
  }
 
  /**
    * create a target file and provide parent folder if necessary
    */
  def createLocalFile(fullName : String) : File = {
    val target : File = new File(fullName)
    if(!target.exists){
      val index = fullName.lastIndexOf(File.separator)
      val parentFullName = fullName.substring(0, index)
      val parent : File = new File(parentFullName)
 
      if(!parent.exists)
        parent.mkdirs
      else if(!parent.isDirectory)
        parent.mkdir
 
      target.createNewFile
    }
    target
  }
 
  /**
    * delete file in hdfs
    * true: success, false: failed
    */
  def deleteFile(hdfs : FileSystem, path: String) : Boolean = {
    if (isDir(hdfs, path))
      hdfs.delete(new Path(path), true)//true: delete files recursively
    else
      hdfs.delete(new Path(path), false)
  }
 
  /**
    * get all file children's full name of a hdfs dir, not include dir children
    * fullName the hdfs dir's full name
    */
  def listChildren(hdfs : FileSystem, fullName : String, holder : ListBuffer[String]) : ListBuffer[String] = {
    val filesStatus = hdfs.listStatus(new Path(fullName), new MyPathFilter)
    for(status <- filesStatus){
      val filePath : Path = status.getPath
      if(isFile(hdfs,filePath))
        holder += filePath.toString
      else
        listChildren(hdfs, filePath.toString, holder)
    }
    holder
  }
 
  def copyFile(hdfs : FileSystem, source: String, target: String): Unit = {
 
    val sourcePath = new Path(source)
    val targetPath = new Path(target)
 
    if(!exists(hdfs, targetPath))
      createFile(hdfs, targetPath)
 
    val inputStream : FSDataInputStream = hdfs.open(sourcePath)
    val outputStream : FSDataOutputStream = hdfs.create(targetPath)
    transport(inputStream, outputStream)
  }
 
  def copyFolder(hdfs : FileSystem, sourceFolder: String, targetFolder: String): Unit = {
    val holder : ListBuffer[String] = new ListBuffer[String]
    val children : List[String] = listChildren(hdfs, sourceFolder, holder).toList
    for(child <- children)
      copyFile(hdfs, child, child.replaceFirst(sourceFolder, targetFolder))
  }
 
  def copyFileFromLocal(hdfs : FileSystem, localSource: String, hdfsTarget: String): Unit = {
    val targetPath = new Path(hdfsTarget)
    if(!exists(hdfs, targetPath))
      createFile(hdfs, targetPath)
 
    val inputStream : FileInputStream = new FileInputStream(localSource)
    val outputStream : FSDataOutputStream = hdfs.create(targetPath)
    transport(inputStream, outputStream)
  }
 
  def copyFileToLocal(hdfs : FileSystem, hdfsSource: String, localTarget: String): Unit = {
    val localFile : File = createLocalFile(localTarget)
 
    val inputStream : FSDataInputStream = hdfs.open(new Path(hdfsSource))
    val outputStream : FileOutputStream = new FileOutputStream(localFile)
    transport(inputStream, outputStream)
  }
 
  def copyFolderFromLocal(hdfs : FileSystem, localSource: String, hdfsTarget: String): Unit = {
    val localFolder : File = new File(localSource)
    val allChildren : Array[File] = localFolder.listFiles
    for(child <- allChildren){
      val fullName = child.getAbsolutePath
      val nameExcludeSource : String = fullName.substring(localSource.length)
      val targetFileFullName : String = hdfsTarget + Path.SEPARATOR + nameExcludeSource
      if(child.isFile)
        copyFileFromLocal(hdfs, fullName, targetFileFullName)
      else
        copyFolderFromLocal(hdfs, fullName, targetFileFullName)
    }
  }
 
  def copyFolderToLocal(hdfs : FileSystem, hdfsSource: String, localTarget: String): Unit = {
    val holder : ListBuffer[String] = new ListBuffer[String]
    val children : List[String] = listChildren(hdfs, hdfsSource, holder).toList
    val hdfsSourceFullName = hdfs.getFileStatus(new Path(hdfsSource)).getPath.toString
    val index = hdfsSourceFullName.length
    for(child <- children){
      val nameExcludeSource : String = child.substring(index + 1)
      val targetFileFullName : String = localTarget + File.separator + nameExcludeSource
      copyFileToLocal(hdfs, child, targetFileFullName)
    }
  }
 
}

以下是工具类的测试类

package util

 
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
 
import scala.collection.mutable.ListBuffer
 
/**
  * Created by zls on 16-11-24.
  */
object HDFSOperator {
 
  def start(args: Array[String]): Unit = {
    val hdfs : FileSystem = FileSystem.get(new Configuration)
    args(0) match {
      case "list" => traverse(hdfs, args(1))
      case "createFile" => HDFSHelper.createFile(hdfs, args(1))
      case "createFolder" => HDFSHelper.createFolder(hdfs, args(1))
      case "copyfile" => HDFSHelper.copyFile(hdfs, args(1), args(2))
      case "copyfolder" => HDFSHelper.copyFolder(hdfs, args(1), args(2))
      case "delete" => HDFSHelper.deleteFile(hdfs, args(1))
      case "copyfilefrom" => HDFSHelper.copyFileFromLocal(hdfs, args(1), args(2))
      case "copyfileto" => HDFSHelper.copyFileToLocal(hdfs, args(1), args(2))
      case "copyfolderfrom" => HDFSHelper.copyFolderFromLocal(hdfs, args(1), args(2))
      case "copyfolderto" => HDFSHelper.copyFolderToLocal(hdfs, args(1), args(2))
    }
  }
 
  def traverse(hdfs : FileSystem, hdfsPath : String) = {
    val holder : ListBuffer[String] = new ListBuffer[String]
    val paths : List[String] = HDFSHelper.listChildren(hdfs, hdfsPath, holder).toList
    for(path <- paths){
      System.out.println("--------- path = " + path)
      System.out.println("--------- Path.getname = " + new Path(path).getName)
    }
  }
 
}

转载于:https://my.oschina.net/hblt147/blog/2885804

你可能感兴趣的文章
SQL注入测试工具:Pangolin(穿山甲)
查看>>
在html 的img属性里只显示图片的部分区域(矩形,给出开始点和结束点),其他部份不显示,也不要拉伸...
查看>>
程序员第二定律:量化管理在程序员身上永无可能
查看>>
ubuntu一些脚本的执行顺序
查看>>
类继承的结构
查看>>
Intel 被 ARM 逼急了
查看>>
testng + reportng 测试结果邮件发送
查看>>
百度亮相iDASH,推动隐私保护在人类基因组分析领域的应用
查看>>
Python「八宗罪」
查看>>
你的隐私还安全吗?社交网络中浏览历史的去匿名化
查看>>
NeurIPS 2018|如何用循环关系网络解决数独类关系推理任务?
查看>>
Windows 10 份额突破 40%,Windows 7 连跌四月终回升
查看>>
怎么把Maven项目转为动态Web项目?
查看>>
Arm发布Cortex-A76AE自动驾驶芯片架构,宣示车载系统市场主权
查看>>
Hibernate入门教程
查看>>
Java支付宝扫码支付[新]
查看>>
SpringMVC 拦截器 筛选
查看>>
第十八章:MVVM(八)
查看>>
点击表头切换升降序排序方式
查看>>
第26天,Django之include本质
查看>>