MatchingPairs.java

  1. /*
  2.  * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.gradoop.flink.model.impl.operators.matching.common.functions;

  17. import org.apache.flink.api.common.functions.RichFlatJoinFunction;
  18. import org.apache.flink.api.java.functions.FunctionAnnotation;
  19. import org.apache.flink.configuration.Configuration;
  20. import org.apache.flink.util.Collector;
  21. import org.gradoop.common.model.impl.id.GradoopId;
  22. import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
  23. import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
  24. import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
  25. import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithSourceEdgeCandidates;

  26. /**
  27.  * Filters vertex-edge pairs based on their corresponding candidates.
  28.  * <p>
  29.  * Forwarded Fields Second:
  30.  * <ul>
  31.  * <li>edge id: {@code f0}</li>
  32.  * <li>source vertex id: {@code f1}</li>
  33.  * <li>source vertex id: {@code f2->f3}</li>
  34.  * </ul>
  35.  */
  36. @FunctionAnnotation.ForwardedFieldsSecond("f0;f1;f2->f3")
  37. public class MatchingPairs extends RichFlatJoinFunction
  38.   <IdWithCandidates<GradoopId>, TripleWithCandidates<GradoopId>, TripleWithSourceEdgeCandidates<GradoopId>> {

  39.   /**
  40.    * serial version uid
  41.    */
  42.   private static final long serialVersionUID = 42L;
  43.   /**
  44.    * GDL query
  45.    */
  46.   private final String query;
  47.   /**
  48.    * Query handler
  49.    */
  50.   private transient QueryHandler queryHandler;

  51.   /**
  52.    * Reduce instantiations
  53.    */
  54.   private final TripleWithSourceEdgeCandidates<GradoopId> reuseTuple;

  55.   /**
  56.    * Constructor
  57.    *
  58.    * @param query GDL query
  59.    */
  60.   public MatchingPairs(final String query) {
  61.     this.query = query;
  62.     this.reuseTuple = new TripleWithSourceEdgeCandidates<>();
  63.   }

  64.   @Override
  65.   public void open(Configuration parameters) throws Exception {
  66.     super.open(parameters);
  67.     queryHandler = new QueryHandler(query);
  68.   }

  69.   @Override
  70.   public void join(IdWithCandidates<GradoopId> sourceVertex,
  71.     TripleWithCandidates<GradoopId> edge,
  72.     Collector<TripleWithSourceEdgeCandidates<GradoopId>> collector)
  73.       throws Exception {

  74.     boolean[] sourceCandidates = sourceVertex.getCandidates();
  75.     boolean[] edgeCandidates   = edge.getCandidates();

  76.     boolean[] newSourceCandidates = new boolean[sourceCandidates.length];
  77.     boolean[] newEdgeCandidates   = new boolean[edgeCandidates.length];

  78.     boolean pairStillValid = false;

  79.     for (int eQ = 0; eQ < edgeCandidates.length; eQ++) {
  80.       if (edgeCandidates[eQ]) {
  81.         int vQ = queryHandler
  82.           .getEdgeById((long) eQ).getSourceVertexId().intValue();
  83.         if (sourceCandidates[vQ]) {
  84.           newSourceCandidates[vQ] = true;
  85.           newEdgeCandidates[eQ]   = true;
  86.           pairStillValid          = true;
  87.         }
  88.       }
  89.     }

  90.     if (pairStillValid) {
  91.       reuseTuple.setEdgeId(edge.getEdgeId());
  92.       reuseTuple.setSourceId(edge.getSourceId());
  93.       reuseTuple.setSourceCandidates(newSourceCandidates);
  94.       reuseTuple.setTargetId(edge.getTargetId());
  95.       reuseTuple.setEdgeCandidates(newEdgeCandidates);
  96.       collector.collect(reuseTuple);
  97.     }
  98.   }
  99. }