HBaseDataSource.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.storage.hbase.impl.io;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.GraphCollectionFactory;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.tuple.ValueOf1;
import org.gradoop.flink.model.impl.operators.combination.ReduceCombination;
import org.gradoop.flink.util.GradoopFlinkConfig;
import org.gradoop.storage.common.io.FilterableDataSource;
import org.gradoop.storage.common.predicate.query.ElementQuery;
import org.gradoop.storage.hbase.impl.io.inputformats.EdgeTableInputFormat;
import org.gradoop.storage.hbase.impl.io.inputformats.GraphHeadTableInputFormat;
import org.gradoop.storage.hbase.impl.io.inputformats.VertexTableInputFormat;
import org.gradoop.storage.hbase.impl.predicate.filter.api.HBaseElementFilter;
import org.gradoop.storage.hbase.impl.HBaseEPGMStore;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
 * Creates an EPGM instance from HBase.
 *
 * @see FilterableDataSource
 */
public class HBaseDataSource extends HBaseBase
  implements FilterableDataSource<
  ElementQuery<HBaseElementFilter<EPGMGraphHead>>,
  ElementQuery<HBaseElementFilter<EPGMVertex>>,
  ElementQuery<HBaseElementFilter<EPGMEdge>>> {

  /**
   * Query definition for graph head elements
   */
  private final ElementQuery<HBaseElementFilter<EPGMGraphHead>> graphHeadQuery;

  /**
   * Query definition for vertices
   */
  private final ElementQuery<HBaseElementFilter<EPGMVertex>> vertexQuery;

  /**
   * Query definition for edges
   */
  private final ElementQuery<HBaseElementFilter<EPGMEdge>> edgeQuery;

  /**
   * Creates a new HBase data source.
   *
   * @param epgmStore HBase store
   * @param flinkConfig gradoop flink execute config
   */
  public HBaseDataSource(
    @Nonnull HBaseEPGMStore epgmStore,
    @Nonnull GradoopFlinkConfig flinkConfig
  ) {
    this(epgmStore, flinkConfig, null, null, null);
  }

  /**
   * Private constructor to create a data source instance with predicates.
   *
   * @param epgmStore HBase store
   * @param flinkConfig gradoop flink execute config
   * @param graphHeadQuery A predicate to apply to graph head elements
   * @param vertexQuery A predicate to apply to vertices
   * @param edgeQuery A predicate to apply to edges
   */
  private HBaseDataSource(
    @Nonnull HBaseEPGMStore epgmStore,
    @Nonnull GradoopFlinkConfig flinkConfig,
    @Nullable ElementQuery<HBaseElementFilter<EPGMGraphHead>> graphHeadQuery,
    @Nullable ElementQuery<HBaseElementFilter<EPGMVertex>> vertexQuery,
    @Nullable ElementQuery<HBaseElementFilter<EPGMEdge>> edgeQuery
  ) {
    super(epgmStore, flinkConfig);
    this.graphHeadQuery = graphHeadQuery;
    this.vertexQuery = vertexQuery;
    this.edgeQuery = edgeQuery;
  }

  @Override
  public LogicalGraph getLogicalGraph() {
    return getGraphCollection().reduce(new ReduceCombination<>());
  }

  @Override
  public GraphCollection getGraphCollection() {
    GradoopFlinkConfig config = getFlinkConfig();
    GraphCollectionFactory factory = config.getGraphCollectionFactory();
    HBaseEPGMStore store = getStore();

    DataSet<EPGMGraphHead> graphHeads = config.getExecutionEnvironment()
      .createInput(new GraphHeadTableInputFormat(
          getHBaseConfig().getGraphHeadHandler().applyQuery(graphHeadQuery),
          store.getGraphHeadName()),
        new TupleTypeInfo<>(TypeExtractor.createTypeInfo(factory.getGraphHeadFactory().getType())))
      .map(new ValueOf1<>());

    DataSet<EPGMVertex> vertices = config.getExecutionEnvironment()
      .createInput(new VertexTableInputFormat(
          getHBaseConfig().getVertexHandler().applyQuery(vertexQuery),
          store.getVertexTableName()),
        new TupleTypeInfo<>(TypeExtractor.createTypeInfo(factory.getVertexFactory().getType())))
      .map(new ValueOf1<>());

    DataSet<EPGMEdge> edges = config.getExecutionEnvironment()
      .createInput(new EdgeTableInputFormat(
          getHBaseConfig().getEdgeHandler().applyQuery(edgeQuery),
          store.getEdgeTableName()),
        new TupleTypeInfo<>(TypeExtractor.createTypeInfo(factory.getEdgeFactory().getType())))
      .map(new ValueOf1<>());

    return factory.fromDataSets(graphHeads, vertices, edges);
  }

  @Nonnull
  @Override
  public HBaseDataSource applyGraphPredicate(
    @Nonnull ElementQuery<HBaseElementFilter<EPGMGraphHead>> query
  ) {
    return new HBaseDataSource(getStore(), getFlinkConfig(), query, vertexQuery, edgeQuery);
  }

  @Nonnull
  @Override
  public HBaseDataSource applyVertexPredicate(
    @Nonnull ElementQuery<HBaseElementFilter<EPGMVertex>> query
  ) {
    return new HBaseDataSource(getStore(), getFlinkConfig(), graphHeadQuery, query, edgeQuery);
  }

  @Nonnull
  @Override
  public HBaseDataSource applyEdgePredicate(
    @Nonnull ElementQuery<HBaseElementFilter<EPGMEdge>> query
  ) {
    return new HBaseDataSource(getStore(), getFlinkConfig(), graphHeadQuery, vertexQuery, query);
  }

  @Override
  public boolean isFilterPushedDown() {
    return this.graphHeadQuery != null || this.vertexQuery != null || this.edgeQuery != null;
  }
}