HDFS 命令行与客户端操作

HDFS 命令行操作

基本命令:

1
>>> bin/hadoop fs

参数大全:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Usage: hadoop fs [generic options]
[-appendToFile <localsrc> ... <dst>]
[-cat [-ignoreCrc] <src> ...]
[-checksum <src> ...]
[-chgrp [-R] GROUP PATH...]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-count [-q] [-h] <path> ...]
[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
[-createSnapshot <snapshotDir> [<snapshotName>]]
[-deleteSnapshot <snapshotDir> <snapshotName>]
[-df [-h] [<path> ...]]
[-du [-s] [-h] <path> ...]
[-expunge]
[-find <path> ... <expression> ...]
[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
[-getfacl [-R] <path>]
[-getfattr [-R] {-n name | -d} [-e en] <path>]
[-getmerge [-nl] <src> <localdst>]
[-help [cmd ...]]
[-ls [-d] [-h] [-R] [<path> ...]]
[-mkdir [-p] <path> ...]
[-moveFromLocal <localsrc> ... <dst>]
[-moveToLocal <src> <localdst>]
[-mv <src> ... <dst>]
[-put [-f] [-p] [-l] <localsrc> ... <dst>]
[-renameSnapshot <snapshotDir> <oldName> <newName>]
[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
[-setfattr {-n name [-v value] | -x name} <path>]
[-setrep [-R] [-w] <rep> <path> ...]
[-stat [format] <path> ...]
[-tail [-f] <file>]
[-test -[defsz] <path>]
[-text [-ignoreCrc] <src> ...]
[-touchz <path> ...]
[-truncate [-w] <length> <path> ...]
[-usage [cmd ...]]

-help:输出这个命令参数

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -help rm

-ls:显示目录信息

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -ls /

-mkdir:在 hdfs 上创建目录

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -mkdir -p /user/luoteng/

-moveFromLocal:从本地剪切粘贴到 hdfs

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -moveFromLocal ./file.txt /user/luoteng

-appendToFile:追加一个文件到已经存在的文件末尾

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -appendToFile newfile.txt /user/luoteng/file.txt

-cat:显示文件内容

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -cat file.txt

-tail:显示一个文件的末尾

1
[user@hadoop102 hadoop-2.7.2]$ hadoop fs -tail /user/luoteng/file.txt

-chgrp -chmod -chown:修改文件所属权限

1
2
[user@hadoop101 hadoop-2.7.2]$ adoop fs -chmod 666 /user/luoteng/file.txt
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -chown luoteng:luoteng /user/luoteng/file.txt

-copyFromLocal:从本地文件系统中拷贝文件到 hdfs 路径去

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -copyFromLocal README.txt /user/luoteng

-copyToLocal:从 hdfs 拷贝到本地

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -copyToLocal /user/luoteng/file.txt ./file.txt

-cp:从 hdfs 的一个路径拷贝到 hdfs 的另一个路径

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -cp /user/luoteng/file.txt /file.txt

-mv:在 hdfs 目录中移动文件

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -mv /file.txt /user/luoteng/

-get:等同于 copyToLocal,就是从 hdfs 下载文件到本地

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -get /user/luoteng/file.txt ./

-getmerge:合并下载多个文件

比如 hdfs 的目录 user/luoteng/ 下有多个文件

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -getmerge /user/luoteng/* ./mergefile.txt

-put:等同于 copyFromLocal

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -put ./mergefile.txt /user/lutoeng/

-rm:删除文件或文件夹

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -rm /user/lutoeng/file.txt

-rmdir:删除空目录

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -rmdir /test

-df:统计文件系统的可用空间信息

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -df -h /

-du:统计文件夹的大小信息

1
2
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -du -s -h /user/luoteng
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -du -h /user/luoteng

-setrep:设置 hdfs 中文件的副本数量

1
[user@hadoop101 hadoop-2.7.2]$ hadoop fs -setrep 10 /user/luoteng/file.txt

这里设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得看 DataNode 的数量。因为目前只有 3 台设备,最多也就 3 个副本,只有节点数的增加到10 台时,副本数才能达到 10。

HDFS 客户端操作

HDFS 客户端环境准备

  1. 根据自己电脑的操作系统安装 jdk (一般系统自带,没有去官网下一个)
  2. 根据自己电脑的操作系统拷贝对应的编译后的 hadoop jar 包到非中文路径
  3. 配置 HADOOP_HOME 环境变量,windowslinux 操作各有不同。
  4. 下载一个 java 编译器,我用的是 eclipse
  5. 创建一个 Maven 工程,命名为 HDFS-Client
  6. 导入相应的依赖坐标
    pom.xml 文件中添加以下内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <dependencies>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.8.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.2</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.2</version>
    </dependency>
    </dependencies>

    点击保存后会自动下载相应的东西,这里需要等很久,就慢慢等让他下载吧!
    保存完毕后可能会出现文件头标错,鼠标点击项目目录右键 -> Maven -> Update Project -> Force Update of Snapshots/Releases 打钩 -> OK

  7. 导入日志添加
    src/main/resources 模流下新建 File 文件,命名为 log4j.properties,在其中写入如下内容:

    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

    这是解决 Eclipse/Idea 打印不出日志,在控制台上只显示 log4j:WARN 的问题。

  8. src/main/java 目录下创建包名:com.luoteng.hdfs
  9. com.luoteng.hdfs 下创建类 HDFSClient ,写入如下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class HDFSClient {

    public static void main(String[] args) throws IOException, Exception, URISyntaxException {

    Configuration conf = new Configuration();

    // 1 获取hdfs客户端对象(方法一)
    // conf.set("fs.defaultFS", "hdfs://hadoop101:9000");
    // FileSystem fs = FileSystem.get(conf);

    // 1. 获取hdfs客户端对象(方法二)
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

    // 2 在hdfs上创建路径
    fs.mkdirs(new Path("/Client"));

    // 3 关闭资源
    fs.close();

    System.out.println("Over");
    }
    }

    获取hdfs客户端对象(方法一) 该方法运行时需要配置一下 VM argument,鼠标右键 -> Run Aa -> Run Configurations -> HDFSClient -> Arguments, 在 VM argument 中写入 DHADOOP_USER_NAME=luoteng

  10. 执行代码,web 中查看 HDFS 文件系统以上代码的操作是否成功,如果成功创建路径 /Client ,测试代码成功,客户端环境准备完成。

文件上传

1
2
3
4
5
6
7
8
9
10
11
12
13
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {

// 1 获取fs对象
Configuration conf = new Configuration();
conf.set("dfs.replication", "1"); // 设置副本数量
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行上传API
fs.copyFromLocalFile(new Path("/root/Client/luoteng.txt"), new Path("/Client/luoteng.txt"));

// 3 关闭资源
fs.close();
}

文件下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行下载API(方法一)
// fs.copyToLocalFile(new Path("/Client/luoteng.txt"), new Path("/root/Client"));
// 2 执行下载API(方法二)
fs.copyToLocalFile(false, new Path("/Client/luoteng.txt"), new Path("/root/Client"), true);//第一个参数表示是否删除源文件

// 3 关闭资源
fs.close();
}

文件夹删除

1
2
3
4
5
6
7
8
9
10
11
12
public void testDelete() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 文件删除
fs.delete(new Path("/Client"), true); // 第二个参数表明是否递归删除(适用于文件夹)

// 3 关闭资源
fs.close();
}

文件名更改

1
2
3
4
5
6
7
8
9
10
11
12
public void testRename() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行更名操作
fs.rename(new Path("/Clients"), new Path("/Client"));

// 3 关闭资源
fs.close();
}

文件详情查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void testListFiles() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 查看文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); // 第二个参数表明是否递归

while(listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
// 查看文件名称,权限,长度,块信息
System.out.println(fileStatus.getPath().getName()); // 文件名称
System.out.println(fileStatus.getPermission()); // 文件权限
System.out.println(fileStatus.getLen()); // 文件长度

BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation: blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host: hosts) {
System.out.println(host);
}
}
System.out.println("----------分割线----------");
}

// 3 关闭资源
fs.close();
}

文件和文件夹判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void testListStatus() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 判断操作
FileStatus[] listStatus = fs.listStatus(new Path("/"));

for (FileStatus fileStatus: listStatus) {
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}

// 3 关闭资源
fs.close();
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package com.luoteng.hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Test;

public class HDFSClient {

public static void main(String[] args) throws IOException, Exception, URISyntaxException {

Configuration conf = new Configuration();

// 1 获取hdfs客户端对象(方法一)
// conf.set("fs.defaultFS", "hdfs://hadoop101:9000");
// FileSystem fs = FileSystem.get(conf);

// 1 获取hdfs客户端对象(方法二)
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 在hdfs上创建路径
fs.mkdirs(new Path("/Client"));

// 3 关闭资源
fs.close();

System.out.println("Over");
}

// 1 文件上传
@Test
public void testCopyFromLocalFile() throws IOException, InterruptedException, URISyntaxException {

// 1 获取fs对象
Configuration conf = new Configuration();
conf.set("dfs.replication", "1"); // 设置副本数量
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行上传API
fs.copyFromLocalFile(new Path("/root/Client/luoteng.txt"), new Path("/Client/luoteng.txt"));

// 3 关闭资源
fs.close();
}

// 2 文件下载
@Test
public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行下载API
// fs.copyToLocalFile(new Path("/Client/luoteng.txt"), new Path("/root/Client"));
fs.copyToLocalFile(false, new Path("/Client/luoteng.txt"), new Path("/root/Client"), true);//第一个参数表示是否删除源文件

// 3 关闭资源
fs.close();
}

// 3 文件删除
@Test
public void testDelete() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 文件删除
fs.delete(new Path("/Client"), true); // 第二个参数表明是否递归删除(适用于文件夹)

// 3 关闭资源
fs.close();
}

// 4 文件更名
@Test
public void testRename() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 执行更名操作
fs.rename(new Path("/Clients"), new Path("/Client"));

// 3 关闭资源
fs.close();
}

// 5 文件详情查看
@Test
public void testListFiles() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 查看文件详情
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); // 第二个参数表明是否递归

while(listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
// 查看文件名称,权限,长度,块信息
System.out.println(fileStatus.getPath().getName()); // 文件名称
System.out.println(fileStatus.getPermission()); // 文件权限
System.out.println(fileStatus.getLen()); // 文件长度

BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation: blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host: hosts) {
System.out.println(host);
}
}
System.out.println("----------分割线----------");
}

// 3 关闭资源
fs.close();
}

// 6 判断是文件还是文件夹
@Test
public void testListStatus() throws IOException, InterruptedException, URISyntaxException {

// 1 获取对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 判断操作
FileStatus[] listStatus = fs.listStatus(new Path("/"));

for (FileStatus fileStatus: listStatus) {
if (fileStatus.isFile()) {
System.out.println("f:"+fileStatus.getPath().getName());
}else {
System.out.println("d:"+fileStatus.getPath().getName());
}
}

// 3 关闭资源
fs.close();
}
}

HDFS的 I/O 流操作

上面我们学的 API 操作 HDFS 系统都是框架封装好的。那么如果我们想自己实现上述 API 的操作该怎么实现呢?

我们可以采用 IO 流的方式实现数据的上传和下载。

文件上传

需求:把本地 /root/Client 盘上的 luoteng.txt 文件上传到 HDFS 根目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

// 1 获取文件系统
Configuration conf = new Configuration();
conf.set("dfs.replication", "2"); // 设置副本数量
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FileInputStream fis = new FileInputStream(new File("/root/Client/luoteng.txt"));

// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("/luoteng.txt"));

// 4 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

文件下载

需求:从 HDFS 上下载 luoteng.txt 文件到本地 /root/Client 盘上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/luoteng.txt"));

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/luoteng.txt"));

// 4 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

定位文件读取

需求:分块读取 HDFS 上的大文件,比如根目录下的 /hadoop-2.7.2.tar.gz

读取第一块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

// 3 创建输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/hadoop-2.7.2.tar.gz.part1"));

// 4 流的拷贝
byte[] buf = new byte[1024];
for(int i =0 ; i < 1024 * 128; i++){
fis.read(buf);
fos.write(buf);
}

// 5关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

读取第二块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 打开输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

// 3 定位输入数据位置
fis.seek(1024*1024*128);

// 4 创建输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/hadoop-2.7.2.tar.gz.part2"));

// 5 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 6 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package com.luoteng.hdfs;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSIO {

// 把本地 /root/Client 盘上的 luoteng.txt文件上传到 HDFS 根目录
// @Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

// 1 获取文件系统
Configuration conf = new Configuration();
conf.set("dfs.replication", "2"); // 设置副本数量
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FileInputStream fis = new FileInputStream(new File("/root/Client/luoteng.txt"));

// 3 获取输出流
FSDataOutputStream fos = fs.create(new Path("/luoteng.txt"));

// 4 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

// 从 HDFS 上下载 luoteng.txt 文件到本地 /root/Client 盘上
// @Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException {

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/luoteng.txt"));

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/luoteng.txt"));

// 4 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

// 分块读取 HDFS 上的大文件,比如根目录下的 /hadoop-2.7.2.tar.gz
// @Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 获取输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

// 3 创建输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/hadoop-2.7.2.tar.gz.part1"));

// 4 流的拷贝
byte[] buf = new byte[1024];
for(int i =0 ; i < 1024 * 128; i++){
fis.read(buf);
fos.write(buf);
}

// 5关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}

// 下载第二块
@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

// 1 获取文件系统
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI("hdfs://hadoop101:9000"), conf, "luoteng");

// 2 打开输入流
FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

// 3 定位输入数据位置
fis.seek(1024*1024*128);

// 4 创建输出流
FileOutputStream fos = new FileOutputStream(new File("/root/Client/hadoop-2.7.2.tar.gz.part2"));

// 5 流的对拷
IOUtils.copyBytes(fis, fos, conf);

// 6 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
fs.close();
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!