代码之家  ›  专栏  ›  技术社区  ›  Seki

websocket在长过程中的异步反馈

  •  2
  • Seki  · 技术社区  · 6 年前

    我试图在一个网页中实现一个反馈,让用户从Excel工作表开始一个漫长的过程(见,是的…)。对于每行数据,处理时间约为1秒,公共数据长度在40到100项之间,因此总体处理时间可以超过一分钟。

    我正在显示页面中数据的预览,通过一个websocket开始这个过程,并希望显示来自同一个websocket的进程。

    处理本身是由一个外部包完成的,而且页面的复杂性很小,所以我将它包装在一个 Lite 单个文件。

    我的问题是,在websocket路由中启动的长处理阻塞了反馈,直到它完成,并且所有的进程事件都在最后同时发送。据我所知,它与Mojolicious的事件循环有关,我应该单独启动处理,以避免冻结websocket的处理。

    请注意,我尝试了一个单独的反馈渠道 EventSource 在处理过程中将某些进程推送到客户机,但它在最后一次显示相同的完成。

    这里是我的代码简化,我使用的是 sleep() 模拟长过程。我从

    perl mojo_notify_ws.pl daemon
    

    您能建议如何修改websocket路由以允许实时反馈吗?

    use Mojolicious::Lite;
    use Mojo::JSON qw(encode_json decode_json j);
    
    use Data::Dumper;
    
    $|++;
    
    any '/' => sub {
        my $c = shift;
        $c->render('index');
    };
    
    my $peer;
    websocket '/go' => sub {
        use Carp::Always;
        my $ws = shift;
    
        $peer = $ws->tx;
        app->log->debug(sprintf 'Client connected: %s', Dumper $peer->remote_address);
    
        # do not subscribe to 'text' else 'json' won't work
        #$ws->on(text => sub {
        #    my ($ws, $msg) = @_;
        #    app->log->debug("Received text from websocket: `$msg`");
        #        });
    
        # $peer->send('{"type": "test"}');
        # say 'default inactivity timeout='. (p $ws->inactivity_timeout());
        $ws->inactivity_timeout(120);
    
        $ws->on(json => sub {
            my ($ws, $msg) = @_;
            app->log->debug('Received from websocket:', Dumper(\$msg));
            unless($msg){
                app->log->debug('Received empty message? WTF?!');
                return;
            }
            my $prompt = $msg->{cmd};
            return unless $prompt;
            app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
    
            # simulate
            my $loop = Mojo::IOLoop->singleton;
    
    #        $loop->subprocess( sub {
    #            my $sp = shift;
    
            for my $cell (1..3) {
                # $loop->delay( sub {
                    app->log->debug("sending cell $cell");
                    my $payload = {
                            type => 'ticket',
                            cell => $cell,
                            result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $ws->send( { json => $payload } );
                    sleep(2);
                    # $loop->timer(2, sub {say 'we have waited 2 secs!';})->wait;
                # });
            };
    
    #        }, sub {} );#subprocess
    
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        });
    
        $ws->on(finish => sub {
            my ($ws, $code, $reason) = @_;
            $reason = '' unless defined $reason;
            app->log->debug("Client disconnected: $code ($reason)");
        });
    
        app->log->debug('Reached end of ws route definition');
    };
    
    app->start;
    
    __DATA__
    
    @@ index.html.ep
    <html>
        <head>
        <script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/jquery/3.3.1/jquery.js"></script>
        <script>
    var timerID = 0; 
    function keepAlive(ws) { 
        var timeout = 20000;  
        if (ws.readyState == ws.OPEN) {  
            ws.send('ping');  
        }  
        timerId = setTimeout(function(){keepAlive(ws);}, timeout);  
    }  
    function cancelKeepAlive() {  
        if (timerId) {  
            clearTimeout(timerId);  
        }  
    }
    
    function flagCell(cell, result){
        var id='#CELL_' + cell;
        var cell = $(id);
        if(cell) {
            if (result=='OK') {
                cell.css('color', 'green');
                cell.text('⯲');
            } else {
                cell.css('color','red');
                cell.text('✘');
            }
        }
    }
    
    function process(){
        //debugger;
        console.log('Opening WebSocket');
        var ws = new WebSocket('<%= url_for('go')->to_abs %>');
    
        ws.onopen = function (){
            console.log('Websocket Open');
            //keepAlive(ws);
            ws.send(JSON.stringify({cmd: "let's go Perl"}));
        };
        //incoming
        ws.onmessage = function(evt){
            var data = JSON.parse(evt.data);
            console.log('WS received '+JSON.stringify(data));
            if (data.type == 'ticket') {
                console.log('Server has send a status');
                console.log('Cell:'+data.cell + ' res:' + data.result);
    
                flagCell(data.cell, data.result);
            } else if (data.type == 'end') {
                console.log('Server has finished.');
                //cancelKeepAlive();
                ws.close();
            } else {
                console.log('Unknown message:' + evt.data);
            }
        };
        ws.onerror = function (evt) {
            console.log('ws error:', evt.data);
        }
        ws.onclose = function (evt) {
            if(evt.wasClean) {
                console.log('Connection closed cleanly');
            } else {
                console.log('Connection reseted');
            }
            console.log('Code:'+ evt.code + ' Reason:' + evt.reason);
        }
    }
    
        </script>
        </head>
        <body>
            <button type=button id='upload' onclick="process();">Go</button><br>
            <div style='font-family:sans;'>
                <table border="1px">
                  <tr><td id="CELL_1">&nbsp;</td><td>Foo</td></tr>
                  <tr><td id="CELL_2">&nbsp;</td><td>Bar</td></tr>
                  <tr><td id="CELL_3">&nbsp;</td><td>Baz</td></tr>
                </table>
            </div>
        </body>
    </html>
    

    Mojo::IOLoop::Subprocess 但我没有任何反馈。我正在运行Linux和 Subprocess 好像是叉子, 父进程似乎会立即终止websocket 编辑: 不:我最终发现 $ws->send() sub{} 在父进程中运行,而不是在子进程中运行的第一个进程中运行。这段代码应该被重构为有一个 subprocess

    这是修改后的 on(json)

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
    
        # my $loop = Mojo::IOLoop->singleton;
        my $subprocess = Mojo::IOLoop::Subprocess->new;
        app->log->debug("we are pid $$");
        $subprocess->run( 
            sub {
                my $sp = shift;
                for my $cell (1..3) {
                    app->log->debug("starting process for cell $cell in pid $$");     
                    sleep(2);
                    app->log->debug("sending cell $cell to ws");
                    my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    $ws->send( { json => $payload } ); # FIXME: actually this line is in the wrong place
                                                       # and should be in the second sub{}
                };
            },
            sub {
                my ($sp, $err, @results) = @_; 
                $ws->reply->exception($err) and return if $err;
                app->log->debug('sending end of process ->websocket');
                $ws->send({json => { type => 'end' } });
            });  
        # Start event loop if necessary
        $subprocess->ioloop->start unless $subprocess->ioloop->is_running;       
    });
    

    [Wed Oct  3 19:51:58 2018] [debug] Received: `let's go Perl`
    [Wed Oct  3 19:51:58 2018] [debug] we are pid 8898
    [Wed Oct  3 19:51:58 2018] [debug] Client disconnected: 1006 ()
    [Wed Oct  3 19:51:58 2018] [debug] starting process for cell 1 in pid 8915
    [Wed Oct  3 19:52:00 2018] [debug] sending cell 1 to ws
    [Wed Oct  3 19:52:00 2018] [debug] starting process for cell 2 in pid 8915
    [Wed Oct  3 19:52:02 2018] [debug] sending cell 2 to ws
    [Wed Oct  3 19:52:02 2018] [debug] starting process for cell 3 in pid 8915
    [Wed Oct  3 19:52:04 2018] [debug] sending cell 3 to ws
    [Wed Oct  3 19:52:04 2018] [debug] sending end of process ->websocket
    [Wed Oct  3 19:52:04 2018] [debug] Client disconnected: 1005 ()
    

    我还尝试了 Mojo::IOLoop->delay 以类似于 Promise 解决方案,但此解决方案在结尾同时发送所有通知:

    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // '<empty??>');
    
        app->log->debug("we are pid $$");
    
        my @steps;
        for my $cell (1..3) {
            push @steps, 
                sub {
                    app->log->debug("subprocess for cell pid $cell");
                    # my $sp = shift;
                    my $delay = shift;
                    sleep(2);
                    app->log->debug("end of sleep for cell $cell");
                    $delay->pass($cell % 2 ? 'OK' : 'NOK');
                },
                sub {
                    my $delay = shift;
                    my $result = shift;
    
                    app->log->debug("sending cell $cell from pid $$ - result was $result");
                    my $payload = {
                        type => 'ticket',
                        cell => $cell,
                        result => $result
                };
                $ws->send( { json => $payload } );
                $delay->pass;    
            };
        }
    
        # add final step to notify end of processing
        push @steps, sub {
            my $delay = shift;
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
            $delay->pass;
        };
    
        my $delay = Mojo::IOLoop::Delay->new;
        app->log->debug("Starting delay...");
        $delay->steps( @steps );
        app->log->debug("After the delay");
    
    });
    
    2 回复  |  直到 6 年前
        1
  •  3
  •   Grinnz    6 年前

    It is not possible to magically make Perl code non-blocking.

    单个子进程对此不起作用,因为只有处理请求的原始工作进程才能响应websocket,并且子进程只能返回一次。但是,您可以使用子流程来准备要发送的每个响应。然而,您对子流程的使用并不完全正确。

    除此之外的任何代码都将在子进程启动之前执行,因为这是异步代码,所以需要通过回调对逻辑进行排序。你可以用 promises 使复杂的排序更简单。

    use Mojo::Promise;
    
    $ws->on(json => sub {
        my ($ws, $msg) = @_;
        app->log->debug('Received from websocket:', Dumper(\$msg));
        unless($msg){
            app->log->debug('Received empty message? WTF?!');
            return;
        }
        my $prompt = $msg->{cmd};
        return unless $prompt;
        app->log->debug(sprintf 'Received: `%s`', $prompt // 'empty??');
    
        my $promise = Mojo::Promise->new->resolve; # starting point
        # attach follow-up code for each cell, returning a new promise representing the whole chain so far
        for my $cell (1..3) {
            $promise = $promise->then(sub {
                my $promise = Mojo::Promise->new;
                Mojo::IOLoop->subprocess(sub {
                    app->log->debug("sending cell $cell");
                    sleep(2);
                    my $payload = {
                            type => 'ticket',
                            cell => $cell,
                            result => $cell % 2 ? 'OK' : 'NOK'
                    };
                    return $payload;
                }, sub {
                    my ($sp, $err, $payload) = @_;
                    return $promise->reject($err) if $err; # indicates subprocess died
                    $ws->send( { json => $payload }, sub { $promise->resolve } );
                });
    
                # here, the subprocess has not been started yet
                # it will be started when this handler returns to the event loop
                # then the second callback will run once the subprocess exits
                return $promise;
            };
        }
    
        # chain from last promise
        $promise->then(sub {
            app->log->debug('sending end of process ->websocket');
            $ws->send({json => { type => 'end' } });
        })->catch(sub {
            my $err = shift;
            # you can send or log something here to indicate an error occurred in one of the subprocesses
        });
    });
    

    如果合适的话,我可以更详细地介绍一些其他选项: Mojo::IOLoop::ReadWriteFork Postgres , Redis Mercury (也需要序列化)。

        2
  •  3
  •   Theo Ohnsorge    6 年前

    可以使用线程而不是子进程来完成工作。在创建线程之后,您需要一个通过websocket更新进度的循环。

    如果您处理在任何情况下都必须完成的关键工作负载(websocket消失、网络关闭等),您应该将其委托给另一个守护进程,该守护进程将持续存在,并通过文件或套接字通信其状态。

    如果它是一个非关键工作负载,您可以轻松地重新启动它,这可能是一个模板为您。

    # Insert this at module header
    # use threads;
    # use Thread::Queue;
    
    my $queue  = Thread::Queue->new();
    my $worker = threads->create(sub {
      # dummy workload. do your work here
      my $count = 60;
      for (1..$count) {
        sleep 1;
        $queue->enqueue($_/$count);
      }
    
      # undef to signal end of work
      $queue->enqueue(undef);
    
      return;
    });
    
    # blocking dequeuing ends when retrieving an undef'd value
    while(defined(my $item = $queue->dequeue)) {
      # update progress via websocket
      printf("%f %\n", $item);
    }
    
    # join thread
    $worker->join;
    
        3
  •  3
  •   Hollie    5 年前

    我对您更新的示例做了一个小改动,使其按预期工作。你可以用 progress 的特征 Subprocess

    代码现在可以像我预期的那样工作了,每次子流程经过迭代时,客户端上的表状态都会更新。

    源代码的相关部分如下所示:

    $ws->on(
        json => sub {
            my ( $ws, $msg ) = @_;
            app->log->debug( 'Received from websocket:', Dumper( \$msg ) );
            unless ($msg) {
                app->log->debug('Received empty message? WTF?!');
                return;
            }
            my $prompt = $msg->{cmd};
            return unless $prompt;
            app->log->debug( sprintf 'Received: `%s`', $prompt // '<empty??>' );
    
            # my $loop = Mojo::IOLoop->singleton;
            my $subprocess = Mojo::IOLoop::Subprocess->new;
            app->log->debug("we are pid $$");
            $subprocess->run(
                sub {
                    my $sp = shift;
                    for my $cell ( 1 .. 3 ) {
                        app->log->debug(
                            "starting process for cell $cell in pid $$");
                        sleep(2);
                        app->log->debug("sending cell $cell to ws");
                        my $payload = {
                            type   => 'ticket',
                            cell   => $cell,
                            result => $cell % 2 ? 'OK' : 'NOK'
                        };
                        $sp->progress($payload);
                    }
                },
                sub {
                    my ( $sp, $err, @results ) = @_;
    
                    #$ws->send( { json => $payload } );
                    $ws->reply->exception($err) and return if $err;
                    app->log->debug('sending end of process ->websocket');
                    $ws->send( { json => { type => 'end' } } );
                }
            );
    
            # Start event loop if necessary
            $subprocess->on(
                progress => sub {
                    my ( $subprocess, $payload ) = @_;
                    $ws->send( { json => $payload } );
                }
            );
            $subprocess->ioloop->start unless $subprocess->ioloop->is_running;
        }
    );
    
    推荐文章
    Ivan  ·  Mojolique Lite文件上传
    10 年前