Average.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.aggregation.functions.average;

import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.common.model.impl.properties.PropertyValueUtils;
import org.gradoop.flink.model.api.functions.AggregateFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
 * Base interface for aggregate functions determining the average of some value.<br>
 * This aggregate function uses a list of two property values for aggregation internally.
 * The list will contain two values:
 * <ol start=0>
 *   <li>The sum of all values considered by the average.</li>
 *   <li>The number of values added to the sum.</li>
 * </ol>
 * A post-processing step is necessary after the aggregation, to get the final average value.
 * The final value will be a {@code double} value or {@link PropertyValue#NULL_VALUE null},
 * if there were no elements aggregated (i.e. if the property was not set on any element).<p>
 * <b>Hint: </b> Implementations of this interface have to make sure to use a property value
 * with the correct format, as described above.
 */
public interface Average extends AggregateFunction {

  /**
   * The default value used internally in this aggregation.
   * Implementations of {@link #getIncrement(Element)} should return this value when the
   * element is ignored, i.e. when it does not have the attribute aggregated by this function.
   */
  PropertyValue IGNORED_VALUE = PropertyValue.create(
    Arrays.asList(PropertyValue.create(0L), PropertyValue.create(0L)));

  /**
   * The aggregation logic for calculating the average.
   * This function requires property values to have a certain format, see {@link Average}.
   *
   * @param aggregate previously aggregated value
   * @param increment value that is added to the aggregate
   * @return The new aggregate value.
   */
  @Override
  default PropertyValue aggregate(PropertyValue aggregate, PropertyValue increment) {
    List<PropertyValue> aggregateValue = aggregate.getList();
    List<PropertyValue> incrementValue = increment.getList();
    PropertyValue sum = PropertyValueUtils.Numeric.add(aggregateValue.get(0),
      incrementValue.get(0));
    PropertyValue count = PropertyValueUtils.Numeric.add(aggregateValue.get(1),
      incrementValue.get(1));
    aggregateValue.set(0, sum);
    aggregateValue.set(1, count);
    aggregate.setList(aggregateValue);
    return aggregate;
  }

  /**
   * Calculate the average from the internally used aggregate value.
   *
   * @param result The result of the aggregation step.
   * @return The average value (or null, if there were no elements to get the average of).
   * @throws IllegalArgumentException if the previous result had an invalid format.
   */
  @Override
  default PropertyValue postAggregate(PropertyValue result) {
    if (!Objects.requireNonNull(result).isList()) {
      throw new IllegalArgumentException("The aggregate value is expected to be a List.");
    }
    List<PropertyValue> value = result.getList();
    if (value.size() != 2) {
      throw new IllegalArgumentException("The aggregate value list is expected to have size 2.");
    }
    if (!value.get(0).isNumber() || !value.get(1).isLong()) {
      throw new IllegalArgumentException("The aggregate value list contains unsupported types.");
    }
    // Convert the two list values to a double.
    // The first was some unknown number type, the second a long.
    double sum = ((Number) value.get(0).getObject()).doubleValue();
    long count = value.get(1).getLong();
    if (count < 0) {
      throw new IllegalArgumentException("Invalid number of elements " + count +
        ", expected value greater than zero.");
    } else if (count == 0) {
      return PropertyValue.NULL_VALUE;
    } else {
      result.setDouble(sum / count);
      return result;
    }
  }
}