CanonicalAdjacencyMatrixBuilder.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.tostring;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToValueOperator;
import org.gradoop.flink.model.impl.functions.epgm.LabelCombiner;
import org.gradoop.flink.model.impl.operators.tostring.api.EdgeToString;
import org.gradoop.flink.model.impl.operators.tostring.api.GraphHeadToString;
import org.gradoop.flink.model.impl.operators.tostring.api.VertexToString;
import org.gradoop.flink.model.impl.operators.tostring.functions.AdjacencyMatrix;
import org.gradoop.flink.model.impl.operators.tostring.functions.ConcatGraphHeadStrings;
import org.gradoop.flink.model.impl.operators.tostring.functions.IncomingAdjacencyList;
import org.gradoop.flink.model.impl.operators.tostring.functions.MultiEdgeStringCombiner;
import org.gradoop.flink.model.impl.operators.tostring.functions.OutgoingAdjacencyList;
import org.gradoop.flink.model.impl.operators.tostring.functions.SourceStringUpdater;
import org.gradoop.flink.model.impl.operators.tostring.functions.SwitchSourceTargetIds;
import org.gradoop.flink.model.impl.operators.tostring.functions.TargetStringUpdater;
import org.gradoop.flink.model.impl.operators.tostring.functions.UndirectedAdjacencyList;
import org.gradoop.flink.model.impl.operators.tostring.tuples.EdgeString;
import org.gradoop.flink.model.impl.operators.tostring.tuples.GraphHeadString;
import org.gradoop.flink.model.impl.operators.tostring.tuples.VertexString;

/**
 * Operator deriving a string representation from a graph collection.
 * The representation follows the concept of a canonical adjacency matrix.
 *
 * @param <G> type of the graph head
 * @param <V> the vertex type
 * @param <E> the edge type
 * @param <LG> type of the base graph instance
 * @param <GC> type of the graph collection
 */
public class CanonicalAdjacencyMatrixBuilder<
  G extends GraphHead,
  V extends Vertex,
  E extends Edge,
  LG extends BaseGraph<G, V, E, LG, GC>,
  GC extends BaseGraphCollection<G, V, E, LG, GC>>
  implements UnaryBaseGraphCollectionToValueOperator<GC, DataSet<String>> {

  /**
   * Character used to separate lines. Should be the same regardless of OS.
   */
  public static final Character LINE_SEPARATOR = '\n';

  /**
   * function describing string representation of graph heads
   */
  private final GraphHeadToString<G> graphHeadToString;
  /**
   * function describing string representation of vertices
   */
  private final VertexToString<V> vertexToString;
  /**
   * function describing string representation of edges
   */
  private final EdgeToString<E> egeLabelingFunction;
  /**
   * sets mode for either directed or undirected graph
   */
  private final boolean directed;

  /**
   * Creates a new MatrixBuilder instance.
   *
   * @param graphHeadToString representation of graph heads
   * @param vertexToString representation of vertices
   * @param edgeLabelingFunction representation of edges
   * @param directed sets mode for either directed or undirected graph
   */
  public CanonicalAdjacencyMatrixBuilder(
    GraphHeadToString<G> graphHeadToString,
    VertexToString<V> vertexToString,
    EdgeToString<E> edgeLabelingFunction,
    boolean directed
  ) {
    this.graphHeadToString = graphHeadToString;
    this.vertexToString = vertexToString;
    this.egeLabelingFunction = edgeLabelingFunction;
    this.directed = directed;
  }

  @Override
  public DataSet<String> execute(GC collection) {

    // 1-10.
    DataSet<GraphHeadString> graphHeadLabels = getGraphHeadStrings(collection);

    // 11. add empty head to prevent empty result for empty collection

    graphHeadLabels = graphHeadLabels
      .union(collection
        .getConfig()
        .getExecutionEnvironment()
        .fromElements(new GraphHeadString(GradoopId.get(), "")));

    // 12. label collection

    return graphHeadLabels
      .reduceGroup(new ConcatGraphHeadStrings());
  }

  /**
   * Created a dataset of (graph id, canonical label) pairs.
   *
   * @param collection input collection
   * @return (graph id, canonical label) pairs
   */
  public DataSet<GraphHeadString> getGraphHeadStrings(GC collection) {
    // 1. label graph heads
    DataSet<GraphHeadString> graphHeadLabels = collection.getGraphHeads()
      .map(graphHeadToString);

    // 2. label vertices
    DataSet<VertexString> vertexLabels = collection.getVertices()
      .flatMap(vertexToString);

    // 3. label edges
    DataSet<EdgeString> edgeLabels = collection.getEdges()
      .flatMap(egeLabelingFunction);

    if (directed) {
      // 4. combine labels of parallel edges
      edgeLabels = edgeLabels
        .groupBy(0, 1, 2)
        .reduceGroup(new MultiEdgeStringCombiner());

      // 5. extend edge labels by vertex labels

      edgeLabels = edgeLabels
        .join(vertexLabels)
        .where(0, 1).equalTo(0, 1) // graphId,sourceId = graphId,vertexId
        .with(new SourceStringUpdater())
        .join(vertexLabels)
        .where(0, 2).equalTo(0, 1) // graphId,targetId = graphId,vertexId
        .with(new TargetStringUpdater());

      // 6. extend vertex labels by outgoing vertex+edge labels

      DataSet<VertexString> outgoingAdjacencyListLabels =
        edgeLabels.groupBy(0, 1) // graphId, sourceId
          .reduceGroup(new OutgoingAdjacencyList());

      // 7. extend vertex labels by outgoing vertex+edge labels

      DataSet<VertexString> incomingAdjacencyListLabels =
        edgeLabels.groupBy(0, 2) // graphId, targetId
          .reduceGroup(new IncomingAdjacencyList());

      // 8. combine vertex labels

      vertexLabels = vertexLabels
        .leftOuterJoin(outgoingAdjacencyListLabels)
        .where(0, 1).equalTo(0, 1)
        .with(new LabelCombiner<VertexString>())
        .leftOuterJoin(incomingAdjacencyListLabels)
        .where(0, 1).equalTo(0, 1)
        .with(new LabelCombiner<VertexString>());
    } else {
    // undirected graph

      // 4. union edges with flipped edges and combine labels of parallel edges

      edgeLabels = edgeLabels
        .union(edgeLabels
          .map(new SwitchSourceTargetIds()))
        .groupBy(0, 1, 2)
        .reduceGroup(new MultiEdgeStringCombiner());

      // 5. extend edge labels by vertex labels

      edgeLabels = edgeLabels
        .join(vertexLabels)
        .where(0, 2).equalTo(0, 1) // graphId,targetId = graphId,vertexId
        .with(new TargetStringUpdater());

      // 6/7. extend vertex labels by vertex+edge labels

      DataSet<VertexString> adjacencyListLabels =
        edgeLabels.groupBy(0, 1) // graphId, sourceId
          .reduceGroup(new UndirectedAdjacencyList());

      // 8. combine vertex labels

      vertexLabels = vertexLabels
        .leftOuterJoin(adjacencyListLabels)
        .where(0, 1).equalTo(0, 1)
        .with(new LabelCombiner<VertexString>());
    }

    // 9. create adjacency matrix labels

    DataSet<GraphHeadString> adjacencyMatrixLabels = vertexLabels
      .groupBy(0)
      .reduceGroup(new AdjacencyMatrix());

    // 10. combine graph labels

    graphHeadLabels = graphHeadLabels
      .leftOuterJoin(adjacencyMatrixLabels)
      .where(0).equalTo(0)
      .with(new LabelCombiner<GraphHeadString>());
    return graphHeadLabels;
  }
}