TLFRecordReader.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.tlf.inputformats;

import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.gradoop.flink.io.impl.tlf.TLFConstants;

import java.io.IOException;

/**
 * TLFRecordReader class to read through a given TLF document to
 * output graph blocks as records which are specified by the start tag and
 * end tag.
 */
public class TLFRecordReader extends RecordReader<LongWritable, Text> {

  /**
   * The start position of the split.
   */
  private final long start;

  /**
   * The end position of the split.
   */
  private final long end;

  /**
   * Input stream which reads the data from the split file.
   */
  private final FSDataInputStream fsin;

  /**
   * Output buffer which writes only needed content.
   */
  private final DataOutputBuffer buffer = new DataOutputBuffer();

  /**
   * The current key.
   */
  private LongWritable currentKey;

  /**
   * The current value.
   */
  private Text currentValue;

  /**
   * The length of the buffer data to be set to the value.
   */
  private int valueLength;

  /**
   * Constructor for the reader which handles TLF splits and
   * initializes the file input stream.
   *
   * @param split the split of the file containing all TLF content
   * @param conf the configuration of the task attempt context
   * @throws IOException on failure
   */
  public TLFRecordReader(FileSplit split, Configuration conf) throws
    IOException {
    // open the file and seek to the start of the split
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();
    FileSystem fs = file.getFileSystem(conf);
    fsin = fs.open(split.getPath());
    fsin.seek(start);
  }

  /**
   * Reads the next key/value pair from the input for processing.
   *
   * @param key the new key
   * @param value the new value
   * @return true if a key/value pair was found
   * @throws IOException on failure
   */
  private boolean next(LongWritable key, Text value) throws IOException {
    if (fsin.getPos() < end &&
      readUntilMatch(TLFConstants.START_TAG.getBytes(Charsets.UTF_8), false)) {
      try {
        buffer.write(TLFConstants.START_TAG.getBytes(Charsets.UTF_8));
        if (readUntilMatch(TLFConstants.END_TAG.getBytes(Charsets.UTF_8), true)) {
          key.set(fsin.getPos());
          if (fsin.getPos() != end) {
            //- end tag because it is the new start tag and shall not be added
            valueLength = buffer.getLength() - TLFConstants.END_TAG.getBytes(Charsets.UTF_8).length;
          } else {
            // in this case there is no new start tag
            valueLength = buffer.getLength();
          }
          //- end tag because it is the new start tag and shall not be added
          value.set(buffer.getData(), 0, valueLength);
          //set the buffer to position before end tag of old graph which is
          // start tag of the new one
          fsin.seek(fsin.getPos() - TLFConstants.END_TAG.getBytes(Charsets.UTF_8).length);
          return true;
        }
      } finally {
        buffer.reset();
      }
    }
    return false;
  }

  /**
   * Reads the split and searches for matches with given 'match byte array'.
   *
   * @param match the match byte to be found
   * @param withinBlock specifies if match is within the graph block
   * @return true if match was found or the end of file was reached, so
   * that the current block can be closed
   * @throws IOException on failure
   */
  private boolean readUntilMatch(byte[] match, boolean withinBlock) throws
    IOException {
    int i = 0;
    while (true) {
      int b = fsin.read();
      // end of file:
      if (b == -1) {
        return true;
      }
      // save to buffer:
      if (withinBlock) {
        buffer.write(b);
      }
      // check if we are matching:
      if (b == match[i]) {
        i++;
        if (i >= match.length) {
          return true;
        }
      } else {
        i = 0;
      }
      // see if we have passed the stop point:
      if (!withinBlock && i == 0 && fsin.getPos() >= end) {
        return false;
      }
    }
  }

  /**
   * Closes open buffers
   *
   * @throws IOException on failure
   */
  @Override
  public void close() throws IOException {
    fsin.close();
    buffer.close();
  }

  /**
   * Returns the current process of input streaming.
   *
   * @return percentage of the completion
   * @throws IOException on failure
   */
  @Override
  public float getProgress() throws IOException {
    return (fsin.getPos() - start) / (float) (end - start);
  }

  /**
   * Returns the current key.
   *
   * @return the current key.
   * @throws IOException on failure
   * @throws InterruptedException if interrupted
   */
  @Override
  public LongWritable getCurrentKey() throws IOException,
    InterruptedException {
    return currentKey;
  }

  /**
   * Returns the current value.
   *
   * @return the current value
   * @throws IOException on failure
   * @throws InterruptedException if interrupted
   */
  @Override
  public Text getCurrentValue() throws IOException, InterruptedException {
    return currentValue;
  }

  /**
   * Called once for initialization.
   *
   * @param split the split of the file containing all TLF content
   * @param context current task attempt context
   * @throws IOException on failure
   * @throws InterruptedException if interrupted
   */
  @Override
  public void initialize(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
  }

  /**
   * Reads the next kex/value pair from the input for processing.
   *
   * @return true if a key/value pair was found
   * @throws IOException on failure
   * @throws InterruptedException if interrupted
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    currentKey = new LongWritable();
    currentValue = new Text();
    return next(currentKey, currentValue);
  }
}