MinimalCSVImporter.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.importer.impl.csv;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.dataintegration.importer.impl.csv.functions.CsvRowToProperties;
import org.gradoop.dataintegration.importer.impl.csv.functions.PropertiesToVertex;
import org.gradoop.flink.io.api.DataSource;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.util.GradoopFlinkConfig;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;

/**
 * Read a csv file and import each row as a vertex in EPGM representation.
 */
public class MinimalCSVImporter implements DataSource {

  /**
   * Token delimiter of the CSV file.
   */
  private final String tokenSeparator;

  /**
   * Path to the csv file.
   */
  private final String path;

  /**
   * The charset used in the csv file, e.g., "UTF-8".
   */
  private String charset = "UTF-8";

  /**
   * The property names for all columns in the file. If {@code null}, the first line will be
   * interpreted as header row.
   */
  private final List<String> columnNames;

  /**
   * Flag to specify if each row of the file should be checked for reoccurring of the column property names.
   */
  private boolean checkReoccurringHeader = false;

  /**
   * Flag to specify if quoted strings should be considered. Will escape delimiters inside quoted strings.
   */
  private boolean parseQuotedStrings = false;

  /**
   * Character used to quote strings.
   */
  private char quoteCharacter = '"';

  /**
   * Gradoop Flink configuration.
   */
  private final GradoopFlinkConfig config;

  /**
   * Create a new MinimalCSVImporter instance by the given parameters.
   *
   * @param path the path to the csv file
   * @param tokenSeparator the token delimiter of the csv file
   * @param config GradoopFlinkConfig configuration
   * @param columnNames property identifiers for each column
   */
  public MinimalCSVImporter(String path, String tokenSeparator, GradoopFlinkConfig config,
                            List<String> columnNames) {
    this.path = Objects.requireNonNull(path);
    this.tokenSeparator = Objects.requireNonNull(tokenSeparator);
    this.config = Objects.requireNonNull(config);
    this.columnNames = columnNames;
  }

  /**
   * Create a new MinimalCSVImporter instance by the given parameters. The first line of the file
   * will set as the property names for each column.
   *
   * @param path the path to the csv file
   * @param tokenSeparator the token delimiter of the csv file
   * @param config GradoopFlinkConfig configuration
   */
  public MinimalCSVImporter(String path, String tokenSeparator, GradoopFlinkConfig config) {
    this(path, tokenSeparator, config, null);
  }

  /**
   * Set charset of CSV file. UTF-8 is used as default.
   *
   * @param charset the charset used in the csv file, e.g., "UTF-8"
   * @return this
   */
  public MinimalCSVImporter setCharset(String charset) {
    this.charset = charset;
    return this;
  }

  /**
   * Set checkReoccurringHeader flag.
   * Each row of the file will be checked for reoccurrence of the column property names.
   *
   * @return this
   */
  public MinimalCSVImporter checkReoccurringHeader() {
    this.checkReoccurringHeader = true;
    return this;
  }

  /**
   * Set parseQuotedStrings flag. Delimiters in quoted fields will be escaped.
   *
   * @return this
   */
  public MinimalCSVImporter parseQuotedStrings() {
    this.parseQuotedStrings = true;
    return this;
  }

  /**
   * Set quoteCharacter.
   *
   * @param quoteCharacter character used to quote fields
   * @return this
   */
  public MinimalCSVImporter quoteCharacter(char quoteCharacter) {
    this.quoteCharacter = quoteCharacter;
    return this;
  }

  /**
   * Import each row of the file as a vertex and create a logical graph from it.
   * If no column property names are set, read the first line of the file as header and set this
   * values as column names.
   *
   * @return a logical graph with a vertex per csv line and no edges
   * @throws IOException on failure
   */
  @Override
  public LogicalGraph getLogicalGraph() throws IOException {
    DataSet<EPGMVertex> vertices;

    if (columnNames == null) {
      vertices = readCSVFile(readHeaderRow(), checkReoccurringHeader);
    } else {
      vertices = readCSVFile(columnNames, checkReoccurringHeader);
    }

    return config.getLogicalGraphFactory().fromDataSets(vertices);
  }

  @Override
  public GraphCollection getGraphCollection() throws IOException {
    LogicalGraph logicalGraph = getLogicalGraph();
    return logicalGraph.getCollectionFactory().fromGraph(logicalGraph);
  }

  /**
   * Reads the csv file specified by {@link MinimalCSVImporter#path} and converts each valid line
   * to a {@link EPGMVertex}.
   *
   * @param propertyNames list of the property identifier names
   * @param checkReoccurringHeader set to true if each row of the file should be checked for
   *                               reoccurring of the column property names
   * @return a {@link DataSet} of all vertices from one specific file
   */
  private DataSet<EPGMVertex> readCSVFile(List<String> propertyNames, boolean checkReoccurringHeader) {
    Class<String>[] types = IntStream.range(0, propertyNames.size())
      .mapToObj(i -> String.class)
      .<Class<String>>toArray(Class[]::new);

    return getCSVReader(config.getExecutionEnvironment(), path, types)
      .flatMap(new CsvRowToProperties<>(propertyNames, checkReoccurringHeader))
      .filter(p -> !p.isEmpty())
      .map(new PropertiesToVertex<>(config.getLogicalGraphFactory().getVertexFactory()))
      .returns(config.getLogicalGraphFactory().getVertexFactory().getType());
  }

  /**
   * Create CSVReader for the provided types.
   * Flinks CSVReader builder does not support tuples of dynamic length.
   *
   * @param env execution environment
   * @param filename path of csv file
   * @param fieldTypes array of types
   * @return csv reader returning a tuple of fieldTypes
   */
  private org.apache.flink.api.java.operators.DataSource<Tuple> getCSVReader(ExecutionEnvironment env,
    String filename, Class<?>[] fieldTypes) {
    TupleTypeInfo<Tuple> typeInfo = TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(fieldTypes);
    TupleCsvInputFormat<Tuple> inputFormat = new TupleCsvInputFormat<>(
      new org.apache.flink.core.fs.Path(filename), typeInfo);
    inputFormat.setCharset(this.charset);
    inputFormat.setFieldDelimiter(this.tokenSeparator);
    inputFormat.setSkipFirstLineAsHeader(false);
    if (this.parseQuotedStrings) {
      inputFormat.enableQuotedStringParsing(this.quoteCharacter);
    }
    return new org.apache.flink.api.java.operators.DataSource<>(env, inputFormat, typeInfo,
      Utils.getCallLocationName());
  }

  /**
   * Reads the fist row of a csv file and creates a list including all column entries that
   * will be used as property names.
   *
   * @return the property names
   * @throws IOException if an error occurred while open the stream
   */
  private List<String> readHeaderRow() throws IOException {
    Path filePath = new Path(path);
    try (FileSystem fs = FileSystem.get(filePath.toUri(), new Configuration())) {
      FSDataInputStream inputStream = fs.open(filePath);
      BufferedReader lineReader = new BufferedReader(new InputStreamReader(inputStream, charset));
      String headerLine = lineReader.readLine();
      lineReader.close();
      if (headerLine == null || headerLine.isEmpty()) {
        throw new IOException("The csv file '" + path + "' does not contain any rows.");
      }
      return Arrays.asList(headerLine.split(tokenSeparator));
    } catch (IOException ioe) {
      throw new IOException("Error while opening a stream to '" + path + "'.", ioe);
    }
  }
}