LabelGroup.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.grouping.tuples;

import org.apache.flink.api.java.tuple.Tuple5;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.common.model.impl.properties.PropertyValueList;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.impl.operators.aggregation.functions.AggregateUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.stream.Collectors;

/**
 * Stores grouping keys and aggregates for a specific label.
 *
 * <pre>
 * f0: grouping label
 * f1: group label
 * f2: property keys
 * f3: aggregate functions
 * f4: aggregate values
 * </pre>
 */
public class LabelGroup
  extends Tuple5<String, String, List<String>, List<AggregateFunction>, List<PropertyValue>> {

  /**
   * Default constructor.
   */
  public LabelGroup() {
    this(null, null);
  }

  /**
   * Constructor to only define the label.
   *
   * @param groupingLabel label used for grouping
   * @param groupLabel label used after grouping
   */
  public LabelGroup(String groupingLabel, String groupLabel) {
    this(groupingLabel, groupLabel, new ArrayList<>(), new ArrayList<>());
  }

  /**
   * Constructor with varargs.
   *
   * @param groupingLabel label used for grouping
   * @param groupLabel label used after grouping
   * @param propertyKeys variable amount of grouping keys for the label
   * @param aggregators aggregate functions
   */
  public LabelGroup(
    String groupingLabel, String groupLabel,
    List<String> propertyKeys,
    List<AggregateFunction> aggregators) {
    super(groupingLabel, groupLabel, propertyKeys, aggregators, new ArrayList<>());
  }

  /**
   * Returns the grouping label
   *
   * @return grouping label
   */
  public String getGroupingLabel() {
    return f0;
  }

  /**
   * Sets the grouping label
   *
   * @param label grouping label
   */
  public void setGroupingLabel(String label) {
    f0 = label;
  }

  /**
   * Returns the group label
   *
   * @return group label
   */
  public String getGroupLabel() {
    return f1;
  }

  /**
   * Sets the group label
   *
   * @param label group label
   */
  public void setGroupLabel(String label) {
    f1 = label;
  }

  /**
   * Returns the property keys
   *
   * @return list of property keys
   */
  public List<String> getPropertyKeys() {
    return f2;
  }

  /**
   * Sets the property keys
   *
   * @param propertyKeys list of property keys
   */
  public void setPropertyKeys(List<String> propertyKeys) {
    f2 = propertyKeys;
  }

  /**
   * Adds a property key to the current list of keys.
   *
   * @param propertyKey property key as string
   */
  public void addPropertyKey(String propertyKey) {
    f2.add(propertyKey);
  }

  /**
   * Returns the aggregate functions as list
   *
   * @return list of aggregate functions
   */
  public List<AggregateFunction> getAggregateFunctions() {
    return f3;
  }

  /**
   * Sets the aggregate functions
   *
   * @param aggregateFunctions aggregate functions
   */
  public void setAggregateFunctions(List<AggregateFunction> aggregateFunctions) {
    f3 = aggregateFunctions;
  }

  /**
   * Adds an aggregate function to the current list of aggregators.
   *
   * @param aggregateFunction property value aggregate function
   */
  public void addAggregateFunction(AggregateFunction aggregateFunction) {
    f3.add(aggregateFunction);
  }

  /**
   * Returns the aggregate values as list
   *
   * @return aggregate values
   */
  public List<PropertyValue> getAggregateValues() {
    if (f4.size() < f3.size()) {
      return f3.stream().map(AggregateUtil::getDefaultAggregate).collect(Collectors.toList());
    }
    return f4;
  }

  /**
   * Sets the aggregate values
   *
   * @param aggregateValues list of aggregate values
   */
  public void setAggregateValues(List<PropertyValue> aggregateValues) {
    f4 = aggregateValues;
  }

  /**
   * Sets the aggregate values
   *
   * @param aggregateValues aggregate value list
   */
  public void setAggregateValues(PropertyValueList aggregateValues) {
    List<PropertyValue> aggregate = new ArrayList<>();
    aggregateValues.iterator().forEachRemaining(aggregate::add);
    setAggregateValues(aggregate);
  }

  /**
   * Resets the current aggregate values
   */
  public void resetAggregateValues() {
    f4.clear();
  }

  /**
   * Returns the aggregate values as property value list
   *
   * @return aggregate values
   * @throws IOException on failure
   */
  public PropertyValueList getAggregateValueList() throws IOException {
    return PropertyValueList.fromPropertyValues(getAggregateValues());
  }

  /**
   * Aggregates the aggregate values with {@code values} using the aggregate functions
   *
   * @param values values to aggregate with
   */
  public void aggregate(PropertyValueList values) {
    if (f4.isEmpty()) {
      setAggregateValues(values);
      return;
    }

    Iterator<PropertyValue> valueIt = values.iterator();
    ListIterator<PropertyValue> aggregateIt = f4.listIterator();
    PropertyValue value;
    PropertyValue aggregate;
    for (AggregateFunction valueAggregator : getAggregateFunctions()) {
      value = valueIt.next();
      if (!PropertyValue.NULL_VALUE.equals(value)) {
        aggregate = aggregateIt.next();
        if (!PropertyValue.NULL_VALUE.equals(aggregate)) {
          aggregateIt.set(valueAggregator.aggregate(aggregate, value));
        } else {
          aggregateIt.set(value);
        }
      }
    }
  }

  /**
   * Returns the property values of the given element which are used for
   * aggregation. If the EPGM element does not have a property, it uses the
   * default value or {@code PropertyValue.NULL_VALUE} instead.
   *
   * @param element attributed EPGM element
   * @return property values for aggregation
   * @throws IOException on failure
   */
  public PropertyValueList getIncrementValues(Element element) throws IOException {
    if (f3.isEmpty()) {
      return PropertyValueList.createEmptyList();
    }
    List<PropertyValue> propertyValues = getAggregateFunctions().stream()
      .map(f -> getIncrement(f, element))
      .collect(Collectors.toList());
    return PropertyValueList.fromPropertyValues(propertyValues);
  }

  /**
   * Returns the increment value for an aggregate function and an element.
   *
   * @param aggregateFunction aggregate function to create the increment
   * @param element element used to create the increment
   * @return increment value
   */
  private static PropertyValue getIncrement(AggregateFunction aggregateFunction,
                                            Element element) {
    PropertyValue increment = null;
    if ((element instanceof Vertex && aggregateFunction.isVertexAggregation()) ||
      (element instanceof Edge && aggregateFunction.isEdgeAggregation())) {
      increment = aggregateFunction.getIncrement(element);
    }
    return increment == null ? AggregateUtil.getDefaultAggregate(aggregateFunction) : increment;
  }
}