MinimalJSONImporter.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.importer.impl.json;

import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.dataintegration.importer.impl.json.functions.MinimalJsonToVertex;
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.util.Objects;

/**
 * A data importer that creates a graph from JSON objects.
 * The input is expected to be a text file where every line is a valid JSON object.
 * A vertex will be created for each of those objects, properties will be created from
 * the attributes of the JSON object.
 */
public class MinimalJSONImporter implements DataSource {

  /**
   * The path of the JSON file or directory.
   */
  private final String jsonPath;

  /**
   * The config used to access vertex factories and for reading the JSON file.
   */
  private final GradoopFlinkConfig config;

  /**
   * Create a new simple JSON importer.
   *
   * @param jsonPath The path of the JSON file(s).
   * @param config   The config used to read the file(s) and create the graph.
   */
  public MinimalJSONImporter(String jsonPath, GradoopFlinkConfig config) {
    Objects.requireNonNull(jsonPath);
    Objects.requireNonNull(config);
    this.jsonPath = jsonPath;
    this.config = config;
  }

  @Override
  public LogicalGraph getLogicalGraph() {
    DataSet<EPGMVertex> vertices = config.getExecutionEnvironment().readTextFile(jsonPath)
      .map(new MinimalJsonToVertex(config.getLogicalGraphFactory().getVertexFactory()));
    return config.getLogicalGraphFactory().fromDataSets(vertices);
  }

  @Override
  public GraphCollection getGraphCollection() {
    LogicalGraph logicalGraph = getLogicalGraph();
    return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
  }
}