MatchingPairs.java
- /*
- * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.gradoop.flink.model.impl.operators.matching.common.functions;
- import org.apache.flink.api.common.functions.RichFlatJoinFunction;
- import org.apache.flink.api.java.functions.FunctionAnnotation;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.util.Collector;
- import org.gradoop.common.model.impl.id.GradoopId;
- import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
- import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
- import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
- import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithSourceEdgeCandidates;
- /**
- * Filters vertex-edge pairs based on their corresponding candidates.
- * <p>
- * Forwarded Fields Second:
- * <ul>
- * <li>edge id: {@code f0}</li>
- * <li>source vertex id: {@code f1}</li>
- * <li>source vertex id: {@code f2->f3}</li>
- * </ul>
- */
- @FunctionAnnotation.ForwardedFieldsSecond("f0;f1;f2->f3")
- public class MatchingPairs extends RichFlatJoinFunction
- <IdWithCandidates<GradoopId>, TripleWithCandidates<GradoopId>, TripleWithSourceEdgeCandidates<GradoopId>> {
- /**
- * serial version uid
- */
- private static final long serialVersionUID = 42L;
- /**
- * GDL query
- */
- private final String query;
- /**
- * Query handler
- */
- private transient QueryHandler queryHandler;
- /**
- * Reduce instantiations
- */
- private final TripleWithSourceEdgeCandidates<GradoopId> reuseTuple;
- /**
- * Constructor
- *
- * @param query GDL query
- */
- public MatchingPairs(final String query) {
- this.query = query;
- this.reuseTuple = new TripleWithSourceEdgeCandidates<>();
- }
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- queryHandler = new QueryHandler(query);
- }
- @Override
- public void join(IdWithCandidates<GradoopId> sourceVertex,
- TripleWithCandidates<GradoopId> edge,
- Collector<TripleWithSourceEdgeCandidates<GradoopId>> collector)
- throws Exception {
- boolean[] sourceCandidates = sourceVertex.getCandidates();
- boolean[] edgeCandidates = edge.getCandidates();
- boolean[] newSourceCandidates = new boolean[sourceCandidates.length];
- boolean[] newEdgeCandidates = new boolean[edgeCandidates.length];
- boolean pairStillValid = false;
- for (int eQ = 0; eQ < edgeCandidates.length; eQ++) {
- if (edgeCandidates[eQ]) {
- int vQ = queryHandler
- .getEdgeById((long) eQ).getSourceVertexId().intValue();
- if (sourceCandidates[vQ]) {
- newSourceCandidates[vQ] = true;
- newEdgeCandidates[eQ] = true;
- pairStillValid = true;
- }
- }
- }
- if (pairStillValid) {
- reuseTuple.setEdgeId(edge.getEdgeId());
- reuseTuple.setSourceId(edge.getSourceId());
- reuseTuple.setSourceCandidates(newSourceCandidates);
- reuseTuple.setTargetId(edge.getTargetId());
- reuseTuple.setEdgeCandidates(newEdgeCandidates);
- collector.collect(reuseTuple);
- }
- }
- }