GradoopRootConverter.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.parquet.plain.common;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.MessageType;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.properties.Properties;
import org.gradoop.common.model.impl.properties.PropertyValue;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.LongConsumer;

/**
 * Root parquet group converter for EPGM elements.
 *
 * @param <R> the record type
 */
public abstract class GradoopRootConverter<R> extends GroupConverter {

  /**
   * the current record to be read
   */
  protected R record;

  /**
   * Contains every registered converter by addressed field name.
   */
  protected final Map<String, Converter> converterMap = new HashMap<>();
  /**
   * Array of all converters in-order of the fields based on the given {@link MessageType} during converter
   * creation.
   */
  private final Converter[] converters;

  /**
   * Creates a new root converter based on the given record type ({@link MessageType}).
   *
   * @apiNote the {@link GradoopRootConverter<R>#initializeConverters} methods gets called before the
   * converter array is initialized and should be used to register every available field converter for the
   * request record type
   * @param requestedSchema the record type
   */
  public GradoopRootConverter(MessageType requestedSchema) {
    this.initializeConverters();

    this.converters = new Converter[requestedSchema.getFieldCount()];
    for (int i = 0; i < this.converters.length; i++) {
      String name = requestedSchema.getFieldName(i);

      Converter converter = this.converterMap.get(name);
      if (converter == null) {
        throw new NullPointerException("can't find converter for field: " + name);
      }

      this.converters[i] = converter;
    }
  }

  /**
   * Gets called once during construction to allow for the registration of field converters.
   */
  protected abstract void initializeConverters();

  /**
   * Creates a new record instance.
   *
   * @return the new record instance
   */
  protected abstract R createRecord();

  /**
   * Registers the converter for all fields with the given name.
   *
   * @param name the field name
   * @param converter the converter
   */
  protected void registerConverter(String name, Converter converter) {
    this.converterMap.put(name, converter);
  }

  /**
   * Returns the current record.
   *
   * @return the current record
   */
  public final R getCurrentRecord() {
    return this.record;
  }

  @Override
  public Converter getConverter(int fieldIndex) {
    return this.converters[fieldIndex];
  }

  @Override
  public void start() {
    this.record = this.createRecord();
  }

  @Override
  public void end() {
    // NOOP
  }

  /**
   * Parquet converter for gradoop ids
   */
  protected static class GradoopIdConverter extends PrimitiveConverter {

    /**
     * The consumer for the read gradoop id
     */
    private final Consumer<GradoopId> consumer;

    /**
     * Creates a converter for gradoop ids.
     *
     * @param consumer the read value consumer
     */
    public GradoopIdConverter(Consumer<GradoopId> consumer) {
      this.consumer = consumer;
    }

    @Override
    public void addBinary(Binary value) {
      GradoopId gradoopId = GradoopId.fromByteArray(value.getBytes());
      this.consumer.accept(gradoopId);
    }
  }

  /**
   * Parquet converter for strings
   */
  protected static class StringConverter extends PrimitiveConverter {

    /**
     * The consumer for the read string
     */
    private final Consumer<String> consumer;

    /**
     * Creates a converter for strings.
     *
     * @param consumer the read value consumer
     */
    public StringConverter(Consumer<String> consumer) {
      this.consumer = consumer;
    }

    @Override
    public void addBinary(Binary value) {
      String string = value.toStringUsingUTF8();
      this.consumer.accept(string);
    }
  }

  /**
   * Parquet converter for {@link PropertyValue}
   */
  protected static class PropertyValueConverter extends PrimitiveConverter {

    /**
     * The consumer for the read {@link PropertyValue}
     */
    private final Consumer<PropertyValue> consumer;

    /**
     * Creates a converter for {@link PropertyValue}
     *
     * @param consumer the read value consumer
     */
    public PropertyValueConverter(Consumer<PropertyValue> consumer) {
      this.consumer = consumer;
    }

    @Override
    public void addBinary(Binary value) {
      PropertyValue propertyValue = PropertyValue.fromRawBytes(value.getBytesUnsafe());
      this.consumer.accept(propertyValue);
    }
  }

  /**
   * Parquet converter for longs
   */
  private static class LongValueConverter extends PrimitiveConverter {

    /**
     * The consumer for the read longs
     */
    private final LongConsumer consumer;

    /**
     * Creates a converter for longs.
     *
     * @param consumer the read value consumer
     */
    LongValueConverter(LongConsumer consumer) {
      this.consumer = consumer;
    }

    @Override
    public void addLong(long value) {
      this.consumer.accept(value);
    }
  }

  /**
   * Parquet group converter for parquet spec compliant map key-value pairs
   */
  private static class KeyValueConverter extends GroupConverter {

    /**
     * Notifies the parent converter that the key-value pair got read.
     */
    private final Runnable notifier;

    /**
     * The key converter
     */
    private final Converter keyConverter;
    /**
     * The value converter
     */
    private final Converter valueConverter;

    /**
     * Creates a group converter for parquet spec compliant map key-value pairs.
     *
     * @param notifier the read completion notifier
     * @param keyConverter the key converter
     * @param valueConverter the value converter
     */
    KeyValueConverter(Runnable notifier, Converter keyConverter, Converter valueConverter) {
      this.notifier = notifier;
      this.keyConverter = keyConverter;
      this.valueConverter = valueConverter;
    }

    @Override
    public Converter getConverter(int fieldIndex) {
      if (fieldIndex == 0) {
        return keyConverter;
      } else if (fieldIndex == 1) {
        return valueConverter;
      } else {
        throw new IndexOutOfBoundsException("key_value only consists of two fields: 'key', 'value'");
      }
    }

    @Override
    public void start() {
    }

    @Override
    public void end() {
      this.notifier.run();
    }
  }

  /**
   * Parquet group converter for {@link Properties}
   */
  protected static class PropertiesConverter extends GroupConverter {

    /**
     * The consumer for the read {@link Properties}
     */
    private final Consumer<Properties> consumer;
    /**
     * The key-value pair converter
     */
    private final Converter keyValueConverter;

    /**
     * The current properties
     */
    private Properties properties;
    /**
     * The latest key
     */
    private String key;
    /**
     * The latest value
     */
    private PropertyValue value;

    /**
     * Creates a group converter for {@link Properties}.
     *
     * @param consumer the read value consumer
     */
    public PropertiesConverter(Consumer<Properties> consumer) {
      this.consumer = consumer;

      this.keyValueConverter = new KeyValueConverter(() -> this.properties.set(this.key, this.value),
        new StringConverter(key -> this.key = key), new PropertyValueConverter(value -> this.value = value));
    }

    @Override
    public Converter getConverter(int fieldIndex) {
      if (fieldIndex != 0) {
        throw new IndexOutOfBoundsException("properties only consists of single field: 'key_value'");
      }
      return this.keyValueConverter;
    }

    @Override
    public void start() {
      this.properties = new Properties();
    }

    @Override
    public void end() {
      this.consumer.accept(this.properties);
    }
  }

  /**
   * Parquet group converter for parquet spec compliant list elements
   */
  private static class ListElementConverter extends GroupConverter {

    /**
     * The element's value converter
     */
    private final Converter converter;

    /**
     * Creates a group converter for parquet spec compliant list elements
     *
     * @param converter the element's value converter
     */
    ListElementConverter(Converter converter) {
      this.converter = converter;
    }

    @Override
    public Converter getConverter(int fieldIndex) {
      if (fieldIndex != 0) {
        throw new IndexOutOfBoundsException("list only consists of single field: 'element'");
      }
      return this.converter;
    }

    @Override
    public void start() {
    }

    @Override
    public void end() {
    }
  }

  /**
   * Parquet group converter for {@link GradoopIdSet}
   */
  protected static class GradoopIdSetConverter extends GroupConverter {

    /**
     * The consumer for the read {@link GradoopIdSet}
     */
    private final Consumer<GradoopIdSet> consumer;
    /**
     * The list element converter
     */
    private final Converter converter;

    /**
     * The current gradoop id set
     */
    private GradoopIdSet gradoopIdSet;

    /**
     * Creates a group converter for {@link GradoopIdSet}.
     *
     * @param consumer the read value consumer
     */
    public GradoopIdSetConverter(Consumer<GradoopIdSet> consumer) {
      this.consumer = consumer;

      this.converter = new ListElementConverter(
        new GradoopIdConverter(id -> this.gradoopIdSet.add(id)));
    }

    @Override
    public Converter getConverter(int fieldIndex) {
      if (fieldIndex != 0) {
        throw new IndexOutOfBoundsException("graph_ids only consists of single field: 'list'");
      }
      return this.converter;
    }

    @Override
    public void start() {
      this.gradoopIdSet = new GradoopIdSet();
    }

    @Override
    public void end() {
      this.consumer.accept(this.gradoopIdSet);
    }
  }

  /**
   * Parquet group converter for gradoop time intervals
   */
  protected static class TimeIntervalConverter extends GroupConverter {

    /**
     * The consumer for the read time interval
     */
    private final Consumer<Tuple2<Long, Long>> consumer;

    /**
     * The from time converter
     */
    private final Converter fromConverter;
    /**
     * The to time converter
     */
    private final Converter toConverter;

    /**
     * The current time interval
     */
    private Tuple2<Long, Long> timeInterval;

    /**
     * Creates a group converter for gradoop time intervals
     *
     * @param consumer the read value consumer
     */
    public TimeIntervalConverter(Consumer<Tuple2<Long, Long>> consumer) {
      this.consumer = consumer;

      this.fromConverter = new LongValueConverter(value -> timeInterval.f0 = value);
      this.toConverter = new LongValueConverter(value -> timeInterval.f1 = value);
    }

    @Override
    public Converter getConverter(int fieldIndex) {
      if (fieldIndex == 0) {
        return fromConverter;
      } else if (fieldIndex == 1) {
        return toConverter;
      } else {
        throw new IndexOutOfBoundsException("time interval only consists of two fields: 'from', 'to'");
      }
    }

    @Override
    public void start() {
      this.timeInterval = new Tuple2<>();
    }

    @Override
    public void end() {
      this.consumer.accept(timeInterval);
    }
  }
}