关于How to skip even lines of a Stream obtained from the Files.lines问题,我遵循了公认的答案方法,基于filterEven()接口(interface)实现了自己的Spliterator<T>方法,例如:
public static <T> Stream<T> filterEven(Stream<T> src) {
Spliterator<T> iter = src.spliterator();
AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
{
@Override
public boolean tryAdvance(Consumer<? super T> action) {
iter.tryAdvance(item -> {}); // discard
return iter.tryAdvance(action); // use
}
};
return StreamSupport.stream(res, false);
}
我可以通过以下方式使用:
Stream<DomainObject> res = Files.lines(src)
filterEven(res)
.map(line -> toDomainObject(line))
但是,相对于使用带有副作用的
filter()的下一种方法来衡量这种方法的性能,我注意到下一种方法的效果更好:
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
Stream<DomainObject> res = Files.lines(src)
.filter(line -> isEvenLine ())
.map(line -> toDomainObject(line))
我使用JMH测试了性能,但没有在基准测试中包含文件负载。我以前将其加载到数组中。然后,每个基准测试都从先前的数组创建
Stream<String>开始,然后过滤偶数行,然后应用
mapToInt()提取
int字段的值,最后是
max()操作。这是基准测试之一(您可以检查整个
Program
here,在这里您有
data file with about 186 lines):
@Benchmark
public int maxTempFilterEven(DataSource src){
Stream<String> content = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1); // Skip line: Not available
return filterEven(content) // Filter daily info and skip hourly
.mapToInt(line -> parseInt(line.substring(14, 16)))
.max()
.getAsInt();
}
我不明白为什么
filter()方法具有比
filterEven()(〜50ops/ms)更好的性能(〜80ops/ms)?
请您参考如下方法:
简介
我想我知道原因,但是不幸的是,我不知道如何提高基于Spliterator的解决方案的性能(至少在不重写整个Streams API功能的情况下)。
旁注1 :设计Stream API时,性能并不是最重要的设计目标。如果性能至关重要,则最有可能在不使用Stream API的情况下重新编写代码将使代码更快。 (例如,Stream API不可避免地会增加内存分配,从而增加GC压力)。另一方面,在大多数情况下,Stream API以相对较小的性能下降为代价提供了更好的高级API。
部分 1 或理论上的简短回答Stream旨在实现一种内部迭代作为消耗的主要方式,而外部迭代(即基于Spliterator的方式)是一种“模拟”的附加手段。因此,外部迭代会涉及一些开销。懒惰对外部迭代的效率增加了一些限制,并且由于需要支持flatMap,因此有必要在此过程中使用某种动态缓冲区。
旁注2 在某些情况下,基于Spliterator的迭代可能与内部迭代一样快(在这种情况下为filter)。在直接从包含数据的Spliterator直接创建Stream的情况下尤其如此。要查看它,可以修改测试以将第一个过滤器具体化为String s数组:
String[] filteredData = Arrays.stream(src.data)
.filter(s-> s.charAt(0) != '#') // Filter comments
.skip(1)
.toArray(String[]::new);
然后比较
maxTempFilter和
maxTempFilterEven的性能,以接受该预先过滤的
String[] filteredData。如果您想知道为什么会这样,您可能应该阅读本长答案的其余部分,或者至少阅读第2部分。
部分 2 或 更长的理论答案:
流的设计主要是通过某些终端操作将其整体消耗掉。尽管被支持,但一个接一个的迭代元素并不是设计使用流的主要方式。
请注意,使用“功能性” Stream API(例如
map,
flatMap,
filter,
reduce和
collect),您不能在某个步骤中说“我已经有足够的数据,不再遍历源并推送值”。您可以丢弃某些传入数据(就像
filter一样),但不能停止迭代。 (
take和
skip转换实际上是在内部使用
Spliterator来实现的;
anyMatch,
allMatch,
noneMatch,
findFirst,
findAny等使用非公共(public)API
j.u.s.Sink.cancellationRequested,它们也更容易使用,因为不能进行多个终端操作。)如果管道中的所有转换都是同步的,则可以将它们组合为一个聚合函数(
Consumer),然后在一个简单的循环中调用它(可以选择将循环执行拆分为多个线程)。这就是我简化的基于状态的过滤器所代表的含义(请参见
中的代码,向我显示一些代码部分)。如果管道中有一个
flatMap,它会变得更加复杂,但是思路仍然相同。
基于
Spliterator的转换从根本上有所不同,因为它向管道增加了异步的,由消费者驱动的步骤。现在,
Spliterator而不是源
Stream驱动了迭代过程。如果您直接在源
Spliterator上请求
Stream,它也许可以返回一些实现,该实现只需对其内部数据结构进行迭代,这就是为什么实现预先过滤的数据应消除性能差异的原因。但是,如果您为某些非空管道创建
Spliterator,则除了要求源将元素逐一推送通过管道,直到某个元素通过所有过滤器之外,没有其他(简单)的选择(另请参见示例2中的第二个示例)。
,向我显示一些部分代码)。源元素被逐个而不是分批推送的事实是使
Stream变得懒惰的基本决定的结果。需要缓冲区而不是仅一个元素是对
flatMap的支持的结果:从源中推送一个元素可以为
Spliterator生成许多元素。
部分 3 或 向我显示一些代码
本部分试图为“理论”部分中所描述的代码提供支持(包括链接到实际代码和模拟代码)。
首先,您应该知道当前的Streams API实现将非终端(中间)操作累积到单个惰性管道中(请参阅 j.u.s.AbstractPipeline及其子类(例如 j.u.s.ReferencePipeline)。然后,当应用了终端操作时,原始操作中的所有元素
Stream通过管道“推送”。
您看到的是两件事的结果:
里面有一个基于
Spliterator的步骤。 OddLines不是具有状态过滤器的代码或多或少类似于以下简单代码:
static int similarToFilter(String[] data)
{
final int[] counter = {0};
final Predicate<String> isEvenLine = item -> ++counter[0] % 2 == 0;
int skip = 1;
boolean reduceEmpty = true;
int reduceState = 0;
for (String outerEl : data)
{
if (outerEl.charAt(0) != '#')
{
if (skip > 0)
skip--;
else
{
if (isEvenLine.test(outerEl))
{
int intEl = parseInt(outerEl.substring(14, 16));
if (reduceEmpty)
{
reduceState = intEl;
reduceEmpty = false;
}
else
{
reduceState = Math.max(reduceState, intEl);
}
}
}
}
}
return reduceState;
}
请注意,这实际上是一个内部包含一些计算(过滤/转换)的循环。
另一方面,当您在管道中添加
Spliterator时,情况会发生很大变化,即使使用与实际发生的情况基本相似的简化代码,它也会变得更大,例如:
interface Sp<T>
{
public boolean tryAdvance(Consumer<? super T> action);
}
static class ArraySp<T> implements Sp<T>
{
private final T[] array;
private int pos;
public ArraySp(T[] array)
{
this.array = array;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (pos < array.length)
{
action.accept(array[pos]);
pos++;
return true;
}
else
{
return false;
}
}
}
static class WrappingSp<T> implements Sp<T>, Consumer<T>
{
private final Sp<T> sourceSp;
private final Predicate<T> filter;
private final ArrayList<T> buffer = new ArrayList<T>();
private int pos;
public WrappingSp(Sp<T> sourceSp, Predicate<T> filter)
{
this.sourceSp = sourceSp;
this.filter = filter;
}
@Override
public void accept(T t)
{
buffer.add(t);
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (true)
{
if (pos >= buffer.size())
{
pos = 0;
buffer.clear();
sourceSp.tryAdvance(this);
}
// failed to fill buffer
if (buffer.size() == 0)
return false;
T nextElem = buffer.get(pos);
pos++;
if (filter.test(nextElem))
{
action.accept(nextElem);
return true;
}
}
}
}
static class OddLineSp<T> implements Sp<T>, Consumer<T>
{
private Sp<T> sourceSp;
public OddLineSp(Sp<T> sourceSp)
{
this.sourceSp = sourceSp;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
if (sourceSp == null)
return false;
sourceSp.tryAdvance(this);
if (!sourceSp.tryAdvance(action))
{
sourceSp = null;
}
return true;
}
@Override
public void accept(T t)
{
}
}
static class ReduceIntMax
{
boolean reduceEmpty = true;
int reduceState = 0;
public int getReduceState()
{
return reduceState;
}
public void accept(int t)
{
if (reduceEmpty)
{
reduceEmpty = false;
reduceState = t;
}
else
{
reduceState = Math.max(reduceState, t);
}
}
}
static int similarToSpliterator(String[] data)
{
ArraySp<String> src = new ArraySp<>(data);
int[] skip = new int[1];
skip[0] = 1;
WrappingSp<String> firstFilter = new WrappingSp<String>(src, (s) ->
{
if (s.charAt(0) == '#')
return false;
if (skip[0] != 0)
{
skip[0]--;
return false;
}
return true;
});
OddLineSp<String> oddLines = new OddLineSp<>(firstFilter);
final ReduceIntMax reduceIntMax = new ReduceIntMax();
while (oddLines.tryAdvance(s ->
{
int intValue = parseInt(s.substring(14, 16));
reduceIntMax.accept(intValue);
})) ; // do nothing in the loop body
return reduceIntMax.getReduceState();
}
此代码较大,因为如果在循环内没有一些非平凡的有状态回调,则无法(或至少很难)表示逻辑。这里的
Sp接口(interface)是
j.u.s.Stream和
j.u.Spliterator接口(interface)的混合体。
ArraySp表示Arrays.stream的结果。 WrappingSp与j.u.s.StreamSpliterators.WrappingSpliterator相似,后者在实际代码中表示针对任何非空管道的Spliterator接口(interface)的实现,即,至少应用了一个中间操作的Stream(请参阅j.u.s.AbstractPipeline.spliterator method)。在我的代码中,我将其与StatelessOp子类合并,并放置负责filter方法实现的逻辑。同样为了简单起见,我使用skip实现了filter。 OddLineSp对应于您的OddLines及其产生的Stream ReduceIntMax表示ReduceOps的Math.max终端操作int 那么在此示例中重要的是什么?这里重要的是,因为您首先过滤了原始流,所以
OddLineSp是根据非空管道(即
WrappingSp)创建的。而且,如果您仔细观察
WrappingSp,您会注意到每次
tryAdvance都被调用时,它将调用委托(delegate)给
sourceSp并将结果累积到
buffer中。而且,由于管道中没有
flatMap,因此
buffer的元素将被一一复制。 IE。每次调用
WrappingSp.tryAdvance时,它将调用
ArraySp.tryAdvance,准确地返回一个元素(通过回调),并将其进一步传递给调用方提供的
consumer(除非该元素与过滤器不匹配,在这种情况下将再次调用
ArraySp.tryAdvance)再一次,但
buffer永远不会一次填充多个元素)。
旁注3 :如果要查看实际代码,则最有趣的地方是 j.u.s.StreamSpliterators.
WrappingSpliterator.tryAdvance ,它调用
j.u.s.StreamSpliterators.
AbstractWrappingSpliterator.doAdvance 依次调用
j.u.s.StreamSpliterators. AbstractWrappingSpliterator.fillBuffer ,
j.u.s.StreamSpliterators. pusher 依次调用在ojit_a初始化的
WrappingSpliterator.initPartialTraversalState
因此,影响性能的主要因素是复制到缓冲区中。
对于我们这些普通的Java开发人员来说,不幸的是,Stream API的当前实现几乎是封闭的,您不能仅使用继承或组合来修改内部行为的某些方面。
您可以使用基于反射的黑客手段,使针对特定情况的复制到缓冲区的效率更高,并获得一定的性能(但是牺牲了
Stream的惰性),但是您不能完全避免这种复制,因此基于
Spliterator的代码将是反正比较慢
回到第2条 旁注的示例,基于
Spliterator的包含具体化
filteredData的测试工作得更快,因为在
WrappingSp之前的管道中没有
OddLineSp,因此不会复制到中间缓冲区中。




