FilterAndProjectTriple.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.filter.functions;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.CNF;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingFactory;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Triple;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * Applies a given predicate on a {@link Triple} and projects specified property values to the
 * output embedding.
 */
public class FilterAndProjectTriple extends RichFlatMapFunction<Triple, Embedding> {
  /**
   * Predicates used for filtering
   */
  private final CNF predicates;
  /**
   * variable of the source vertex
   */
  private final String sourceVariable;
  /**
   * variable of the target vertex
   */
  private final String targetVariable;
  /**
   * Property keys used for value projection of the source vertex
   */
  private final List<String> sourceProjectionPropertyKeys;
  /**
   * Property keys used for value projection of the edge
   */
  private final List<String> edgeProjectionPropertyKeys;
  /**
   * Property keys used for value projection of the target vertex
   */
  private final List<String> targetProjectionPropertyKeys;
  /**
   * Meta data describing the vertex embedding used for filtering
   */
  private final EmbeddingMetaData filterMetaData;
  /**
   * Source vertex propertyKeys of the embedding used for filtering
   */
  private final List<String> sourceFilterPropertyKeys;
  /**
   * Edge propertyKeys of the embedding used for filtering
   */
  private final List<String> edgeFilterPropertyKeys;
  /**
   * Target vertex propertyKeys of the embedding used for filtering
   */
  private final List<String> targetFilterPropertyKeys;

  /**
   * True if vertex and target variable are the same
   */
  private final boolean isLoop;

  /**
   * Set to true if vertex matching strategy is isomorphism
   */
  private final boolean isVertexIso;

  /**
   * New FilterAndProjectTriples
   * @param sourceVariable the source variable
   * @param edgeVariable edge variabe
   * @param targetVariable target variable
   * @param predicates filter predicates
   * @param projectionPropertyKeys property keys used for projection
   * @param vertexMatchStrategy vertex match strategy
   */
  public FilterAndProjectTriple(String sourceVariable, String edgeVariable, String targetVariable,
    CNF predicates, Map<String, List<String>> projectionPropertyKeys,
    MatchStrategy vertexMatchStrategy) {

    this.predicates = predicates;
    this.sourceVariable = sourceVariable;
    this.targetVariable = targetVariable;

    this.sourceProjectionPropertyKeys =
      projectionPropertyKeys.getOrDefault(sourceVariable, new ArrayList<>());
    this.edgeProjectionPropertyKeys =
      projectionPropertyKeys.getOrDefault(edgeVariable, new ArrayList<>());
    this.targetProjectionPropertyKeys =
      projectionPropertyKeys.getOrDefault(targetVariable, new ArrayList<>());

    this.isLoop = sourceVariable.equals(targetVariable);
    this.isVertexIso = vertexMatchStrategy.equals(MatchStrategy.ISOMORPHISM);

    filterMetaData = createFilterMetaData(predicates, sourceVariable, edgeVariable, targetVariable);
    sourceFilterPropertyKeys = filterMetaData.getPropertyKeys(sourceVariable);
    edgeFilterPropertyKeys = filterMetaData.getPropertyKeys(edgeVariable);
    targetFilterPropertyKeys = filterMetaData.getPropertyKeys(targetVariable);

  }

  @Override
  public void flatMap(Triple triple, Collector<Embedding> out) throws Exception {
    boolean isValid = true;

    if (isLoop) {
      if (!(triple.getSourceId().equals(triple.getTargetId()))) {
        isValid = false;
      }
    } else if (isVertexIso && triple.getSourceId().equals(triple.getTargetId())) {
      isValid = false;
    }

    if (isValid && filter(triple)) {
      out.collect(
        EmbeddingFactory.fromTriple(
          triple,
          sourceProjectionPropertyKeys, edgeProjectionPropertyKeys, targetProjectionPropertyKeys,
          sourceVariable, targetVariable
        )
      );
    }
  }

  /**
   * Checks if the the triple holds for the predicate
   * @param triple triple to be filtered
   * @return True if the triple holds for the predicate
   */
  private boolean filter(Triple triple) {
    return predicates.evaluate(
      EmbeddingFactory.fromTriple(triple,
        sourceFilterPropertyKeys,  edgeFilterPropertyKeys, targetFilterPropertyKeys,
        sourceVariable, targetVariable
      ),
      filterMetaData
    );
  }

  /**
   * Creates the {@code EmbeddingMetaData} of the embedding used for filtering
   * @param predicates filter predicates
   * @param sourceVariable source variable
   * @param edgeVariable edge variable
   * @param targetVariable target variable
   * @return filter embedding meta data
   */
  private static EmbeddingMetaData createFilterMetaData(CNF predicates, String sourceVariable,
    String edgeVariable, String targetVariable) {

    EmbeddingMetaData metaData = new EmbeddingMetaData();
    metaData.setEntryColumn(sourceVariable, EmbeddingMetaData.EntryType.VERTEX, 0);
    metaData.setEntryColumn(edgeVariable, EmbeddingMetaData.EntryType.EDGE, 1);
    metaData.setEntryColumn(targetVariable, EmbeddingMetaData.EntryType.VERTEX, 2);

    int i = 0;
    for (String variable : new String[] {sourceVariable, edgeVariable, targetVariable}) {
      for (String propertyKey : predicates.getPropertyKeys(variable)) {
        metaData.setPropertyColumn(variable, propertyKey, i++);
      }
    }

    return metaData;
  }

}