分享

SQOOP、FLUME

 BIGDATA云 2018-07-13
Hadoop
海量数据的分布式的存储
海量数据的分布式的计算
计算是(MapReduce)的完成是基于yarn(资源调度框架)来完成
Hive
是一个数据仓库的基础框架和一个SQL的解析引擎,使用SQL的方式统计/分析存储在HDFS上面的结构化数据,可以将传统
数据分析业务迁移到Hive中来执行。
HBase
是一款能够支撑随机高速读写的NoSQL,设计目标是用来操作十亿级别的行和百万级别的列的大表BigTable
-------------------------------------------------------------------------------------------------------------
数据的搬运工——SQOOP(sqoop.apache.org)
----SQOOP简介
SQOOP就是传统关系型数据库(RDBMS)和HDFS/Hive/HBase之间数据转换的一个搬运工。
Apache Sqoop是一种专门为hadoop和比如关系型数据库等结构化数据库之间的高效数据转换一种工具。
   Sqoop在大多数自动化数据转换的过程中,依托于数据库相关的Schema描述信息,转换的过程是使用MapReduce来进行的。
   Sqoop目前有两个版本,完全不兼容,Sqoop和Sqoop2.可以通过版本号来进行简单的区分,1.4.x为sqoop或sqoop1,1.99.x为sqoop2。
主要学习SQOOP
import:rdbms---->hdfs
从传统数据库获取元数据信息(schema、table、field、field type),把导入功能转换为只有Map的Mapreduce作业,
在mapreduce中有很多map,每个map读一片数据,进而并行的完成数据的拷贝。
涉及到的就是InputFormat的子类DBInputFormat
export:hdfs---->rdbms
  获取导出表的schema、meta信息,和Hadoop中的字段match;多个map only作业同时运行,完成hdfs中数据导出到关系型数据库中。
涉及到的就是OutputFormat的子类DBOutputFormat
这二者在执行的过程中都只生产了只有map阶段的mr(map only),默认会生成4个maptask任务,因为不要对数据进行聚合操作。
----SQOOP安装和配置
约定:安装目录为/opt/
下载地址:/apache/sqoop/1.4.6/
下载之后进行解压:
  tar -zxvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/
重命名:
  mv sqoop-1.4.6.bin__hadoop-2.0.4-alpha sqoop
配置SQOOP_HOME到环境变量中
vim /etc/profile.d/hadoop-etc.sh
 export SQOOP_HOME=/opt/sqoop
 export PATH=$PATH:$SQOOP_HOME/bin
配置$SQOOP_HOME/conf/sqoop-env.sh
  export HADOOP_COMMON_HOME=/opt/hadoop
  export HBASE_HOME=/opt/hbase
  export HIVE_HOME=/opt/hive
  export ZOOCFGDIR=/opt/zookeeper/conf
注意:因为SQOOP要连接rdbms,所以需要将该数据的驱动jar包添加进sqoop的classpath中,所以将mysql的jar包上传至$SQOOP_HONE/lib,
mysql需要mysql-connector-java-5.1.32.jar以上版本支持。
jdk1.7以上
----SQOOP安装验证
bin/sqoop list-databases --connect jdbc:mysql://172.16.180.246:3306/mydb1 --username root --password root
information_schema
exam
mydb1
mydb2
mydb3
mydb4
mydb5
mysql
performance_schema
test
bin/sqoop list-tables --connect jdbc:mysql://172.16.180.246:3306/mydb1 --username root --password root
employee
student
user
----SQOOP的操作
--------import
将mysql中的数据导入到hdfs默认的路径下面去:
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student
在执行导入的过程中,一定要确定表中有pk
将mysql中的数据导入到hdfs的指定的路径下去(--target-dir <hdfs_path>):
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir /input/sqoop/student
将mysql中的追加进hdfs的指定的路径下去(--append):
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir /input/sqoop/student --append
指定执行import和export的map的task数目(-m <NUM>)
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir /input/sqoop/student --append -m 1
删除执行目录重新上传数据(覆盖源目录中的数据--delete-target-dir )
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir /input/sqoop/student --delete-target-dir -m 1
执行特定的追加操作(--check-column 'id' --incremental append --last-value 2):
--check-column <column> 执行增长追加操作作用的列
--incremental append    执行增长追加操作的模式/append/lastmodified
--last-value 2 跳过几行数据,追加其余所有的数据
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir hdfs://ns1/input/sqoop/student/ --append --check-column 'id' --incremental append --last-value 2 -m 1 
条件查询(--where "条件语句")
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --target-dir hdfs://ns1/input/sqoop/student/ --append --where "name like 'wang%'" -m 1 
SQL导入(--query "select语句",如果有where条件需要将where查询条件和and \$CONDITIONS连接起来)
bin/sqoop import --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --query "select id, name, chinese, english, math from student where chinese>80 and \$CONDITIONS" --target-dir hdfs://ns1/input/sqoop/student/ -m 1 --append --fields-terminated-by "," --split-by ","
Hive操作
--------export
将hdfs上面特定的目录下的数据导出到mysql中
sqoop export --connect jdbc:mysql://172.16.180.246:3306/test --username root --password root --table student --export-dir /user/root/student
解决导入导出过程中的中文乱码问题(在jdbc_url连接地址后面加上useUnicode=true&characterEncoding=utf-8)
sqoop export --connect "jdbc:mysql://172.16.180.246:3306/test?useUnicode=true&characterEncoding=utf-8" --username root --password root --table student --export-dir /user/root/student
插入更新(如果存在,更新之,不存在,插入之 --update-key id --update-mode allowinsert)
sqoop export --connect "jdbc:mysql://172.16.180.246:3306/test?useUnicode=true&characterEncoding=utf-8" --username root --password root --table student --export-dir /user/root/student --update-key id --update-mode allowinsert
=================================================================================================================
数据的采集工——FLUME()
埋点:就是企业为网站的建设而在某些特殊的位置被用户点击之后,收集用户的行为,那么把这些特殊的位置称之为埋点。
专业的分布式的日志采集工具——Flume
Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力。
名词介绍:
Flume OG:Flume original generation,即Flume0.9x版本
Flume NG:Flume next generation,即Flume1.x版本
官网:
Flume的数据流结构
A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).
A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol.When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.
Flume的重要组件:
Source:接入外部的数据源,根据指定的数据源特征,将采集到的数据转化为event,打入到channel中做搭建的存储
Channel:就是被动的接受/存储Source传递过来的数据,直到Sink将数据取走,那么channel扮演的角色就是临时的存储
Sink:将Channel中的数据移除到下游系统,目的地(可以是文件系统,当然也可以是下一个Flume的Source)
Flume-ng的两种操作方式:
avro-client:一次性将采集到数据打入到avro的服务端
agent:持续不断的采集数据
----------------------------------------------------------------------------------------
Flume的操作案例:
监听网络中的数据,将监听到底的数据输出值控制台

# flume-nc.conf: 用于监听网络数据的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是netcat网络
a1.sources.r1.type = netcat
# source监听的网络ip地址和端口号
a1.sources.r1.bind = uplooking01
a1.sources.r1.port = 44444

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行方式:
一:telnet
需要在centOS上面安装telnet,
启动flumn-agent
启动telnet:
telnet uplooking01 44444
二:netcat
安装发给大家的nc.xx.rpm
rpm -ivh nc.xx.rpm-path
启动flumn-agent
启动nc进程
nc uplooking01 44444
----------------------------------------------------------------------------------------
# flume-nc.conf: 用于监听文件中的新增数据的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是linux命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/logs/flume/nb-http.2017-07-24.log

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行:flume]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-nc.conf -Dflume.root.logger=INFO,console
----------------------------------------------------------------------------------------
# flume-exec.conf: 用于监听文件中的新增数据的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,类型是linux命令
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/logs/flume/nb-http.2017-07-24.log

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行:flume]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-exec.conf -Dflume.root.logger=INFO,console
----------------------------------------------------------------------------------------
# flume-dir.conf: 用于监听目录中的新增文件的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,监听的是一个目录
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs/flume/
a1.sources.r1.fileSuffix = .OK
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true

# 用于描述sink,类型是日志格式
a1.sinks.k1.type = logger

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行:flume]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-dir.conf -Dflume.root.logger=INFO,console
----------------------------------------------------------------------------------------
# flume-hdfs.conf: 用于监听目录中的新增文件传送到hdfs的flume agent实例的配置文件
############################################
# 对各个组件的描述说明
# 其中a1为agent的名字
# r1是a1的source的代号名字
# c1是a1的channel的代号名字
# k1是a1的sink的代号名字
############################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 用于描述source的,监听的是一个目录
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/logs/flume/
a1.sources.r1.fileSuffix = .OK
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileHeader = true

# 用于描述sink,类型是hdfs
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://ns1/input/flume/%Y/%m/
a1.sinks.k1.hdfs.filePrefix = uplooking-
a1.sinks.k1.hdfs.fileSuffix = .com
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.useLocalTimeStamp = true

# 用于描述channel,在内存中做数据的临时的存储
a1.channels.c1.type = memory
# 该内存中最大的存储容量,1000个events事件
a1.channels.c1.capacity = 1000
# 能够同时对100个events事件监管事务
a1.channels.c1.transactionCapacity = 100

# 将a1中的各个组件建立关联关系,将source和sink都指向了同一个channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行:flume]# bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-hdfs.conf

flume]# nohup bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-hdfs.conf > nohup.log 2>&1 &


    转藏 全屏 打印 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多