AggregatorWrapperWithValue.java

  1. /*
  2.  * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  *     http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific;

  17. import org.gradoop.common.model.impl.properties.PropertyValue;
  18. import org.gradoop.flink.model.api.functions.AggregateDefaultValue;
  19. import org.gradoop.flink.model.api.functions.AggregateFunction;

  20. import java.util.Arrays;
  21. import java.util.List;
  22. import java.util.Objects;

  23. /**
  24.  * Wraps an aggregate function where the internal representation of a {@link PropertyValue} is a {@link List}
  25.  * containing some property value and the original aggregate value.
  26.  * The additional value is used to identify aggregate values used by this function.
  27.  */
  28. abstract class AggregatorWrapperWithValue implements AggregateFunction, AggregateDefaultValue {

  29.   /**
  30.    * The actual aggregate function.
  31.    */
  32.   protected final AggregateFunction wrappedFunction;

  33.   /**
  34.    * The additional value stored with the property value.
  35.    * This value is used to identify values to be aggregated.
  36.    */
  37.   private final PropertyValue identifyingValue;

  38.   /**
  39.    * Create a new instance of this wrapper.
  40.    *
  41.    * @param wrappedFunction The wrapped aggregate function.
  42.    * @param identifyingValue The additional value used to identify values to be aggregated.
  43.    */
  44.   AggregatorWrapperWithValue(AggregateFunction wrappedFunction, PropertyValue identifyingValue) {
  45.     this.wrappedFunction = Objects.requireNonNull(wrappedFunction);
  46.     this.identifyingValue = Objects.requireNonNull(identifyingValue);
  47.   }

  48.   /**
  49.      * Unwrap a {@link PropertyValue} wrapped by this class.
  50.      *
  51.      * @param wrappedValue The wrapped property value.
  52.      * @return The unwrapped (raw) property value.
  53.      * @see #wrap(PropertyValue)
  54.      */
  55.   protected PropertyValue unwrap(PropertyValue wrappedValue) {
  56.     if (wrappedValue.isNull()) {
  57.       return wrappedValue;
  58.     }
  59.     return wrappedValue.getList().get(1);
  60.   }

  61.   /**
  62.      * Wrap a {@link PropertyValue} into an internal representation used by this class.
  63.      *
  64.      * @param rawValue The raw property value.
  65.      * @return The wrapped property value.
  66.      * @see #unwrap(PropertyValue)
  67.      */
  68.   protected PropertyValue wrap(PropertyValue rawValue) {
  69.     return PropertyValue.create(Arrays.asList(identifyingValue, rawValue));
  70.   }

  71.   /**
  72.      * Check if a {@link PropertyValue} should be considered by this function.
  73.      *
  74.      * @param value The property value.
  75.      * @return {@code true}, if the value should be aggregated by this function.
  76.      */
  77.   protected boolean isAggregated(PropertyValue value) {
  78.     if (!value.isList()) {
  79.       return false;
  80.     }
  81.     final List<PropertyValue> values = value.getList();
  82.     if (values.size() != 2) {
  83.       return false;
  84.     }
  85.     return identifyingValue.equals(values.get(0));
  86.   }

  87.   @Override
  88.   public PropertyValue aggregate(PropertyValue aggregate, PropertyValue increment) {
  89.     if (isAggregated(increment)) {
  90.       return wrap(wrappedFunction.aggregate(unwrap(aggregate), unwrap(increment)));
  91.     } else {
  92.       return aggregate;
  93.     }
  94.   }

  95.   @Override
  96.   public String getAggregatePropertyKey() {
  97.     return wrappedFunction.getAggregatePropertyKey();
  98.   }

  99.   @Override
  100.   public PropertyValue postAggregate(PropertyValue result) {
  101.     if (isAggregated(result)) {
  102.       return wrappedFunction.postAggregate(unwrap(result));
  103.     } else {
  104.       return null;
  105.     }
  106.   }

  107.   @Override
  108.   public PropertyValue getDefaultValue() {
  109.     return wrappedFunction instanceof AggregateDefaultValue ?
  110.       ((AggregateDefaultValue) wrappedFunction).getDefaultValue() : PropertyValue.NULL_VALUE;
  111.   }
  112. }