AbstractTimeAggregateFunction.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.temporal.model.impl.operators.aggregation.functions;

import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.operators.aggregation.functions.BaseAggregateFunction;
import org.gradoop.temporal.model.api.functions.TemporalAggregateFunction;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.pojo.TemporalElement;

import java.util.Objects;
import java.util.function.BiFunction;

/**
 * An abstract super class for aggregation functions that aggregate a time of a temporal element.
 * Times can be a {@link TimeDimension.Field} of a {@link TimeDimension}.
 */
public abstract class AbstractTimeAggregateFunction extends BaseAggregateFunction
  implements TemporalAggregateFunction {

  /**
   * Selects which time-dimension is considered by this aggregate function.
   */
  private final TimeDimension timeDimension;

  /**
   * Selects the field of the temporal element to consider.
   */
  private final TimeDimension.Field field;

  /**
   * Selects the field the resulting value is written to. Defaults to property value if null.
   */
  private TimeDimension.Field resultValidTimeField = null;

  /**
   * The property value that is considered as the default 'from' value of this aggregate function.
   * It is ignored during aggregation of valid times.
   */
  private final PropertyValue defaultFromValue = PropertyValue.create(TemporalElement.DEFAULT_TIME_FROM);

  /**
   * The property value that is considered as the default 'to' value of this aggregate function.
   * It is ignored during aggregation of valid times.
   */
  private final PropertyValue defaultToValue = PropertyValue.create(TemporalElement.DEFAULT_TIME_TO);

  /**
   * Sets attributes used to initialize this aggregate function.
   *
   * @param timeDimension The time dimension type to consider.
   * @param field The field of the time-dimension to consider.
   * @param aggregatePropertyKey The key of the property where the aggregated result is saved.
   */
  public AbstractTimeAggregateFunction(TimeDimension timeDimension, TimeDimension.Field field,
    String aggregatePropertyKey) {
    super(aggregatePropertyKey);
    this.timeDimension = Objects.requireNonNull(timeDimension);
    this.field = Objects.requireNonNull(field);
  }

  /**
   * Get a time stamp as the aggregate value from a temporal element.
   * The value will be the value of a {@link TimeDimension.Field} of a {@link TimeDimension}.
   *
   * @param element The temporal element.
   * @return The value, as a long-type property value.
   */
  @Override
  public PropertyValue getIncrement(TemporalElement element) {
    final Tuple2<Long, Long> timeInterval = element.getTimeByDimension(timeDimension);
    switch (field) {
    case FROM:
      return PropertyValue.create(timeInterval.f0);
    case TO:
      return PropertyValue.create(timeInterval.f1);
    default:
      throw new IllegalArgumentException("Field [" + field + "] is not supported for time intervals.");
    }
  }

  /**
   * Base aggregate function for min and max aggregations. Handles default behaviour and the
   * logic of the aggregation.
   *
   * @param aggregate The aggregate value.
   * @param increment The increment value.
   * @param comparison The function to apply the aggregation of the aggregate and increment value
   * @return the aggregated value
   */
  PropertyValue applyAggregateWithDefaults(PropertyValue aggregate, PropertyValue increment,
    BiFunction<PropertyValue, PropertyValue, PropertyValue> comparison) {
    if (aggregate.isNull() || isDefaultTime(aggregate)) {
      return isDefaultTime(increment) ? PropertyValue.NULL_VALUE : increment;
    } else if (increment.isNull() || isDefaultTime(increment)) {
      return aggregate;
    } else {
      return comparison.apply(aggregate, increment);
    }
  }

  /**
   * Checks if the given property value is a temporal default value (see {@link TemporalElement}).
   * If the temporal attribute is a {@link TimeDimension#TRANSACTION_TIME}, this function
   * will return {@code false} since the transaction time is system maintained and there are no
   * defaults to check.
   *
   * @param value the property value to check
   * @return true, if the time semantic is valid time and the value equals a default temporal value.
   */
  private boolean isDefaultTime(PropertyValue value) {
    return timeDimension == TimeDimension.VALID_TIME &&
      (value.equals(defaultFromValue) || value.equals(defaultToValue));
  }

  /**
   * Valid time field to write the aggregation result to.
   * Passing null defaults to adding a property value instead.
   *
   * @param field field
   * @return this
   */
  public AbstractTimeAggregateFunction setAsValidTime(TimeDimension.Field field) {
    this.resultValidTimeField = field;
    return this;
  }

  @Override
  public String toString() {
    return String.format("%s(%s.%s)", getClass().getSimpleName(), timeDimension, field);
  }

  @Override
  public <E extends Element> E applyResult(E element, PropertyValue aggregate) {
    if (resultValidTimeField == null) {
      return super.applyResult(element, aggregate);
    }
    if (!(element instanceof TemporalElement)) {
      throw new IllegalArgumentException("Cannot write time value to non-temporal element.");
    }

    TemporalElement temporalElement = (TemporalElement) element;
    temporalElement.setValidTime(aggregate.getLong(), resultValidTimeField);

    return (E) temporalElement;
  }
}