分享

使用Apache Flink开始批处理

 码农书馆 2019-05-05

如果您最近一直在关注软件开发新闻,那么您可能听说过名为Apache Flink的新项目。我已经在这里这里写了一些内容,但如果您不熟悉它,Apache Flink是新一代大数据处理工具,可以处理有限的数据集(这也称为批处理)或潜在的无限的数据流(流处理)。在新功能方面,许多人认为Apache Flink是一款游戏规则改变者,甚至可以在未来取代Apache Spark。

在本文中,我将向您介绍如何使用Apache Flink实现简单的批处理算法。我们将从设置开发环境开始,然后我们将看到如何加载数据,处理数据集以及将数据写回外部系统。

 

为什么批处理?

您可能听说过流处理是“现在新的热点”,Apache Flink是一个流处理工具。这可能会提出一个问题,为什么我们需要学习如何实现批处理应用程序。

虽然确实如此,但流处理变得越来越普遍; 许多任务仍然需要批量处理。另外,如果您刚开始使用Apache Flink,我认为最好从批处理开始,因为它更简单,并且在某种程度上类似于使用数据库。一旦您完成了批量处理,您就可以了解Apache Flink真正发挥作用的流处理!

如何遵循示例

如果您想自己实现一些Apache Flink应用程序,首先需要创建一个Flink项目。在本文中,我们将用Java编写应用程序,但您也可以在Scala,Python或R中编写Flink应用程序。

要创建Flink Java项目,请执行以下命令:

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.3.2

输入组ID,工件ID和项目版本后,此命令将创建以下项目结构:

  1. .
  2. ├── pom.xml
  3. └── src
  4. └── main
  5. ├── java
  6. │   └── flinkProject
  7. │   ├── BatchJob.java
  8. │   ├── SocketTextStreamWordCount.java
  9. │   ├── StreamingJob.java
  10. │   └── WordCount.java
  11. └── resources
  12. └── log4j.properties

这里最重要的是pom.xml指定所有必需依赖项的大量内容。自动创建的Java类是您可以查看的一些简单Flink应用程序的示例,但我们并不需要它们用于我们的目的。

要开始开发您的第一个Flink应用程序,请使用如下main方法创建一个类:

  1. public class FilterMovies {

  2. public static void main(String[] args) throws Exception {
  3. // Create Flink execution environment
  4. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

  5. // We will write our code here

  6. // Start Flink application
  7. env.execute();
  8. }
  9. }

这种main方法没什么特别之处。我们所要做的就是添加一些样板代码。

首先,我们需要创建一个Flink执行环境,如果在本地计算机或Flink集群中运行它,它的行为会有所不同:

  • 在本地计算机上,它将创建一个包含多个本地节点的完整Flink集群。这是测试应用程序在实际环境中如何工作的好方法
  • 在Flink集群上,它不会创建任何内容,而是使用现有的集群资源

或者,您可以创建一个这样的集合环境:

 ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

这将创建一个Flink执行环境,而不是在本地群集上运行Flink应用程序将在单个Java进程中使用内存中集合模拟所有操作。您的应用程序将运行得更快,但此环境与具有多个节点的本地群集有一些细微差别。

我们从哪里开始?

在我们可以做任何事情之前,我们需要将数据读入Apache Flink。我们可以从众多系统中读取数据,包括:本地文件系统,S3,HDFS,HBase,Cassandra等。无论我们从何处读取数据集,Apache Flink都允许我们使用DataSet类以统一的方式处理数据:

DataSet<Integer> numbers = ...

数据集中的所有项应具有相同的类型。single generics参数指定存储在数据集中的数据类型。

要从文件中读取数据,我们可以使用readTextFile逐行读取文件中的行并返回类型的数据集的方法String

DataSet<String> lines = env.readTextFile("path/to/file.txt");

如果指定这样的文件路径,Flink将尝试读取本地文件。如果要从HDFS读取文件,则需要指定hdfs://协议:

env.readCsvFile("hdfs:///path/to/file.txt")

Flink还支持CSV文件,但在这种情况下,它不会返回字符串数据集。它将尝试解析每一行并返回Tuple实例的数据集:

  1. DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv")
  2. .types(Long.class, String.class);

Tuple2是存储不可改变的一对两个场中的一类,但也有其他类似Tuple0Tuple1Tuple3,高达Tuple25该存储从零到25的字段。稍后我们将看到如何使用这些类。

types方法指定CSV文件中的列类型和数量,因此Flink可以读取它们进行解析。

我们还可以创建非常适合小型实验和单元测试的小型数据集:

  1. / Create from a list
  2. DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c"));
  3. // Create from an array
  4. DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

您可能会问的问题是我们可以在DataSet中存储哪些数据?并非每种Java类型都可以在数据集中使用,并且您可以使用四种不同类型的类型:

  • 内置Java类型和POJO类
  • Flink Tuples和Scala案例类
  • 值 - 这些是Java原始类型的特殊可变包装器,您可以使用它来提高性能(我将在下一篇文章中写到这一点)
  • Hadoop可写接口的实现

使用Apache Flink处理数据

现在到数据处理部分!如何实现处理数据的算法?为此,您可以使用许多类似于Java 8流操作的操作,例如:

  • map - 使用用户定义的函数转换数据集中的项目。每个输入元素都转换为一个输出元素
  • filter - 根据用户定义的函数过滤数据集中的项目
  • flatMap - 类似于map运算符,但允许返回零个,一个或多个元素
  • groupBy - 按键分组元素。类似于GROUP BYSQL中的运算符
  • project - 选择元组数据集中的指定字段,类似于SELECTSQL中的运算符
  • reduce - 使用用户定义的函数将数据集中的元素组合成单个值

请记住,Java流与这些操作之间的最大区别在于,Java 8可以处理内存中的数据并可以访问本地数据,而Flink可以处理分布式环境中群集上的数据。

我们来看一个使用这些操作的简单示例。以下示例非常简单。它创建一个数字数据集,对每个数字进行平方并过滤掉所有奇数。

  1. // Create a dataset of numbers
  2. DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7);

  3. // Square every number
  4. DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() {
  5. @Override
  6. public Integer map(Integer integer) throws Exception {
  7. return integer * integer;
  8. }
  9. })
  10. // Leave only even numbers
  11. .filter(new FilterFunction<Integer>() {
  12. @Override
  13. public boolean filter(Integer integer) throws Exception {
  14. return integer % 2 == 0;
  15. }
  16. });

如果你对Java 8有任何经验,你可能想知道我为什么不在这里使用lambdas。我们可以在这里使用lambdas,但它可能会导致一些并发症,正如我在这里写的那样。

保存数据

在我们完成数据处理之后,保存我们辛勤工作的结果是有意义的。Flink可以将数据存储到许多第三方系统,如HDFS,S3,Cassandra等。

例如,要将数据写入文件,我们需要使用类中的writeAsText方法DataSet

  1. DataSet<Integer> ds = ...

  2. ds.writeAsText("path/to/file");

出于调试/测试目的,Flink可以将数据写入标准输出或标准输出:

  1. DataSet<Integer> ds = ...

  2. // Output dataset to the standard output
  3. ds.print();

  4. // Output dataset to the standard err
  5. ds.printToErr()

更复杂的例子

要实现一些有意义的算法,我们首先需要下载Grouplens电影数据集。它包含多个CSV文件,其中包含有关电影和电影评级的信息。我们将使用movies.csv此数据集中的文件,其中包含所有电影的列表,如下所示:

  1. movieId,title,genres
  2. 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
  3. 2,Jumanji (1995),Adventure|Children|Fantasy
  4. 3,Grumpier Old Men (1995),Comedy|Romance
  5. 4,Waiting to Exhale (1995),Comedy|Drama|Romance
  6. 5,Father of the Bride Part II (1995),Comedy
  7. 6,Heat (1995),Action|Crime|Thriller
  8. 7,Sabrina (1995),Comedy|Romance
  9. 8,Tom and Huck (1995),Adventure|Children
  10. 9,Sudden Death (1995),Action
  11. 10,GoldenEye (1995),Action|Adventure|Thriller

它有三列:

  • movieId - 此数据集中电影的唯一电影ID
  • 标题 - 电影的标题
  • 流派 - 每部电影的“|”分类列表

我们现在可以在Apache Flink中加载此CSV文件并执行一些有意义的处理。在这里,我们将从本地文件系统加载文件,而在现实环境中,您将读取更大的数据集,它可能驻留在分布式系统中,例如S3或HDFS。

在这个演示中,让我们找到所有“动作”类型的电影。这是一个执行此操作的代码段:

  1. // Load dataset of movies
  2. DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
  3. .ignoreFirstLine()
  4. .parseQuotedStrings('"')
  5. .ignoreInvalidLines()
  6. .types(Long.class, String.class, String.class);


  7. DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
  8. @Override
  9. public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
  10. String movieName = csvLine.f1;
  11. String[] genres = csvLine.f2.split("\\|");
  12. return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
  13. }
  14. });
  15. DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
  16. @Override
  17. public boolean filter(Movie movie) throws Exception {
  18. return movie.getGenres().contains("Action");
  19. }
  20. });

  21. filteredMovies.writeAsText("output.txt");

让我们分解吧。首先,我们使用以下readCsvFile方法读取CSV文件:

  1. DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
  2. // ignore CSV header
  3. .ignoreFirstLine()
  4. // Set strings quotes character
  5. .parseQuotedStrings('"')
  6. // Ignore invalid lines in the CSV file
  7. .ignoreInvalidLines()
  8. // Specify types of columns in the CSV file
  9. .types(Long.class, String.class, String.class);

使用辅助方法,我们指定如何解析CSV文件中的字符串以及我们需要跳过第一行。在最后一行中,我们指定CSV文件中每列的类型,Flink将为我们解析数据。

现在,当我们在Flink集群中加载数据集时,我们可以进行一些数据处理。首先,我们使用以下map方法解析每部电影的流派列表:

  1. DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
  2. @Override
  3. public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
  4. String movieName = csvLine.f1;
  5. String[] genres = csvLine.f2.split("\\|");
  6. return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
  7. }
  8. });

要转换我们需要实现的每部电影MapFunction,它将接收每个CSV记录作为Tuple3实例,并将其转换为MoviePOJO类:

  1. class Movie {
  2. private String name;
  3. private Set<String> genres;

  4. public Movie(String name, Set<String> genres) {
  5. this.name = name;
  6. this.genres = genres;
  7. }

  8. public String getName() {
  9. return name;
  10. }

  11. public Set<String> getGenres() {
  12. return genres;
  13. }
  14. }

如果您回想起CSV文件的结构,则第二列包含电影的名称,第三列包含类型列表。因此,我们分别使用字段f1和列来访问这些列f2

现在,当我们有一个电影数据集时,我们可以实现算法的核心部分并过滤所有动作电影:

  1. DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
  2. @Override
  3. public boolean filter(Movie movie) throws Exception {
  4. return movie.getGenres().contains("Action");
  5. }
  6. });

这将仅返回在集合类型中包含“Action”的电影。

现在最后一步非常简单; 我们将结果数据存储到一个文件中:

filteredMovies.writeAsText("output.txt");

这只是将结果数据存储到本地文本文件中,但与readTextFile方法一样,我们可以通过指定协议来将此文件写入HDFS或S3 hdfs://

更多信息

这是一篇介绍性文章,Apache Flink还有很多内容。我会在不久的将来写更多关于Flink的文章,敬请期待!您可以在这里阅读我的其他文章,或者您可以查看我的Pluralsight课程,其中更详细地介绍了Apache Flink:了解Apache Flink。这是本课程的简短预览

原文博客链接:https://brewing.codes/2017/10/01/start-flink-batch/

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多