The code below is a publisher base class. I've never really liked the way I implemented this class. Are there any improvements/simplifications I can make to it?
Background: The execute(...) method is called periodically from ServiceImpl. ServiceImpl is just a generic service class.
package me.munyengm.core.publish;
import static akka.dispatch.Futures.future;
import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import akka.actor.TypedActor;
import com.google.common.collect.FluentIterable;
import com.loudsight.utilities.Cast;
import com.loudsight.utilities.service.ServiceImpl;
import com.loudsight.utilities.service.core.subscribe.Subscriber;
public abstract class PublisherImpl<T> extends ServiceImpl implements Publisher<T> {
private static final Logger logger = Logger.getLogger(PublisherImpl.class);
protected static class SubscriberInfoImpl<T> implements Subscription<T> {
private static final AtomicLong nextId = new AtomicLong();
private final Subscriber subscriber;
private final long subscriberId;
private final T subscriptionData;
private long updateInterval;
private long lastRefresh;
SubscriberInfoImpl(Subscriber subscriber, T subscriptionData, long updateInterval, long lastRefresh) {
this.subscriber = subscriber;
this.subscriberId = nextId.incrementAndGet();
this.subscriptionData = subscriptionData;
this.updateInterval = updateInterval;
this.lastRefresh = lastRefresh;
}
@Override public long refreshInterval() { return updateInterval; }
@Override public void setRefreshInterval(long updateInterval) { this.updateInterval = updateInterval; }
@Override public long getSubscriberId() { return subscriberId;}
@Override public T getSubscriptionData() { return subscriptionData; }
@Override public Subscriber getSubscriber() { return subscriber; }
@Override public long getLastRefresh() { return lastRefresh; }
@Override public void setLastRefresh(long lastRefresh) { this.lastRefresh = lastRefresh; }
@Override
public boolean equals(Object obj) {
if ((obj != null) && (obj instanceof Subscription)) {
Subscription<T> info = Cast.uncheckedCast(obj);
return getSubscriberId() == info.getSubscriberId();
}
return false;
}
@Override
public int hashCode() {
return Long.valueOf(getSubscriberId()).hashCode();
}
}
Comparator<Subscription<T>> c = (o1, o2) -> {
if (o2.getSubscriberId() != o1.getSubscriberId()) {
long timeToWait1 = calculateTimeToWait(o1);
long timeToWait2 = calculateTimeToWait(o2);
return Long.compare(timeToWait1, timeToWait2);
}
return 0;
};
private final Map<Long, Subscription<T>> subscriptionMap = new HashMap<>();
private final long startTime = System.currentTimeMillis();
protected PublisherImpl() {
}
@Override public void onSubscribe(Subscription<T> subscription) { }
@Override public void onUnsubscribe(final long subscriptionId) { }
@Override
public final long subscribe(final Subscriber subscriber, final T eventItem, final long updateInterval) {
Subscription<T> subscription = new SubscriberInfoImpl<>(subscriber, eventItem, updateInterval, 0);
Publisher<T> this_ = TypedActor.self();
subscriptionMap.put(subscription.getSubscriberId(), subscription);
asynchronouslyExecute(() -> {
this_.onSubscribe(subscription);
return null;
});
return subscription.getSubscriberId();
}
@Override
public void resubscribe(long subscriptionId, long updateInterval) {
Subscription<T> subscription = getSubscriberBySubscriptionId(subscriptionId);
subscription.setRefreshInterval(updateInterval);
}
protected Subscription<T> getSubscriberBySubscriptionId(final long subscriptionId) {
return subscriptionMap.get(subscriptionId);
}
@Override
public final void unsubscribe(final long subscriptionId) {
onUnsubscribe(subscriptionId);
subscriptionMap.remove(subscriptionId);
}
@Override
public List<Subscription<T>> getSubscribers() {
return FluentIterable.from(subscriptionMap.values()).toList();
}
@Override
public
long calculateTimeToWait(Subscription<T> info) {
long currentTimeMillis = System.currentTimeMillis();
long nextExecutionTime = info.getLastRefresh() + info.refreshInterval();
return Math.abs(currentTimeMillis - nextExecutionTime) * (nextExecutionTime / currentTimeMillis);
}
@Override
public long getStartTime() {
return startTime;
}
@Override
public boolean execute() throws Throwable {
try {
List<Subscription<T>> readySubscriptions = FluentIterable.from(getSubscribers())
.filter(s -> (calculateTimeToWait(s) == 0))
.toList();
execute(readySubscriptions);
long currentTimeMillis = System.currentTimeMillis();
readySubscriptions.forEach( s -> {
long refreshTime = getExecutionTime(currentTimeMillis, s.refreshInterval());
s.setLastRefresh(refreshTime);
});
return true;
} catch (Exception t) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
t.printStackTrace(new PrintWriter(baos, true));
t = null;
throw new ServiceException(baos.toString("UTF-8"));
}
}
long getExecutionTime(long currentTimeMillis, long updateInterval) {
long numberOfPeriods = (currentTimeMillis - startTime) / updateInterval;
return startTime + (numberOfPeriods * updateInterval);
}
protected void asynchronouslyExecute(Callable<Void> job) {
future(job, TypedActor.dispatcher());
}
}