CSVLineToElement.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.csv.functions;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.metadata.MetaData;
import org.gradoop.common.model.impl.metadata.PropertyMetaData;
import org.gradoop.common.model.impl.properties.Properties;
import org.gradoop.flink.io.impl.csv.CSVConstants;
import org.gradoop.flink.io.impl.csv.CSVDataSource;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaData;
import org.gradoop.flink.io.impl.csv.metadata.CSVMetaDataSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.gradoop.flink.io.impl.csv.CSVConstants.LIST_TEMPLATE;

/**
 * Base class for reading an {@link Element} from CSV. Handles the {@link MetaData} which is
 * required to parse the property values.
 *
 * @param <E> element type
 */
public abstract class CSVLineToElement<E extends Element> extends RichMapFunction<String, E> {
  /**
   * Stores the properties for the {@link Element} to be parsed.
   */
  private final Properties properties;
  /**
   * Meta data that provides parsers for a specific {@link Element}.
   */
  private CSVMetaData metaData;

  /**
   * Constructor
   */
  public CSVLineToElement() {
    this.properties = Properties.create();
  }


  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    this.metaData = new CSVMetaDataSource().fromTuples(getRuntimeContext()
      .getBroadcastVariable(CSVDataSource.BC_METADATA));
  }

  /**
   * Parses the given property values according to the meta data associated with the specified
   * label.
   *
   * @param type                element type
   * @param label               element label
   * @param propertyValueString string representation of elements' property values
   * @return parsed properties
   */
  protected Properties parseProperties(String type, String label, String propertyValueString) {
    String[] propertyValues = StringEscaper
      .split(propertyValueString, CSVConstants.VALUE_DELIMITER);
    List<PropertyMetaData> metaDataList = metaData.getPropertyMetaData(type, label);
    properties.clear();
    for (int i = 0; i < propertyValues.length; i++) {
      if (propertyValues[i].length() > 0) {
        properties.set(metaDataList.get(i).getKey(),
          metaDataList.get(i).getValueParser().apply(propertyValues[i]));
      }
    }
    return properties;
  }

  /**
   * Parses the CSV string that contains EPGMGraphHead ids.
   * The csv string is formatted as a list: {@code "[id0,id1,id3,...]"}
   *
   * @param gradoopIdsString The csv token string.
   * @return gradoop ids contained in the string
   */
  protected GradoopIdSet parseGradoopIds(String gradoopIdsString) {
    // Check for empty id list string and return empty list
    // Splitting the empty list string would result in 1 empty string instead
    // If list string is not empty, parse as GradoopIds
    List<GradoopId> gradoopIds = gradoopIdsString.equals(String.format(LIST_TEMPLATE, "")) ? Collections
      .emptyList() : Arrays.stream(gradoopIdsString
        .substring(1, gradoopIdsString.length() - 1)
        .split(CSVConstants.LIST_DELIMITER))
      .map(String::trim)
      .map(GradoopId::fromString)
      .collect(Collectors.toList());

    return GradoopIdSet.fromExisting(gradoopIds);
  }

  /**
   * Splits the specified string.
   *
   * @param s     string
   * @param limit resulting array length
   * @return tokens
   */
  public String[] split(String s, int limit) {
    return StringEscaper.split(s, CSVConstants.TOKEN_DELIMITER, limit);
  }
}