Commit 95bba76f authored by iwakeh's avatar iwakeh 🌴 Committed by Karsten Loesing
Browse files

Implements task-19895 run-once functionality.

CollecTorMain implements Callable and scheduler waits for termination
of all started Callables.
parent fe6fbb55
Loading
Loading
Loading
Loading
+10 −1
Original line number Diff line number Diff line
@@ -14,9 +14,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class CollecTorMain implements Observer, Runnable {
public abstract class CollecTorMain implements Callable<Object>, Observer,
    Runnable {

  private static final Logger logger = LoggerFactory.getLogger(
      CollecTorMain.class);
@@ -58,6 +60,13 @@ public abstract class CollecTorMain implements Observer, Runnable {
    logger.info("Terminating {} module of CollecTor.", module());
  }

  /** Wrapper for <code>run</code>. */
  @Override
  public final Object call() {
    run();
    return null;
  }

  @Override
  public synchronized void update(Observable obs, Object obj) {
    newConfigAvailable.set(true);
+34 −23
Original line number Diff line number Diff line
@@ -11,8 +11,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -51,6 +53,7 @@ public final class Scheduler implements ThreadFactory {
   */
  public void scheduleModuleRuns(Map<Key,
      Class<? extends CollecTorMain>> collecTorMains, Configuration conf) {
    List<Callable<Object>> runOnceMains = new ArrayList<>();
    for (Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry
        : collecTorMains.entrySet()) {
      try {
@@ -58,10 +61,16 @@ public final class Scheduler implements ThreadFactory {
          String prefix = ctmEntry.getKey().name().replace(ACTIVATED, "");
          CollecTorMain ctm = ctmEntry.getValue()
              .getConstructor(Configuration.class).newInstance(conf);
          scheduleExecutions(conf.getBool(Key.RunOnce), ctm,
          if (conf.getBool(Key.RunOnce)) {
            logger.info("Prepare single run for " + ctm.getClass().getName()
                + ".");
            runOnceMains.add(Executors.callable(ctm));
          } else {
            scheduleExecutions(ctm,
                conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
                conf.getInt(Key.valueOf(prefix + PERIODMIN)));
          }
        }
      } catch (ConfigurationException | IllegalAccessException
          | InstantiationException | InvocationTargetException
          | NoSuchMethodException | RejectedExecutionException
@@ -70,16 +79,19 @@ public final class Scheduler implements ThreadFactory {
            + ". Reason: " + ex.getMessage(), ex);
      }
    }
    try {
      if (conf.getBool(Key.RunOnce)) {
        scheduler.invokeAll(runOnceMains);
      }
    } catch (ConfigurationException | InterruptedException
        | RejectedExecutionException | NullPointerException ex) {
      logger.error("Cannot schedule run-once: " + ex.getMessage(), ex);
    }
  }

  private static final long MILLIS_IN_A_MINUTE = 60_000L;

  private void scheduleExecutions(boolean runOnce, CollecTorMain ctm,
      int offset, int period) {
    if (runOnce) {
      logger.info("Single run for " + ctm.getClass().getName() + ".");
      this.scheduler.execute(ctm);
    } else {
  private void scheduleExecutions(CollecTorMain ctm, int offset, int period) {
    logger.info("Periodic updater started for " + ctm.getClass().getName()
        + "; offset=" + offset + ", period=" + period + ".");
    long periodMillis = period * MILLIS_IN_A_MINUTE;
@@ -94,7 +106,6 @@ public final class Scheduler implements ThreadFactory {
    this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis,
        TimeUnit.MILLISECONDS);
  }
  }

  protected static long computeInitialDelayMillis(long currentMillis,
      long offsetMillis, long periodMillis) {