代码之家  ›  专栏  ›  技术社区  ›  daniel

Rxjs订阅队列

  •  3
  • daniel  · 技术社区  · 6 年前

    我的angular应用程序中有一个firebase订阅,可以多次启动。 ich如何实现将任务作为队列处理,以便我可以同步运行每个任务一次?

    this.tasks.subscribe(async tasks => {
       for (const x of tasks) 
          await dolongtask(x); // has to be sync
          await removetask(x);
       });
    

    问题是subribe事件在长任务仍在处理时激发。

    3 回复  |  直到 6 年前
        1
  •  3
  •   Richard Matsen    6 年前

    • firebase查询返回所有未处理的任务(在集合中),并且每次添加新任务时都会发出完整的列表。

    • 查询仅在 removeTask() 已运行

    为了便于说明,我用一个主题(重命名为tasksQuery$)模拟了firebase查询,并在脚本的底部模拟了一系列firebase事件。 希望不要太混乱!

    console.clear()
    const { mergeMap, filter } = rxjs.operators;
    
    // Simulate tasks query  
    const tasksQuery$ = new rxjs.Subject();
    
    // Simulate dolongtask and removetask (assume both return promises that can be awaited)
    const dolongtask = (task) => {
      console.log( `Processing: ${task.id}`);
      return new Promise(resolve => {
        setTimeout(() => {
          console.log( `Processed: ${task.id}`);
          resolve('done')
        }, 1000);
      });
    }
    const removeTask = (task) => {
      console.log( `Removing: ${task.id}`);
      return new Promise(resolve => {
        setTimeout(() => {
          console.log( `Removed: ${task.id}`);
          resolve('done')
        }, 200);
      });
    }
    
    // Set up queue (this block could be a class in Typescript)
    let tasks = [];
    const queue$ = new rxjs.Subject();
    const addToQueue = (task) => {
      tasks = [...tasks, task];
      queue$.next(task);
    }
    const removeFromQueue = () => tasks = tasks.slice(1);
    const queueContains = (task) => tasks.map(t => t.id).includes(task.id)
    
    // Dedupe and enqueue
    tasksQuery$.pipe(
      mergeMap(tasks => tasks), // flatten the incoming task array 
      filter(task => task && !queueContains(task)) // check not in queue
    ).subscribe(task => addToQueue(task) );
    
    //Process the queue
    queue$.subscribe(async task => {
      await dolongtask(task);
      await removeTask(task); // Assume this sends 'delete' to firebase
      removeFromQueue();
    });
    
    // Run simulation
    tasksQuery$.next([{id:1},{id:2}]);
    // Add after delay to show repeated items in firebase
    setTimeout(() => {
      tasksQuery$.next([{id:1},{id:2},{id:3}]); 
    }, 500);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
        2
  •  2
  •   Ingo Bürk    6 年前

    IMHO,我会尝试利用rxjs的强大功能,因为我们在这里已经在使用它了,并且避免实现另一个答案所建议的定制排队概念(尽管你当然可以做到)。

    按顺序 . rxjs允许通过 concatMap 基本上是开箱即用的操作员:

    $data.pipe(concatMap(item => processItem(item))).subscribe();
    

    这只是假设 processItem 返回可观察的。自从你用过 await ,我假设您的函数当前返回承诺。这些可以通过使用 from .

    从OP中可以看到的唯一细节是,observable实际上发射了一个项目数组,我们希望对每个发射的每个项目执行操作。为此,我们只需使用 mergeMap .


    让我们把它放在一起。请注意,如果您不准备一些存根数据和日志记录,那么实际的实现仅限于 代码行(使用mergeMap+concatMap)。

    const { from, interval } = rxjs;
    const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;
    
    // Stub for the long-running operation
    function processTask(task) {
      console.log("Processing task: ", task);
      return new Promise(resolve => {
        setTimeout(() => {
          console.log("Finished task: ", task);
          resolve(task);
        }, 500 * Math.random() + 300);
      });
    }
    
    // Turn processTask into a function returning an observable
    const processTask$ = item => from(processTask(item));
    
    // Some stubbed data stream
    const tasks$ = interval(250).pipe(
      take(9),
      bufferCount(3),
    );
    
    tasks$.pipe(
      tap(task => console.log("Received task: ", task)),
      // Flatten the tasks array since we want to work in sequence anyway
      mergeMap(tasks => tasks),
      // Process each task, but do so consecutively
      concatMap(task => processTask$(task)),
    ).subscribe(null, null, () => console.log("Done"));
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
        3
  •  1
  •   Hiram K. Hackenbacker    6 年前

    撇开标题“Rxjs subscription queue”不谈,实际上可以修复异步/等待代码。

    Using async/await with a forEach loop .

    例如,您可以根据@Bergi的答案替换For循环,

    具有 Promise.all()

    console.clear();
    const { interval } = rxjs;
    const { take, bufferCount } = rxjs.operators;
    
    function processTask(task) {
      console.log(`Processing task ${task}`);
      return new Promise(resolve => {
        setTimeout(() => {
          resolve(task);
        }, 500 * Math.random() + 300);
      });
    }
    function removeTask(task) {
      console.log(`Removing task ${task}`);
      return new Promise(resolve => {
        setTimeout(() => {
          resolve(task);
        }, 50);
      });
    }
    
    const tasks$ = interval(250).pipe(
      take(10),
      bufferCount(3),
    );
    
    tasks$.subscribe(async tasks => {
      await Promise.all(
        tasks.map(async task => {
          await processTask(task); // has to be sync
          await removeTask(task);
          console.log(`Finished task ${task}`);
        })
      );
    });
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>

    更好的是,您可以调整查询的形状,以避免使用for循环,

    具有 mergeMap()

    console.clear();
    const { interval } = rxjs;
    const { mergeMap, take, bufferCount } = rxjs.operators;
    
    function processTask(task) {
      console.log(`Processing task ${task}`);
      return new Promise(resolve => {
        setTimeout(() => {
          resolve(task);
        }, 500 * Math.random() + 300);
      });
    }
    function removeTask(task) {
      console.log(`Removing task ${task}`);
      return new Promise(resolve => {
        setTimeout(() => {
          resolve(task);
        }, 50);
      });
    }
    
    const tasks$ = interval(250).pipe(
      take(10),
      bufferCount(3),
    );
    
    tasks$
    .pipe(mergeMap(tasks => tasks))
    .subscribe(
      async task => {
        await processTask(task); // has to be sync
        await removeTask(task);
        console.log(`Finished task ${task}`);
      }
    );
    <