VertexFusion.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.fusion;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.api.operators.BinaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.IdNotInBroadcast;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.functions.tuple.Value1Of2;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;
import org.gradoop.flink.model.impl.operators.fusion.functions.CoGroupAssociateOldVerticesWithNewIds;
import org.gradoop.flink.model.impl.operators.fusion.functions.CoGroupGraphHeadToVertex;
import org.gradoop.flink.model.impl.operators.fusion.functions.FlatJoinSourceEdgeReference;
import org.gradoop.flink.model.impl.operators.fusion.functions.LeftElementId;
import org.gradoop.flink.model.impl.operators.fusion.functions.MapFunctionAddGraphElementToGraph2;
import org.gradoop.flink.model.impl.operators.fusion.functions.MapGraphHeadForNewGraph;
import org.gradoop.flink.model.impl.operators.fusion.functions.MapVertexToPairWithGraphId;
import org.gradoop.flink.model.impl.operators.fusion.functions.MapVerticesAsTuplesWithNullId;


/**
 * Fusion is a binary operator taking two graphs: a search graph (first parameter) and a
 * pattern graph (second parameter) [This means that this is not a symmetric operator
 * (F(a,b) != F(b,a))]. The general idea of this operator is that everything that appears
 * in the pattern graph is fused into a single vertex into the search graph. A new logical graph
 * is returned.
 *
 * 1) If any search graph is empty, an empty result will always be returned
 * 2) If a pattern graph is empty, then the search graph will always be returned
 * 3) The vertices of the pattern graph appearing in the first parameter are replaced by a
 *    new vertex. The old edges of the search graph are updated in the final result so that
 *    each vertex, either pointing to a to-be-fused vertex or starting from a to-be-fused vertex,
 *    are respectively updated as either pointing or starting from the fused vertex.
 * 4) Pattern graph's edges are also taken into account: any edge between to-be-fused vertices
 *    appearing in the search graph that are not expressed in the pattern graph are
 *    rendered as hooks over the fused vertex
 */
public class VertexFusion implements BinaryBaseGraphToBaseGraphOperator<LogicalGraph> {

  @Override
  public LogicalGraph execute(LogicalGraph searchGraph, LogicalGraph graphPatterns) {
    return execute(searchGraph,
        graphPatterns.getCollectionFactory()
        .fromDataSets(
            graphPatterns.getGraphHead(),
            graphPatterns.getVertices(),
            graphPatterns.getEdges()));
  }


  /**
   * Fusing the already-combined sources.
   *
   * @param searchGraph   Logical Graph defining the data lake
   * @param graphPatterns Collection of elements representing which vertices will be merged into
   *                      a vertex
   * @return              A single merged graph
   */
  public LogicalGraph execute(LogicalGraph searchGraph, GraphCollection graphPatterns) {

    // Missing in the theoric definition: creating a new header
    GradoopId newGraphid = GradoopId.get();

    DataSet<EPGMGraphHead> gh = searchGraph.getGraphHead()
      .map(new MapGraphHeadForNewGraph(newGraphid));

    DataSet<GradoopId> patternVertexIds = graphPatterns.getVertices()
            .map(new Id<>());
    DataSet<GradoopId> patternEdgeIds = graphPatterns.getEdges()
            .map(new Id<>());

    // PHASE 1: Induced Subgraphs
    // Associate each vertex to its graph id
    DataSet<Tuple2<EPGMVertex, GradoopId>> patternVerticesWithGraphIDs =
        graphPatterns.getVertices()
        .coGroup(searchGraph.getVertices())
        .where(new Id<>()).equalTo(new Id<>())
        .with(new LeftSide<>())
        .flatMap(new MapVertexToPairWithGraphId());

    // Associate each gid in hypervertices.H to the merged vertices
    DataSet<Tuple2<EPGMVertex, GradoopId>> mergedVertices =
        graphPatterns.getGraphHeads()
        .map(new CoGroupGraphHeadToVertex());

    // PHASE 2: Recreating the vertices
    DataSet<EPGMVertex> vi = searchGraph.getVertices()
      .filter(new IdNotInBroadcast<>())
      .withBroadcastSet(patternVertexIds, IdNotInBroadcast.IDS);

    DataSet<Tuple2<EPGMVertex, GradoopId>> idJoin = patternVerticesWithGraphIDs
      .coGroup(mergedVertices)
      .where(new Value1Of2<>()).equalTo(new Value1Of2<>())
      .with(new CoGroupAssociateOldVerticesWithNewIds())
      .union(vi.map(new MapVerticesAsTuplesWithNullId()));

    DataSet<EPGMVertex> vToRet = mergedVertices
      .coGroup(patternVerticesWithGraphIDs)
      .where(new Value1Of2<>()).equalTo(new Value1Of2<>())
      .with(new LeftSide<>())
      .map(new Value0Of2<>())
      .union(vi)
      .map(new MapFunctionAddGraphElementToGraph2<>(newGraphid));

    // PHASE 3: Recreating the edges
    DataSet<EPGMEdge> edges = searchGraph.getEdges()
      .filter(new IdNotInBroadcast<>())
      .withBroadcastSet(patternEdgeIds, IdNotInBroadcast.IDS)
      .leftOuterJoin(idJoin)
      .where(new SourceId<>()).equalTo(new LeftElementId<>())
      .with(new FlatJoinSourceEdgeReference(true))
      .leftOuterJoin(idJoin)
      .where(new TargetId<>()).equalTo(new LeftElementId<>())
      .with(new FlatJoinSourceEdgeReference(false))
      .groupBy(new Id<>())
      .reduceGroup(new AddNewIdToDuplicatedEdge())
      .map(new MapFunctionAddGraphElementToGraph2<>(newGraphid));

    return searchGraph.getFactory().fromDataSets(gh, vToRet, edges);
  }
}