RxJS:异步变异树

RxJS: Asynchronously mutate tree

本文关键字:变异 异步 RxJS      更新时间:2023-09-26

我有一系列对象,需要通过向每个对象添加一个属性来异步修改:

[{ id: 1 }, { id: 2 }] => [{ id: 1, foo: 'bar' }, { id: 2, foo: 'bar' }]

其同步等价物为:

var xs = [{ id: 1 }, { id: 2 }];
// Warning: mutation!
xs.forEach(function (x) {
    x.foo = 'bar';
});
var newXs = xs;

但是,在我的情况下,我需要异步地附加foo属性。我希望结束值是添加了foo属性的对象序列。

我想出了以下代码来解决这个问题。在本例中,我只是为每个值为bar的对象添加一个属性。

var xs = Rx.Observable.fromArray([{ id: 1 }, { id: 2 }]);
var propertyValues = xs
    // Warning: mutation!
    .flatMap(function (x) {
        return Rx.Observable.return('bar');
    });
var newXs =
    .zip(propertyValues, function (x, propertyValue) {
        // Append the property here
        x.foo = propertyValue;
        return x;
    })
    .toArray();
newXs.subscribe(function (y) { console.log(y); });

这是解决我的问题的最佳方法,还是Rx为异步改变序列中的对象提供了更好的方法?我正在寻找一个更干净的解决方案,因为我有一棵很深的树,我需要对它进行变异,而这段代码很快就会变得异常:

var xs = Rx.Observable.fromArray([{ id: 1, blocks: [ {} ] }, { id: 2, blocks: [ {} ] } ]);
var propertyValues = xs
    // Warning: mutation!
    .flatMap(function (x) {
        return Rx.Observable.fromArray(x.blocks)
            .flatMap(function (block) {
                var blockS = Rx.Observable.return(block);
                var propertyValues = blockS.flatMap(function (block) {
                    return Rx.Observable.return('bar');
                });
                return blockS.zip(propertyValues, function (block, propertyValue) {
                    block.foo = propertyValue;
                    return block;
                });
            })
            .toArray();
    });
xs
    .zip(propertyValues, function (x, propertyValue) {
        // Rewrite the property here
        x.blocks = propertyValue;
        return x;
    })
    .toArray()
    .subscribe(function (newXs) { console.log(newXs); });

也许我一开始就不应该进行这种突变?

是否有理由需要创建两个独立的Observable:一个用于更新的列表,另一个用于生成的值?

如果您只是在原始列表上执行.map(),那么您应该能够异步更新列表并订阅结果:

// This is the function that generates the new property value
function getBlocks(x) { ... }
const updatedList$ = Rx.Observable.fromArray(originalList)
    // What we're essentially doing here is scheduling work
    // to be completed for each item
    .map(x => Object.assign({}, x, { blocks: getBlocks(x)}))
    .toArray();
// Finally we can wait for our updatedList$ observable to emit
// the modified list
updatedList$.subscribe(list => console.log(list));

为了抽象这个功能,我创建了一个助手函数,它将使用setTimeout:显式地为每个项目安排工作

function asyncMap(xs, fn) {
    return Rx.Observable.fromArray(xs)
        .flatMap(x => {
            return new Rx.Observable.create(observer => {
                setTimeout(() => {
                    observer.onNext(fn(x));
                    observer.completed();
                }, 0);
            });
        })
        .toArray();
}

您可以使用此功能为每个项目安排要完成的工作:

function updateItem(x) {
    return Object.assign({}, x, { blocks: getBlocks(x) }
}
var updatedList$ = asyncMap(originalList, updateItem);
updateList$.subscribe(newList => console.log(newList));