BuildEmbeddingFromTriple.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.functions;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.query.Step;
import org.gradoop.flink.model.impl.operators.matching.common.query.TraversalCode;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint;

/**
 * Initializes an {@link EmbeddingWithTiePoint} from the given edge triple.
 *
 * @param <K> key type
 */
public class BuildEmbeddingFromTriple<K>
  extends BuildEmbedding<K>
  implements FlatMapFunction<TripleWithCandidates<K>, EmbeddingWithTiePoint<K>> {
  /**
   * Match strategy for the traversal.
   */
  private final MatchStrategy matchStrategy;
  /**
   * Index of the source vertex in the vertex embedding.
   */
  private final int sourceIndex;
  /**
   * Index of the edge in the edge embedding
   */
  private final int edgeIndex;
  /**
   * Index of the target vertex in the vertex mapping.
   */
  private final int targetIndex;
  /**
   * Vertex candidate to continue traversal from
   */
  private int nextFrom;
  /**
   * True, iff the initial step is a loop.
   */
  private boolean isQueryLoop;

  /**
   * Constructor
   *
   * @param keyClazz    key type is needed for array initialization
   * @param traversalCode traversal code for the current query
   * @param matchStrategy strategy used for morphism checks
   * @param vertexCount number of vertices in the query graph
   * @param edgeCount   number of edges in the query graph
   */
  public BuildEmbeddingFromTriple(Class<K> keyClazz, TraversalCode traversalCode,
    MatchStrategy matchStrategy, long vertexCount, long edgeCount) {
    super(keyClazz, vertexCount, edgeCount);

    this.matchStrategy = matchStrategy;
    Step step = traversalCode.getStep(0);

    boolean isOutgoing = step.isOutgoing();
    this.edgeIndex = (int) step.getVia();
    // set source and target index in case of incoming or outgoing traversal
    this.sourceIndex = isOutgoing ? (int) step.getFrom() : (int) step.getTo();
    this.targetIndex = isOutgoing ? (int) step.getTo() : (int) step.getFrom();
    this.isQueryLoop = sourceIndex == targetIndex;

    if (traversalCode.getSteps().size() > 1) {
      nextFrom = (int) traversalCode.getStep(1).getFrom();
    }
  }

  @Override
  public void flatMap(TripleWithCandidates<K> triple, Collector<EmbeddingWithTiePoint<K>> out)
    throws Exception {
    if (isValidTriple(triple)) {
      reuseEmbedding.getEdgeMapping()[edgeIndex] = triple.getEdgeId();
      reuseEmbedding.getVertexMapping()[sourceIndex] = triple.getSourceId();
      reuseEmbedding.getVertexMapping()[targetIndex] = triple.getTargetId();
      reuseEmbeddingWithTiePoint.setTiePointId(reuseEmbedding.getVertexMapping()[nextFrom]);
      out.collect(reuseEmbeddingWithTiePoint);
    }
  }

  /**
   * Checks if the given triple is valid according to the match strategy and query characteristics.
   *
   * @param triple triple to check validity for
   * @return true, iff the triple is a valid initial candidate
   */
  private boolean isValidTriple(TripleWithCandidates<K> triple) {
    return (matchStrategy == MatchStrategy.HOMOMORPHISM) ||
      (isQueryLoop && triple.getSourceId().equals(triple.getTargetId())) ||
      (!isQueryLoop && !triple.getSourceId().equals(triple.getTargetId()));
  }
}