FilterAndProjectTriple.java

  1. /*
  2.  * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.filter.functions;

  17. import org.apache.flink.api.common.functions.RichFlatMapFunction;
  18. import org.apache.flink.util.Collector;
  19. import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
  20. import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.CNF;
  21. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
  22. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingFactory;
  23. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
  24. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Triple;

  25. import java.util.ArrayList;
  26. import java.util.List;
  27. import java.util.Map;

  28. /**
  29.  * Applies a given predicate on a {@link Triple} and projects specified property values to the
  30.  * output embedding.
  31.  */
  32. public class FilterAndProjectTriple extends RichFlatMapFunction<Triple, Embedding> {
  33.   /**
  34.    * Predicates used for filtering
  35.    */
  36.   private final CNF predicates;
  37.   /**
  38.    * variable of the source vertex
  39.    */
  40.   private final String sourceVariable;
  41.   /**
  42.    * variable of the target vertex
  43.    */
  44.   private final String targetVariable;
  45.   /**
  46.    * Property keys used for value projection of the source vertex
  47.    */
  48.   private final List<String> sourceProjectionPropertyKeys;
  49.   /**
  50.    * Property keys used for value projection of the edge
  51.    */
  52.   private final List<String> edgeProjectionPropertyKeys;
  53.   /**
  54.    * Property keys used for value projection of the target vertex
  55.    */
  56.   private final List<String> targetProjectionPropertyKeys;
  57.   /**
  58.    * Meta data describing the vertex embedding used for filtering
  59.    */
  60.   private final EmbeddingMetaData filterMetaData;
  61.   /**
  62.    * Source vertex propertyKeys of the embedding used for filtering
  63.    */
  64.   private final List<String> sourceFilterPropertyKeys;
  65.   /**
  66.    * Edge propertyKeys of the embedding used for filtering
  67.    */
  68.   private final List<String> edgeFilterPropertyKeys;
  69.   /**
  70.    * Target vertex propertyKeys of the embedding used for filtering
  71.    */
  72.   private final List<String> targetFilterPropertyKeys;

  73.   /**
  74.    * True if vertex and target variable are the same
  75.    */
  76.   private final boolean isLoop;

  77.   /**
  78.    * Set to true if vertex matching strategy is isomorphism
  79.    */
  80.   private final boolean isVertexIso;

  81.   /**
  82.    * New FilterAndProjectTriples
  83.    * @param sourceVariable the source variable
  84.    * @param edgeVariable edge variabe
  85.    * @param targetVariable target variable
  86.    * @param predicates filter predicates
  87.    * @param projectionPropertyKeys property keys used for projection
  88.    * @param vertexMatchStrategy vertex match strategy
  89.    */
  90.   public FilterAndProjectTriple(String sourceVariable, String edgeVariable, String targetVariable,
  91.     CNF predicates, Map<String, List<String>> projectionPropertyKeys,
  92.     MatchStrategy vertexMatchStrategy) {

  93.     this.predicates = predicates;
  94.     this.sourceVariable = sourceVariable;
  95.     this.targetVariable = targetVariable;

  96.     this.sourceProjectionPropertyKeys =
  97.       projectionPropertyKeys.getOrDefault(sourceVariable, new ArrayList<>());
  98.     this.edgeProjectionPropertyKeys =
  99.       projectionPropertyKeys.getOrDefault(edgeVariable, new ArrayList<>());
  100.     this.targetProjectionPropertyKeys =
  101.       projectionPropertyKeys.getOrDefault(targetVariable, new ArrayList<>());

  102.     this.isLoop = sourceVariable.equals(targetVariable);
  103.     this.isVertexIso = vertexMatchStrategy.equals(MatchStrategy.ISOMORPHISM);

  104.     filterMetaData = createFilterMetaData(predicates, sourceVariable, edgeVariable, targetVariable);
  105.     sourceFilterPropertyKeys = filterMetaData.getPropertyKeys(sourceVariable);
  106.     edgeFilterPropertyKeys = filterMetaData.getPropertyKeys(edgeVariable);
  107.     targetFilterPropertyKeys = filterMetaData.getPropertyKeys(targetVariable);

  108.   }

  109.   @Override
  110.   public void flatMap(Triple triple, Collector<Embedding> out) throws Exception {
  111.     boolean isValid = true;

  112.     if (isLoop) {
  113.       if (!(triple.getSourceId().equals(triple.getTargetId()))) {
  114.         isValid = false;
  115.       }
  116.     } else if (isVertexIso && triple.getSourceId().equals(triple.getTargetId())) {
  117.       isValid = false;
  118.     }

  119.     if (isValid && filter(triple)) {
  120.       out.collect(
  121.         EmbeddingFactory.fromTriple(
  122.           triple,
  123.           sourceProjectionPropertyKeys, edgeProjectionPropertyKeys, targetProjectionPropertyKeys,
  124.           sourceVariable, targetVariable
  125.         )
  126.       );
  127.     }
  128.   }

  129.   /**
  130.    * Checks if the the triple holds for the predicate
  131.    * @param triple triple to be filtered
  132.    * @return True if the triple holds for the predicate
  133.    */
  134.   private boolean filter(Triple triple) {
  135.     return predicates.evaluate(
  136.       EmbeddingFactory.fromTriple(triple,
  137.         sourceFilterPropertyKeys,  edgeFilterPropertyKeys, targetFilterPropertyKeys,
  138.         sourceVariable, targetVariable
  139.       ),
  140.       filterMetaData
  141.     );
  142.   }

  143.   /**
  144.    * Creates the {@code EmbeddingMetaData} of the embedding used for filtering
  145.    * @param predicates filter predicates
  146.    * @param sourceVariable source variable
  147.    * @param edgeVariable edge variable
  148.    * @param targetVariable target variable
  149.    * @return filter embedding meta data
  150.    */
  151.   private static EmbeddingMetaData createFilterMetaData(CNF predicates, String sourceVariable,
  152.     String edgeVariable, String targetVariable) {

  153.     EmbeddingMetaData metaData = new EmbeddingMetaData();
  154.     metaData.setEntryColumn(sourceVariable, EmbeddingMetaData.EntryType.VERTEX, 0);
  155.     metaData.setEntryColumn(edgeVariable, EmbeddingMetaData.EntryType.EDGE, 1);
  156.     metaData.setEntryColumn(targetVariable, EmbeddingMetaData.EntryType.VERTEX, 2);

  157.     int i = 0;
  158.     for (String variable : new String[] {sourceVariable, edgeVariable, targetVariable}) {
  159.       for (String propertyKey : predicates.getPropertyKeys(variable)) {
  160.         metaData.setPropertyColumn(variable, propertyKey, i++);
  161.       }
  162.     }

  163.     return metaData;
  164.   }

  165. }