TransactionalFSMBase.java

  1. /*
  2.  * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.gradoop.flink.algorithms.fsm.transactional.tle;

  17. import org.apache.flink.api.java.DataSet;
  18. import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
  19. import org.gradoop.flink.algorithms.fsm.dimspan.functions.mining.Frequent;
  20. import org.gradoop.flink.algorithms.fsm.transactional.common.FSMConfig;
  21. import org.gradoop.flink.algorithms.fsm.transactional.common.TFSMConstants;
  22. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.DropPropertiesAndGraphContainment;
  23. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.EdgeLabels;
  24. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.FilterEdgesByLabel;
  25. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.FilterVerticesByLabel;
  26. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.NotEmpty;
  27. import org.gradoop.flink.algorithms.fsm.transactional.common.functions.VertexLabels;
  28. import org.gradoop.flink.algorithms.fsm.transactional.tle.functions.MinFrequency;
  29. import org.gradoop.flink.model.impl.epgm.GraphCollection;
  30. import org.gradoop.flink.model.api.operators.UnaryCollectionToCollectionOperator;
  31. import org.gradoop.flink.model.impl.functions.tuple.ValueOfWithCount;
  32. import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;
  33. import org.gradoop.flink.model.impl.operators.count.Count;
  34. import org.gradoop.flink.util.GradoopFlinkConfig;

  35. /**
  36.  * Superclass of frequent subgraph implementations in the graph transaction setting.
  37.  */
  38. public abstract class TransactionalFSMBase implements UnaryCollectionToCollectionOperator {

  39.   /**
  40.    * FSM configuration
  41.    */
  42.   protected final FSMConfig fsmConfig;

  43.   /**
  44.    * search space size
  45.    */
  46.   protected DataSet<Long> graphCount;

  47.   /**
  48.    * minimum frequency for patterns to be considered to be frequent
  49.    */
  50.   protected DataSet<Long> minFrequency;

  51.   /**
  52.    * Gradoop configuration
  53.    */
  54.   protected GradoopFlinkConfig config;

  55.   /**
  56.    * Constructor.
  57.    *
  58.    * @param fsmConfig FSM configuration
  59.    */
  60.   public TransactionalFSMBase(FSMConfig fsmConfig) {
  61.     this.fsmConfig = fsmConfig;
  62.   }

  63.   @Override
  64.   public GraphCollection execute(GraphCollection collection)  {
  65.     config = collection.getConfig();

  66.     DataSet<GraphTransaction> input = collection
  67.       .getGraphTransactions();

  68.     DataSet<GraphTransaction> output = execute(input);

  69.     return collection.getFactory()
  70.       .fromTransactions(output);
  71.   }

  72.   /**
  73.    * Executes the algorithm for a dataset of graphs in transactional representation.
  74.    *
  75.    * @param transactions dataset of graphs
  76.    *
  77.    * @return frequent patterns as dataset of graphs
  78.    */
  79.   protected abstract DataSet<GraphTransaction> execute(DataSet<GraphTransaction> transactions);

  80.   /**
  81.    * Triggers the label-frequency base preprocessing
  82.    *
  83.    * @param transactions input
  84.    * @return preprocessed input
  85.    */
  86.   protected DataSet<GraphTransaction> preProcess(DataSet<GraphTransaction> transactions) {
  87.     transactions = transactions
  88.       .map(new DropPropertiesAndGraphContainment());

  89.     this.graphCount = Count
  90.       .count(transactions);

  91.     this.minFrequency = graphCount
  92.       .map(new MinFrequency(fsmConfig));

  93.     DataSet<String> frequentVertexLabels = transactions
  94.       .flatMap(new VertexLabels())
  95.       .groupBy(0)
  96.       .sum(1)
  97.       .filter(new Frequent<>())
  98.       .withBroadcastSet(minFrequency, DIMSpanConstants.MIN_FREQUENCY)
  99.       .map(new ValueOfWithCount<>());

  100.     transactions = transactions
  101.       .map(new FilterVerticesByLabel())
  102.       .withBroadcastSet(frequentVertexLabels, TFSMConstants.FREQUENT_VERTEX_LABELS);

  103.     DataSet<String> frequentEdgeLabels = transactions
  104.       .flatMap(new EdgeLabels())
  105.       .groupBy(0)
  106.       .sum(1)
  107.       .filter(new Frequent<>())
  108.       .withBroadcastSet(minFrequency, DIMSpanConstants.MIN_FREQUENCY)
  109.       .map(new ValueOfWithCount<>());

  110.     transactions = transactions
  111.       .map(new FilterEdgesByLabel())
  112.       .withBroadcastSet(frequentEdgeLabels, TFSMConstants.FREQUENT_EDGE_LABELS);

  113.     transactions = transactions
  114.       .filter(new NotEmpty());

  115.     return transactions;
  116.   }
  117. }