JavaSpliterator不断拆分并行流

我发现 Java 并行流有一些令人惊讶的行为。我制作了自己的Spliterator,并且生成的并行流被分割,直到每个流中只有一个元素。这似乎太小了,我想知道我做错了什么。我希望我可以设置一些特征来纠正这个问题。

这是我的测试代码。在Float这里仅仅是一个虚拟的有效载荷,我真正的流类稍微复杂一些。

   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 10 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total: " + total );
   }

此代码将不断拆分此流,直到每个流Spliterator都只有一个元素。这似乎太多了,效率不高。

输出:

run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)

这是Spliterator. 我主要关心的是我应该使用哪些特征,但也许其他地方有问题?

public class TestingSpliterator implements Spliterator<Float> {
   int count;
   int splits;

   public TestingSpliterator( int count ) {
      this.count = count;
   }

   @Override
   public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
      if( count > 0 ) {
         cnsmr.accept( (float)Math.random() );
         count--;
         return true;
      } else
         return false;
   }

   @Override
   public Spliterator<Float> trySplit() {
      System.err.println( "Split on count: " + count );
      if( count > 1 ) {
         splits++;
         int half = count / 2;
         TestingSpliterator newSplit = new TestingSpliterator( count - half );
         count = half;
         return newSplit;
      } else
         return null;
   }

   @Override
   public long estimateSize() {
      return count;
   }

   @Override
   public int characteristics() {
      return IMMUTABLE | SIZED;
   }
}

那么我怎样才能把流分成更大的块呢?我希望在 10,000 到 50,000 附近会更好。

我知道我可以null从该trySplit()方法返回,但这似乎是一种倒退的方法。系统似乎应该对内核数量、当前负载以及使用流的代码的复杂程度有所了解,并相应地调整自身。换句话说,我希望流块大小在外部配置,而不是由流本身在内部固定。

编辑:重新。Holger 在下面的回答中,当我增加原始流中的元素数量时,流拆分会稍微减少,因此StreamSupport最终会停止拆分。

初始流大小为 100 个元素时,StreamSupport当流大小达到 2 时停止拆分(我在屏幕上看到的最后一行是Split on count: 4)。

对于 1000 个元素的初始流大小,各个流块的最终大小约为 32 个元素。


编辑部分 deux:在查看了上面的输出后,我更改了我的代码以列出Spliterator创建的单个s。以下是变化:

   public static void main( String[] args ) {
      TestingSpliterator splits = new TestingSpliterator( 100 );
      Stream<Float> test = StreamSupport.stream( splits, true );
      double total = test.mapToDouble( Float::doubleValue ).sum();
      System.out.println( "Total Spliterators: " + testers.size() );
      for( TestingSpliterator t : testers ) {
         System.out.println( "Splits: " + t.splits );
      }
   } 

TestingSpliterator's ctor:

   static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();

   public TestingSpliterator( int count ) {
      this.count = count;
      testers.add( this ); // OUCH! 'this' escape
   }

这段代码的结果是第一个Spliterator被拆分了 5 次。下一个Spliterator被拆分 4 次。下一组Spliteratorsget 拆分 3 次。等等。结果是制作了 36 个Spliterators,并且流被分成了尽可能多的部分。在典型的桌面系统上,这似乎是 API 认为最适合并行操作的方式。

我将在下面接受 Holger 的回答,这基本上是StreamSupport班级在做正确的事情,别担心,要开心。对我来说,部分问题是我正在对非常小的流大小进行早期测试,我对拆分的数量感到惊讶。不要自己犯同样的错误。

以上是JavaSpliterator不断拆分并行流的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>