JavaSpliterator不断拆分并行流
我发现 Java 并行流有一些令人惊讶的行为。我制作了自己的Spliterator,并且生成的并行流被分割,直到每个流中只有一个元素。这似乎太小了,我想知道我做错了什么。我希望我可以设置一些特征来纠正这个问题。
这是我的测试代码。在Float这里仅仅是一个虚拟的有效载荷,我真正的流类稍微复杂一些。
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 10 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total: " + total );
}
此代码将不断拆分此流,直到每个流Spliterator都只有一个元素。这似乎太多了,效率不高。
输出:
run:
Split on count: 10
Split on count: 5
Split on count: 3
Split on count: 5
Split on count: 2
Split on count: 2
Split on count: 3
Split on count: 2
Split on count: 2
Total: 5.164293184876442
BUILD SUCCESSFUL (total time: 0 seconds)
这是Spliterator. 我主要关心的是我应该使用哪些特征,但也许其他地方有问题?
public class TestingSpliterator implements Spliterator<Float> {
int count;
int splits;
public TestingSpliterator( int count ) {
this.count = count;
}
@Override
public boolean tryAdvance( Consumer<? super Float> cnsmr ) {
if( count > 0 ) {
cnsmr.accept( (float)Math.random() );
count--;
return true;
} else
return false;
}
@Override
public Spliterator<Float> trySplit() {
System.err.println( "Split on count: " + count );
if( count > 1 ) {
splits++;
int half = count / 2;
TestingSpliterator newSplit = new TestingSpliterator( count - half );
count = half;
return newSplit;
} else
return null;
}
@Override
public long estimateSize() {
return count;
}
@Override
public int characteristics() {
return IMMUTABLE | SIZED;
}
}
那么我怎样才能把流分成更大的块呢?我希望在 10,000 到 50,000 附近会更好。
我知道我可以null从该trySplit()方法返回,但这似乎是一种倒退的方法。系统似乎应该对内核数量、当前负载以及使用流的代码的复杂程度有所了解,并相应地调整自身。换句话说,我希望流块大小在外部配置,而不是由流本身在内部固定。
编辑:重新。Holger 在下面的回答中,当我增加原始流中的元素数量时,流拆分会稍微减少,因此StreamSupport最终会停止拆分。
初始流大小为 100 个元素时,StreamSupport当流大小达到 2 时停止拆分(我在屏幕上看到的最后一行是Split on count: 4)。
对于 1000 个元素的初始流大小,各个流块的最终大小约为 32 个元素。
编辑部分 deux:在查看了上面的输出后,我更改了我的代码以列出Spliterator创建的单个s。以下是变化:
public static void main( String[] args ) {
TestingSpliterator splits = new TestingSpliterator( 100 );
Stream<Float> test = StreamSupport.stream( splits, true );
double total = test.mapToDouble( Float::doubleValue ).sum();
System.out.println( "Total Spliterators: " + testers.size() );
for( TestingSpliterator t : testers ) {
System.out.println( "Splits: " + t.splits );
}
}
和TestingSpliterator's ctor:
static Queue<TestingSpliterator> testers = new ConcurrentLinkedQueue<>();
public TestingSpliterator( int count ) {
this.count = count;
testers.add( this ); // OUCH! 'this' escape
}
这段代码的结果是第一个Spliterator被拆分了 5 次。下一个Spliterator被拆分 4 次。下一组Spliteratorsget 拆分 3 次。等等。结果是制作了 36 个Spliterators,并且流被分成了尽可能多的部分。在典型的桌面系统上,这似乎是 API 认为最适合并行操作的方式。
我将在下面接受 Holger 的回答,这基本上是StreamSupport班级在做正确的事情,别担心,要开心。对我来说,部分问题是我正在对非常小的流大小进行早期测试,我对拆分的数量感到惊讶。不要自己犯同样的错误。