共 122 篇文章 |
|
/** * 99讲:论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信息,进而进行多维度的在线分析 * 这里产生数据,就会发送给kafka,kafka那边启动消费者,就会接收到数据,这一步是用来测试生成数据和消费数据没有问题的,确定没问题后要关闭消... 阅24 转0 评0 公众公开 19-02-25 15:19 |
1:kafka直接也可以监控一个文件夹,但是为什么我们采用flume的方式,通过flume把文件传给kafka,而不直接通过kafka去监控一个文件夹呢?3)用户生成的日志或者不同服务器生成的日志一般是比较零散的,企业中一般都会有Crontab等定时工具来通过日志整理工具来把当天的日志采集、合并和初步的处理成为一份日志文件,当然可以多份,分布式建议1份... 阅16 转0 评0 公众公开 19-02-25 15:19 |
第102讲: 动手实战Spark Streaming自定义Receiver并进行调试和测试。 阅184 转0 评0 公众公开 19-02-25 15:18 |
第103讲: 动手实战联合使用Spark Streaming、Broadcast、Accumulator实现在线黑名单过滤和计数。黑名单数据可以写在广播里面。代表总共过滤了2次黑名单 System.out.println(" BlackList appeared : "+ accumulator.value() + " times"); return null; } }); /* * Spark Streaming执行引擎也就是Driver开始运行,Driver启... 阅173 转0 评0 公众公开 19-02-25 15:18 |
/* * 第三步:创建Spark Streaming输入数据来源input Stream: * 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 * 2, 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口 * 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Strea... 阅82 转0 评0 公众公开 19-02-25 15:16 |
* 插入的用户信息可以只包含:useID、adID、clickedCount、time * 这里面有一个问题:可能出现两条记录的Key是一样的,此时就需要更新累加操作 * Batch里面这个放进去的就是这个用户对这个广告点击的次数,如果发现数据库中有这个数据,,就累加,每10秒更新一次 */ } }); return null; } }); //上面更新数据库,这是过滤历史上累计下来的所有... 阅97 转0 评0 公众公开 19-02-25 15:16 |
/* * 第三步:创建Spark Streaming输入数据来源input Stream: * 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 * 2, 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口 * 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Strea... 阅22 转0 评0 公众公开 19-02-25 15:16 |
/* * 第三步:创建Spark Streaming输入数据来源input Stream: * 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等 * 2, 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口 * 的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Strea... 阅143 转0 评0 公众公开 19-02-25 15:15 |
*/ JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { /** * 在... 阅31 转0 评0 公众公开 19-02-25 15:15 |
*/ JavaPairDStream<String, String> filteredadClickedStreaming = adClickedStreaming.transformToPair(new Function<JavaPairRDD<String,String>, JavaPairRDD<String,String>>() { @Override public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception { /** * 在... 阅32 转0 评0 公众公开 19-02-25 15:14 |