GradoopReadSupport.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.parquet.plain.common;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;

import java.lang.reflect.Constructor;
import java.util.Map;

/**
 * Simple parquet read support for a {@link GradoopRootConverter<R>}
 *
 * @param <R> the record type
 */
public class GradoopReadSupport<R> extends ReadSupport<R> {

  /**
   * The configuration key for the {@link GradoopRootConverter}
   */
  public static final String GRADOOP_ROOT_CONVERTER_CLASS = "gradoop.parquet.rootConverterClass";

  /**
   * The constructor of the root converter
   */
  private Constructor<? extends GradoopRootConverter> rootConverterConstructor;

  /**
   * Sets the root converter class of a {@link JobContext}.
   *
   * @param job the job's context
   * @param rootConverter the root converter class
   */
  public static void setRootConverter(Job job, Class<? extends GradoopRootConverter<?>> rootConverter) {
    ContextUtil.getConfiguration(job)
      .setClass(GRADOOP_ROOT_CONVERTER_CLASS, rootConverter, GradoopRootConverter.class);
  }

  /**
   * Gets the root converter class of a job's {@link Configuration}.
   *
   * @param configuration the job's configuration
   * @return the specified root converter class or defaults to null
   */
  public static Class<? extends GradoopRootConverter> getRootConverter(Configuration configuration) {
    return configuration.getClass(GRADOOP_ROOT_CONVERTER_CLASS, null, GradoopRootConverter.class);
  }

  @Override
  public ReadContext init(InitContext context) {
    try {
      Class<? extends GradoopRootConverter> rootConverter = getRootConverter(context.getConfiguration());
      this.rootConverterConstructor = rootConverter.getConstructor(MessageType.class);
    } catch (ReflectiveOperationException | SecurityException e) {
      throw new RuntimeException("can't initialize ReadSupport", e);
    }

    return new ReadContext(context.getFileSchema());
  }

  @Override
  public RecordMaterializer<R> prepareForRead(Configuration configuration,
    Map<String, String> keyValueMetaData,
    MessageType fileSchema, ReadContext readContext) {
    try {
      GradoopRootConverter rootConverter = this.rootConverterConstructor
        .newInstance(readContext.getRequestedSchema());
      return new GradoopRecordMaterializer<>(rootConverter);
    } catch (ReflectiveOperationException e) {
      throw new RuntimeException("can't initialize GradoopRecordMaterializer", e);
    }
  }
}