BaseElementIterator.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.storage.accumulo.impl.iterator.tserver;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.util.Pair;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.storage.accumulo.impl.predicate.filter.api.AccumuloElementFilter;
import org.gradoop.storage.accumulo.impl.constants.AccumuloTables;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

/**
 * Accumulo Tablet Server Iterator
 * This Iterator will be created in accumulo tablet server runtime, when executing a partition
 * range query. A Gradoop element iterator will decode query options as query filter, transform
 * multi-rows into epgm element and check if this element should be return by predicate. Each
 * element that fulfill the predicate will be serialized into one row.
 *
 * @param <E> gradoop epgm element
 * @see <a href="https://accumulo.apache.org/1.9/accumulo_user_manual.html#_iterator_design">
 *   accumulo iterator design</a>
 */
public abstract class BaseElementIterator<E extends Element> implements
  SortedKeyValueIterator<Key, Value> {

  /**
   * Origin accumulo data source
   */
  private SortedKeyValueIterator<Key, Value> source;

  /**
   * Mapped element iterator, which will handle "next fulfilled element" instrumentation
   */
  private InnerIterator seekIterator;

  /**
   * Serialized top row
   */
  private Pair<Key, Value> top;

  /**
   * EPGMElement filter predicate
   */
  private AccumuloElementFilter<E> filter;

  /**
   * Deserialize from key-value pair
   *
   * @param pair k-v pair from accumulo row
   * @return gradoop element instance
   * @throws IOException on failure
   */
  @Nonnull
  public abstract E fromRow(@Nonnull Map.Entry<Key, Value> pair) throws IOException;

  /**
   * Serialize record to key-value pair
   *
   * @param record gradoop element instance
   * @return k-v pair as accumulo row
   * @throws IOException on failure
   */
  @Nonnull
  public abstract Pair<Key, Value> toRow(@Nonnull E record) throws IOException;

  /**
   * Read read next element definition from accumulo store.
   * Return null if next element does not fulfill filter formula
   *
   * @param source origin store source
   * @return decoded epgm element
   * @throws IOException on failure
   */
  @Nullable
  public abstract E readLine(
    @Nonnull SortedKeyValueIterator<Key, Value> source
  ) throws IOException;

  /**
   * Get element filter predicate
   *
   * @return iterator element filter
   */
  protected AccumuloElementFilter<E> getFilter() {
    return filter;
  }

  @Override
  public void init(
    final SortedKeyValueIterator<Key, Value> source,
    final Map<String, String> options,
    final IteratorEnvironment env
  ) {
    this.source = source;
    //read filter predicate
    if (options != null && !options.isEmpty() &&
      options.containsKey(AccumuloTables.KEY_PREDICATE)) {
      this.filter = AccumuloElementFilter.decode(options.get(AccumuloTables.KEY_PREDICATE));
    } else {
      this.filter = (AccumuloElementFilter<E>) t -> true;
    }
  }

  @Override
  public boolean hasTop() {
    return top != null;
  }

  @Override
  public void next() throws IOException {
    E topElement = seekIterator.hasNext() ? seekIterator.next() : null;
    top = topElement == null ? null : toRow(topElement);
  }

  @Override
  public void seek(
    final Range range,
    final Collection<ByteSequence> columnFamilies,
    final boolean inclusive
  ) throws IOException {
    //LOG.info("seek range {}", range);
    source.seek(range, new ArrayList<>(), false);
    seekIterator = new InnerIterator();
    next();
  }

  @Override
  public Key getTopKey() {
    return top == null ? null : top.getFirst();
  }

  @Override
  public Value getTopValue() {
    return top == null ? null : top.getSecond();
  }

  @Override
  public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) {
    throw new UnsupportedOperationException("deep copy is not supported!");
  }

  /**
   * Logical iterator for seeking fulfilled element
   */
  private class InnerIterator implements Iterator<E> {

    /**
     * Next element head
     */
    private E head;

    /**
     * Create a new inner iterator instance
     */
    private InnerIterator() {
      this.head = readHead();
    }

    /**
     * Read next head element
     *
     * @return edge row head
     */
    private E readHead() {
      E next;
      do {
        try {
          next = readLine(source);
          if (next != null && !getFilter().test(next)) {
            next = null;
          }
        } catch (IOException err) {
          throw new RuntimeException(err);
        }
      } while (source.hasTop() && next == null);
      return next;
    }

    @Override
    public boolean hasNext() {
      return head != null;
    }

    @Override
    public E next() {
      E result = head;
      head = readHead();
      return result;
    }
  }

}