LPUpdateFunction.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.algorithms.gelly.labelpropagation.functions;

import com.google.common.collect.Lists;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.properties.PropertyValue;

import java.util.Collections;
import java.util.List;

/**
 * Updates the value of a vertex by picking the most frequent value out of
 * all incoming values.
 */
public class LPUpdateFunction
  extends GatherFunction<GradoopId, PropertyValue, PropertyValue> {
  /**
   * Updates the vertex value if it has changed.
   *
   * @param vertex  vertex to be updated
   * @param msg     message
   */
  @Override
  public void updateVertex(Vertex<GradoopId, PropertyValue> vertex,
    MessageIterator<PropertyValue> msg) {
    PropertyValue value = getNewValue(vertex,
      Lists.newArrayList(msg.iterator()));
    if (!vertex.getValue().equals(value)) {
      setNewVertexValue(value);
    }
  }

  /**
   * Returns the new value based on all incoming messages. Depending on the
   * number of messages sent to the vertex, the method returns:
   * <p/>
   * 0 messages:   The current value
   * <p/>
   * 1 message:    The minimum of the message and the current vertex value
   * <p/>
   * >1 messages:  The most frequent of all message values
   * <p/>
   * >1 messages, same frequency: The minimum of the most frequent labels
   *
   * @param vertex      the current vertex
   * @param allMessages all received messages
   * @return most frequent value below all messages
   */
  private PropertyValue getNewValue(Vertex<GradoopId, PropertyValue> vertex,
    List<PropertyValue> allMessages) {

    Collections.sort(allMessages);
    PropertyValue newValue;
    int currentCounter = 1;
    PropertyValue currentValue = allMessages.get(0);
    int maxCounter = 1;
    PropertyValue maxValue = PropertyValue.NULL_VALUE;
    for (int i = 1; i < allMessages.size(); i++) {
      if (currentValue == allMessages.get(i)) {
        currentCounter++;
        if (maxCounter < currentCounter) {
          maxCounter = currentCounter;
          maxValue = currentValue;
        }
      } else {
        currentCounter = 1;
        currentValue = allMessages.get(i);
      }
    }
    // if each label has a frequency of one
    if (maxCounter == 1) {
      // to avoid an oscillating state of the calculation we will just use
      // the smaller value
      newValue = getMinimum(vertex.getValue(), allMessages.get(0));
    } else {
      newValue = maxValue;
    }
    return newValue;
  }

  /**
   * Compares two PropertyValues and returns the smaller one
   *
   * @param v1 PropertyValue 1
   * @param v2 PropertyValue 2
   * @return returns the smaller PropertyValue
   */
  private PropertyValue getMinimum(PropertyValue v1, PropertyValue v2) {
    PropertyValue newValue;
    int compare = v1.compareTo(v2);
    if (compare <= 0) {
      newValue = v1;
    } else {
      newValue = v2;
    }
    return newValue;
  }
}