Skip to main content
deleted 12 characters in body
Source Link
package com.loudsightme.eventrademunyengm.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.TypedActor;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();
    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl() {
    }

    @Override public void onSubscribe(Subscription<T> subscription) { }

    @Override public void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        Publisher<T> this_ = TypedActor.self();
        subscriptionMap.put(subscription.getSubscriberId(), subscription);
        asynchronouslyExecute(() -> {
            this_.onSubscribe(subscription);
            return null;
        });

        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        subscription.setRefreshInterval(updateInterval);
    }

    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(subscriptionMap.values()).toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asynchronouslyExecute(Callable<Void> job) {
        future(job, TypedActor.dispatcher());
    }
}
package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.TypedActor;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();
    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl() {
    }

    @Override public void onSubscribe(Subscription<T> subscription) { }

    @Override public void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        Publisher<T> this_ = TypedActor.self();
        subscriptionMap.put(subscription.getSubscriberId(), subscription);
        asynchronouslyExecute(() -> {
            this_.onSubscribe(subscription);
            return null;
        });

        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        subscription.setRefreshInterval(updateInterval);
    }

    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(subscriptionMap.values()).toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asynchronouslyExecute(Callable<Void> job) {
        future(job, TypedActor.dispatcher());
    }
}
package me.munyengm.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.TypedActor;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();
    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl() {
    }

    @Override public void onSubscribe(Subscription<T> subscription) { }

    @Override public void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        Publisher<T> this_ = TypedActor.self();
        subscriptionMap.put(subscription.getSubscriberId(), subscription);
        asynchronouslyExecute(() -> {
            this_.onSubscribe(subscription);
            return null;
        });

        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        subscription.setRefreshInterval(updateInterval);
    }

    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(subscriptionMap.values()).toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asynchronouslyExecute(Callable<Void> job) {
        future(job, TypedActor.dispatcher());
    }
}
deleted 1018 characters in body
Source Link
package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.ActorSystem;TypedActor;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final ActorSystem system;
    private final List<Subscription<T>> sls = new ArrayList<>(); // should rename this to orderedSubscriberList
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();

    private final List<Subscription<T>> pendingSubscription = new ArrayList<>();

    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl(ActorSystem system) {
        this.system = system;
    }

    protected@Override public void onSubscribe(Subscription<T> subscription) { }

    protected@Override public void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        pendingSubscriptionPublisher<T> this_ = TypedActor.addself();
        subscriptionMap.put(subscription.getSubscriberId(), subscription);
        asynchronouslyExecute(() -> {
            this_.onSubscribe(subscription);
            return null;
        });

        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        sls.remove(subscription);
        subscription.setRefreshInterval(updateInterval);
        sls.add(subscription);
        sls.sort(c);
    }

 
    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        Subscription<T> subscription = subscriptionMap.get(subscriptionId);
        sls.remove(subscription);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(sls)
                .filter(s->subscriptionMap.containsKey(ssubscriptionMap.getSubscriberIdvalues()))
                .toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        Iterator<Subscription<T>> pendingSubscriptionIt = pendingSubscription.iterator();

        while (pendingSubscriptionIt.hasNext()) {
            Subscription<T> subscription = pendingSubscriptionIt.next();
            try {
                sls.add(subscription);
                sls.sort(c);
                onSubscribe(subscription);
                subscriptionMap.put(subscription.getSubscriberId(), subscription);
            } catch (Exception e) {
                logger.error(e);
                continue;
            }
            pendingSubscriptionIt.remove();
        }
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asyncExecuteasynchronouslyExecute(Callable<Void> job) {
        future(job, systemTypedActor.dispatcher());
    }

 }
package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.ActorSystem;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final ActorSystem system;
    private final List<Subscription<T>> sls = new ArrayList<>(); // should rename this to orderedSubscriberList
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();

    private final List<Subscription<T>> pendingSubscription = new ArrayList<>();

    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl(ActorSystem system) {
        this.system = system;
    }

    protected void onSubscribe(Subscription<T> subscription) { }

    protected void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        pendingSubscription.add(subscription);
        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        sls.remove(subscription);
        subscription.setRefreshInterval(updateInterval);
        sls.add(subscription);
        sls.sort(c);
    }

 
    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        Subscription<T> subscription = subscriptionMap.get(subscriptionId);
        sls.remove(subscription);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(sls)
                .filter(s->subscriptionMap.containsKey(s.getSubscriberId()))
                .toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        Iterator<Subscription<T>> pendingSubscriptionIt = pendingSubscription.iterator();

        while (pendingSubscriptionIt.hasNext()) {
            Subscription<T> subscription = pendingSubscriptionIt.next();
            try {
                sls.add(subscription);
                sls.sort(c);
                onSubscribe(subscription);
                subscriptionMap.put(subscription.getSubscriberId(), subscription);
            } catch (Exception e) {
                logger.error(e);
                continue;
            }
            pendingSubscriptionIt.remove();
        }
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asyncExecute(Callable<Void> job) {
        future(job, system.dispatcher());
    }

 }
package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.TypedActor;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();
    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl() {
    }

    @Override public void onSubscribe(Subscription<T> subscription) { }

    @Override public void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        Publisher<T> this_ = TypedActor.self();
        subscriptionMap.put(subscription.getSubscriberId(), subscription);
        asynchronouslyExecute(() -> {
            this_.onSubscribe(subscription);
            return null;
        });

        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        subscription.setRefreshInterval(updateInterval);
    }

    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(subscriptionMap.values()).toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asynchronouslyExecute(Callable<Void> job) {
        future(job, TypedActor.dispatcher());
    }
}
added 129 characters in body
Source Link

Background: The execute(...) method is called periodically from ServiceImpl. ServiceImpl is just a generic service class.

package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.ActorSystem;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final ActorSystem system;
    private final List<Subscription<T>> sls = new ArrayList<>(); // should rename this to orderedSubscriberList
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();

    private final List<Subscription<T>> pendingSubscription = new ArrayList<>();

    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl(ActorSystem system) {
        this.system = system;
    }

    protected void onSubscribe(Subscription<T> subscription) { }

    protected void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        pendingSubscription.add(subscription);
        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        sls.remove(subscription);
        subscription.setRefreshInterval(updateInterval);
        sls.add(subscription);
        sls.sort(c);
    }


    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        Subscription<T> subscription = subscriptionMap.get(subscriptionId);
        sls.remove(subscription);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(sls)
                .filter(s->subscriptionMap.containsKey(s.getSubscriberId()))
                .toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        Iterator<Subscription<T>> pendingSubscriptionIt = pendingSubscription.iterator();

        while (pendingSubscriptionIt.hasNext()) {
            Subscription<T> subscription = pendingSubscriptionIt.next();
            try {
                sls.add(subscription);
                sls.sort(c);
                onSubscribe(subscription);
                subscriptionMap.put(subscription.getSubscriberId(), subscription);
            } catch (Exception e) {
                logger.error(e);
                continue;
            }
            pendingSubscriptionIt.remove();
        }
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asyncExecute(Callable<Void> job) {
        future(job, system.dispatcher());
    }

}
package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.ActorSystem;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final ActorSystem system;
    private final List<Subscription<T>> sls = new ArrayList<>();
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();

    private final List<Subscription<T>> pendingSubscription = new ArrayList<>();

    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl(ActorSystem system) {
        this.system = system;
    }

    protected void onSubscribe(Subscription<T> subscription) { }

    protected void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        pendingSubscription.add(subscription);
        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        sls.remove(subscription);
        subscription.setRefreshInterval(updateInterval);
        sls.add(subscription);
        sls.sort(c);
    }


    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        Subscription<T> subscription = subscriptionMap.get(subscriptionId);
        sls.remove(subscription);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(sls)
                .filter(s->subscriptionMap.containsKey(s.getSubscriberId()))
                .toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        Iterator<Subscription<T>> pendingSubscriptionIt = pendingSubscription.iterator();

        while (pendingSubscriptionIt.hasNext()) {
            Subscription<T> subscription = pendingSubscriptionIt.next();
            try {
                sls.add(subscription);
                sls.sort(c);
                onSubscribe(subscription);
                subscriptionMap.put(subscription.getSubscriberId(), subscription);
            } catch (Exception e) {
                logger.error(e);
                continue;
            }
            pendingSubscriptionIt.remove();
        }
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asyncExecute(Callable<Void> job) {
        future(job, system.dispatcher());
    }

}

Background: The execute(...) method is called periodically from ServiceImpl. ServiceImpl is just a generic service class.

package com.loudsight.eventrade.core.publish;

import static akka.dispatch.Futures.future;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

import akka.actor.ActorSystem;

import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;

public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
    private static final Logger logger = Logger.getLogger(PublisherImpl.class);

    protected static class SubscriberInfoImpl<T> implements Subscription<T> {
        private static final AtomicLong nextId = new AtomicLong();
        private final Subscriber subscriber;
        private final long subscriberId;
        private final T subscriptionData;
        private long updateInterval;
        private long lastRefresh;

        SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
            this.subscriber = subscriber;
            this.subscriberId = nextId.incrementAndGet();
            this.subscriptionData = subscriptionData;
            this.updateInterval = updateInterval;
            this.lastRefresh = lastRefresh;
        }

        @Override public long refreshInterval() { return updateInterval; }
        @Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
        @Override public long getSubscriberId() { return subscriberId;}
        @Override public T getSubscriptionData() { return subscriptionData; }
        @Override public Subscriber getSubscriber() { return subscriber; }
        @Override public long getLastRefresh() { return lastRefresh; }
        @Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }

        @Override
        public boolean equals(Object obj) {
            if ((obj != null) && (obj instanceof Subscription)) {
                Subscription<T> info = Cast.uncheckedCast(obj);

                return getSubscriberId() == info.getSubscriberId();
            }
            return false;
        }

        @Override
        public int hashCode() {
            return Long.valueOf(getSubscriberId()).hashCode();
        }
    }

    Comparator<Subscription<T>> c = (o1, o2) -> {
        if (o2.getSubscriberId() != o1.getSubscriberId()) {
            long timeToWait1 = calculateTimeToWait(o1);
            long timeToWait2 = calculateTimeToWait(o2);

            return Long.compare(timeToWait1, timeToWait2);
        }
        return 0;
    };
    
    private final ActorSystem system;
    private final List<Subscription<T>> sls = new ArrayList<>(); // should rename this to orderedSubscriberList
    private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();

    private final List<Subscription<T>> pendingSubscription = new ArrayList<>();

    private final long startTime = System.currentTimeMillis();

    protected PublisherImpl(ActorSystem system) {
        this.system = system;
    }

    protected void onSubscribe(Subscription<T> subscription) { }

    protected void onUnsubscribe(final long subscriptionId) { }
    
    @Override
    public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {

        Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
        pendingSubscription.add(subscription);
        return subscription.getSubscriberId();
    }

    @Override
    public void resubscribe(long subscriptionId, long updateInterval) {
        Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
        sls.remove(subscription);
        subscription.setRefreshInterval(updateInterval);
        sls.add(subscription);
        sls.sort(c);
    }


    protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
        return subscriptionMap.get(subscriptionId);
    }

    @Override
    public final void unsubscribe(final long subscriptionId) {
        onUnsubscribe(subscriptionId);
        Subscription<T> subscription = subscriptionMap.get(subscriptionId);
        sls.remove(subscription);
        subscriptionMap.remove(subscriptionId);
    }

    @Override
    public List<Subscription<T>> getSubscribers() {
        return FluentIterable.from(sls)
                .filter(s->subscriptionMap.containsKey(s.getSubscriberId()))
                .toList();
    }

    @Override
    public
    long calculateTimeToWait(Subscription<T> info) {
        long currentTimeMillis = System.currentTimeMillis();
        long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();

        return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
    }

    @Override
    public long getStartTime() {
        return startTime;
    }
    
    @Override
    public boolean execute() throws  Throwable {
        Iterator<Subscription<T>> pendingSubscriptionIt = pendingSubscription.iterator();

        while (pendingSubscriptionIt.hasNext()) {
            Subscription<T> subscription = pendingSubscriptionIt.next();
            try {
                sls.add(subscription);
                sls.sort(c);
                onSubscribe(subscription);
                subscriptionMap.put(subscription.getSubscriberId(), subscription);
            } catch (Exception e) {
                logger.error(e);
                continue;
            }
            pendingSubscriptionIt.remove();
        }
        
        try {
            List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
                    .filter(s -> (calculateTimeToWait(s) == 0))
                    .toList();
            execute(readySubscriptions);
            long currentTimeMillis = System.currentTimeMillis();

            readySubscriptions.forEach( s -> {
                long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
                s.setLastRefresh(refreshTime);
            });
            return true;
        } catch (Exception t) {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            t.printStackTrace(new PrintWriter(baos, true));
            t = null;

            throw new ServiceException(baos.toString("UTF-8"));
        }
    }

    long getExecutionTime(long currentTimeMillis, long updateInterval) {
        long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;

        return startTime + (numberOfPeriods * updateInterval);
    }
    
    protected void asyncExecute(Callable<Void> job) {
        future(job, system.dispatcher());
    }

}
deleted 11 characters in body; edited title
Source Link
Jamal
  • 35.2k
  • 13
  • 134
  • 238
Loading
Source Link
Loading