CanonicalLabeler.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.fsm.transactional.tle.canonicalization;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.gradoop.flink.algorithms.fsm.transactional.tle.pojos.Embedding;
import org.gradoop.flink.algorithms.fsm.transactional.tle.pojos.FSMEdge;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Creates a canonical label for a given graph embedding that represents its
 * adjacency matrix.
 */
public class CanonicalLabeler implements Serializable {

  /**
   * separator among adjacency lists
   */
  private static final char NEW_LIST = '|';
  /**
   * separator among vertex label and adjacency list entries
   */
  private static final char LIST_START = ':';
  /**
   * separator among adjacency list entries
   */
  private static final char NEW_ENTRY = ',';
  /**
   * edge separator indicating an outgoing edge
   */
  private static final char OUTGOING_EDGE = '>';
  /**
   * edge separator indicating an incoming edge
   */
  private static final char INCOMING_EDGE = '<';
  /**
   * edge separator indicating an undirected edge
   */
  private static final char UNDIRECTED_EDGE = '-';

  /**
   * true, if the label should represent directed graphs,
   * false, otherwise
   */
  private final boolean directed;

  /**
   * Constructor.
   *
   * @param directed true for directed mode
   */
  public CanonicalLabeler(boolean directed) {
    this.directed = directed;
  }

  /**
   * Creates a canonical subgraph graph label
   *
   * @param embedding subgraph
   *
   * @return canonical label
   */
  public String label(Embedding embedding) {

    Map<Integer, String> vertices = embedding.getVertices();
    Map<Integer, FSMEdge> edges = embedding.getEdges();

    Map<Integer, Map<Integer, Set<Integer>>> adjacencyMatrix =
      createAdjacencyMatrix(vertices, edges);

    List<String> adjacencyListLabels =
      Lists.newArrayListWithCapacity(vertices.size());

    // for each vertex
    for (Map.Entry<Integer, String> vertex : vertices.entrySet()) {

      int vertexId = vertex.getKey();
      String adjacencyListLabel = vertex.getValue() + LIST_START;

      Map<Integer, Set<Integer>> adjacencyList = adjacencyMatrix.get(vertexId);

      List<String> entryLabels =
        Lists.newArrayListWithCapacity(adjacencyList.size());

      // for each adjacent vertex
      for (Map.Entry<Integer, Set<Integer>> entry : adjacencyList.entrySet()) {

        int adjacentVertexId = entry.getKey();
        String entryLabel = vertices.get(adjacentVertexId);

        // for each edge
        Set<Integer> incidentEdgeIds = entry.getValue();

        if (incidentEdgeIds.size() == 1) {
          FSMEdge incidentEdge = edges.get(incidentEdgeIds.iterator().next());

          entryLabel += format(incidentEdge, vertexId);

        } else {

          List<String> incidentEdges =
            Lists.newArrayListWithExpectedSize(incidentEdgeIds.size());

          for (int incidentEdgeId : incidentEdgeIds) {
            incidentEdges.add(format(edges.get(incidentEdgeId), vertexId));
          }

          Collections.sort(incidentEdges);
          entryLabel += StringUtils.join(incidentEdges, "");
        }

        entryLabels.add(entryLabel);
      }

      Collections.sort(entryLabels);
      adjacencyListLabel += StringUtils.join(entryLabels, NEW_ENTRY);
      adjacencyListLabels.add(adjacencyListLabel);
    }

    Collections.sort(adjacencyListLabels);
    return StringUtils.join(adjacencyListLabels, NEW_LIST);
  }

  /**
   * Creates an adjacency matrix.
   *
   * @param vertices vertices
   * @param edges edges
   * @return adjacency matrix
   */
  private Map<Integer, Map<Integer, Set<Integer>>> createAdjacencyMatrix(
    Map<Integer, String> vertices, Map<Integer, FSMEdge> edges) {

    Map<Integer, Map<Integer, Set<Integer>>> adjacencyMatrix =
      Maps.newHashMapWithExpectedSize(vertices.size());

    for (Map.Entry<Integer, FSMEdge> edgeEntry : edges.entrySet()) {

      int edgeId = edgeEntry.getKey();
      FSMEdge edge = edgeEntry.getValue();

      int sourceId = edge.getSourceId();
      int targetId = edge.getTargetId();

      addAdjacencyMatrixEntry(adjacencyMatrix, sourceId, edgeId, targetId);

      if (sourceId != targetId) {
        addAdjacencyMatrixEntry(adjacencyMatrix, targetId, edgeId, sourceId);
      }
    }

    return adjacencyMatrix;
  }

  /**
   * Adds an entry to an adjacency matrix.
   *
   * @param adjacencyMatrix adjacency matrix
   * @param vertexId id of the vertex the entry should be added to
   * @param incidentEdgeId incident edge id
   * @param adjacentVertexId adjacent vertex id
   */
  private void addAdjacencyMatrixEntry(
    Map<Integer, Map<Integer, Set<Integer>>> adjacencyMatrix,
    int vertexId, int incidentEdgeId, int adjacentVertexId) {

    // create entry for target to source
    Map<Integer, Set<Integer>> adjacencyList = adjacencyMatrix.get(vertexId);

    // first visit of this vertex
    if (adjacencyList == null) {

      // create target map
      adjacencyList = Maps.newHashMap();
      adjacencyMatrix.put(vertexId, adjacencyList);
    }

    Set<Integer> incidentEdgeIds = adjacencyList.get(adjacentVertexId);

    // first edge connecting to other vertex
    if (incidentEdgeIds == null) {
      adjacencyList.put(adjacentVertexId, Sets.newHashSet(incidentEdgeId));
    } else {
      incidentEdgeIds.add(incidentEdgeId);
    }
  }

  /**
   * Formats an edge in an adjacency list entry.
   *
   * @param edge edge
   * @param adjacencyListVertexId vertex id of the related adjacency list
   *
   * @return string representation
   */
  private String format(FSMEdge edge, int adjacencyListVertexId) {

    char edgeChar = directed ?
      (edge.getSourceId() == adjacencyListVertexId ?
        OUTGOING_EDGE : INCOMING_EDGE
      ) :
      UNDIRECTED_EDGE;

    return edgeChar + edge.getLabel();
  }
}