GroupingGroupCombine.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.grouping;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.functions.tuple.Value1Of2;
import org.gradoop.flink.model.impl.operators.grouping.functions.BuildSuperVertex;
import org.gradoop.flink.model.impl.operators.grouping.functions.BuildVertexGroupItem;
import org.gradoop.flink.model.impl.operators.grouping.functions.BuildVertexWithSuperVertexBC;
import org.gradoop.flink.model.impl.operators.grouping.functions.CombineVertexGroupItems;
import org.gradoop.flink.model.impl.operators.grouping.functions.FilterRegularVertices;
import org.gradoop.flink.model.impl.operators.grouping.functions.FilterSuperVertices;
import org.gradoop.flink.model.impl.operators.grouping.functions.LabelGroupFilter;
import org.gradoop.flink.model.impl.operators.grouping.functions.TransposeVertexGroupItems;
import org.gradoop.flink.model.impl.operators.grouping.tuples.EdgeGroupItem;
import org.gradoop.flink.model.impl.operators.grouping.tuples.LabelGroup;
import org.gradoop.flink.model.impl.operators.grouping.tuples.VertexGroupItem;
import org.gradoop.flink.model.impl.operators.grouping.tuples.VertexWithSuperVertex;
import org.gradoop.flink.model.impl.tuples.IdWithIdSet;

import java.util.List;

/**
 * Grouping implementation that uses group + groupCombine + groupReduce for
 * building super vertices and updating the original vertices.
 *
 * Algorithmic idea:
 *
 * 1) Map vertices to a minimal representation, i.e. {@link VertexGroupItem}.
 * 2) Group vertices on label and/or property
 * 3) Use groupCombine to process the grouped partitions. Creates a super vertex
 *    tuple for each group partition, including the local aggregates.
 *    Update each vertex tuple with their super vertex id and forward them.
 * 4) Filter output of 3)
 *    a) super vertex tuples are filtered, grouped and merged via groupReduce to
 *       create a final super vertex representing the group. An additional
 *       mapping from the final super vertex id to the super vertex ids of the
 *       original partitions is also created.
 *    b) non-candidate tuples are mapped to {@link VertexWithSuperVertex} using
 *       the broadcasted mapping output of 4a)
 * 5) Map edges to a minimal representation, i.e. {@link EdgeGroupItem}
 * 6) Join edges with output of 4b) and replace source/target id with super
 *    vertex id.
 * 7) Updated edges are grouped by source and target id and optionally by label
 *    and/or edge property.
 * 8) Group combine on the workers and compute aggregate.
 * 9) Group reduce globally and create final super edges.
 *
 * @param <G>  The graph head type.
 * @param <V>  The vertex type.
 * @param <E>  The edge type.
 * @param <LG> The type of the graph.
 * @param <GC> The type of the graph collection.
 */
public class GroupingGroupCombine<
  G extends GraphHead,
  V extends Vertex,
  E extends Edge,
  LG extends BaseGraph<G, V, E, LG, GC>,
  GC extends BaseGraphCollection<G, V, E, LG, GC>> extends Grouping<G, V, E, LG, GC> {

  /**
   * Creates grouping operator instance.
   *
   * @param useVertexLabels             group on vertex label true/false
   * @param useEdgeLabels               group on edge label true/false
   * @param vertexLabelGroups           stores grouping properties for vertex labels
   * @param edgeLabelGroups             stores grouping properties for edge labels
   * @param retainVerticesWithoutGroup  a flag to retain vertices that are not affected by the
   *                                    grouping
   */
  GroupingGroupCombine(
    boolean useVertexLabels,
    boolean useEdgeLabels,
    List<LabelGroup> vertexLabelGroups,
    List<LabelGroup> edgeLabelGroups,
    boolean retainVerticesWithoutGroup) {
    super(useVertexLabels, useEdgeLabels, vertexLabelGroups, edgeLabelGroups,
      retainVerticesWithoutGroup);
  }

  @Override
  protected LG groupInternal(LG graph) {

    DataSet<V> vertices = isRetainingVerticesWithoutGroup() ?
      graph.getVertices()
        .filter(new LabelGroupFilter<>(getVertexLabelGroups(), useVertexLabels())) :
      graph.getVertices();

    // map vertex to vertex group item
    DataSet<VertexGroupItem> verticesForGrouping = vertices.flatMap(
      new BuildVertexGroupItem<>(useVertexLabels(), getVertexLabelGroups()));

    // group vertices by label / properties / both
    DataSet<VertexGroupItem> combinedVertexGroupItems = groupVertices(verticesForGrouping)
      // apply aggregate function per combined partition
      .combineGroup(new CombineVertexGroupItems(useVertexLabels()));

    // filter super vertex tuples (1..n per partition/group)
    // group  super vertex tuples
    // create super vertex tuple (1 per group) + previous super vertex ids
    DataSet<Tuple2<VertexGroupItem, IdWithIdSet>> superVertexTuples =
      groupVertices(combinedVertexGroupItems.filter(new FilterSuperVertices()))
        .reduceGroup(new TransposeVertexGroupItems(useVertexLabels()));

    // build super vertices from super vertex tuples
    DataSet<V> superVertices = superVertexTuples
      .map(new Value0Of2<>())
      .map(new BuildSuperVertex<>(
        useVertexLabels(), graph.getFactory().getVertexFactory()));

    // extract mapping
    DataSet<IdWithIdSet> mapping = superVertexTuples
      .map(new Value1Of2<>());

    // filter non-candidates from combiner output
    // update their vertex representative according to the mapping
    DataSet<VertexWithSuperVertex> vertexToRepresentativeMap = combinedVertexGroupItems
      .filter(new FilterRegularVertices())
      .map(new BuildVertexWithSuperVertexBC())
      .withBroadcastSet(mapping, BuildVertexWithSuperVertexBC.BC_MAPPING);

    DataSet<E> edgesToGroup = graph.getEdges();

    if (isRetainingVerticesWithoutGroup()) {
      LG retainedVerticesSubgraph = getSubgraphOfRetainedVertices(graph);

      // To add support for grouped edges between retained vertices and supervertices,
      // vertices are their group representatives themselves
      vertexToRepresentativeMap =
        updateVertexRepresentatives(vertexToRepresentativeMap,
          retainedVerticesSubgraph.getVertices());

      // don't execute grouping on edges between retained vertices
      // but execute on edges between retained vertices and grouped vertices
      //   graph.getEdges() - retainedVerticesSubgraph.getEdges()
      edgesToGroup = subtractEdges(graph.getEdges(), retainedVerticesSubgraph.getEdges());
    }

    DataSet<E> superEdges =
      buildSuperEdges(graph.getFactory().getEdgeFactory(), edgesToGroup, vertexToRepresentativeMap);

    if (isRetainingVerticesWithoutGroup()) {
      LG retainedVerticesSubgraph = getSubgraphOfRetainedVertices(graph);
      superVertices = superVertices.union(retainedVerticesSubgraph.getVertices());
      superEdges = superEdges.union(retainedVerticesSubgraph.getEdges());
    }

    return graph.getFactory().fromDataSets(superVertices, superEdges);

  }

}