PostProcessor.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.common;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphCollectionFactory;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.EdgeFromIds;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.MergedGraphIds;
import org.gradoop.flink.model.impl.functions.epgm.VertexFromId;
import org.gradoop.flink.model.impl.functions.utils.Cast;
import org.gradoop.flink.model.impl.functions.utils.IsInstance;
import org.gradoop.flink.model.impl.functions.utils.RightSide;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.functions.EdgeTriple;
import org.gradoop.flink.model.impl.operators.matching.single.simulation.dual.tuples.FatVertex;

/**
 * Provides methods for post-processing query results.
 */
public class PostProcessor {

  /**
   * Extracts a {@link GraphCollection} from a set of {@link Element}.
   *
   * @param elements   elements
   * @param factory    element factory
   * @param mayOverlap elements may be contained in multiple graphs
   * @param <G> The graph head type.
   * @param <V> The vertex type.
   * @param <E> The edge type.
   * @param <LG> The graph type.
   * @param <GC> The graph collection type.
   * @return Graph collection
   */
  public static <G extends GraphHead,
    V extends Vertex,
    E extends Edge,
    LG extends BaseGraph<G, V, E, LG, GC>,
    GC extends BaseGraphCollection<G, V, E, LG, GC>> GC extractGraphCollection(
    DataSet<Element> elements, BaseGraphCollectionFactory<G, V, E, LG, GC> factory, boolean mayOverlap) {

    Class<G> graphHeadType = factory.getGraphHeadFactory().getType();
    Class<V> vertexType = factory.getVertexFactory().getType();
    Class<E> edgeType = factory.getEdgeFactory().getType();
    return factory.fromDataSets(
      extractGraphHeads(elements, graphHeadType),
      extractVertices(elements, vertexType, mayOverlap),
      extractEdges(elements, edgeType, mayOverlap)
    );
  }

  /**
   * Extracts a {@link GraphCollection} from a set of {@link Element} and
   * attaches the original data from the input {@link LogicalGraph}.
   *
   * @param elements      elements
   * @param inputGraph    original input graph
   * @param mayOverlap    true, if elements may be contained in multiple graphs
   * @param <G> The graph head type.
   * @param <V> The vertex type.
   * @param <E> The edge type.
   * @param <LG> The graph type.
   * @param <GC> The graph collection type.
   * @return Graph collection
   */
  public static <G extends GraphHead,
    V extends Vertex,
    E extends Edge,
    LG extends BaseGraph<G, V, E, LG, GC>,
    GC extends BaseGraphCollection<G, V, E, LG, GC>> GC extractGraphCollectionWithData(
    DataSet<Element> elements, LG inputGraph, boolean mayOverlap) {

    BaseGraphCollectionFactory<G, V, E, LG, GC> factory = inputGraph.getCollectionFactory();

    // get result collection without data
    GC collection = extractGraphCollection(elements, factory, mayOverlap);

    // attach data by joining first and merging the graph head ids
    DataSet<V> newVertices = inputGraph.getVertices()
      .rightOuterJoin(collection.getVertices())
      .where(new Id<>()).equalTo(new Id<>())
      .with(new MergedGraphIds<>())
      .withForwardedFieldsFirst("id;label;properties;");


    DataSet<E> newEdges = inputGraph.getEdges()
      .rightOuterJoin(collection.getEdges())
      .where(new Id<>()).equalTo(new Id<>())
      .with(new MergedGraphIds<>())
      .withForwardedFieldsFirst("id;label;properties");

    return factory.fromDataSets(collection.getGraphHeads(), newVertices, newEdges);
  }
  /**
   * Extracts vertex ids from the given pattern matching result.
   *
   * @param result pattern matching result
   * @return dataset with (vertexId) tuples
   */
  private static DataSet<Tuple1<GradoopId>> extractVertexIds(
    DataSet<FatVertex> result) {
    return result.project(0);
  }

  /**
   * Extracts edge-source-target id triples from the given pattern matching
   * result.
   *
   * @param result pattern matching result
   * @return dataset with (edgeId, sourceId, targetId) tuples
   */
  private static DataSet<Tuple3<GradoopId, GradoopId, GradoopId>> extractEdgeIds(
    DataSet<FatVertex> result) {
    return result.flatMap(new EdgeTriple());
  }

  /**
   * Filters and casts graph heads from a given set of {@link Element}
   *
   * @param elements      elements
   * @param graphHeadType graph head type
   * @param <G> The graph head type
   * @return graph heads
   */
  private static <G extends GraphHead> DataSet<G> extractGraphHeads(DataSet<Element> elements,
    Class<G> graphHeadType) {
    return elements
      .filter(new IsInstance<>(graphHeadType))
      .map(new Cast<>(graphHeadType))
      .returns(TypeExtractor.createTypeInfo(graphHeadType));
  }

  /**
   * Initializes vertices from the given pattern matching result.
   *
   * @param result        pattern matching result
   * @param vertexFactory vertex factory
   * @param <V> The produced vertex type.
   * @return vertices
   */
  public static <V extends Vertex> DataSet<V> extractVertices(DataSet<FatVertex> result,
    VertexFactory<V> vertexFactory) {
    return extractVertexIds(result).map(new VertexFromId<>(vertexFactory));
  }

  /**
   * Filters and casts vertices from a given set of {@link Element}
   *
   * @param elements      elements
   * @param vertexType    vertex type
   * @param mayOverlap    vertices may be contained in multiple graphs
   * @param <V> The produced vertex type.
   * @return vertices
   */
  private static <V extends Vertex> DataSet<V> extractVertices(DataSet<Element> elements,
    Class<V> vertexType, boolean mayOverlap) {
    DataSet<V> result = elements
      .filter(new IsInstance<>(vertexType))
      .map(new Cast<>(vertexType))
      .returns(TypeExtractor.createTypeInfo(vertexType));
    // TODO: Replace two group-by statements with a combinable reduce function.
    return mayOverlap ? result
      .groupBy(new Id<>())
      .combineGroup(new MergedGraphIds<>())
      .groupBy(new Id<>())
      .reduceGroup(new MergedGraphIds<>()) : result;
  }

  /**
   * Initializes edges from the given pattern matching result.
   *
   * @param result      pattern matching result
   * @param edgeFactory edge factory
   * @param <E> The produced edge type.
   * @return edges
   */
  public static <E extends Edge> DataSet<E> extractEdges(DataSet<FatVertex> result,
    EdgeFactory<E> edgeFactory) {
    return extractEdgeIds(result).map(new EdgeFromIds<>(edgeFactory));
  }

  /**
   * Filters and casts edges from a given set of {@link Element}
   *
   * @param elements      elements
   * @param edgeType      edge type
   * @param mayOverlap    edges may be contained in multiple graphs
   * @param <E> The produced edge type.
   * @return edges
   */
  private static <E extends Edge> DataSet<E> extractEdges(DataSet<Element> elements,
    Class<E> edgeType, boolean mayOverlap) {
    DataSet<E> result = elements
      .filter(new IsInstance<>(edgeType))
      .map(new Cast<>(edgeType))
      .returns(TypeExtractor.createTypeInfo(edgeType));

    return mayOverlap ? result
      .groupBy(new Id<>())
      .combineGroup(new MergedGraphIds<>()).groupBy(new Id<>())
      .reduceGroup(new MergedGraphIds<>()) : result;
  }

  /**
   * Initializes vertices including their original data from the given
   * pattern matching result.
   *
   * @param result        pattern matching result
   * @param inputVertices original data graph vertices
   * @param <V> The vertex type.
   * @return vertices including data
   */
  public static <V extends Vertex> DataSet<V> extractVerticesWithData(
    DataSet<FatVertex> result, DataSet<V> inputVertices) {
    return extractVertexIds(result)
      .join(inputVertices)
      .where(0).equalTo(new Id<>())
      .with(new RightSide<>());
  }

  /**
   * Initializes edges including their original data from the given pattern
   * matching result.
   *
   * @param result      pattern matching result
   * @param inputEdges  original data graph edges
   * @param <E> The edge type.
   * @return edges including data
   */
  public static <E extends Edge> DataSet<E> extractEdgesWithData(DataSet<FatVertex> result,
    DataSet<E> inputEdges) {
    return extractEdgeIds(result)
      .join(inputEdges)
      .where(0).equalTo(new Id<>())
      .with(new RightSide<>());
  }
}