代码之家  ›  专栏  ›  技术社区  ›  Vaccano

限制在给定时间开放的承诺量

  •  11
  • Vaccano  · 技术社区  · 8 年前

    doSomething(action) 一次一个。(这意味着列表中的第二项在第一项完成之前不会接到呼叫)。

    async performActionsOneAtATime() {
        for (let action of listOfActions) {
            const actionResult = await doSomethingOnServer(action);
            console.log(`Action Done: ${actionResult}`);
        }
     }
    

    这将立即向服务器发送所有请求(无需等待任何响应):

    async performActionsInParallel() {
        for (let action of listOfActions) {
            const actionResultPromise = doSomething(action);
            actionResultPromise.then((actionResult) => {
                console.log(`Action Done: ${actionResult}`);
            });
        }
    }
    

    但我真正需要的是一种控制它们的方法。可能一次打10或20个电话。(一次一个太慢,但所有600个都会使服务器过载。)

    但我很难理解这一点。

    关于如何减少每次打给X的电话数量,有什么建议吗?

    (这个问题使用TypeScript,但我可以用ES6 JavaScript回答。)

    7 回复  |  直到 3 年前
        1
  •  17
  •   Jeff Bowman    2 年前

    您可以在一个简短的函数中完成此操作。(根据naomik的建议按顺序返回值。谢谢!)

    /**
     * Performs a list of callable actions (promise factories) so
     * that only a limited number of promises are pending at any
     * given time.
     *
     * @param listOfCallableActions An array of callable functions,
     *     which should return promises.
     * @param limit The maximum number of promises to have pending
     *     at once.
     * @returns A Promise that resolves to the full list of values
     *     when everything is done.
     */
    function throttleActions(listOfCallableActions, limit) {
      // We'll need to store which is the next promise in the list.
      let i = 0;
      let resultArray = new Array(listOfCallableActions.length);
    
      // Now define what happens when any of the actions completes.
      // Javascript is (mostly) single-threaded, so only one
      // completion handler will call at a given time. Because we
      // return doNextAction, the Promise chain continues as long as
      // there's an action left in the list.
      function doNextAction() {
        if (i < listOfCallableActions.length) {
          // Save the current value of i, so we can put the result
          // in the right place
          let actionIndex = i++;
          let nextAction = listOfCallableActions[actionIndex];
          return Promise.resolve(nextAction()).then(result => {
            // Save results to the correct array index.
            resultArray[actionIndex] = result;
          }).then(doNextAction);
        }
      }
    
      // Now start up the original <limit> number of promises.
      // i advances in calls to doNextAction.
      let listOfPromises = [];
      while (i < limit && i < listOfCallableActions.length) {
        listOfPromises.push(doNextAction());
      }
      return Promise.all(listOfPromises).then(() => resultArray);
    }
    
    // Test harness:
    
    function delay(name, ms) {
      return new Promise((resolve, reject) => setTimeout(() => {
        console.log(name);
        resolve(name);
      }, ms));
    }
    
    var ps = [];
    for (let i = 0; i < 10; i++) {
      ps.push(() => delay("promise " + i, Math.random() * 3000));
    }
    
    throttleActions(ps, 3).then(result => console.log(result));
        2
  •  3
  •   Mulan    8 年前

    编辑

    杰夫·鲍曼(Jeff Bowman)极大地改进了他的答案,以解决有意义的价值观。请随意查看此答案的历史,以了解为什么解析值如此重要/有用。


    节流阀

    此解决方案非常类似于本机 Promise.all

    这是怎么回事…

    • 尽快解决承诺
    • 遇到一次拒绝时立即拒绝

    有什么不同…

    • 数组输入接受承诺 创建者 (沙沙声);非实际承诺

    // throttlep :: Number -> [(* -> Promise)]
    const throttlep = n=> Ps=>
      new Promise ((pass, fail)=> {
        // r is the number of promises, xs is final resolved value
        let r = Ps.length, xs = []
        // decrement r, save the resolved value in position i, run the next promise
        let next = i=> x=> (r--, xs[i] = x, run(Ps[n], n++))
        // if r is 0, we can resolve the final value xs, otherwise chain next
        let run = (P,i)=> r === 0 ? pass(xs) : P().then(next(i), fail)
        // initialize by running the first n promises
        Ps.slice(0,n).forEach(run)
      })
    
    // -----------------------------------------------------
    // make sure it works
    
    // delay :: (String, Number) -> (* -> Promise)
    const delay = (id, ms)=>
      new Promise (pass=> {
        console.log (`running: ${id}`)
        setTimeout(pass, ms, id)
      })
    
    // ps :: [(* -> Promise)]
    let ps = new Array(10)
    for (let i = 0; i < 10; i++) {
      ps[i] = () => delay(i, Math.random() * 3000)
    }
    
    // run a limit of 3 promises in parallel
    // the first error will reject the entire pool
    throttlep (3) (ps) .then (
      xs => console.log ('result:', xs),
      err=> console.log ('error:', err.message)
    )

    控制台输出

    输入按顺序运行;解析结果与输入的顺序相同

    running: 0
    running: 1
    running: 2
    => Promise {}
    running: 3
    running: 4
    running: 5
    running: 6
    running: 7
    running: 8
    running: 9
    result: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    

    实际使用

    让我们看一个更实用的代码示例。这段代码的任务是从服务器获取一组图像。这就是我们可以使用的 throttlep 将同时请求的数量限制为每次3个

    // getImage :: String -> Promise<base64>
    let getImage = url=> makeRequest(url).then(data => data.base64, reqErrorHandler)
    
    // actions :: [(* -> Promise<base64>)]
    let actions = [
      ()=> getImage('one.jpg'),
      ()=> getImage('two.jpg'),
      ()=> getImage('three.jpg'),
      ()=> getImage('four.jpg'),
      ()=> getImage('five.jpg')
    ]
    
    // throttle the actions then do something...
    throttlep (3) (actions) .then(results => {
      // results are guaranteed to be ordered the same as the input array
      console.log(results)
      // [<base64>, <base64>, <base64>, <base64>, <base64>]
    })
    
        3
  •  2
  •   Stephen Cleary    8 年前

    没有任何内置的功能,因此您必须构建自己的。AFAIK,也没有这个库。

    首先,从“延期”开始——一种允许外部代码解决问题的承诺:

    class Deferral<T> {
        constructor() {
            this.promise = new Promise<T>((resolve, reject) => {
                this.resolve = resolve;
                this.reject = reject;
            });
        }
    
        promise: Promise<T>;
        resolve: (thenableOrResult?: T | PromiseLike<T>) => void;
        reject: (error: any) => void;
    }
    

    然后您可以定义一个“等待队列”,它表示等待进入关键部分的所有代码块:

    class WaitQueue<T> {
        private deferrals: Deferral<T>[];
    
        constructor() {
            this.deferrals = [];
        }
    
        get isEmpty(): boolean {
            return this.deferrals.length === 0;
        }
    
        enqueue(): Promise<T> {
            const deferral = new Deferral<T>();
            this.deferrals.push(deferral);
            return deferral.promise;
        }
    
        dequeue(result?: T) {
            const deferral = this.deferrals.shift();
            deferral.resolve(result);
        }
    }
    

    最后,您可以定义异步信号量,如下所示:

    export class AsyncSemaphore {
        private queue: WaitQueue<void>;
        private _count: number;
    
        constructor(count: number = 0) {
            this.queue = new WaitQueue<void>();
            this._count = count;
        }
    
        get count(): number { return this._count; }
    
        waitAsync(): Promise<void> {
            if (this._count !== 0) {
                --this._count;
                return Promise.resolve();
            }
            return this.queue.enqueue();
        }
    
        release(value: number = 1) {
            while (value !== 0 && !this.queue.isEmpty) {
                this.queue.dequeue();
                --value;
            }
            this._count += value;
        }
    }
    

    示例用法:

    async function performActionsInParallel() {
        const semaphore = new AsyncSemaphore(10);
        const listOfActions = [...];
        const promises = listOfActions.map(async (action) => {
            await semaphore.waitAsync();
            try {
                await doSomething(action);
            }
            finally {
                semaphore.release();
            }
        });
        const results = await Promise.all(promises);
    }
    

    此方法首先创建一个节流器,然后立即启动所有异步操作。每个异步操作将首先(异步)等待信号量释放,然后执行操作,最后释放信号量(允许另一个信号量进入)。当所有异步操作完成后,将检索所有结果。

        4
  •  0
  •   kennasoft    8 年前

    您可以使用pub-sub模式来实现这一点。我也不熟悉typescipt,我不知道这是发生在浏览器还是后端。我将为此编写伪代码(假设它是后端):

    //I'm assuming required packages are included e.g. events = require("events");
    let limit = 10;
    let emitter = new events.EventEmitter();
    
    for(let i=0; i<limit; i++){
        fetchNext(listOfActions.pop());
    }
    
    function fetchNext(action){
        const actionResultPromise = doSomething(action);
        actionResultPromise.then((actionResult) => {
            console.log(`Action Done: ${actionResult}`);
            emitter.emit('grabTheNextOne', listOfActions.pop());
        });
    }
    
    emitter.on('grabTheNextOne', fetchNext);
    

    如果您在Node中工作,EventEmitter是NodeJS的一部分。如果在浏览器中,则可以使用普通事件模型。这里的关键思想是发布订阅模式。

        5
  •  0
  •   alextanhongpin    5 年前

    可以用发电机限制承诺。在下面的示例中,我们对它们进行节流,以便

    function asyncTask(duration = 1000) {
      return new Promise(resolve => {
        setTimeout(resolve, duration, duration)
      })
    }
    
    
    async function main() {
      const items = Array(10).fill(() => asyncTask()) {
        const generator = batchThrottle(3, ...items)
        console.log('batch', (await generator.next()).value)
        for await (let result of generator) {
          console.log('remaining batch', result)
        }
      }
    
      {
        const generator = streamThrottle(3, ...items)
        console.log('stream', await generator.next())
        for await (let result of generator) {
          console.log('remaining stream', result)
        }
      }
    
    }
    
    async function* batchThrottle(n = 5, ...items) {
      while (items.length) {
        const tasks = items.splice(0, n).map(fn => fn())
        yield Promise.all(tasks)
      }
    }
    
    async function* streamThrottle(n = 5, ...items) {
      while (items.length) {
        const tasks = items.splice(0, n).map(fn => fn())
        yield* await Promise.all(tasks)
      }
    }
    main().catch()
    
        6
  •  0
  •   trincot    3 年前

    下面是一个节流功能的版本,使用 async await 语法:

    async function throttle(tasks, max) {
        async function run(_, i) {
            values[i] = await tasks[i]();
            if (max < tasks.length) return run(_, max++);
        };
        const values = [];
        try {
            await Promise.all(tasks.slice(0, max).map(run));
        } catch (error) {
            max = tasks.length; // don't allow new tasks to start
            throw error;
        }
        return values;
    }
    
    // Demo
    const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
    
    const tasks = Array.from({length: 10}, (_, i) => 
        async () => {
            console.log(`task ${i} starts`);
            await delay((1 + i % 3)*1000);
            console.log(`task ${i} ends with ${i*10}`);
            return i*10;
        }
    );
    
    throttle(tasks, 4).then(console.log);
        7
  •  0
  •   Maz T    2 年前

    下面是我使用TypeScript的看法:

    function ParallelMap<T, U>(array: U[], callbackFn: (element: U, index?: number, array?: U[]) => Promise<T>, maxDegreeOfParallelism: number = -1) {
        if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
    
        return new Promise<T[]>((resolve, reject) => {
            const inputArraySize = array.length;
    
            let indexTracker = 0;
            let completedTracker = 0;
    
            const output = new Array<T>(inputArraySize);
            const errors = new Array<{ index: number, error: any }>();
    
            const processNext = () => {
                const elementIndex = indexTracker++;
                const element = array[elementIndex];
    
                callbackFn(element, elementIndex, array).then(
                    value => output[elementIndex] = value,
                    reason => errors.push({ index: elementIndex, error: reason })
                ).finally(() => {
                    ++completedTracker;
    
                    if (completedTracker == inputArraySize) {
                        if (errors.length > 0) reject(errors);
    
                        else resolve(output);
                    }
    
                    else if (indexTracker < inputArraySize) processNext();
                });
            };
    
            for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
                processNext();
            }
        });
    }
    

    用法:

    const maxDegreeOfParallelism = 3; // Number of concurrent tasks
    const result = await ParallelMap(
        inputArray,
        async (value, index, array) => { /* Do something */ }, // Some async function to process each element
        maxDegreeOfParallelism
    );
    

    在JavaScript中也是如此:

    function ParallelMap(array, callbackFn, maxDegreeOfParallelism = -1) {
      if (maxDegreeOfParallelism < -1 || maxDegreeOfParallelism == 0) return Promise.reject(`'maxDegreeOfParallelism' must be either -1 or greater than 0`);
    
      return new Promise((resolve, reject) => {
        const inputArraySize = array.length;
    
        let indexTracker = 0;
        let completedTracker = 0;
    
        const output = new Array(inputArraySize);
        const errors = new Array();
    
        const processNext = () => {
          const elementIndex = indexTracker++;
          const element = array[elementIndex];
    
          callbackFn(element, elementIndex, array).then(
            value => output[elementIndex] = value,
            reason => errors.push({
              index: elementIndex,
              error: reason
            })
          ).finally(() => {
            ++completedTracker;
    
            if (completedTracker == inputArraySize) {
              if (errors.length > 0) reject(errors);
    
              else resolve(output);
            } else if (indexTracker < inputArraySize) processNext();
          });
        };
    
        for (let index = 0, count = maxDegreeOfParallelism < 0 ? inputArraySize : Math.min(maxDegreeOfParallelism, inputArraySize); index < count; ++index) {
          processNext();
        }
      });
    }
    
    
    
    // Usage
    
    (async() => {
      const input = new Array(10).fill(1); // Array containing 10 '1' values
    
      const oneSecondTask = (value, index) => {
        return new Promise(resolve => {
          setTimeout(() => {
            resolve(value + index); // Extremely complex calculation of adding index to value 1
          }, 1000);
        });
      };
    
      console.log(`const input = [${input.join(', ')}];`);
      console.log(`---------------------------------------------`);
      console.log(`... wait for 10s ...`);
      console.log(`---------------------------------------------`);
    
      let start = Date.now();
      let maxDegreeOfParallelism = 1;
      let result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
      console.log(`const result = [${result.join(', ')}];`);
      console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) one at a time`);
    
      console.log(`---------------------------------------------`);
    
      start = Date.now();
      maxDegreeOfParallelism = 2;
      result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
      console.log(`const result = [${result.join(', ')}];`);
      console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
    
      console.log(`---------------------------------------------`);
    
      start = Date.now();
      maxDegreeOfParallelism = 5;
      result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
      console.log(`const result = [${result.join(', ')}];`);
      console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
    
      console.log(`---------------------------------------------`);
    
      start = Date.now();
      maxDegreeOfParallelism = 10;
      result = await ParallelMap(input, oneSecondTask, maxDegreeOfParallelism);
      console.log(`const result = [${result.join(', ')}];`);
      console.log(`${(Date.now() - start) / 1000}s to process ${input.length} items (taking 1s each) in parallel using ${maxDegreeOfParallelism} concurrent tasks`);
    })();