BusinessTransactionGraphs.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.btgs;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.types.NullValue;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.algorithms.btgs.functions.BtgMessenger;
import org.gradoop.flink.algorithms.btgs.functions.BtgUpdater;
import org.gradoop.flink.algorithms.btgs.functions.CollectGradoopIds;
import org.gradoop.flink.algorithms.btgs.functions.ComponentToNewBtgId;
import org.gradoop.flink.algorithms.btgs.functions.MasterData;
import org.gradoop.flink.algorithms.btgs.functions.NewBtgGraphHead;
import org.gradoop.flink.algorithms.btgs.functions.SetBtgId;
import org.gradoop.flink.algorithms.btgs.functions.SetBtgIds;
import org.gradoop.flink.algorithms.btgs.functions.TargetIdBtgId;
import org.gradoop.flink.algorithms.btgs.functions.TransactionalData;
import org.gradoop.flink.algorithms.gelly.functions.VertexToGellyVertexWithGradoopId;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.api.operators.UnaryGraphToCollectionOperator;
import org.gradoop.flink.model.impl.functions.epgm.ExpandGradoopIds;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.ToGellyEdgeWithNullValue;
import org.gradoop.flink.model.impl.functions.tuple.SwitchPair;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;

/**
 * Part of the BIIIG approach.
 * Vertex-centric implementation to isolate business transaction graphs.
 */
public class BusinessTransactionGraphs implements
  UnaryGraphToCollectionOperator {

  /**
   * reserved property key referring to master or transactional data
   */
  public static final String SUPERTYPE_KEY = "superType";
  /**
   * reserved property value to mark master data
   */
  public static final String SUPERCLASS_VALUE_MASTER = "M";
  /**
   * reserved property value to mark transactional data
   */
  public static final String SUPERCLASS_VALUE_TRANSACTIONAL = "T";
  /**
   * reserved label to mark business transaction graphs
   */
  public static final String BTG_LABEL = "BusinessTransactionGraph";
  /**
   * reserved property key referring to the source identifier of vertices
   */
  public static final String SOURCEID_KEY = "sid";

  @Override
  public GraphCollection execute(LogicalGraph iig) {

    DataSet<EPGMVertex> masterVertices = iig.getVertices()
      .filter(new MasterData<>());

    LogicalGraph transGraph = iig
      .vertexInducedSubgraph(new TransactionalData<>());

    DataSet<EPGMVertex> transVertices = transGraph
      .getVertices();

    DataSet<org.apache.flink.graph.Edge<GradoopId, NullValue>> transEdges =
      transGraph.getEdges().map(new ToGellyEdgeWithNullValue());

    Graph<GradoopId, GradoopId, NullValue> gellyTransGraph = Graph.fromDataSet(
      transVertices.map(new VertexToGellyVertexWithGradoopId()),
      transEdges,
      iig.getConfig().getExecutionEnvironment()
    );

    gellyTransGraph = gellyTransGraph
      .getUndirected()
      .runScatterGatherIteration(new BtgMessenger(), new BtgUpdater(), 100);


    DataSet<Tuple2<GradoopId, GradoopIdSet>> btgVerticesMap = gellyTransGraph
      .getVerticesAsTuple2()
      .map(new SwitchPair<>())
      .groupBy(0)
      .reduceGroup(new CollectGradoopIds())
      .map(new ComponentToNewBtgId());

    DataSet<Tuple2<GradoopId, GradoopId>> vertexBtgMap = btgVerticesMap
      .flatMap(new ExpandGradoopIds<>())
      .map(new SwitchPair<>());

    DataSet<EPGMGraphHead> graphHeads = btgVerticesMap
      .map(new Value0Of2<>())
      .map(new NewBtgGraphHead<>(iig.getFactory().getGraphHeadFactory()));

    // filter and update edges
    DataSet<EPGMEdge> btgEdges = iig.getEdges()
      .join(vertexBtgMap)
      .where(new SourceId<>()).equalTo(0)
      .with(new SetBtgId<>());

    // update transactional vertices
    transVertices = transVertices
      .join(vertexBtgMap)
      .where(new Id<>()).equalTo(0)
      .with(new SetBtgId<>());

    // create master data BTG map
    vertexBtgMap = btgEdges
      .map(new TargetIdBtgId<>())
      .join(masterVertices)
      .where(0).equalTo(new Id<>())
      .with(new LeftSide<>())
      .distinct();

    DataSet<Tuple2<GradoopId, GradoopIdSet>> vertexBtgsMap = vertexBtgMap
      .groupBy(0)
      //.combineGroup(new CollectGradoopIds())
      .reduceGroup(new CollectGradoopIds());

    masterVertices = masterVertices.join(vertexBtgsMap)
      .where(new Id<>()).equalTo(0)
      .with(new SetBtgIds<>());

    return iig.getCollectionFactory()
      .fromDataSets(graphHeads, transVertices.union(masterVertices), btgEdges);
  }
}