分享

图系统引入Tinkerpop的机制和原理

 新用户0175WbuX 2022-02-13

  一直以来, 我们都在探讨图系统的各种实现和功能, 但是常见的图语言Gremlin, 和包含Gremlin的本体Tinkerpop, 还没有很清晰的认识.

  这次就主要解答一个问题: 如果 要自己动手实现一个图数据库, 如何引入Tinkerpop支持 ? (以及它的工作机制是什么)

  这篇文章不是入门篇, 如果还不知道Gremlin是什么, 也不知道Tinkerpop是什么, 建议可以看看官方文档, 或者我之前写的函数式图语言Gremlin和.图引擎Tinkerpop (当然那时候写的不好, 见谅…)

  直接进入正题, 首先还是搬出引用过的TP 整体结构 图:

  图系统引入Tinkerpop的机制和原理

  不难看出, 对实现一个基本的图DB来说, 最主要的是实现最后一层( Provider API ) , 一旦最后一层最小实现了, 它就自动的实现了上层的大部分功能, ( how? ) 其中最次要的是顶层的命令行/HTTP访问口. 那接下来依次看看如何 接入 Tinkerpop, 接入之后又是怎么在解析Gremlin语句的.

  实现Gremlin的核心API主要包含实现 OLTP + OLAP 两个部分:

  OLTP部分:数据结构 : 最少要实现 Graph , Vertex , Edge , 和 Property , Element 这几个接口, 然后进一步如果你实现了 VertexProperty , Index , Transaction 这些接口, 那就可以说实现了完整的 Structure API .遍历操作 : TraversalStrategy (遍历策略)的实现, 目的是优化图的具体遍历, 完整实现后称之为 Process APIOLAP部分:数据结构 : GraphComputer , Messenger 和 Memory 是最少要实现的 (并且OLAP需要OLTP基于实现)1.数据结构

  首先一个图, 那后面我就以 XGraph 为例子, 简单介绍一下一个图应该有哪些基础OLTP功能:

  //1.初始化一个图

  Graph g=XGraph.open();

  //2.给图添加两个person顶点,按K-V对传入

  Vertex p1=g.addVertex(T.label, "person", T.id, 1, "name", "jin");

  Vertex p2=g.addVertex(T.label, "person", T.id, 2, "name", "tom");

  //3.给这两个点加一条边

  p1.addEdge("likes",p2, T.id, 3, "date", "20190220");

  这里的 T 是Tinkerpop的一个枚举 , 这样就构成了一个最经典的图, 然后具体的数据结构实现, 参考TP自带的 TinkerGraph 模块, 里面实现了一个最简的图. (我在之前也写了 TinkerGraph的源码分析 )

  接下来补充一下实现过程中的注意事项:

  首先实现 structure 包中的接口( 数据结构 ) ,然后实现 process 包中的接口( 遍历 操作)尽量复用Tinkerpop已经提供的各种工具类比如使用 ElementHelper , GraphComputerHelper , VertexProgramHelper 这种来进行很多 通用 的图操作. (封装了大量迭代器和λ)比如使用 StringHelper 的 toString() 确保输出的一致性. 包括异常输出等..如果要实现OLAP, 则 ComputerGraph 会充当一个 Wrapper 的作用, 确保图计算过程中语义解释合理

  实现了上面的基础数据结构, 就可以说最快的实现了一个图的 OLTP , 此时你就可以使用 gremlin 和 gremlin-server 之类的了(这部分是完全不需要你来写代码的), 可以说是非常的简易吧.

  但是实际的应用中, 一般数据不会只存在内存中, 最后都需要落地, 如果要用特定的图序列化器, 那么还需要实现一下 Graph 接口的 的 io 方法. ( 注意, 这是非必须的.)

  2. IO接口 (非必须)

  实现自定义序列化方式的第一步就是要实现 IoRegistry 接口, 相当于注册并使它生效, 但是有些特定的格式是不允许自定义序列化器的, 也就是说如果 XGraph 要支持某些特定数据格式, 就必须符合 Tinkerpop 定义, 当然这种情况很少. 来看看常用的几种序列化方案:

  Gryo : 有几种可选的方式实现:Kryo Function<kryo, serializergraphson="" :="" 实现="" jackson="" 的="" simplemodule="" 模块就行了,

  这里以 HugeGraphIoRegistry 为例, 简单看看怎么注册: (具体实现就不单独搬了, 可参看TinkerGraph的实现)

  public class HugeGraphIoRegistry extends AbstractIoRegistry {

  private static final HugeGraphIoRegistry instance= new HugeGraphIoRegistry();

  public static HugeGraphIoRegistry instance() {

  return instance;

  }

  //这里Huge再单独实现Gryo和GraphSon格式的支持.

  private HugeGraphIoRegistry() {

  HugeGryoModule.register(this);

  HugeGraphSONModule.register(this);

  }

  @SuppressWarnings("rawtypes")

  @Override

  public void register(Class ioClass, Class clazz, Object ser) {

  super.register(ioClass, clazz, ser);

  }

  }

  因为在Tinkerpop中, OLAP的实现必须基于OLTP实现, 而且它的实现其实 较为复杂 , 所以这部分是 可选的 , 现在大部分引入了Tinkerpop支持的图系统对这一块的支持也不够好, 那么不妨来看看如果要使用TP的OLAP具体需要实现的数据结构的作用:

  GraphComputer : 执行管理图计算任务的核心, 不管是 VertexProgram 还是 MR 任务都是从这提交, 并且也提供了任务资源的隔离Memory : 全局的内存数据结构 (用于一些特定K-V对, ANDing, ORing, INCRing, 和SETing )Messenger : 消息分发和汇总的结构, 调度 VertexProgram 到多个顶点.MapReduce.MapEmitter : 汇总由MR任务在 Map阶段 给出的K-V对.MapReduce.ReduceEmitter : 汇总由MR任务在 Reduce阶段 给出的K-V对.

  其中, VertexProgram 和 MapReduce 的接口定义在 process/computer 包内, 类似MR作业一样, 这些接口的实现由具体的业务去写. 图不关心它的实现, 然后为了方便大家上手, TP提供了两个OLAP的整合实现 , 一个是 TinkerGraph .另一个是 SparkGraphCounter (Hadoop)模块, 官方建议是大量参考实现方式… 接下来详细说一下上面的5个核心结构作要实现什么:

  1.GraphComputer

  从 TinkerGraph 实现中可以看出, submit() 方法, 是整个图计算类最复杂/核心的方法, 实现它就差不多实现了GraphComputer的核心 , 它需要做:

  执行OLAP任务之前的检查操作, 包括不限于任务是否被提交, 是否至少存在1个 VertexProgram 或 1个 MR 任务 .如果是VertexProgram, 还需要检查它是否满足要执行的图计算特性 (比如是否支持PageRank ,ShortestPath..)为图计算初始全局的 Memory 结构, 然后执行以下的循环首先执行 VertexProgram.setup() 一次.然后在每个 分发的顶点 上执行 VertexProgram.execute()最后执行 VertexProgram.terminate() , 获得一个bool返回值如果为 true , 开一个新线程开始执行MR任务, 原始线程跳回2.2, 继续执行 execute() ,依次迭代如果为 false ,停止执/行对每个 分发的顶点 , 执行MR任务流程 : map() ---> (若有) combine() --> reduce()上述操作中不断更新 Memory 的数据值.创建一个 暂存 上面图计算后的子图和 Memory 结构的对象, 命名为 ComputerResult2mory

  从上面可以知道, Memory 是在执行 VertexProgram.setup() 的时候初始化的, 在第一轮任务分发中, 它对所有执行任务的顶点都是可见的, 然后每次定点执行任务后可以在它的线程中更新 Memory 的值, 但是这次更新只有在第一轮所有顶点全部执行完任务后, 下一轮其他顶点才能看到, 同理其他顶点也是如此. 然后不断重复这个过程, 直到 VertexProgram 完结.

  图系统引入Tinkerpop的机制和原理

  3ssenger

  消息分发对象和 Memory 对象是比较类似游戏下载的, 所有执行任务的 顶点 也都能在它上面进行 读/写 数据的操作, 那么和内存对象主要的区别在于 :

  读 : Messenger 读取的数据不是实时的, 而是在 上一轮 中发送给顶点的消息写 : 而它写入的数据也不是实时的, 而是在 下一轮 中由接收消息的顶点读取的消息4.MapReduce (Emitters)

  类似原始的 Hadoop-MR 模型, Tinkerpop中也设计了类似的MR接口, 但是还是有些区分, 主要的区别在于: 所有Mappers的工作都在加工图的顶点,而不能处理普通K-V对 , 而且. 访问顶点的时候无法访问其拥有的边 —- 只能访问顶点的属性 ., 这样设计的目的是为了大幅减少MR任务中数据量的 规模 , 以及在多个JVM中分发的实现复杂性. 而边的信息在 VertexProgram.execute() 方法中就能计算得到

  同时需要注意的是, 在MR过程中, 顶点会被暂时设置为只读, 只能读取它的属性和一些标记信息, 最后由之前说的业务来实现具体的MR操作. 这里用 Tinkergraph 为例, 截取一下如何具体使用MR任务:

  //先看看MapEmitter的实现

  public class TinkerMapEmitter implements MapReduce.MapEmitter {

  public Map<k, queue> reduceMap;

  public Queue<keyvalue> mapQueue;

  private final boolean doReduce; //是否有Reduce操作

  /*如果一个MR作业有reduce过程, 那么使用reduceMap结构,否则使用mapQueue的结构(本质是一个map-list)

  *目的是为了减少reduce所需的K-V组结构消耗.

  */

  public TinkerMapEmitter(final boolean doReduce) {

  this.doReduce=doReduce;

  if (this.doReduce)

  this.reduceMap=new ConcurrentHashMap<>();

  else

  this.mapQueue=new ConcurrentLinkedQueue<>();

  }

  @Override

  public void emit(K key, V value) {

  if (this.doReduce)

  //如果有reduce操作,那么给Map的key新增一个value

  this.reduceMapputeIfAbsent(key, k -> new ConcurrentLinkedQueue<>()).add(value);

  else

  //如果没有reduce操作,那么只需在队列尾附加一个K-V对象

  this.mapQueue.add(new KeyValue<>(key, value));

  }

  protected void complete(final MapReduce mapReduce) {

  //当map阶段完成, 就可以对需要的映射结果进行任意的排列组合.

  if (!this.doReduce && mapReduce.getMapKeySort().isPresent()) {

  final Comparator comparator=mapReduce.getMapKeySort().get();

  final List<keyvalue> list=new ArrayList<>(this.mapQueue);

  Collections.sort(list, Comparatorparing(KeyValue::getKey, comparator));

  this.mapQueue.clear();

  this.mapQueue.addAll(list);

  } else if (mapReduce.getMapKeySort().isPresent()) {

  final Comparator comparator=mapReduce.getMapKeySort().get();

  final List<map.entry<k, queue>> list=new ArrayList<>();

  list.addAll(this.reduceMap.entrySet());

  Collections.sort(list, Comparatorparing(Map.Entry::getKey, comparator));

  this.reduceMap=new LinkedHashMap<>();

  list.forEach(entry -> this.reduceMap.put(entry.getKey(), entry.getValue()));

  }

  }

  }

  //再看看ReduceEmitter的实现

  public class TinkerReduceEmitter implements MapReduce.ReduceEmitter {

  protected Queue<keyvalue> reduceQueue=new ConcurrentLinkedQueue<>(); //链队列

  @Override

  public void emit(final OK key, final OV value) {

  this.reduceQueue.add(new KeyValue<>(key, value));

  }

  protected void complete(final MapReduce mapReduce) {

  if (mapReduce.getReduceKeySort().isPresent()) {

  final Comparator comparator=mapReduce.getReduceKeySort().get();

  final List<keyvalue> list=new ArrayList<>(this.reduceQueue);

  Collections.sort(list, Comparatorparing(KeyValue::getKey, comparator));

  this.reduceQueue.clear();

  this.reduceQueue.addAll(list);

  }

  }

  }

  //调用reduce方法是这样的

  public void reduce(final OK key, final Iterator values, final ReduceEmitter emitter) { ... }

  reduce() 执行的时候,传入加工过的 emitter 对象, 那结合Map的操作, 整个过程是什么样的呢? 可以看看接下来截取的一个执行MR任务片段的调用.

  //MR任务整体片段

  for (final MapReduce mapReduce : mapReducers) {

  //先执行Map

  if (mapReduce.doStage(MapReduce.Stage.MAP)) {

  final TinkerMapEmitter mapEmitter=new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));

  final SynchronizedIterator vertices=new SynchronizedIterator<>(this.graph.vertices());

  workers.setMapReduce(mapReduce);

  workers.mapReduceWorkerStart(MapReduce.Stage.MAP);

  workers.executeMapReduce(workerMapReduce -> {

  while (true) { //一直遍历完所有可达顶点

  final Vertex vertex=vertices.next();

  if (null==vertex) return;

  workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);

  }

  });

  workers.mapReduceWorkerEnd(MapReduce.Stage.MAP);

  // sort results if a map output sort is defined

  mapEmitterplete(mapReduce);

  //再执行Reduce. (因为Tinkergraph是单机使用OLAP, 所以无需执行合并操作)

  if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {

  final TinkerReduceEmitter reduceEmitter=new TinkerReduceEmitter<>();

  final SynchronizedIterator<map.entry<?, queue>> keyValues=new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());

  workers.mapReduceWorkerStart(MapReduce.Stage.REDUCE);

  workers.executeMapReduce(workerMapReduce -> {

  while (true) {

  final Map.Entry> entry=keyValues.next();

  if (null==entry) return;

  workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);

  }

  });

  workers.mapReduceWorkerEnd(MapReduce.Stage.REDUCE);

  reduceEmitterplete(mapReduce); // sort results if a reduce output sort is defined

  mapReduce.addResultToMemory(thismory, reduceEmitter.reduceQueue.iterator());

  } else {

  //没有reduce过程的时候,直接把map操作完的结果存入Memory

  mapReduce.addResultToMemory(thismory, mapEmitter.mapQueue.iterator());

  }

  }

  }

  如果完成了OLTP的最基本支持 , 理论上 你不用做其他事就能直接整合Gremlin语法的支持了, 当然实际使用中, 比如 XGraph 希望引入gremlin语法支持, 一般比较好的做法还是借助一个中间层, 引入 GremlinPlugin ,然后注册实现.

  这个操作其实很简单, 以 Tinkergraph 为例 , 其实就是把在OLTP和OLAP自己实现的类都集中绑定一下 ,同理如果是 neo4j 或者 Hugegraph 要用, 最推荐的做法也是这样注册. (当然你不这样注册好像也是可以使用gremlin的, 尚不确定…)

  public final class TinkerGraphGremlinPlugin extends AbstractGremlinPlugin {

  private static final String NAME="tinkerpop.tinkergraph";

  private static final ImportCustomizer imports=DefaultImportCustomizer.build()

  .addClassImports(TinkerEdge.class,

  TinkerElement.class,

  TinkerFactory.class,

  TinkerGraph.class,

  TinkerGraphVariables.class,

  TinkerHelper.class,

  TinkerIoRegistryV1d0.class,

  TinkerIoRegistryV2d0.class,

  TinkerIoRegistryV3d0.class,

  TinkerProperty.class,

  TinkerVertex.class,

  TinkerVertexProperty.class,

  TinkerGraphComputer.class,

  TinkerGraphComputerView.class,

  TinkerMapEmitter.class,

  TinkerMemory.class,

  TinkerMessenger.class,

  TinkerReduceEmitter.class,

  TinkerWorkerPool.class).create();

  private static final TinkerGraphGremlinPlugin instance=new TinkerGraphGremlinPlugin();

  public TinkerGraphGremlinPlugin() {

  super(NAME, imports);

  }

  public static TinkerGraphGremlinPlugin instance() {

  return instance;

  }

  }

  上面说的是常见的图系统可以选择实现的几个重要模块, TP还提供了一些其他模块, 可以选择性的实现 :

  Traversal Strategies(遍历策略): TraversalStrategy 用于在执行gremlin之前改变遍历, 比如 g.V().has("name","jin") 的语句本来要遍历所有顶点的 name 属性, 在有属性索引的时候, 我就能让它调用索引进行二分查找, 这样时间复杂度就由 O(n) 转变为 O(logn) 了, 类似的比如模糊索引, 范围索引都需要指定一些策略, 否则默认都会是全查.. 例如 TinkerGraphStepStrategy 的实现就针对 has() 做了判断 :@Override public void apply(final Traversal.Admin traversal) { if (TraversalHelper.onGraphComputer(traversal)) return; for (final GraphStep originalGraphStep : TraversalHelper.getStepsOfClass(GraphStep.class, traversal)) { final TinkerGraphStep tinkerGraphStep=new TinkerGraphStep<>(originalGraphStep); TraversalHelper.replaceStep(originalGraphStep, tinkerGraphStep, traversal); Step currentStep=tinkerGraphStep.getNextStep(); while (currentStep instanceof HasStep || currentStep instanceof NoOpBarrierStep) { //针对has()步骤 if (currentStep instanceof HasStep) { for (final HasContainer hasContainer : ((HasContainerHolder) currentStep).getHasContainers()) { if (!GraphStepcessHasContainerIds(tinkerGraphStep, hasContainer)) tinkerGraphStep.addHasContainer(hasContainer); } TraversalHelper.copyLabels(currentStep, currentStep.getPreviousStep(), false); traversal.removeStep(currentStep); } currentStep=currentStep.getNextStep(); } } }Step (步骤): 类似gremlin中的 has() , filter() ,这些都是一个单独的 Step ,每条gremlin语句中的step最终都会在 TraversalStrategy 中调用, 那么如果我想新增一个单独的Step, 比如 pagerank() , 那就可以自己实现, 然后注册进Tinkerpop中 ,但是这样最大的缺点就是侵入性很强, 所以Hugegraph 采用了另一种策略, 绕开gremlin实现扩展的API.例子待补充…

  首先, 强烈推荐通读一下 TinkerGraph 的源码, 我也是通读了一遍之后加上官方文档的补充解释, 才能比较好的理解整个流程. 之前写过一个``TinkerGraph` 的源码阅读系列, 不过因为时间原因中间写的比较粗略, 大家可以凑合看, 有问题欢迎及时反馈.

  然后从整篇文章可以看出, 如果你已经实现好了一个图系统, 比如叫 XGraph ,那么你在有类似的图基础之后, 最快可以在 一周内 就能把Tinkerpop基本引入进来(包括读代码+开发+测试+部署), 至于高级特性的实现, 不一定需要, 也不需急于一时.

  最后, 这里没有细节的讲Tinkerpop的各种 Step . 各种 Path 的细节, 只是比较提纲的结合了部分代码讲一下核心实现, 没提 Gremlin 如何转换为图的API调用的, 也没有提及 gremlin-console/server 的实现, 前者以后会单独再说, 后者觉得暂时不是很需要关心.. 就先这样

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多