TemporalGraphFactory.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.temporal.model.impl;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.util.Preconditions;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.EdgeFactory;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayout;
import org.gradoop.flink.model.api.layouts.LogicalGraphLayoutFactory;
import org.gradoop.temporal.model.api.functions.TimeIntervalExtractor;
import org.gradoop.temporal.model.impl.functions.tpgm.EdgeToTemporalEdge;
import org.gradoop.temporal.model.impl.functions.tpgm.GraphHeadToTemporalGraphHead;
import org.gradoop.temporal.model.impl.functions.tpgm.VertexToTemporalVertex;
import org.gradoop.temporal.model.impl.layout.TemporalGraphLayoutFactory;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import org.gradoop.temporal.util.TemporalGradoopConfig;

import java.util.Collection;
import java.util.Map;

/**
 * Responsible for creating instances of {@link TemporalGraph} based on a specific layout.
 */
public class TemporalGraphFactory implements
  BaseGraphFactory<TemporalGraphHead, TemporalVertex, TemporalEdge, TemporalGraph, TemporalGraphCollection> {
  /**
   * The factory to create a temporal layout.
   */
  private LogicalGraphLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> layoutFactory;
  /**
   * Temporal Gradoop configuration.
   */
  private final TemporalGradoopConfig config;

  /**
   * Creates a new temporal graph factory instance.
   *
   * @param config the temporal Gradoop config
   */
  public TemporalGraphFactory(TemporalGradoopConfig config) {
    this.config = Preconditions.checkNotNull(config);
    this.layoutFactory = new TemporalGraphLayoutFactory();
    this.layoutFactory.setGradoopFlinkConfig(config);
  }

  @Override
  public LogicalGraphLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> getLayoutFactory() {
    return layoutFactory;
  }

  @Override
  public void setLayoutFactory(
    LogicalGraphLayoutFactory<TemporalGraphHead, TemporalVertex, TemporalEdge> layoutFactory) {
    this.layoutFactory = layoutFactory;
  }

  /**
   * Creates a temporal graph instance from an existing layout.
   *
   * @param layout the layout that is used to store the graph data of this temporal graph instance
   * @return a temporal graph
   */
  public TemporalGraph fromLayout(
    LogicalGraphLayout<TemporalGraphHead, TemporalVertex, TemporalEdge> layout) {
    return new TemporalGraph(layout, config);
  }

  @Override
  public TemporalGraph fromDataSets(DataSet<TemporalVertex> vertices) {
    return new TemporalGraph(this.layoutFactory.fromDataSets(vertices), config);
  }

  @Override
  public TemporalGraph fromDataSets(DataSet<TemporalVertex> vertices, DataSet<TemporalEdge> edges) {
    return new TemporalGraph(this.layoutFactory.fromDataSets(vertices, edges), config);
  }

  @Override
  public TemporalGraph fromDataSets(DataSet<TemporalGraphHead> graphHead,
    DataSet<TemporalVertex> vertices, DataSet<TemporalEdge> edges) {
    return new TemporalGraph(this.layoutFactory.fromDataSets(graphHead, vertices, edges), config);
  }

  @Override
  public TemporalGraph fromIndexedDataSets(Map<String, DataSet<TemporalGraphHead>> graphHead,
    Map<String, DataSet<TemporalVertex>> vertices, Map<String, DataSet<TemporalEdge>> edges) {
    return new TemporalGraph(this.layoutFactory.fromIndexedDataSets(graphHead, vertices, edges),
      config);
  }

  @Override
  public TemporalGraph fromCollections(TemporalGraphHead graphHead,
    Collection<TemporalVertex> vertices, Collection<TemporalEdge> edges) {
    return new TemporalGraph(this.layoutFactory.fromCollections(graphHead, vertices, edges),
      config);
  }

  @Override
  public TemporalGraph fromCollections(Collection<TemporalVertex> vertices,
    Collection<TemporalEdge> edges) {
    return new TemporalGraph(this.layoutFactory.fromCollections(vertices, edges), config);
  }

  @Override
  public TemporalGraph createEmptyGraph() {
    return new TemporalGraph(this.layoutFactory.createEmptyGraph(), config);
  }

  @Override
  public GraphHeadFactory<TemporalGraphHead> getGraphHeadFactory() {
    return layoutFactory.getGraphHeadFactory();
  }

  @Override
  public VertexFactory<TemporalVertex> getVertexFactory() {
    return layoutFactory.getVertexFactory();
  }

  @Override
  public EdgeFactory<TemporalEdge> getEdgeFactory() {
    return layoutFactory.getEdgeFactory();
  }

  /**
   * Creates a {@link TemporalGraph} instance by the given EPGM graph head, vertex and edge datasets.
   *
   * The method assumes that the given vertices and edges are already assigned to the given
   * graph head.
   *
   * @param graphHead   1-element graph head DataSet
   * @param vertices    vertex DataSet
   * @param edges       edge DataSet
   * @return a temporal graph with default temporal data
   */
  public TemporalGraph fromNonTemporalDataSets(
    DataSet<? extends GraphHead> graphHead,
    DataSet<? extends Vertex> vertices,
    DataSet<? extends Edge> edges) {
    return new TemporalGraph(this.layoutFactory.fromDataSets(
      graphHead.map(new GraphHeadToTemporalGraphHead<>(getGraphHeadFactory())),
      vertices.map(new VertexToTemporalVertex<>(getVertexFactory())),
      edges.map(new EdgeToTemporalEdge<>(getEdgeFactory()))), config);
  }

  /**
   * Creates a {@link TemporalGraph} instance. By the provided timestamp extractors, it is possible
   * to extract temporal information from the data to define a timestamp or time interval that
   * represents the beginning and end of the element's validity (valid time).
   *
   * The method assumes that the given vertices and edges are already assigned to the given graph head.
   *
   * @param graphHead 1-element graph head DataSet
   * @param graphHeadTimeIntervalExtractor extractor to pick the time interval from graph heads
   * @param vertices vertex DataSet
   * @param vertexTimeIntervalExtractor extractor to pick the time interval from vertices
   * @param edges edge DataSet
   * @param edgeTimeIntervalExtractor extractor to pick the time interval from edges
   * @param <G> The graph head type.
   * @param <V> The vertex type.
   * @param <E> The edge type.
   * @return a temporal graph with temporal data extracted using extractor functions
   */
  public <G extends GraphHead, V extends Vertex, E extends Edge> TemporalGraph fromNonTemporalDataSets(
    DataSet<G> graphHead,
    TimeIntervalExtractor<G> graphHeadTimeIntervalExtractor,
    DataSet<V> vertices,
    TimeIntervalExtractor<V> vertexTimeIntervalExtractor,
    DataSet<E> edges,
    TimeIntervalExtractor<E> edgeTimeIntervalExtractor) {

    return new TemporalGraph(this.layoutFactory.fromDataSets(
      graphHead.map(new GraphHeadToTemporalGraphHead<>(getGraphHeadFactory(),
        graphHeadTimeIntervalExtractor)),
      vertices.map(new VertexToTemporalVertex<>(getVertexFactory(),
        vertexTimeIntervalExtractor)),
      edges.map(new EdgeToTemporalEdge<>(getEdgeFactory(),
        edgeTimeIntervalExtractor))), config);
  }

  /**
   * Creates a {@link TemporalGraph} instance from a (non-temporal) base graph.
   *
   * This method calls {@link #fromNonTemporalDataSets(DataSet, DataSet, DataSet)} on the graphs element
   * data sets.
   *
   * @param graph Some base graph.
   * @return The resulting temporal graph.
   */
  public TemporalGraph fromNonTemporalGraph(BaseGraph<?, ?, ?, ?, ?> graph) {
    return fromNonTemporalDataSets(graph.getGraphHead(), graph.getVertices(), graph.getEdges());
  }
}