DFSCodeToEPGMGraphTransaction.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.fsm.dimspan.functions.conversion;

import com.google.common.collect.Sets;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConfig;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DIMSpanConstants;
import org.gradoop.flink.algorithms.fsm.dimspan.config.DataflowStep;
import org.gradoop.flink.algorithms.fsm.dimspan.model.GraphUtils;
import org.gradoop.flink.algorithms.fsm.dimspan.model.GraphUtilsBase;
import org.gradoop.flink.algorithms.fsm.dimspan.model.Simple16Compressor;
import org.gradoop.flink.model.impl.tuples.WithCount;
import org.gradoop.flink.model.impl.layouts.transactional.tuples.GraphTransaction;

import java.util.Set;

/**
 * {@code int-array encoded graph => Gradoop Graph Transaction}
 */
public class DFSCodeToEPGMGraphTransaction extends RichMapFunction<WithCount<int[]>, GraphTransaction> {

  /**
   * flag to enable decompression of patterns (true=enabled)
   */
  private final boolean uncompressPatterns;
  /**
   * frequent vertex labels
   */
  private String[] vertexDictionary;

  /**
   * frequent vertex labels
   */
  private String[] edgeDictionary;

  /**
   * utils to interpret and manipulate integer encoded graphs
   */
  private final GraphUtils graphUtils = new GraphUtilsBase();

  /**
   * Input graph collection size
   */
  private long graphCount;

  /**
   * Constructor.
   *
   * @param fsmConfig FSM configuration.
   */
  public DFSCodeToEPGMGraphTransaction(DIMSpanConfig fsmConfig) {
    this.uncompressPatterns = !fsmConfig.getPatternCompressionInStep().equals(DataflowStep.WITHOUT);
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    super.open(parameters);

    vertexDictionary = getRuntimeContext()
      .<String[]>getBroadcastVariable(DIMSpanConstants.VERTEX_DICTIONARY).get(0);

    edgeDictionary = getRuntimeContext()
      .<String[]>getBroadcastVariable(DIMSpanConstants.EDGE_DICTIONARY).get(0);


    graphCount = getRuntimeContext()
      .<Long>getBroadcastVariable(DIMSpanConstants.GRAPH_COUNT).get(0);
  }

  @Override
  public GraphTransaction map(WithCount<int[]> patternWithCount) throws Exception {

    int[] pattern = patternWithCount.getObject();

    if (uncompressPatterns) {
      pattern = Simple16Compressor.uncompress(pattern);
    }

    long frequency = patternWithCount.getCount();


    // GRAPH HEAD
    EPGMGraphHead graphHead = new EPGMGraphHead(GradoopId.get(), "", null);
    graphHead.setLabel(DIMSpanConstants.FREQUENT_PATTERN_LABEL);
    graphHead.setProperty(DIMSpanConstants.SUPPORT_KEY, (float) frequency / graphCount);

    GradoopIdSet graphIds = GradoopIdSet.fromExisting(graphHead.getId());

    // VERTICES
    int[] vertexLabels = graphUtils.getVertexLabels(pattern);

    GradoopId[] vertexIds = new GradoopId[vertexLabels.length];

    Set<EPGMVertex> vertices = Sets.newHashSetWithExpectedSize(vertexLabels.length);

    int intId = 0;
    for (int intLabel : vertexLabels) {
      String label = vertexDictionary[intLabel];

      GradoopId gradoopId = GradoopId.get();
      vertices.add(new EPGMVertex(gradoopId, label, null, graphIds));

      vertexIds[intId] = gradoopId;
      intId++;
    }

    // EDGES
    Set<EPGMEdge> edges = Sets.newHashSet();

    for (int edgeId = 0; edgeId < graphUtils.getEdgeCount(pattern); edgeId++) {
      String label = edgeDictionary[graphUtils.getEdgeLabel(pattern, edgeId)];

      GradoopId sourceId;
      GradoopId targetId;

      if (graphUtils.isOutgoing(pattern, edgeId)) {
        sourceId = vertexIds[graphUtils.getFromId(pattern, edgeId)];
        targetId = vertexIds[graphUtils.getToId(pattern, edgeId)];
      } else {
        sourceId = vertexIds[graphUtils.getToId(pattern, edgeId)];
        targetId = vertexIds[graphUtils.getFromId(pattern, edgeId)];
      }

      edges.add(new EPGMEdge(GradoopId.get(), label, sourceId, targetId, null, graphIds));
    }

    return new GraphTransaction(graphHead, vertices, edges);
  }
}