CypherTemporalPatternMatching.java

  1. /*
  2.  * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.gradoop.temporal.model.impl.operators.matching.single.cypher;

  17. import com.google.common.collect.Sets;
  18. import org.apache.flink.api.java.DataSet;
  19. import org.apache.log4j.Logger;
  20. import org.gradoop.common.model.api.entities.Element;
  21. import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
  22. import org.gradoop.flink.model.impl.operators.matching.common.PostProcessor;
  23. import org.gradoop.flink.model.impl.operators.matching.single.cypher.debug.PrintEmbedding;
  24. import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.add.AddEmbeddingsElements;
  25. import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.project.ProjectEmbeddingsElements;
  26. import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.QueryPlan;
  27. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
  28. import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
  29. import org.gradoop.temporal.model.impl.TemporalGraph;
  30. import org.gradoop.temporal.model.impl.TemporalGraphCollection;
  31. import org.gradoop.temporal.model.impl.operators.matching.common.query.TemporalQueryHandler;
  32. import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
  33. import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.exceptions.QueryContradictoryException;
  34. import org.gradoop.temporal.model.impl.operators.matching.common.statistics.TemporalGraphStatistics;
  35. import org.gradoop.temporal.model.impl.operators.matching.single.TemporalPatternMatching;
  36. import org.gradoop.temporal.model.impl.operators.matching.single.cypher.functions.ElementsFromEmbeddingTPGM;
  37. import org.gradoop.temporal.model.impl.operators.matching.single.cypher.planning.planner.greedy.GreedyPlanner;
  38. import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
  39. import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
  40. import org.gradoop.temporal.model.impl.pojo.TemporalVertex;

  41. import java.util.ArrayList;
  42. import java.util.Objects;
  43. import java.util.Set;

  44. import static com.google.common.collect.Sets.difference;
  45. import static com.google.common.collect.Sets.intersection;
  46. import static org.gradoop.flink.model.impl.operators.matching.common.debug.Printer.log;

  47. /**
  48.  * Implementation of a query engine based on the Cypher graph query language.
  49.  */
  50. public class CypherTemporalPatternMatching
  51.   extends TemporalPatternMatching<TemporalGraphHead, TemporalGraph, TemporalGraphCollection> {

  52.   /**
  53.    * Logger
  54.    */
  55.   private static final Logger LOG = Logger.getLogger(CypherTemporalPatternMatching.class);
  56.   /**
  57.    * Construction pattern for result transformation.
  58.    */
  59.   private final String constructionPattern;
  60.   /**
  61.    * Morphism strategy for vertex mappings
  62.    */
  63.   private final MatchStrategy vertexStrategy;
  64.   /**
  65.    * Morphism strategy for edge mappings
  66.    */
  67.   private final MatchStrategy edgeStrategy;
  68.   /**
  69.    * Statistics about the data graph
  70.    */
  71.   private final TemporalGraphStatistics graphStatistics;

  72.   /**
  73.    * Instantiates a new operator.
  74.    *
  75.    * @param query           Cypher query string
  76.    * @param attachData      true, if original data shall be attached to the result
  77.    * @param vertexStrategy  morphism strategy for vertex mappings
  78.    * @param edgeStrategy    morphism strategy for edge mappings
  79.    * @param graphStatistics statistics about the data graph
  80.    * @param postprocessor   postprocessing pipeline for the query CNF
  81.    */
  82.   public CypherTemporalPatternMatching(String query, boolean attachData,
  83.                                        MatchStrategy vertexStrategy, MatchStrategy edgeStrategy,
  84.                                        TemporalGraphStatistics graphStatistics,
  85.                                        CNFPostProcessing postprocessor) {
  86.     this(query, null, attachData, vertexStrategy, edgeStrategy, graphStatistics, postprocessor);
  87.   }

  88.   /**
  89.    * Instantiates a new operator.
  90.    *
  91.    * @param query               Cypher query string
  92.    * @param constructionPattern Construction pattern
  93.    * @param attachData          true, if original data shall be attached to the result
  94.    * @param vertexStrategy      morphism strategy for vertex mappings
  95.    * @param edgeStrategy        morphism strategy for edge mappings
  96.    * @param graphStatistics     statistics about the data graph
  97.    * @param postprocessor       postprocessing pipeline for the query CNF
  98.    */
  99.   public CypherTemporalPatternMatching(String query, String constructionPattern, boolean attachData,
  100.                                        MatchStrategy vertexStrategy, MatchStrategy edgeStrategy,
  101.                                        TemporalGraphStatistics graphStatistics,
  102.                                        CNFPostProcessing postprocessor) {
  103.     super(query, attachData, postprocessor, LOG);
  104.     this.constructionPattern = constructionPattern;
  105.     this.vertexStrategy = vertexStrategy;
  106.     this.edgeStrategy = edgeStrategy;
  107.     this.graphStatistics = graphStatistics;
  108.   }

  109.   @Override
  110.   protected TemporalGraphCollection executeForVertex(TemporalGraph graph) {
  111.     return executeForPattern(graph);
  112.   }

  113.   @Override
  114.   protected TemporalGraphCollection executeForPattern(TemporalGraph graph) {
  115.     // Query planning
  116.     TemporalQueryHandler queryHandler = getQueryHandler();
  117.     QueryPlan plan =
  118.       new GreedyPlanner<>(graph, queryHandler, graphStatistics, vertexStrategy, edgeStrategy).plan()
  119.         .getQueryPlan();

  120.     // Query execution
  121.     DataSet<Embedding> embeddings = plan.execute();

  122.     EmbeddingMetaData embeddingMetaData = plan.getRoot().getEmbeddingMetaData();

  123.     embeddings =
  124.       log(embeddings, new PrintEmbedding(embeddingMetaData),
  125.         getVertexMapping(), getEdgeMapping());

  126.     // Pattern construction (if necessary)
  127.     DataSet<Element> finalElements = this.constructionPattern != null ?
  128.       constructFinalElements(graph, embeddings, embeddingMetaData) :
  129.       embeddings.flatMap(
  130.         new ElementsFromEmbeddingTPGM<TemporalGraphHead, TemporalVertex, TemporalEdge>(
  131.           graph.getFactory().getGraphHeadFactory(),
  132.           graph.getFactory().getVertexFactory(),
  133.           graph.getFactory().getEdgeFactory(),
  134.           embeddingMetaData,
  135.           queryHandler.getSourceTargetVariables()));

  136.     // Post processing
  137.     TemporalGraphCollection graphCollection = doAttachData() ?
  138.       PostProcessor.extractGraphCollectionWithData(finalElements, graph, true) :
  139.       PostProcessor.extractGraphCollection(finalElements, graph.getCollectionFactory(), true);

  140.     return graphCollection;
  141.   }

  142.   @Override
  143.   protected TemporalGraphCollection emptyCollection(TemporalGraph graph) {
  144.     return graph.getCollectionFactory().createEmptyCollection();
  145.   }

  146.   /**
  147.    * Method to construct final embedded elements
  148.    *
  149.    * @param graph             Used logical graph
  150.    * @param embeddings        embeddings
  151.    * @param embeddingMetaData Meta information
  152.    * @return New set of EmbeddingElements
  153.    */
  154.   private DataSet<Element> constructFinalElements(TemporalGraph graph, DataSet<Embedding> embeddings,
  155.                                                   EmbeddingMetaData embeddingMetaData) {

  156.     TemporalQueryHandler constructionPatternHandler = null;
  157.     try {
  158.       // no postprocessing needed, construction pattern is not a query
  159.       constructionPatternHandler = new TemporalQueryHandler(
  160.         this.constructionPattern, new CNFPostProcessing(new ArrayList<>()));
  161.       // will never happen, as the construction pattern does not contain conditions
  162.     } catch (QueryContradictoryException e) {
  163.       e.printStackTrace();
  164.     }
  165.     Objects.requireNonNull(constructionPatternHandler).updateGeneratedVariableNames(n -> "_" + n);

  166.     Set<String> queryVars = Sets.newHashSet(embeddingMetaData.getVariables());
  167.     Set<String> constructionVars = constructionPatternHandler.getAllVariables();
  168.     Set<String> existingVars = intersection(queryVars, constructionVars).immutableCopy();
  169.     Set<String> newVars = difference(constructionVars, queryVars).immutableCopy();

  170.     EmbeddingMetaData newMetaData = computeNewMetaData(
  171.       embeddingMetaData, constructionPatternHandler, existingVars, newVars);

  172.     // project existing embedding elements to new embeddings
  173.     ProjectEmbeddingsElements projectedEmbeddings =
  174.       new ProjectEmbeddingsElements(embeddings, existingVars, embeddingMetaData, newMetaData);
  175.     // add new embedding elements
  176.     AddEmbeddingsElements addEmbeddingsElements =
  177.       new AddEmbeddingsElements(projectedEmbeddings.evaluate(), newVars.size());

  178.     return addEmbeddingsElements.evaluate().flatMap(
  179.       new ElementsFromEmbeddingTPGM<>(
  180.         graph.getFactory().getGraphHeadFactory(),
  181.         graph.getFactory().getVertexFactory(),
  182.         graph.getFactory().getEdgeFactory(),
  183.         newMetaData,
  184.         constructionPatternHandler.getSourceTargetVariables(),
  185.         constructionPatternHandler.getLabelsForVariables(newVars)));
  186.   }

  187.   /**
  188.    * Compute new meta information
  189.    *
  190.    * @param metaData             old meta information
  191.    * @param returnPatternHandler pattern handler
  192.    * @param existingVariables    old variables
  193.    * @param newVariables         new variables
  194.    * @return new EmbeddingMetaData
  195.    */
  196.   private EmbeddingMetaData computeNewMetaData(EmbeddingMetaData metaData,
  197.                                                    TemporalQueryHandler returnPatternHandler,
  198.                                                    Set<String> existingVariables, Set<String> newVariables) {
  199.     // update meta data
  200.     EmbeddingMetaData newMetaData = new EmbeddingMetaData();

  201.     // case 1: Filter existing embeddings based on return pattern
  202.     for (String var : existingVariables) {
  203.       newMetaData.setEntryColumn(var, metaData.getEntryType(var), newMetaData.getEntryCount());
  204.     }

  205.     // case 2: Add new vertices and edges
  206.     for (String var : newVariables) {
  207.       EmbeddingMetaData.EntryType type = returnPatternHandler.isEdge(var) ?
  208.         EmbeddingMetaData.EntryType.EDGE :
  209.         EmbeddingMetaData.EntryType.VERTEX;

  210.       newMetaData.setEntryColumn(var, type, newMetaData.getEntryCount());
  211.     }
  212.     return newMetaData;
  213.   }
  214. }