DOTDataSink.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.io.impl.dot;

import org.apache.flink.api.java.io.TextOutputFormat;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.gradoop.flink.io.api.DataSink;
import org.gradoop.flink.io.impl.dot.functions.AbstractDotFileFormat;
import org.gradoop.flink.io.impl.dot.functions.DotFileFormatHtml;
import org.gradoop.flink.io.impl.dot.functions.DotFileFormatSimple;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

import java.io.IOException;
import java.util.Objects;

/**
 * Writes an EPGM representation into one DOT file. The format
 * is documented at {@link DotFileFormatHtml}.
 *
 * For more information see:
 * https://en.wikipedia.org/wiki/DOT_(graph_description_language)
 */
public class DOTDataSink implements DataSink {
  /**
   * Destination path of the dot file
   */
  private final String path;
  /**
   * Flag to print graph head information
   */
  private final boolean graphInformation;

  /**
   * The format in which the graph elements should be written.
   */
  private final DotFormat format;

  /**
   * Creates a new data sink. Path can be local (file://) or HDFS (hdfs://).
   *
   * @param path             dot data file
   * @param graphInformation flag to print graph head information
   */
  public DOTDataSink(String path, boolean graphInformation) {
    this(path, graphInformation, DotFormat.HTML);
  }

  /**
   * Creates a new data sink that uses the specified dot format for output.
   *
   * @param path             dot data file
   * @param graphInformation flag to print graph head information
   * @param format           output format
   */
  public DOTDataSink(String path, boolean graphInformation, DotFormat format) {
    this.path = Objects.requireNonNull(path);
    this.graphInformation = graphInformation;
    this.format = Objects.requireNonNull(format);
  }

  @Override
  public void write(LogicalGraph logicalGraph) throws IOException {
    write(logicalGraph, false);
  }

  @Override
  public void write(GraphCollection graphCollection) throws
    IOException {
    write(graphCollection, false);
  }

  @Override
  public void write(LogicalGraph graph, boolean overwrite) throws IOException {
    write(graph.getCollectionFactory().fromGraph(graph), overwrite);
  }

  @Override
  public void write(GraphCollection graphCollection, boolean overwrite) throws IOException {
    FileSystem.WriteMode writeMode =
      overwrite ? FileSystem.WriteMode.OVERWRITE : FileSystem.WriteMode.NO_OVERWRITE;

    AbstractDotFileFormat dotFileFormat = format.getDotFileFormat(graphInformation);
    GraphvizWriter graphvizWriter = new GraphvizWriter(new Path(path));
    graphvizWriter.setWriteMode(writeMode);

    graphCollection
      .getGraphTransactions()
      .map(dotFileFormat::format)
      .output(graphvizWriter)
      .setParallelism(1);
  }

  /**
   * Write opening and closing lines around strings
   * representing individual {@link GraphTransaction}s in graphviz.
   */
  private static class GraphvizWriter extends TextOutputFormat<String> {

    /**
     * Default class version for serialization.
     */
    private static final long serialVersionUID = 1;

    /**
     * see super constructor.
     *
     * @param outputPath graphviz dot file name
     */
    GraphvizWriter(Path outputPath) {
      super(outputPath);
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
      super.open(taskNumber, numTasks);
      super.writeRecord("digraph {\n");
    }

    @Override
    public void close() throws IOException {
      super.writeRecord("}");
      super.close();
    }
  }

  /**
   * Enumeration of supported dot formats.
   */
  public enum DotFormat {
    /**
     * Format that uses HTML tables to display element data.
     */
    HTML,
    /**
     * Format that uses plain dot formatting.
     */
    SIMPLE;

    /**
     * Returns a subclass of {@link AbstractDotFileFormat} that implements the specified formatting.
     *
     * @param printGraphHeadInformation flag that indicates that the graph head is included
     * @return a subclass of AbstractDotFileFormat
     */
    public AbstractDotFileFormat getDotFileFormat(boolean printGraphHeadInformation) {

      String x11Black = "#000000";

      switch (this) {
      case SIMPLE:
        return new DotFileFormatSimple(printGraphHeadInformation);
      case HTML:
      default:
        return new DotFileFormatHtml(printGraphHeadInformation, x11Black);
      }
    }
  }
}