代码之家  ›  专栏  ›  技术社区  ›  luca

Linux下用pexpect读取管道

  •  2
  • luca  · 技术社区  · 6 年前

    old 对于get from队列的问题。这是代码(感谢Martijn Pieters)

    import asyncio
    import sys
    import json
    import os
    import websockets
    
    
    async def socket_consumer(socket, outgoing):
        # take messages from the web socket and push them into the queue
        async for message in socket:
            await outgoing.put(message)
            file = open(r"/home/host/Desktop/FromSocket.txt", "a")
            file.write("From socket: " + ascii(message) + "\n")
            file.close()
    
    
    async def socket_producer(socket, incoming):
        # take messages from the queue and send them to the socket
        while True:
            message = await incoming.get()
            file = open(r"/home/host/Desktop/ToSocket.txt", "a")
            file.write("To socket: " + ascii(message) + "\n")
            file.close()
            await socket.send(message)
    
    
    async def connect_socket(incoming, outgoing, loop=None):
        header = {"Authorization": r"Basic XXX="}
        uri = 'XXXXXX'
        async with websockets.connect(uri, extra_headers=header) as web_socket:
            # create tasks for the consumer and producer. The asyncio loop will
            # manage these independently
            consumer_task = asyncio.ensure_future(
                socket_consumer(web_socket, outgoing), loop=loop)
            producer_task = asyncio.ensure_future(
                socket_producer(web_socket, incoming), loop=loop)
    
            # start both tasks, but have the loop return to us when one of them
            # has ended. We can then cancel the remainder
            done, pending = await asyncio.wait(
                [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
            for task in pending:
                task.cancel()
    
    
    # pipe support
    async def stdio(loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
    
        reader = asyncio.StreamReader()
        await loop.connect_read_pipe(
            lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
    
        writer_transport, writer_protocol = await loop.connect_write_pipe(
            asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
        writer = asyncio.streams.StreamWriter(
            writer_transport, writer_protocol, None, loop)
    
        return reader, writer
    
    
    async def pipe_consumer(pipe_reader, outgoing):
        # take messages from the pipe and push them into the queue
        while True:
            message = await pipe_reader.readline()
            if not message:
                break
            file = open(r"/home/host/Desktop/FromPipe.txt", "a")
            file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
            file.close()
    
            await outgoing.put(message.decode('utf8'))
    
    
    async def pipe_producer(pipe_writer, incoming):
        # take messages from the queue and send them to the pipe
        while True:
            json_message = await incoming.get()
            file = open(r"/home/host/Desktop/ToPipe.txt", "a")
            file.write("Send to pipe message: " + ascii(json_message) + "\n")
            file.close()
            try:
                message = json.loads(json_message)
                message_type = int(message.get('header', {}).get('messageID', -1))
    
            except (ValueError, TypeError, AttributeError):
                # failed to decode the message, or the message was not
                # a dictionary, or the messageID was convertable to an integer
                message_type = None
                file = open(r"/home/host/Desktop/Error.txt", "a")
                file.write(" Error \n")
                file.close()
            # 1 is DENM message, 2 is CAM message
            file.write("Send to pipe type: " + type)
            if message_type in {1, 2}:
                file.write("Send to pipe: " + json_message)
                pipe_writer.write(json_message.encode('utf8') + b'\n')
                await pipe_writer.drain()
    
    
    async def connect_pipe(incoming, outgoing, loop=None):
        reader, writer = await stdio()
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            pipe_consumer(reader, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            pipe_producer(writer, incoming), loop=loop)
    
        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()
        # force a result check; if there was an exception it'll be re-raised
        for task in done:
            task.result()
    
    
    def main():
        loop = asyncio.get_event_loop()
        pipe_to_socket = asyncio.Queue(loop=loop)
        socket_to_pipe = asyncio.Queue(loop=loop)
    
        socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
        pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)
    
        loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
    
    main()
    

    要发送到管道,我使用以下代码:

    import pexpect
    
    
    test = r"/home/host/PycharmProjects/Tim/Tim.py"
    process = pexpect.spawn("python3 " + test)
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5}}';
    process.write(message + "\n")
    process.wait()
    

    test = r"/home/host/PycharmProjects/Tim/Tim.py"
    p = pexpect.spawn("python3 " + test, timeout=None)
    while True:
        m = p.read()
        file = open(r"/home/host/Desktop/OpeListening.txt", "a")
        file.write(str(m))
        file.close()
    p.wait()
    

    但是读取会立即进入下一步,没有任何消息。我的错误是什么?

    1 回复  |  直到 6 年前
        1
  •  0
  •   luca    6 年前

    此刻我用的是成功的波本

    process = subprocess.Popen(['python3', test], shell=False, stdout=subprocess.PIPE,
                               stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    while True:
        result = process.stdout.readline()
        result = result.decode("utf-8")
        print(result)
    proc.wait()