分享

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

 spzproot 2017-12-31

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的JPA风格的对象/文档映射,使用REST API用于文档搜索。

项目主页:https://github.com/chaokunyang/jkes

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

  • 安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径
  • 安装 Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn

配置

  • 引入jkes-spring-data-jpa依赖
  • 添加配置
@EnableAspectJAutoProxy@EnableJkes@Configurationpublic class JkesConfig { @Bean public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) { return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport); }}

提供JkesProperties Bean

@Component@Configurationpublic class JkesConf extends DefaultJkesPropertiesImpl { @PostConstruct public void setUp() { Config.setJkesProperties(this); } @Override public String getKafkaBootstrapServers() { return ':9292,:9292,:9292'; } @Override public String getKafkaConnectServers() { return 'http://:8084,http://:8084,http://:8084'; } @Override public String getEsBootstrapServers() { return 'http://:9200,http://:9200,http://:9200'; } @Override public String getDocumentBasePackage() { return 'com.timeyang.jkes.integration_test.domain'; } @Override public String getClientId() { return 'integration_test'; }}

这里可以很灵活,如果使用Spring Boot,可以使用@ConfigurationProperties提供配置

增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点

@RestController@RequestMapping('/api/search')public class SearchEndpoint { private Indexer indexer; @Autowired public SearchEndpoint(Indexer indexer) { this.indexer = indexer; } @RequestMapping(value = '/start_all', method = RequestMethod.POST) public void startAll() { indexer.startAll(); } @RequestMapping(value = '/start/{entityClassName:.+}', method = RequestMethod.POST) public void start(@PathVariable('entityClassName') String entityClassName) { indexer.start(entityClassName); } @RequestMapping(value = '/stop_all', method = RequestMethod.PUT) public Map stopAll() { return indexer.stopAll(); } @RequestMapping(value = '/stop/{entityClassName:.+}', method = RequestMethod.PUT) public Boolean stop(@PathVariable('entityClassName') String entityClassName) { return indexer.stop(entityClassName); } @RequestMapping(value = '/progress', method = RequestMethod.GET) public Map getProgress() { return indexer.getProgress(); }}

快速开始

索引API

使用com.timeyang.jkes.core.annotation包下相关注解标记实体

@lombok.Data@Entity@Documentpublic class Person extends AuditedEntity { // @Id will be identified automatically // @Field(type = FieldType.Long) @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = 'raw', type = FieldType.Keyword), @InnerField(suffix = 'english', type = FieldType.Text, analyzer = 'english') } ) private String name; @Field(type = FieldType.Keyword) private String gender; @Field(type = FieldType.Integer) private Integer age; // don't add @Field to test whether ignored // @Field(type = FieldType.Text) private String description; @Field(type = FieldType.Object) @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = 'group_id') private PersonGroup personGroup;}
@lombok.Data@Entity@Document(type = 'person_group', alias = 'person_group_alias')public class PersonGroup extends AuditedEntity { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String interests; @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = 'personGroup', orphanRemoval = true) private List persons; private String description; @DocumentId @Field(type = FieldType.Long) public Long getId() { return id; } @MultiFields( mainField = @Field(type = FieldType.Text), otherFields = { @InnerField(suffix = 'raw', type = FieldType.Keyword), @InnerField(suffix = 'english', type = FieldType.Text, analyzer = 'english') } ) public String getName() { return name; } @Field(type = FieldType.Text) public String getInterests() { return interests; } @Field(type = FieldType.Nested) public List getPersons() { return persons; } /** * 不加Field注解,测试序列化时是否忽略 */ public String getDescription() { return description; }}

当更新实体时,文档会被自动索引到ElasticSearch;删除实体时,文档会自动从ElasticSearch删除。

搜索API

启动搜索服务jkes-search-service,搜索服务是一个Spring Boot Application,提供rest搜索api,默认运行在9000端口。

URI query

curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10

Nested query

integration_test_person_group/person_group/_search?from=0&size=10{ 'query': { 'nested': { 'path': 'persons', 'score_mode': 'avg', 'query': { 'bool': { 'must': [ { 'range': { 'persons.age': { 'gt': 5 } } } ] } } } }}

match query

integration_test_person_group/person_group/_search?from=0&size=10{ 'query': { 'match': { 'interests': 'Hadoop' } }}

bool query

{ 'query': { 'bool' : { 'must' : { 'match' : { 'interests' : 'Hadoop' } }, 'filter': { 'term' : { 'name.raw' : 'name0' } }, 'should' : [ { 'match' : { 'interests' : 'Flink' } }, { 'nested' : { 'path' : 'persons', 'score_mode' : 'avg', 'query' : { 'bool' : { 'must' : [ { 'match' : {'persons.name' : 'name40'} }, { 'match' : {'persons.interests' : 'interests'} } ], 'must_not' : { 'range' : { 'age' : { 'gte' : 50, 'lte' : 60 } } } } } } } ], 'minimum_should_match' : 1, 'boost' : 1.0 } }}

Source filtering

integration_test_person_group/person_group/_search{ '_source': false, 'query' : { 'match' : { 'name' : 'name17' } }}
integration_test_person_group/person_group/_search{ '_source': { 'includes': [ 'name', 'persons.*' ], 'excludes': [ 'date*', 'version', 'persons.age' ] }, 'query' : { 'match' : { 'name' : 'name17' } }}

prefix

integration_test_person_group/person_group/_search{ 'query': { 'prefix' : { 'name' : 'name' } }}

wildcard

integration_test_person_group/person_group/_search{ 'query': { 'wildcard' : { 'name' : 'name*' } }}

regexp

integration_test_person_group/person_group/_search{ 'query': { 'regexp':{ 'name': 'na.*17' } }}

Jkes工作原理

索引工作原理:

  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。
  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置。
  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档
  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档
  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer
  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。
  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据
  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector

查询工作原理:

  • 查询服务通过rest api提供
  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
  • 查询服务是一个Spring Boot Application,使用docker打包为镜像
  • 查询服务提供多版本API,用于API进化和兼容
  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

流程图

模块介绍

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

  • annotation包提供了jkes的核心注解
  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping
  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client
  • metadata包提供了核心的注解元数据的构建与结构化模型
  • event包提供了事件模型与容器
  • exception包提供了常见的Jkes异常
  • http包基于Apache Http Client封装了常见的http json请求
  • support包暴露了Jkes核心配置支持
  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

  • ContextSupport类用于从bean工厂获取Repository Bean
  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型
  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件
  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据
  • exception包封装了常见异常
  • intercept包提供了AOP切点和切面
  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能

jkes-services

jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:

  • jkes-delete-connector
    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。
    • 借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活
  • jkes-search-service
    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容
    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。
    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度
    • 查询服务是一个Spring Boot Application,使用docker打包为镜像
    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

jkes-integration-test

jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

开发

To build a development version you’ll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.

Contribute

  • Source Code: https://github.com/chaokunyang/jkes
  • Issue Tracker: https://github.com/chaokunyang/jkes/issues

LICENSE

This project is licensed under Apache License 2.0.

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多