3
\$\begingroup\$

This streams lines of information over the internet using BufferedReader::lines.

However, what makes this special (and thus, extraordinarily complicated imo) is that ALL resource management is done internally -- the end user does not need to handle the resources at all.

So, no try-with-resources, no try-catch-finally, none of that. The user can fearlessly use this stream (or at least, as fearlessly as they can use any other non-resource stream).


As a result, I would like this review to focus on making sure my claim is as bullet proof as I make it out to be. Priority #1 is to make sure that this implementation cannot leak resources.

Other than that, I want the obvious things like correctness/efficiency/readability/maintainability/etc.

One potential pain point that I want to highlight -- I chose to open a connection to the URL at the last possible moment -- during terminal operations. However, that technically makes certain introspective operations a little dubious. For example, isParallel. As is, I am opening a connection to the internet just to check if my stream is parallel. I don't know how terrible that is, but I also don't see a better way to work around that pain point. Special attention to this method would be appreciated.

And finally, you may be wondering what maniac would go to such lengths when we have TWR (try with resources).

The reason why is because forgetting to do TWR is not a compiler error nor a compiler warning. And I am leading a team of (entry-level) devs to handle a fairly lofty personal project. To make their lives a little easier, I wanted to build something that wouldn't leak. Hence, this monstrosity was created.

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.DoubleSummaryStatistics;
import java.util.IntSummaryStatistics;
import java.util.Iterator;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.*;
import java.util.stream.*;

public class StreamLinesOverInternet<T>
   implements Stream<T>
{

   private final Supplier<Stream<T>> encapsulatedStream;
   
   public static void main(final String[] args)
   {
   
      final String url = 
         //THIS IS A >5 GIGABYTE FILE
         "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
         ;
   
      final Stream<String> linesFromCsv = StreamLinesOverInternet.stream(url);
      
      //Grabs the first 10 lines from the CSV File
      linesFromCsv
         .limit(10)
         .forEach(System.out::println)
         ;
      
      //Normally, a file this size would take several seconds, if not minutes, to download, and then process.
      //But, because we are processing data as soon as we fetch it, we can short-circuit once we have as much
      //as we need. This is thanks to java.util.Stream, java.io.InputStream, and java.io.BufferedReader.
      
      //In the above example, we are streaming lines from the CSV File. So, once the Stream has determined
      //it can terminate early because it has enough info to correctly evaluate (called short-circuiting), 
      //the Stream closes the BufferedReader, which in turn, closes the other resources.
      
   }
   
   private StreamLinesOverInternet(final Supplier<Stream<T>> encapsulatedStream)
   {
   
      Objects.requireNonNull(encapsulatedStream);
   
      this.encapsulatedStream = encapsulatedStream;
   
   }

   public static StreamLinesOverInternet<String> stream(final URI uri)
   {
   
      Objects.requireNonNull(uri);
   
      final URL url;
   
      try
      {
      
         url = uri.toURL();
      
      }
      
      catch (final Exception exception)
      {
      
         throw new IllegalStateException(exception);
      
      }
      
      final Supplier<Stream<String>> stream =
         () ->
         {
         
            try
            {
            
               final InputStream inputStream = url.openStream();
               final InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
               final BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
               
               final Stream<String> encapsulatedStream = 
                  bufferedReader
                     .lines()
                     .onClose
                     (
                        () ->
                        {
                        
                           try
                           {
                           
                              bufferedReader.close();

                              System.out.println("CLOSED THE BUFFERED READER");
                           
                           }
                           
                           catch (final Exception exception)
                           {
                           
                              throw new IllegalStateException(exception);
                           
                           }
                        
                        }
                     )
                     ;
            
               return encapsulatedStream;
            
            }
            
            catch (final Exception exception)
            {
            
               throw new IllegalStateException(exception);
            
            }
         
         }
         ;
      
      return new StreamLinesOverInternet<>(stream);
      
   }

   public static StreamLinesOverInternet<String> stream(final String uriString)
   {
   
      Objects.requireNonNull(uriString);
      
      if (uriString.isBlank())
      {
      
         throw new IllegalArgumentException("uri cannot be blank!");
      
      }
      
      try
      {
      
         final URI uri = new URI(uriString);
      
         return stream(uri);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   private 
      static 
         <A, B, C> 
         C
         convertStream
         (
            final Supplier<A> currentStreamSupplier, 
            final Function<A, ? extends B> function, 
            final Function<Supplier<B>, C> constructor
         )
   {
   
      Objects.requireNonNull(currentStreamSupplier);
      Objects.requireNonNull(function);
      Objects.requireNonNull(constructor);
   
      final Supplier<B> nextStreamSupplier =
         () ->
         {
         
            final A currentStream = currentStreamSupplier.get();
            
            final B nextStream = function.apply(currentStream);
            
            return nextStream;
         
         }
         ;
   
      return constructor.apply(nextStreamSupplier);
   
   }

   private <U> StreamLinesOverInternet<U> continueStreamSafely(final Function<Stream<T>, Stream<U>> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            function,
            StreamLinesOverInternet::new
         )
         ;
   
   }

   private <U> U terminateWithValueSafely(final Function<Stream<T>, U> function)
   {
   
      try
      (
         final Stream<T> stream = this.encapsulatedStream.get();   
      )
      {
      
         Objects.requireNonNull(function);
      
         return function.apply(stream);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   private void terminateSafely(final Consumer<Stream<T>> consumer)
   {
   
      try
      (
         final Stream<T> stream = this.encapsulatedStream.get();   
      )
      {
      
         Objects.requireNonNull(consumer);
      
         consumer.accept(stream);
      
      }
      
      catch (final Exception exception)
      {
      
         throw new RuntimeException(exception);
      
      }
   
   }

   @Override
   public Optional<T> findAny()
   {
   
      return this.terminateWithValueSafely(Stream::findAny);
   
   }

   @Override
   public Optional<T> findFirst()
   {
   
      return this.terminateWithValueSafely(Stream::findFirst);
   
   }

   @Override
   public boolean noneMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.noneMatch(predicate));
   
   }

   @Override
   public boolean allMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.allMatch(predicate));
   
   }

   @Override
   public boolean anyMatch(final Predicate<? super T> predicate)
   {
   
      return this.terminateWithValueSafely(stream -> stream.anyMatch(predicate));
   
   }

   @Override
   public long count()
   {
   
      return this.terminateWithValueSafely(Stream::count);
   
   }

   @Override
   public Optional<T> max(final Comparator<? super T> comparator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.max(comparator));
   
   }

   @Override
   public Optional<T> min(final Comparator<? super T> comparator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.min(comparator));
   
   }

   @Override
   public <R, A> R collect(final Collector<? super T, A, R> collector)
   {
   
      return this.terminateWithValueSafely(stream -> stream.collect(collector));
   
   }

   @Override
   public <R> R collect(final Supplier<R> supplier, final BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner)
   {
   
      return this.terminateWithValueSafely(stream -> stream.collect(supplier, accumulator, combiner));
   
   }

   @Override
   public <U> U reduce(final U identity, final BiFunction<U,? super T,U> accumulator, final BinaryOperator<U> combiner)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator, combiner));
   
   }

   @Override
   public Optional<T> reduce(final BinaryOperator<T> accumulator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(accumulator));
   
   }

   @Override
   public T reduce(final T identity, final BinaryOperator<T> accumulator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.reduce(identity, accumulator));
   
   }

   @Override
   public <A> A[] toArray(final IntFunction<A[]> generator)
   {
   
      return this.terminateWithValueSafely(stream -> stream.toArray(generator));
   
   }

   @Override
   public Object[] toArray()
   {
   
      return this.terminateWithValueSafely(Stream::toArray);
   
   }

   @Override
   public void forEachOrdered(final Consumer<? super T> action)
   {
   
      this.terminateSafely(stream -> stream.forEachOrdered(action));
   
   }

   @Override
   public void forEach(final Consumer<? super T> action)
   {
   
      this.terminateSafely(stream -> stream.forEach(action));
   
   }

   @Override
   public StreamLinesOverInternet<T> skip(final long n)
   {
   
      return this.continueStreamSafely(stream -> stream.skip(n));
   
   }

   @Override
   public StreamLinesOverInternet<T> limit(final long maxSize)
   {
   
      return this.continueStreamSafely(stream -> stream.limit(maxSize));
   
   }

   @Override
   public StreamLinesOverInternet<T> peek(final Consumer<? super T> action)
   {
   
      return this.continueStreamSafely(stream -> stream.peek(action));
   
   }

   @Override
   public StreamLinesOverInternet<T> sorted(final Comparator<? super T> comparator)
   {
   
      return this.continueStreamSafely(stream -> stream.sorted(comparator));
   
   }

   @Override
   public StreamLinesOverInternet<T> sorted()
   {
   
      return this.continueStreamSafely(Stream::sorted);
   
   }

   @Override
   public StreamLinesOverInternet<T> distinct()
   {
   
      return this.continueStreamSafely(Stream::distinct);
   
   }

   @Override
   public DoubleVersion flatMapToDouble(final Function<? super T, ? extends DoubleStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToDouble(function),
            DoubleVersion::new
         )
         ;
   
   }

   @Override
   public LongVersion flatMapToLong(final Function<? super T, ? extends LongStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToLong(function),
            LongVersion::new
         )
         ;
   
   }

   @Override
   public IntVersion flatMapToInt(final Function<? super T, ? extends IntStream> function)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.flatMapToInt(function),
            IntVersion::new
         )
         ;
   
   }

   @Override
   public <R> StreamLinesOverInternet<R> flatMap(final Function<? super T, ? extends Stream<? extends R>> function)
   {
   
      Objects.requireNonNull(function);
   
      return this.continueStreamSafely(stream -> stream.flatMap(function));
   
   }

   @Override
   public DoubleVersion mapToDouble(final ToDoubleFunction<? super T> toDoubleFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToDouble(toDoubleFunction),
            DoubleVersion::new
         )
         ;
   
   }

   @Override
   public LongVersion mapToLong(final ToLongFunction<? super T> toLongFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToLong(toLongFunction),
            LongVersion::new
         )
         ;
   
   }

   @Override
   public IntVersion mapToInt(final ToIntFunction<? super T> toIntFunction)
   {
   
      return
         convertStream
         (
            this.encapsulatedStream,
            stream -> stream.mapToInt(toIntFunction),
            IntVersion::new
         )
         ;
   
   }

   @Override
   public <R> StreamLinesOverInternet<R> map(final Function<? super T, ? extends R> function)
   {
   
      Objects.requireNonNull(function);
   
      return this.continueStreamSafely(stream -> stream.map(function));
   
   }

   @Override
   public StreamLinesOverInternet<T> filter(final Predicate<? super T> predicate)
   {
   
      Objects.requireNonNull(predicate);
   
      return this.continueStreamSafely(stream -> stream.filter(predicate));
   
   }

   @Override
   public void close()
   {
   
      this.terminateSafely(Stream::close);
   
      System.out.println("closed " + this);
   
   }

   @Override
   public StreamLinesOverInternet<T> onClose(final Runnable runnable)
   {
   
      Objects.requireNonNull(runnable);
   
      return this.continueStreamSafely(stream -> stream.onClose(runnable));
   
   }

   @Override
   public StreamLinesOverInternet<T> unordered()
   {
   
      return this.continueStreamSafely(Stream::unordered);
   
   }

   @Override
   public StreamLinesOverInternet<T> parallel()
   {
   
      return this.continueStreamSafely(Stream::parallel);
   
   }

   @Override
   public StreamLinesOverInternet<T> sequential()
   {
   
      return this.continueStreamSafely(Stream::sequential);
   
   }

   @Override
   //THIS FEELS LIKE A HORRIBLE IDEA, and yet, it doesn't seem that bad.
   public boolean isParallel()
   {
   
      return this.terminateWithValueSafely(Stream::isParallel);
   
   }

   @Override
   public Spliterator<T> spliterator()
   {
   
      final List<T> list = this.toList();
   
      return list.spliterator();
   
   }

   @Override
   public Iterator<T> iterator()
   {
   
      final Spliterator<T> spliterator = this.spliterator();
   
      return Spliterators.iterator(spliterator);
   
   }

   public static class IntVersion
      implements IntStream
   {
   
      private final Supplier<IntStream> encapsulatedStream;
      
      private IntVersion(final Supplier<IntStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private IntVersion continueStreamSafely(final UnaryOperator<IntStream> function)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               function,
               IntVersion::new
            )
            ;
      
      }
   
      private <U> U terminateWithValueSafely(final Function<IntStream, U> function)
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<IntStream> consumer)
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfInt spliterator()
      {
      
         final int[] array = this.terminateWithValueSafely(IntStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfInt iterator()
      {
      
         final Spliterator.OfInt spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.IntVersion parallel()
      {
      
         return this.continueStreamSafely(IntStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.IntVersion sequential()
      {
      
         return this.continueStreamSafely(IntStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Integer> boxed()
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               IntStream::boxed,
               StreamLinesOverInternet::new
            )
            ;
      
      }
   
      @Override
      public OptionalInt findAny()
      {
      
         return this.terminateWithValueSafely(IntStream::findAny);
      
      }
   
      @Override
      public OptionalInt findFirst()
      {
      
         return this.terminateWithValueSafely(IntStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.noneMatch(intPredicate));
      
      }
   
      @Override
      public boolean allMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.allMatch(intPredicate));
      
      }
   
      @Override
      public boolean anyMatch(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.terminateWithValueSafely(intStream -> intStream.anyMatch(intPredicate));
      
      }
   
      @Override
      public IntSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(IntStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(IntStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(IntStream::count);
      
      }
   
      @Override
      public OptionalInt max()
      {
      
         return this.terminateWithValueSafely(IntStream::max);
      
      }
   
      @Override
      public OptionalInt min()
      {
      
         return this.terminateWithValueSafely(IntStream::min);
      
      }
   
      @Override
      public int sum()
      {
      
         return this.terminateWithValueSafely(IntStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjIntConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(intStream -> intStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalInt reduce(final IntBinaryOperator intBinaryOperator)
      {
      
         Objects.requireNonNull(intBinaryOperator);
      
         return this.terminateWithValueSafely(intStream -> intStream.reduce(intBinaryOperator));
      
      }
   
      @Override
      public int reduce(final int identity, final IntBinaryOperator intBinaryOperator)
      {
      
         Objects.requireNonNull(intBinaryOperator);
      
         return this.terminateWithValueSafely(intStream -> intStream.reduce(identity, intBinaryOperator));
      
      }
   
      @Override
      public int[] toArray()
      {
      
         return this.terminateWithValueSafely(IntStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         this.terminateSafely(intStream -> intStream.forEachOrdered(intConsumer));
      
      }
   
      @Override
      public void forEach(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         this.terminateSafely(intStream -> intStream.forEach(intConsumer));
      
      }
   
      @Override
      public IntVersion skip(final long n)
      {
      
         return this.continueStreamSafely(intStream -> intStream.skip(n));
      
      }
   
      @Override
      public IntVersion limit(final long n)
      {
      
         return this.continueStreamSafely(intStream -> intStream.limit(n));
      
      }
   
      @Override
      public IntVersion peek(final IntConsumer intConsumer)
      {
      
         Objects.requireNonNull(intConsumer);
      
         return this.continueStreamSafely(intStream -> intStream.peek(intConsumer));
      
      }
   
      @Override
      public IntVersion sorted()
      {
      
         return this.continueStreamSafely(IntStream::sorted);
      
      }
   
      @Override
      public IntVersion distinct()
      {
      
         return this.continueStreamSafely(IntStream::distinct);
      
      }
   
      @Override
      public IntVersion flatMap(final IntFunction<? extends IntStream> intFunction)
      {
      
         Objects.requireNonNull(intFunction);
      
         return this.continueStreamSafely(intStream -> intStream.flatMap(intFunction));
      
      }
   
      @Override
      public DoubleVersion asDoubleStream()
      {
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
               
               final DoubleStream nextStream = currentStream.asDoubleStream();
               
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
      
      }
   
      @Override
      public LongVersion asLongStream()
      {
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
               
               final LongStream nextStream = currentStream.asLongStream();
               
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      @Override
      public DoubleVersion mapToDouble(final IntToDoubleFunction intToDoubleFunction)
      {
      
         Objects.requireNonNull(intToDoubleFunction);
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final DoubleStream nextStream = currentStream.mapToDouble(intToDoubleFunction);
            
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
      
      }
   
      @Override
      public LongVersion mapToLong(final IntToLongFunction intToLongFunction)
      {
      
         Objects.requireNonNull(intToLongFunction);
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final LongStream nextStream = currentStream.mapToLong(intToLongFunction);
            
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final IntFunction<? extends U> intFunction)
      {
      
         Objects.requireNonNull(intFunction);
      
         final Supplier<Stream<U>> nextStreamSupplier =
            () ->
            {
            
               final IntStream currentStream = this.encapsulatedStream.get();
            
               final Stream<U> nextStream = currentStream.mapToObj(intFunction);
            
               return nextStream;
            
            }
            ;
      
         return new StreamLinesOverInternet<>(nextStreamSupplier);
      
      }
   
      @Override
      public IntVersion map(final IntUnaryOperator intUnaryOperator)
      {
      
         Objects.requireNonNull(intUnaryOperator);
      
         return this.continueStreamSafely(intStream -> intStream.map(intUnaryOperator));
      
      }
   
      @Override
      public IntVersion filter(final IntPredicate intPredicate)
      {
      
         Objects.requireNonNull(intPredicate);
      
         return this.continueStreamSafely(intStream -> intStream.filter(intPredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(IntStream::close);
      
         System.out.println("closed " + this);
      
      }
   
      @Override
      public IntVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(intStream -> intStream.onClose(runnable));
      
      }
   
      @Override
      public IntVersion unordered()
      {
      
         return this.continueStreamSafely(IntStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         try
         (
            final IntStream stream = this.encapsulatedStream.get()
         )
         {
         
            return stream.isParallel();
         
         }
         
         catch (final Exception exception)
         {
         
            throw new IllegalStateException(exception);
         
         }
      
      }
   
   }

   public static class LongVersion
      implements LongStream
   {
   
      private final Supplier<LongStream> encapsulatedStream;
      
      private LongVersion(final Supplier<LongStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private LongVersion continueStreamSafely(final UnaryOperator<LongStream> function)
      {
      
         Objects.requireNonNull(function);
      
         final Supplier<LongStream> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
            
               final LongStream nextStream = function.apply(currentStream);
            
               return nextStream;
            
            }
            ;
      
         return new LongVersion(nextStreamSupplier);
      
      }
   
      private <U> U terminateWithValueSafely(final Function<LongStream, U> function)
      {
      
         try
         (
            final LongStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<LongStream> consumer)
      {
      
         try
         (
            final LongStream stream = this.encapsulatedStream.get();
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfLong spliterator()
      {
      
         final long[] array = this.terminateWithValueSafely(LongStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfLong iterator()
      {
      
         final Spliterator.OfLong spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.LongVersion parallel()
      {
      
         return this.continueStreamSafely(LongStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.LongVersion sequential()
      {
      
         return this.continueStreamSafely(LongStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Long> boxed()
      {
      
         final Supplier<Stream<Long>> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
               
               final Stream<Long> nextStream = currentStream.boxed();
               
               return nextStream;
            
            }
            ;
      
         return new StreamLinesOverInternet<>(nextStreamSupplier);
      
      }
   
      @Override
      public OptionalLong findAny()
      {
      
         return this.terminateWithValueSafely(LongStream::findAny);
      
      }
   
      @Override
      public OptionalLong findFirst()
      {
      
         return this.terminateWithValueSafely(LongStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.noneMatch(longPredicate));
      
      }
   
      @Override
      public boolean allMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.allMatch(longPredicate));
      
      }
   
      @Override
      public boolean anyMatch(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.terminateWithValueSafely(longStream -> longStream.anyMatch(longPredicate));
      
      }
   
      @Override
      public LongSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(LongStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(LongStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(LongStream::count);
      
      }
   
      @Override
      public OptionalLong max()
      {
      
         return this.terminateWithValueSafely(LongStream::max);
      
      }
   
      @Override
      public OptionalLong min()
      {
      
         return this.terminateWithValueSafely(LongStream::min);
      
      }
   
      @Override
      public long sum()
      {
      
         return this.terminateWithValueSafely(LongStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjLongConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(longStream -> longStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalLong reduce(final LongBinaryOperator longBinaryOperator)
      {
      
         Objects.requireNonNull(longBinaryOperator);
      
         return this.terminateWithValueSafely(longStream -> longStream.reduce(longBinaryOperator));
      
      }
   
      @Override
      public long reduce(final long identity, final LongBinaryOperator longBinaryOperator)
      {
      
         Objects.requireNonNull(longBinaryOperator);
      
         return this.terminateWithValueSafely(longStream -> longStream.reduce(identity, longBinaryOperator));
      
      }
   
      @Override
      public long[] toArray()
      {
      
         return this.terminateWithValueSafely(LongStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         this.terminateSafely(longStream -> longStream.forEachOrdered(longConsumer));
      
      }
   
      @Override
      public void forEach(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         this.terminateSafely(longStream -> longStream.forEach(longConsumer));
      
      }
   
      @Override
      public LongVersion skip(final long n)
      {
      
         return this.continueStreamSafely(longStream -> longStream.skip(n));
      
      }
   
      @Override
      public LongVersion limit(final long n)
      {
      
         return this.continueStreamSafely(longStream -> longStream.limit(n));
      
      }
   
      @Override
      public LongVersion peek(final LongConsumer longConsumer)
      {
      
         Objects.requireNonNull(longConsumer);
      
         return this.continueStreamSafely(longStream -> longStream.peek(longConsumer));
      
      }
   
      @Override
      public LongVersion sorted()
      {
      
         return this.continueStreamSafely(LongStream::sorted);
      
      }
   
      @Override
      public LongVersion distinct()
      {
      
         return this.continueStreamSafely(LongStream::distinct);
      
      }
   
      @Override
      public LongVersion flatMap(final LongFunction<? extends LongStream> longFunction)
      {
      
         Objects.requireNonNull(longFunction);
      
         return this.continueStreamSafely(longStream -> longStream.flatMap(longFunction));
      
      }
   
      @Override
      public DoubleVersion asDoubleStream()
      {
      
         final Supplier<DoubleStream> nextStreamSupplier =
            () ->
            {
            
               final LongStream currentStream = this.encapsulatedStream.get();
               
               final DoubleStream nextStream = currentStream.asDoubleStream();
               
               return nextStream;
            
            }
            ;
      
         return new DoubleVersion(nextStreamSupplier);
         
      }
   
      @Override
      public DoubleVersion mapToDouble(final LongToDoubleFunction longToDoubleFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToDouble(longToDoubleFunction),
               DoubleVersion::new
            )
            ;
      
      }
   
      @Override
      public IntVersion mapToInt(final LongToIntFunction longToIntFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToInt(longToIntFunction),
               IntVersion::new
            )
            ;
      
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final LongFunction<? extends U> longFunction)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToObj(longFunction),
               StreamLinesOverInternet<U>::new
            )
            ;
      
      }
   
      @Override
      public LongVersion map(final LongUnaryOperator longUnaryOperator)
      {
      
         Objects.requireNonNull(longUnaryOperator);
      
         return this.continueStreamSafely(longStream -> longStream.map(longUnaryOperator));
      
      }
   
      @Override
      public LongVersion filter(final LongPredicate longPredicate)
      {
      
         Objects.requireNonNull(longPredicate);
      
         return this.continueStreamSafely(longStream -> longStream.filter(longPredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(LongStream::close);
      
      }
   
      @Override
      public LongVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(longStream -> longStream.onClose(runnable));
      
      }
   
      @Override
      public LongVersion unordered()
      {
      
         return this.continueStreamSafely(LongStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         return this.terminateWithValueSafely(LongStream::isParallel);
      
      }
   
   }

   public static class DoubleVersion
      implements DoubleStream
   {
   
      private final Supplier<DoubleStream> encapsulatedStream;
      
      private DoubleVersion(final Supplier<DoubleStream> encapsulatedStream)
      {
      
         Objects.requireNonNull(encapsulatedStream);
      
         this.encapsulatedStream = encapsulatedStream;
      
      }
   
      private DoubleVersion continueStreamSafely(final UnaryOperator<DoubleStream> function)
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               function,
               DoubleVersion::new
            )
            ;
      
      }
   
      private <U> U terminateWithValueSafely(final Function<DoubleStream, U> function)
      {
      
         try
         (
            final DoubleStream stream = this.encapsulatedStream.get();   
         )
         {
         
            Objects.requireNonNull(function);
         
            return function.apply(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      private void terminateSafely(final Consumer<DoubleStream> consumer)
      {
      
         try
         (
            final DoubleStream stream = this.encapsulatedStream.get();   
         )
         {
         
            Objects.requireNonNull(consumer);
         
            consumer.accept(stream);
         
         }
         
         catch (final Exception exception)
         {
         
            throw new RuntimeException(exception);
         
         }
      
      }
   
      @Override
      public Spliterator.OfDouble spliterator()
      {
      
         final double[] array = this.terminateWithValueSafely(DoubleStream::toArray);
      
         return Arrays.spliterator(array);
      
      }
   
      @Override
      public PrimitiveIterator.OfDouble iterator()
      {
      
         final Spliterator.OfDouble spliterator = this.spliterator();
      
         return Spliterators.iterator(spliterator);
      
      }
   
      @Override
      public StreamLinesOverInternet.DoubleVersion parallel()
      {
      
         return this.continueStreamSafely(DoubleStream::parallel);
      
      }
   
      @Override
      public StreamLinesOverInternet.DoubleVersion sequential()
      {
      
         return this.continueStreamSafely(DoubleStream::sequential);
      
      }
   
      @Override
      public StreamLinesOverInternet<Double> boxed()
      {
      
         return
            convertStream
            (
               this.encapsulatedStream,
               DoubleStream::boxed,
               StreamLinesOverInternet<Double>::new
            )
            ;
      }
   
      @Override
      public OptionalDouble findAny()
      {
      
         return this.terminateWithValueSafely(DoubleStream::findAny);
      
      }
   
      @Override
      public OptionalDouble findFirst()
      {
      
         return this.terminateWithValueSafely(DoubleStream::findFirst);
      
      }
   
      @Override
      public boolean noneMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.noneMatch(doublePredicate));
      
      }
   
      @Override
      public boolean allMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.allMatch(doublePredicate));
      
      }
   
      @Override
      public boolean anyMatch(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.anyMatch(doublePredicate));
      
      }
   
      @Override
      public DoubleSummaryStatistics summaryStatistics()
      {
      
         return this.terminateWithValueSafely(DoubleStream::summaryStatistics);
      
      }
   
      @Override
      public OptionalDouble average()
      {
      
         return this.terminateWithValueSafely(DoubleStream::average);
      
      }
   
      @Override
      public long count()
      {
      
         return this.terminateWithValueSafely(DoubleStream::count);
      
      }
   
      @Override
      public OptionalDouble max()
      {
      
         return this.terminateWithValueSafely(DoubleStream::max);
      
      }
   
      @Override
      public OptionalDouble min()
      {
      
         return this.terminateWithValueSafely(DoubleStream::min);
      
      }
   
      @Override
      public double sum()
      {
      
         return this.terminateWithValueSafely(DoubleStream::sum);
      
      }
   
      @Override
      public <R> R collect(final Supplier<R> supplier, final ObjDoubleConsumer<R> accumulator, BiConsumer<R, R> combiner)
      {
      
         Objects.requireNonNull(supplier);
         Objects.requireNonNull(accumulator);
         Objects.requireNonNull(combiner);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.collect(supplier, accumulator, combiner));
      
      }
   
      @Override
      public OptionalDouble reduce(final DoubleBinaryOperator doubleBinaryOperator)
      {
      
         Objects.requireNonNull(doubleBinaryOperator);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(doubleBinaryOperator));
      
      }
   
      @Override
      public double reduce(final double identity, final DoubleBinaryOperator doubleBinaryOperator)
      {
      
         Objects.requireNonNull(doubleBinaryOperator);
      
         return this.terminateWithValueSafely(doubleStream -> doubleStream.reduce(identity, doubleBinaryOperator));
      
      }
   
      @Override
      public double[] toArray()
      {
      
         return this.terminateWithValueSafely(DoubleStream::toArray);
      
      }
   
      @Override
      public void forEachOrdered(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         this.terminateSafely(doubleStream -> doubleStream.forEachOrdered(doubleConsumer));
      
      }
   
      @Override
      public void forEach(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         this.terminateSafely(doubleStream -> doubleStream.forEach(doubleConsumer));
      
      }
   
      @Override
      public DoubleVersion skip(final long n)
      {
      
         return this.continueStreamSafely(doubleStream -> doubleStream.skip(n));
      
      }
   
      @Override
      public DoubleVersion limit(final long n)
      {
      
         return this.continueStreamSafely(doubleStream -> doubleStream.limit(n));
      
      }
   
      @Override
      public DoubleVersion peek(final DoubleConsumer doubleConsumer)
      {
      
         Objects.requireNonNull(doubleConsumer);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.peek(doubleConsumer));
      
      }
   
      @Override
      public DoubleVersion sorted()
      {
      
         return this.continueStreamSafely(DoubleStream::sorted);
      
      }
   
      @Override
      public DoubleVersion distinct()
      {
      
         return this.continueStreamSafely(DoubleStream::distinct);
      
      }
   
      @Override
      public DoubleVersion flatMap(final DoubleFunction<? extends DoubleStream> doubleFunction)
      {
      
         Objects.requireNonNull(doubleFunction);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.flatMap(doubleFunction));
      
      }
   
      @Override
      public LongVersion mapToLong(final DoubleToLongFunction doubleToLongFunction)
      {
      
         Objects.requireNonNull(doubleToLongFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToLong(doubleToLongFunction),
               LongVersion::new
            )
            ;
      
      }
   
      @Override
      public IntVersion mapToInt(final DoubleToIntFunction doubleToIntFunction)
      {
      
         Objects.requireNonNull(doubleToIntFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToInt(doubleToIntFunction),
               IntVersion::new
            )
            ;
         
      }
   
      @Override
      public <U> StreamLinesOverInternet<U> mapToObj(final DoubleFunction<? extends U> doubleFunction)
      {
      
         Objects.requireNonNull(doubleFunction);
      
         return
            convertStream
            (
               this.encapsulatedStream,
               stream -> stream.mapToObj(doubleFunction),
               StreamLinesOverInternet<U>::new
            )
            ;
         
      }
   
      @Override
      public DoubleVersion map(final DoubleUnaryOperator doubleUnaryOperator)
      {
      
         Objects.requireNonNull(doubleUnaryOperator);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.map(doubleUnaryOperator));
      
      }
   
      @Override
      public DoubleVersion filter(final DoublePredicate doublePredicate)
      {
      
         Objects.requireNonNull(doublePredicate);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.filter(doublePredicate));
      
      }
   
      @Override
      public void close()
      {
      
         this.terminateSafely(DoubleStream::close);
      
         System.out.println(this);
      
      }
   
      @Override
      public DoubleVersion onClose(final Runnable runnable)
      {
      
         Objects.requireNonNull(runnable);
      
         return this.continueStreamSafely(doubleStream -> doubleStream.onClose(runnable));
      
      }
   
      @Override
      public DoubleVersion unordered()
      {
      
         return this.continueStreamSafely(DoubleStream::unordered);
      
      }
   
      @Override
      public boolean isParallel()
      {
      
         return this.terminateWithValueSafely(DoubleStream::isParallel);
      
      }
   
   }

}

```
\$\endgroup\$
1
  • \$\begingroup\$ Could someone create a tag for resource management, or something similar? That's the entire point of my post, so I would like it if the tags reflected that fact. \$\endgroup\$ Commented Mar 7, 2024 at 9:22

1 Answer 1

5
\$\begingroup\$

try-with-resources and exception handling are features, not bugs. The vastly less complicated approach is to use Java as Java was intended - write an AutoCloseable utility class. For a very typical example of a situation where the JDK itself operates this way, see DirectoryStream. This is how Java is designed to work. Attempting otherwise is going to be some mix of non-constructive, difficult, non-idiomatic, or "the bad kind of surprise".

I am opening a connection to the internet just to check if my stream is parallel. I don't know how terrible that is

Very.

TWR keeps getting forgotten.

If someone keeps forgetting to hold the steering wheel, the solution is not to remove the steering wheel and build a more complicated, less accessible steering wheel behind the dashboard. Your entry-level developers should be learning how to write idiomatic Java, rather than learning (or - as the case may be - not learning) a custom framework that attempts to be too clever. There are standard tools, some mentioned in the comments, that flag when resources are mismanaged. Learn to use these tools.

To quote a separate commenter,

I would like to consume a stream shared between threads. The stream should autodetect when it is not used any more

It's only safe to close the stream once we can guarantee that all threads are done (i.e. after a join). This can still use AutoCloseable. And what does it even mean to auto-detect when a stream isn't used any more? When it hits EOF? No: because then we break the case where a user wants to perform special logic on EOF, or seek away from EOF. How about when it "goes out of scope"? Other than TWR, this is not possible, since (unlike, say, C++) Java scope is not directly coupled to memory management. The clearest and least-surprising mechanism to declare that we're done with a stream is not automatic, but explicit.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.stream.Stream;

public class Main {
    public static class HTTPLineStream implements AutoCloseable {
        public final URI uri;
        private final BufferedReader reader;

        public HTTPLineStream(URI uri) throws IOException {
            this.uri = uri;
            reader = new BufferedReader(
                new InputStreamReader(
                    uri.toURL().openStream()
                )
            );
        }
        public HTTPLineStream(String uri) throws URISyntaxException, IOException {
            this(new URI(uri));
        }

        public Stream<String> lines() {
            return reader.lines();
        }

        @Override
        public void close() throws IOException {
            // also closes the inner stream
            reader.close();
        }
    }


    public static void main(String[] args) {
        try (HTTPLineStream stream = new HTTPLineStream(
            "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"
        )) {
            stream.lines()
                .limit(10)
                .forEach(System.out::println);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
}
date,cases,deaths
2020-01-21,1,0
2020-01-22,1,0
2020-01-23,1,0
2020-01-24,2,0
2020-01-25,3,0
2020-01-26,5,0
2020-01-27,5,0
2020-01-28,5,0
2020-01-29,5,0
\$\endgroup\$
13
  • \$\begingroup\$ You are absolutely right that neither try nor exception handling are bugs. I certainly don't see them that way. And I also understand if the answers I receive will align under "Use the tools that were meant to handle this problem". And since opening a connection to check for parallelism is as bad as you say, that may be enough to require a complete redesign of my approach. But just to give some context. I do some coding on my offtime, and I help a bunch of entry level (and below) devs to build a project. In preparation for upcoming work, I build this because TWR keeps getting forgotten. \$\endgroup\$ Commented Mar 7, 2024 at 13:57
  • 4
    \$\begingroup\$ @davidalayachew Most software development solutions can detect AutoCloseable not being closed. And Eclipse can for example be configured to make it an compile time error, not to close an AutoCloseable object. \$\endgroup\$ Commented Mar 7, 2024 at 18:35
  • 1
    \$\begingroup\$ David specifically said he wants a stream that does not require an explicit close (and try-with-resources is syntaxic sugar to do exactly that). For instance I would like to consume a stream shared between threads. The stream should autodetect when it is not used any more. \$\endgroup\$ Commented Mar 7, 2024 at 21:07
  • 1
    \$\begingroup\$ Just an update -- we finally shipped this feature to PROD just recently. I went with what you mentioned -- try-with-resources. As expected, they kept forgetting to use it, but we hammered most of the bugs. Thanks again for pushing for the idiomatic way -- it ended up being the better choice in the end. \$\endgroup\$ Commented Feb 13 at 11:24
  • 1
    \$\begingroup\$ @davidalayachew If you want to detect it more easily, wrap your managed resource in something that dumps a warning in the finalizer if close() isn't called. Bonus points for capturing the thread on allocation (in non-production) to make it even easier to find. \$\endgroup\$ Commented Oct 21 at 5:29

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.