CategoryCharacteristicSubgraphs.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.fsm.transactional;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.FilterOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.transactional.common.TFSMConstants;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSingleEdgeEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSubgraphDecoder;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSSubgraphOnly;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CCSWrapInSubgraphEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryEdgeLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryFrequent;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryFrequentAndInteresting;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryGraphCounts;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryMinFrequencies;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryVertexLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.CategoryWithCount;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.IsCharacteristic;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.LabelOnly;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.ToCCSGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.pojos.CCSGraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.CCSSubgraph;
import org.gradoop.flink.algorithms.fsm.transactional.tle.tuples.CCSSubgraphEmbeddings;
import org.gradoop.flink.algorithms.fsm.transactional.tle.ThinkLikeAnEmbeddingFSMBase;
import org.gradoop.flink.algorithms.fsm.transactional.common.FSMConfig;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.MinEdgeCount;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.WithoutInfrequentEdgeLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.WithoutInfrequentVertexLabels;
import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.IsResult;
import org.gradoop.flink.model.impl.functions.utils.First;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

import java.util.Map;

/**
 * abstract superclass of different implementations of the gSpan frequent
 * subgraph mining algorithm as Gradoop operator
 */
public class CategoryCharacteristicSubgraphs
  extends ThinkLikeAnEmbeddingFSMBase<CCSGraph, CCSSubgraph, CCSSubgraphEmbeddings> {

  /**
   * Property key to store a category association.
   */
  public static final String CATEGORY_KEY = "_category";

  /**
   * minimum frequency for subgraphs to be considered to be frequent
   */
  private DataSet<Map<String, Long>> categoryCounts;

  /**
   * minimum frequency for subgraphs to be considered to be frequent
   */
  private DataSet<Map<String, Long>> categoryMinFrequencies;

  /**
   * interstingness threshold
   */
  private final float minInterestingness;

  /**
   * constructor
   * @param fsmConfig frequent subgraph mining configuration
   * @param minInterestingness minInterestingness threshold
   *
   */
  public CategoryCharacteristicSubgraphs(
    FSMConfig fsmConfig, float minInterestingness) {
    super(fsmConfig);
    this.minInterestingness = minInterestingness;
  }

  /**
   * Executes the algorithm based on transactional representation.
   *
   * @param transactions search space
   * @return category characteristic subgraphs
   */
  public DataSet<GraphTransaction> execute(DataSet<GraphTransaction> transactions) {

    DataSet<CCSGraph>  graphs = transactions
      .map(new ToCCSGraph());

    setCategoryCounts(graphs);

    setMinFrequencies();

    if (fsmConfig.isPreprocessingEnabled()) {
      graphs = preProcessCategories(graphs);
    }

    DataSet<CCSSubgraphEmbeddings> embeddings = graphs
      .flatMap(new CCSSingleEdgeEmbeddings(fsmConfig));

    // ITERATION HEAD
    IterativeDataSet<CCSSubgraphEmbeddings> iterative = embeddings
      .iterate(fsmConfig.getMaxEdgeCount());

    // ITERATION BODY

    // get frequent subgraphs
    DataSet<CCSSubgraphEmbeddings> parentEmbeddings = iterative
      .filter(new IsResult<>(false));

    DataSet<CCSSubgraph> categoryFrequentSubgraphs =
      getCategoryFrequentSubgraphs(parentEmbeddings);

    DataSet<CCSSubgraph> frequentSubgraphs = categoryFrequentSubgraphs
      .groupBy(0)
      .reduceGroup(new First<>());

    parentEmbeddings =
      filterByFrequentSubgraphs(parentEmbeddings, frequentSubgraphs);

    DataSet<CCSSubgraphEmbeddings> childEmbeddings =
      growEmbeddingsOfFrequentSubgraphs(parentEmbeddings, frequentSubgraphs
      );

    DataSet<CCSSubgraphEmbeddings> resultIncrement =
      getCharacteristicSubgraphs(categoryFrequentSubgraphs)
        .map(new CCSWrapInSubgraphEmbeddings());

    DataSet<CCSSubgraphEmbeddings> resultAndEmbeddings = iterative
      .filter(new IsResult<>(true))
      .union(resultIncrement)
      .union(childEmbeddings);

    // ITERATION FOOTER

    DataSet<CCSSubgraph> characteristicSubgraphs = iterative
      .closeWith(resultAndEmbeddings, childEmbeddings)
      .filter(new IsResult<>(true))
      .map(new CCSSubgraphOnly());

    if (fsmConfig.getMinEdgeCount() > 1) {
      characteristicSubgraphs = characteristicSubgraphs
        .filter(new MinEdgeCount<>(fsmConfig));
    }

    return characteristicSubgraphs
      .map(new CCSSubgraphDecoder(config));
  }

  /**
   * Determines the graph count for each category.
   *
   * @param graphs search space
   */
  private void setCategoryCounts(DataSet<CCSGraph> graphs) {
    categoryCounts = graphs
      .map(new CategoryWithCount())
      .groupBy(0)
      .sum(1)
      .reduceGroup(new CategoryGraphCounts());
  }

  /**
   * Determines the min frequency per category based on category graph count
   * and support threshold.
   **/
  private void setMinFrequencies() {
    categoryMinFrequencies = categoryCounts
      .map(new CategoryMinFrequencies(fsmConfig));
  }

  /**
   * Removes vertices and edges showing labels that are infrequent in all
   * categories.
   *
   * @param graphs graph collection
   * @return processed graph collection
   */
  private DataSet<CCSGraph> preProcessCategories(DataSet<CCSGraph> graphs) {
    DataSet<String> frequentVertexLabels = graphs
      .flatMap(new CategoryVertexLabels())
      .groupBy(0, 1)
      .sum(2)
      .filter(new CategoryFrequent())
      .withBroadcastSet(categoryMinFrequencies, DIMSpanConstants.MIN_FREQUENCY)
      .map(new LabelOnly())
      .distinct();

    graphs = graphs
      .map(new WithoutInfrequentVertexLabels<CCSGraph>())
      .withBroadcastSet(
        frequentVertexLabels, TFSMConstants.FREQUENT_VERTEX_LABELS);

    DataSet<String> frequentEdgeLabels = graphs
      .flatMap(new CategoryEdgeLabels())
      .groupBy(0, 1)
      .sum(2)
      .filter(new CategoryFrequent())
      .withBroadcastSet(categoryMinFrequencies, DIMSpanConstants.MIN_FREQUENCY)
      .map(new LabelOnly())
      .distinct();

    graphs = graphs
      .map(new WithoutInfrequentEdgeLabels<CCSGraph>())
      .withBroadcastSet(frequentEdgeLabels, TFSMConstants.FREQUENT_EDGE_LABELS);

    return graphs;
  }

  /**
   * Returns characteristic from category frequent subgraphs.
   *
   * @param frequentSubgraphs subgraphs, frequent in at least one category
   * @return characteristic subgraphs
   */
  private FilterOperator<CCSSubgraph> getCharacteristicSubgraphs(
    DataSet<CCSSubgraph> frequentSubgraphs) {
    return frequentSubgraphs
      .filter(new IsCharacteristic());
  }

  /**
   * Determines frequent subgraphs in a set of embeddings.
   *
   * @param embeddings set of embeddings
   * @return frequent subgraphs
   */
  private DataSet<CCSSubgraph> getCategoryFrequentSubgraphs(
    DataSet<CCSSubgraphEmbeddings> embeddings) {
    return embeddings
      .map(new CCSSubgraphOnly())
      .groupBy(0, 3)
      .sum(1)
      .groupBy(0)
      .reduceGroup(new CategoryFrequentAndInteresting(minInterestingness))
      .withBroadcastSet(categoryCounts, TFSMConstants.GRAPH_COUNT)
      .withBroadcastSet(categoryMinFrequencies, DIMSpanConstants.MIN_FREQUENCY);
  }

}