
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.gradoop.temporal.model.impl.operators.matching.single.cypher.functions;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.operators.matching.single.PatternMatching;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.utils.ExpandDirection;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

 * Extracts elements from an {@link Embedding}.
 * Is almost identical to
 * {@link org.gradoop.flink.model.impl.operators.matching.single.cypher.functions.ElementsFromEmbedding}
 * extends it for TPGM embeddings.
 * Could be further extended to specify temporal properties like global val/tx in the graph head
 * @param <G> The graph head type.
 * @param <V> The vertex type.
 * @param <E> The edge type.
public class ElementsFromEmbeddingTPGM<
  G extends TemporalGraphHead,
  V extends TemporalVertex,
  E extends TemporalEdge> implements FlatMapFunction<Embedding, Element> {
   * Constructs temporal graph heads
  private final GraphHeadFactory<G> graphHeadFactory;
   * Constructs temporal vertices
  private final VertexFactory<V> vertexFactory;
   * Constructs temporal edges
  private final EdgeFactory<E> edgeFactory;
   * Describes the embedding content
  private final EmbeddingMetaData metaData;
   * Source vertex variables by edge id
  private final Map<String, Pair<String, String>> sourceTargetVariables;
   * Stores the mapping between query variable and element id. The mapping is added as a property
   * to the graph head representing an embedding.
  private final Map<PropertyValue, PropertyValue> variableMapping;
   * Stores the identifiers that have already been processed.
  private final Set<GradoopId> processedIds;
   * Stores the mapping between return pattern variables and its labels
  private final Map<String, String> labelMapping;

   * Constructor.
   * @param graphHeadFactory      temporal graph head factory
   * @param vertexFactory         temporal vertex factory
   * @param edgeFactory           temporal edge factory
   * @param embeddingMetaData     meta data for the TPGM embedding
   * @param sourceTargetVariables source and target vertex variables by edge variable
  public ElementsFromEmbeddingTPGM(GraphHeadFactory<G> graphHeadFactory,
                                   VertexFactory<V> vertexFactory, EdgeFactory<E> edgeFactory,
                                   EmbeddingMetaData embeddingMetaData,
                                   Map<String, Pair<String, String>> sourceTargetVariables) {
    this(graphHeadFactory, vertexFactory, edgeFactory, embeddingMetaData,
      sourceTargetVariables, Maps.newHashMapWithExpectedSize(0));

   * Constructor.
   * @param graphHeadFactory      temporal graph head factory
   * @param vertexFactory         temporal vertex factory
   * @param edgeFactory           temporal edge factory
   * @param embeddingMetaData     meta data for the TPGM embedding
   * @param sourceTargetVariables source and target vertex variables by edge variable
   * @param labelMapping          mapping between newElementVariables and its labels
  public ElementsFromEmbeddingTPGM(GraphHeadFactory<G> graphHeadFactory,
                                   VertexFactory<V> vertexFactory,
                                   EdgeFactory<E> edgeFactory,
                                   EmbeddingMetaData embeddingMetaData,
                                   Map<String, Pair<String, String>> sourceTargetVariables,
                                   Map<String, String> labelMapping) {
    this.graphHeadFactory = graphHeadFactory;
    this.vertexFactory = vertexFactory;
    this.edgeFactory = edgeFactory;
    this.metaData = embeddingMetaData;
    this.sourceTargetVariables = sourceTargetVariables;
    this.labelMapping = labelMapping;
    this.variableMapping = new HashMap<>(embeddingMetaData.getEntryCount());
    this.processedIds = new HashSet<>(embeddingMetaData.getEntryCount());

  public void flatMap(Embedding embedding, Collector<Element> out) throws Exception {
    // clear for each embedding

    // create graph head for this embedding
    TemporalGraphHead graphHead = graphHeadFactory.createGraphHead();

    // vertices
    for (String vertexVariable : metaData.getVertexVariables()) {
      GradoopId id = embedding.getId(metaData.getEntryColumn(vertexVariable));

      if (labelMapping.containsKey(vertexVariable)) {
        String label = labelMapping.get(vertexVariable);
        initVertexWithData(out, graphHead, id, label);
      } else {
        initVertex(out, graphHead, id);
      variableMapping.put(PropertyValue.create(vertexVariable), PropertyValue.create(id));

    // edges
    GradoopId edgeId;
    GradoopId sourceId;
    GradoopId targetId;
    // Long[] timeData;
    for (String edgeVariable : metaData.getEdgeVariables()) {
      edgeId = embedding.getId(metaData.getEntryColumn(edgeVariable));
      sourceId = embedding.getId(
      targetId = embedding.getId(

      if (labelMapping.containsKey(edgeVariable)) {
        String label = labelMapping.get(edgeVariable);
        initEdgeWithData(out, graphHead, edgeId, sourceId, targetId, label);
      } else {
        initEdge(out, graphHead, edgeId, sourceId, targetId);
      variableMapping.put(PropertyValue.create(edgeVariable), PropertyValue.create(edgeId));

    // paths (copied from EPGM)

    for (String pathVariable : metaData.getPathVariables()) {
      ExpandDirection direction = metaData.getDirection(pathVariable);
      List<GradoopId> path = embedding.getIdList(metaData.getEntryColumn(pathVariable));
      List<PropertyValue> mappingValue = new ArrayList<>(path.size());
      for (int i = 0; i < path.size(); i += 2) {
        edgeId = path.get(i);

        if (direction == ExpandDirection.OUT) {
          sourceId = i > 0 ?
            path.get(i - 1) :

          targetId = i < path.size() - 1 ?
            path.get(i + 1) :

          if (i + 2 < path.size()) {
        } else {
          sourceId = i < path.size() - 1 ?
            path.get(i + 1) :

          targetId = i > 0 ?
            path.get(i - 1) :

          if (i > 0) {

        initVertex(out, graphHead, sourceId);
        initVertex(out, graphHead, targetId);
        initEdge(out, graphHead, edgeId, sourceId, targetId);
      variableMapping.put(PropertyValue.create(pathVariable), PropertyValue.create(mappingValue));

    graphHead.setProperty(PatternMatching.VARIABLE_MAPPING_KEY, variableMapping);


   * Initializes an vertex using the specified parameters
   * @param out       flat map collector
   * @param graphHead temporal graph head to assign vertex to
   * @param vertexId  vertex identifier
  private void initVertex(Collector<Element> out, TemporalGraphHead graphHead,
                          GradoopId vertexId) {
    initVertexWithData(out, graphHead, vertexId, null);

   * Initializes an vertex using the specified parameters and adds its label
   * if the given vertex was created for the return pattern.
   * @param out       flat map collector
   * @param graphHead temporal graph head to assign vertex to
   * @param vertexId  vertex identifier
   * @param label     label associated with vertex
  private void initVertexWithData(Collector<Element> out, TemporalGraphHead graphHead,
                                  GradoopId vertexId,
                                  String label) {
    if (!processedIds.contains(vertexId)) {
      TemporalVertex v = vertexFactory.initVertex(vertexId);

   * Initializes an edge using the specified parameters.
   * @param out       flat map collector
   * @param graphHead temporal graph head to assign edge to
   * @param edgeId    edge identifier
   * @param sourceId  source vertex identifier
   * @param targetId  target vertex identifier
  private void initEdge(Collector<Element> out, TemporalGraphHead graphHead,
                        GradoopId edgeId, GradoopId sourceId, GradoopId targetId) {
    initEdgeWithData(out, graphHead, edgeId, sourceId, targetId, null);

   * Initializes an edge using the specified parameters and adds its label
   * if the given edge was created for return pattern
   * @param out       flat map collector
   * @param graphHead temporal graph head to assign edge to
   * @param edgeId    edge identifier
   * @param sourceId  source vertex identifier
   * @param targetId  target vertex identifier
   * @param label     label associated with edge
  private void initEdgeWithData(Collector<Element> out, TemporalGraphHead graphHead,
                                GradoopId edgeId, GradoopId sourceId, GradoopId targetId,
                                String label) {
    if (!processedIds.contains(edgeId)) {
      TemporalEdge e = edgeFactory.initEdge(edgeId, sourceId, targetId);