Hadoop分布式文件系统

在一个经典的数据架构中,Hadoop是处理复杂数据流的核心。数据从各种系统中收集而来,并汇总导入到Hadoop分布式文件系统HDFS中,然后通过MapReduce或者其它基于MapReduce封装的语言如Hive,Pig等进行处理,将处理后的数据导出即可。具体例子而言,如果一个大型网站需要做网站点击率的分析,它将多个服务器采集的页面访问日志汇总,推送至HDFS中,启动MapReduce作业,接下来数据将被解析,汇总以及IP地址进行关联计算,生成的结果可以导入到关系型数据库中。


启动Hadoop

在一台已经安装Hadoop并配置了环境变量的机器中启动hadoop。
- su hadoop#进入hadoop命令模式
- hadoop namenode -format#初始化hadoop:hadoop部署好之后需要进行格式化工作,同时初始化操作日志,因此对于第一次使用HDFS时,需要执行-format命令才可以正常使用namenode节点
- start-all.sh#启动hadoop
- jps#使用jps检查是否启动进程


使用Hadoop shell命令导入导出数据到HDFS

  • HDFS提供shell命令实现访问文件系统的功能,shell脚本名称为hadoop,通常安装在$HADOOP_BIN目录下,将$HADOOP_BIN配置到$PATH环境变量中,这样所有命令都可以通过hadoop fs -command执行,通过hadoop fs -help command获得某个命令的具体说明。
  • hadoop fs -mkdir -p /data/weblogs#在HDFS创建名为weblogs的新文件夹,-p表示级联创建(在创建目录weblogs时,若data不存在,顺带创建data目录)
  • hadoop fs -copyFromLocal <localsrc> URI将文件从本地文件系统复制到HDFS目标文件夹下。e.g.:hadoop fs -copyFromLocal weblogs.txt /data/weblogs
  • hadoop fs -ls <args>args为文件,列出文件状态;args为目录列出目录下的文件
  • 工作原理:Hadoop shell轻量地封装在HDFS FileSystem API之上。在执行hadoop命令时,如果传进去的参数是fs,实际执行的是org.apache.hadoop.fs.FsShell这个类,FsShell实例化了一个org.apache.hadoop.fs.FileSystem对象,并且将命令行参数与类方法映射起来。例如,执行hadoop fs -mkdir /data/weblogs相当与调用FileSystem.mkdirs(new Path("/data/weblogs"))
  • 详细的命令使用参见:hadoop fs

Pig脚本使用getmerge命令

  • 使用上面的hadoop fs命令的getcopyToLocal只能对文件进行复制,无法对文件夹进行复制,当然可以使用其getmerge合并多个文件并下载到本地文件系统中
  • 使用Pig脚本执行getmerge。建立weblogsmd5group_pig.sh脚本:

weblogs = load '/data/weblogs/weblog_entries.txt' as#逐行读取HDFS上weblog_entries.txt文件 (md5:chararray, url:chararray, date:chararray, time:chararray, ip:chararray); md5_grp = group weblogs by md5 parallel 4;#按照md5值进行分组
store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';#parallel是Pig脚本用来设置reduce个数的方法,由于启动4个reduce任务,所以会在输出目录中生成4个文件


distcp实现集群间数据复制

  • Hadoop分布式复制distcp是Hadoop集群间复制大量数据的高效工作,distcp是通过启动MapReduce实现数据复制的。
  • hadoop distcp hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs#将集群A的weblogs文件夹复制到集群B上
  • hadoop distcp -overwrite hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs#将集群A的weblogs文件夹复制到集群B并覆盖已存在文件
  • hadoop distcp -update hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs#同步集群A和集群B之间的weblogs文件夹
  • 实现原理:在原集群,文件夹的内容被复制为一个临时的大文件,将会启动一个只有map的MapReduce作业实现两个集群之间的数据复制。

使用Sqooq从Mysql数据库导入到HDFS

  • Sqooq和distcp相似,都是构建在MapReduce之上,利用了MapReduce的并行性和容错性,与集群间复制不同,Sqooq设计通过JDBC驱动连接实现Hadoop集群与关系数据库之间的数据复制。
  • 在mysql中创建logs数据库和表weblogs: CREATE DATABASE logs; use logs;
    create table weblogs (
    md5 VARCHAR(32), url VARCHAR(64), request_date DATE, request_time TIME, ip VARCHAR(15) ); show tables;
  • 使用如下命令将logs数据库的表数据导入到HDFS中: ./sqoop import --connect jdbc:mysql://localhost:3306/logs --username root --password 123456 --table weblogs --target-dir /data/weblogs/import
  • 工作原理:Sqooq连接数据库的JDBC驱动在--connect语句中定义,并从$SQOOP_HOME/lib目录中加载相应的包,其中$SQOOP_HOME为Sqooq安装的绝对路径。--username--password用于验证mysql实例的权限,--target-dir选项指定导出数据库的存放位置,-m 1指定选定map的数量。注意:mysql.user表必须包含Hadoop集群每个节点的主机域名和相应的用户名,否则Sqooq会抛出异常。

使用Sqooq从HDFS导出到Mysql

  • 创建表: use logs; create table weblogs_from_hdfs (
    md5 VARCHAR(32), url VARCHAR(64), request_date DATE, request_time TIME, ip VARCHAR(15) );
  • 从HDFS导出到weblog_entries.txt文件到Mysql: ./sqoop export --connect jdbc:mysql://localhost:3306/logs --username root --password 123456 --table weblogs_from_hdfs --export-dir '/data/weblogs/weblog_entries.txt' -m 1 --fields-terminated-by '\t'
  • 上面的这个例子使用--table参数决定HDFS导出的数据被储存在哪张Mysql表中,Sqooq通过表的元数据信息,列数量和列类型来校验HDFS需要导出目录中的数据并生成相应的插入语句。导出作业可以被想象为逐行读取HDFS的文件变每行产生一个INSERT INTO的sql语句进行插入。

本博客部分来源于实验楼