import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DedupingQueue<E> implements QueueWrapper<E> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DedupingQueue.class);
private final Map<E, Future<E>> itemsBeingWorkedOn = new ConcurrentHashMap<>();
private final AsyncWorker<E> asyncWorker;
public DedupingQueue(AsyncWorker<E> asyncWorker) {
    this.asyncWorker = asyncWorker;
}
@Override
public Future<E> submit(E e) {
    if (!itemsBeingWorkedOn.containsKey(e)) {
        itemsBeingWorkedOn.put(e, asyncWorker.executeWorkAsync(e, this));
    } else {
        LOGGER.debug("Rejected [{}] as it's already being worked on", e);
    }
    return itemsBeingWorkedOn.get(e);
}
@Override
public void complete(E e) {
    LOGGER.debug("Completed [{}]", e);
    itemsBeingWorkedOn.remove(e);
}
@Override
public void rejectAndRetry(E e) {
    itemsBeingWorkedOn.putIfAbsent(e, asyncWorker.executeWorkAsync(e, this));
}
}
I am having some difficulty reasoning the thread safeness of the above code.
I reckon complete and rejectAndretry are completely thread-safe as the map is thread safe. But what about submit? Also, how can I make it thread safe in the most efficient way?