AccumulatePropagatedValues.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.dataintegration.transformation.functions;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.properties.PropertyValue;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
 * This {@link CoGroupFunction} accumulates all properties that might be send to a vertex and
 * stores them in a {@link PropertyValue} list.
 *
 * @param <V> The vertex type.
 */
public class AccumulatePropagatedValues<V extends Vertex>
  implements CoGroupFunction<Tuple2<GradoopId, PropertyValue>, V, V> {

  /**
   * The property key where the PropertyValue list should be stored at the target vertices.
   */
  private final String targetVertexPropertyKey;

  /**
   * Labels of vertices where the propagated property should be set.
   */
  private final Set<String> targetVertexLabels;

  /**
   * The constructor of the co group function for accumulation of collected property values.
   *
   * @param targetVertexPropertyKey The property key where the PropertyValue list should be
   *                                stored at the target vertices.
   * @param targetVertexLabels      The set of labels of elements where the property should be
   *                                set. (Use {@code null} for all vertices.)
   */
  public AccumulatePropagatedValues(String targetVertexPropertyKey,
    Set<String> targetVertexLabels) {
    this.targetVertexPropertyKey = Objects.requireNonNull(targetVertexPropertyKey);
    this.targetVertexLabels = targetVertexLabels;
  }

  @Override
  public void coGroup(Iterable<Tuple2<GradoopId, PropertyValue>> propertyValues,
                      Iterable<V> elements, Collector<V> out) {
    // should only contain one vertex, based on the uniqueness of gradoop ids
    Iterator<V> iterator = elements.iterator();
    if (!iterator.hasNext()) {
      return;
    }
    V targetVertex = iterator.next();
    // If the vertex is not whitelisted by the targetVertexLabels list,
    // forward it without modification.
    if (targetVertexLabels != null && !targetVertexLabels.contains(targetVertex.getLabel())) {
      out.collect(targetVertex);
      return;
    }

    // collect values of neighbors
    List<PropertyValue> values = new ArrayList<>();
    propertyValues.forEach(t -> values.add(t.f1));

    // Add to vertex if and only if at least one property was propagated.
    if (!values.isEmpty()) {
      targetVertex.setProperty(targetVertexPropertyKey, PropertyValue.create(values));
    }

    out.collect(targetVertex);
  }
}