代码之家  ›  专栏  ›  技术社区  ›  Chris H

当尝试频繁发送数据包时,asio数据包传输失败

  •  2
  • Chris H  · 技术社区  · 14 年前

    最终,我试图将缓冲区从一台机器转移到另一台机器。下面的代码采用 <id><size><data with size bytes> 并读取handleReadHeader函数中的部分,然后读取 <size> <id><size> 一对。 我粘贴了很多代码,但实际上我唯一怀疑的功能是:
    下行::addMsgToQueue

    下行::startWrites() 下行::handleReadHeader
    下行::HandlerReadFrameDataBGR

    using namespace std;
    using namespace boost;
    using namespace boost::asio;
    
    Downlink::Downlink() :
      socket(nIO),
      headerSize(sizeof(unsigned int)+1),
      connected(false),
      isWriting(false),
      readHeaderBuffer(headerSize)
    {}
    
    Downlink::~Downlink() {
      disconnect();
    }
    
    bool Downlink::connect(const std::string &robotHost, unsigned int port) {
      disconnect();
    
      ip::tcp::resolver resolver(nIO);
      ip::tcp::resolver::query query(robotHost, lexical_cast<string>(port));
      ip::tcp::resolver::iterator iterator = resolver.resolve(query);
    
      ip::tcp::resolver::iterator end;
      boost::system::error_code ec;
      for(;iterator!=end;++iterator) {
        socket.connect(*iterator, ec);
        if(!ec)
          break;
        socket.close();
      }
      if(!socket.is_open())
        return false;
    
      async_read(socket, buffer(readHeaderBuffer), 
          bind(&Downlink::handleReadHeader, this, _1, _2));
    
      //start network thread.
      lock_guard<mutex> l(msgMutex);
      outgoingMessages = queue<vector<char> >();
      nIO.reset();
      t = thread(bind(&boost::asio::io_service::run, &nIO));
      connected = true;
      return true;
    }
    
    bool Downlink::isConnected() const {
      return connected;
    }
    
    void Downlink::disconnect() {
      nIO.stop();
      t.join();
      socket.close();
      connected = false;
      isWriting = false;
      nIO.reset();
      nIO.run();
    }
    
    void Downlink::writeToLogs(const std::string &logMsg) {
      vector<char> newMsg(logMsg.length()+headerSize);
      newMsg[0] = MSG_WRITE_LOG;
      const unsigned int msgLen(logMsg.length());
      memcpy(&newMsg[1], &msgLen, sizeof(unsigned int));
      vector<char>::iterator dataBegin = newMsg.begin();
      advance(dataBegin, headerSize);
      copy(logMsg.begin(), logMsg.end(), dataBegin);
      assert(newMsg.size()==(headerSize+logMsg.length()));
      addMsgToQueue(newMsg);
    }
    
    void Downlink::addMsgToQueue(const std::vector<char> &newMsg) {
      lock_guard<mutex> l(msgMutex);
      outgoingMessages.push(newMsg);
      lock_guard<mutex> l2(outMutex);
      if(!isWriting) {
        nIO.post(bind(&Downlink::startWrites, this));
      }
    }
    
    void Downlink::writeCallback(const boost::system::error_code& error,
            std::size_t bytes_transferred) {
      if(error) {
        disconnect();
        lock_guard<mutex> l(msgMutex);
        outgoingMessages = queue<vector<char> >();
        return;
      }
      {
        lock_guard<mutex> l2(outMutex);
        isWriting = false;
      }
      startWrites();
    }
    
    
    void Downlink::startWrites() {
      lock_guard<mutex> l(msgMutex);
      lock_guard<mutex> l2(outMutex);
      if(outgoingMessages.empty()) {
        isWriting = false;
        return;
      }
    
      if(!isWriting) {
        currentOutgoing = outgoingMessages.front();
        outgoingMessages.pop();
        async_write(socket, buffer(currentOutgoing),
      bind(&Downlink::writeCallback, this, _1, _2));
        isWriting = true;
      }
    }
    
    void Downlink::handleReadHeader(const boost::system::error_code& error,
        std::size_t bytes_transferred) {
      //TODO: how to handle disconnect on errors?
      cout<<"handleReadHeader"<<endl;
      if(error) {
        return;
      }
      assert(bytes_transferred==headerSize);
      if(bytes_transferred!=headerSize) {
        cout<<"got "<<bytes_transferred<<" while waiting for a header."<<endl;
      }
      currentPacketID = readHeaderBuffer[0];
    
      memcpy(&currentPacketLength, &readHeaderBuffer[1], sizeof(unsigned int));
      dataStream.resize(currentPacketLength);
      switch(currentPacketID) {
      case MSG_FRAME_BGR: {
        cout<<"- >> gone to read frame. ("<<currentPacketLength<<")"<<endl;
        async_read(socket, asio::buffer(dataStream), 
            boost::asio::transfer_at_least(currentPacketLength),
            bind(&Downlink::handleReadFrameDataBGR, this, _1, _2));    
      } break;
      default: {
        cout<<"->>> gone to read other. ("<<currentPacketLength<<")"<<endl;
        cout<<"      "<<(int)currentPacketID<<endl;
        async_read(socket, asio::buffer(dataStream), 
            boost::asio::transfer_at_least(currentPacketLength),
            bind(&Downlink::handleReadData, this, _1, _2));
      } break;
      }
    }
    
    void Downlink::handleReadData(const boost::system::error_code& error,
        std::size_t bytes_transferred) {
      cout<<"handleReadData"<<endl;
      if(error) {
        return;
      }
      if(bytes_transferred!=currentPacketLength) {
        cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
      }
      assert(bytes_transferred==currentPacketLength);
    
      switch(currentPacketID) {
      case MSG_ASCII: {
        string msg(dataStream.begin(), dataStream.end());
        textCallback(&msg);
      } break;
      case MSG_IMU: {
        Eigen::Vector3d a,g,m;
        unsigned int stamp;
        memcpy(a.data(), &dataStream[0], sizeof(double)*3);
        memcpy(m.data(), &dataStream[0]+sizeof(double)*3, sizeof(double)*3);
        memcpy(g.data(), &dataStream[0]+sizeof(double)*6, sizeof(double)*3);
        memcpy(&stamp, &dataStream[0]+sizeof(double)*9, sizeof(unsigned int));
        imuCallback(a,m,g,stamp);
      } break;
      default:
        //TODO: handle this better?
        cout<<"Unknown packet ID."<<endl;
      }
    
      async_read(socket, buffer(readHeaderBuffer), 
          boost::asio::transfer_at_least(headerSize),
          bind(&Downlink::handleReadHeader, this, _1, _2));
    }
    
    void Downlink::handleReadFrameDataBGR(const boost::system::error_code& error,
              std::size_t bytes_transferred) {
      cout<<"Got a frame"<<endl;
      if(error) {
        return;
      }
      if(bytes_transferred!=currentPacketLength) {
        cout<<"Got "<<bytes_transferred<<" wanted "<<currentPacketLength<<endl;
      }
      assert(bytes_transferred==currentPacketLength);
      unsigned int imageWidth, imageHeight, cameraID;
    
      unsigned char *readOffset = (unsigned char*)&dataStream[0];
      memcpy(&imageWidth, readOffset, sizeof(unsigned int)); 
      readOffset += sizeof(unsigned int);
      memcpy(&imageHeight, readOffset, sizeof(unsigned int)); 
      readOffset += sizeof(unsigned int);
      memcpy(&cameraID, readOffset, sizeof(unsigned int)); 
      readOffset += sizeof(unsigned int);
    
      cout<<"("<<imageWidth<<"x"<<imageHeight<<") ID = "<<cameraID<<endl;
    
      frameCallback(readOffset, imageWidth, imageHeight, cameraID);
    
      async_read(socket, buffer(readHeaderBuffer), 
          boost::asio::transfer_at_least(headerSize),
          bind(&Downlink::handleReadHeader, this, _1, _2));
    }
    
    
    boost::signals2::connection Downlink::connectTextDataCallback(boost::signals2::signal<void (std::string *)>::slot_type s) {
      return textCallback.connect(s);
    }
    
    boost::signals2::connection Downlink::connectIMUDataCallback(boost::signals2::signal<void (Eigen::Vector3d, Eigen::Vector3d, Eigen::Vector3d, unsigned int)>::slot_type s) {
      return imuCallback.connect(s);
    }
    
    boost::signals2::connection Downlink::connectVideoFrameCallback(boost::signals2::signal<void (unsigned char *, unsigned int, unsigned int, unsigned int)>::slot_type s) {
      return frameCallback.connect(s);
    }
    

    using namespace std;
    using namespace boost;
    using namespace boost::asio;
    
    Uplink::Uplink(unsigned int port) :
      socket(nIO),
      acceptor(nIO),
      endpoint(ip::tcp::v4(), port),
      headerSize(sizeof(unsigned int)+1), //id + data size
      headerBuffer(headerSize)
    {
      //move socket into accept state.
      acceptor.open(endpoint.protocol());
      acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
      acceptor.bind(endpoint);
      acceptor.listen(1);  //1 means only one client in connect queue.
      acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
      //start network thread.
      nIO.reset();
      t = thread(boost::bind(&boost::asio::io_service::run, &nIO));
    }
    
    Uplink::~Uplink() {
      nIO.stop();  //tell the network thread to stop.
      t.join();  //wait for the network thread to stop.
      acceptor.close(); //close listen port.
      socket.close();   //close active connections.
      nIO.reset();
      nIO.run(); //let clients know that we're disconnecting.
    }
    
    void Uplink::parse_header(const boost::system::error_code& error,
         std::size_t bytes_transferred) {
      if(error || bytes_transferred!=headerSize) {
        disconnect();
        return;
      }
      currentPacketID = headerBuffer[0];
      memcpy(&currentPacketLength, &headerBuffer[1], sizeof(unsigned int));
      //move to read data state
    
      //TODO: move to different states to parse various packet types.
    
      async_read(socket, asio::buffer(dataStream), transfer_at_least(currentPacketLength),
          bind(&Uplink::parse_data, this, _1, _2));
    }
    
    void Uplink::parse_data(const boost::system::error_code& error,
       std::size_t bytes_transferred) {
      if(error) {
        disconnect();
        return;
      }
    
      if(bytes_transferred != currentPacketLength) {
        cout<<"bytes_transferred != currentPacketLength"<<endl;
        disconnect();
        return;
      }
    
      //move back into the header reading state
      async_read(socket, buffer(headerBuffer), 
          bind(&Uplink::parse_header, this, _1, _2));
    }
    
    void Uplink::disconnect() {
      acceptor.close();
      socket.close();
      acceptor.open(endpoint.protocol());
      acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
      acceptor.bind(endpoint);
      acceptor.listen(1);  //1 means only one client in connect queue.
      acceptor.async_accept(socket, bind(&Uplink::accept_handler, this, _1));
    }
    
    void Uplink::accept_handler(const boost::system::error_code& error)
    {
      if (!error) {
        //no more clents.
        acceptor.close();
        //move to read header state.
        async_read(socket, buffer(headerBuffer), 
            bind(&Uplink::parse_header, this, _1, _2));
      }
    }
    
    void Uplink::sendASCIIMessage(const std::string &m) {
      //Format the message
      unsigned int msgLength(m.length());
      vector<char> outBuffer(msgLength+headerSize);
      outBuffer[0] = MSG_ASCII;
      memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
      vector<char>::iterator dataBegin(outBuffer.begin());
      advance(dataBegin, headerSize);
      copy(m.begin(), m.end(), dataBegin);
      //queue the message
      addToQueue(outBuffer);
    }
    
    void Uplink::sendIMUDataBlock(const nIMUDataBlock *d) {
      //Format the message.
        //a,g,m, 3 components each plus a stamp
      const unsigned int msgLength(3*3*sizeof(double)+sizeof(unsigned int)); 
      vector<char> outBuffer(msgLength+headerSize);
      outBuffer[0] = MSG_IMU;
      memcpy(&outBuffer[1], &msgLength, sizeof(unsigned int));
    
      const Eigen::Vector3d a(d->getAccel());
      const Eigen::Vector3d m(d->getMag());
      const Eigen::Vector3d g(d->getGyro());
      const unsigned int s(d->getUpdateStamp());
    
      memcpy(&outBuffer[headerSize], a.data(), sizeof(double)*3);
      memcpy(&outBuffer[headerSize+3*sizeof(double)], m.data(), sizeof(double)*3);
      memcpy(&outBuffer[headerSize+6*sizeof(double)], g.data(), sizeof(double)*3);
      memcpy(&outBuffer[headerSize+9*sizeof(double)], &s, sizeof(unsigned int));
    
      /*
      cout<<"----------------------------------------"<<endl;
      cout<<"Accel = ("<<a[0]<<","<<a[1]<<","<<a[2]<<")"<<endl;
      cout<<"Mag   = ("<<m[0]<<","<<m[1]<<","<<m[2]<<")"<<endl;
      cout<<"Gyro  = ("<<g[0]<<","<<g[1]<<","<<g[2]<<")"<<endl;
      cout<<"Stamp = "<<s<<endl;
      cout<<"----------------------------------------"<<endl;
      */
    
      //queue the message
      addToQueue(outBuffer);
    }
    
    void Uplink::send_handler(const boost::system::error_code& error,
         std::size_t bytes_transferred) {
      {
        lock_guard<mutex> l(queueLock);
        lock_guard<mutex> l2(sendingLock);
        if(outQueue.empty()) {
          currentlySending = false;
          return;
        }
      }
      startSend();
    }
    
    void Uplink::addToQueue(const std::vector<char> &out) {
      bool needsRestart = false;
      {
        lock_guard<mutex> l(queueLock);
        lock_guard<mutex> l2(sendingLock);
        outQueue.push(out);
        needsRestart = !currentlySending;
      }
      if(needsRestart)
        nIO.post(bind(&Uplink::startSend, this));
    }
    
    void Uplink::startSend() {
      lock_guard<mutex> l(queueLock);
      lock_guard<mutex> l2(sendingLock);
      if(outQueue.empty())
        return;
      currentlySending = true;
      currentWrite = outQueue.front();
      outQueue.pop();
      async_write(socket, buffer(currentWrite), bind(&Uplink::send_handler, 
           this, _1, _2));
    }
    
    void Uplink::sendVideoFrameBGR(const unsigned int width, const unsigned int height, 
              const unsigned int cameraID, const unsigned char *frameData) {
      //                             image data            image metadata        header
      const unsigned int packetSize(width*height*3   +   sizeof(unsigned int)*3 + headerSize);
      const unsigned int dataSize(width*height*3   +   sizeof(unsigned int)*3);
      vector<char> outgoingBuffer(packetSize);
      outgoingBuffer[0] = MSG_FRAME_BGR;
      memcpy(&outgoingBuffer[1], &dataSize, sizeof(unsigned int));
      char *writePtr = &outgoingBuffer[headerSize];
      memcpy(writePtr, &width, sizeof(unsigned int));
      writePtr += sizeof(unsigned int);
      memcpy(writePtr, &height, sizeof(unsigned int));
      writePtr += sizeof(unsigned int);
      memcpy(writePtr, &cameraID, sizeof(unsigned int));
      writePtr += sizeof(unsigned int);
      memcpy(writePtr, frameData, width*height*3*sizeof(char));
    
      //TODO: can we avoid the whole image copy here?
      //TODO: should come up with a better packet buffer build system.
      //IDEA!: maybe have a "request buffer" funxction so the Uplink
      //class can have sole ownership, rather than do the copy in "addtoQueue"
      addToQueue(outgoingBuffer);
    }
    

    这个程序大部分时间都工作,但很少,当发送大量数据包之间没有延迟时,它会失败。 例如:

    sendVideoFrameBGR(...);  //occasional fail
    sendASCIIMessage("...");
    
    sendVideoFrameBGR(...);  //never fails.
    sleep(1); 
    sendASCIIMessage("...");
    

    在下行链路中处理一个视频帧之后,它返回到hadleHeaderData并等待一个数兆字节长的数据包和一个不存在的数据包ID。不知怎么的,这条流被破坏了。我不知道为什么。

    我不太喜欢我现在写的代码,所以如果有人知道一个好的类或库,可以通过TCP将流解析为缓冲块,我宁愿使用它。

    编辑:

        if(frontImage) {
          uplink.sendVideoFrameBGR(frontImage->width, frontImage->height, 0,
                       (unsigned char*)frontImage->imageData);
          cout<<"Sent"<<endl;
          //sleep(1);   //works fine if this is uncommented !
        }
    
        uplink.sendASCIIMessage("Alive...");
        sleep(1);
        uplink.sendIMUDataBlock(imuDataBlock.get());
        cout<<"Loop"<<endl;
        sleep(1);
      }
    
    1 回复  |  直到 10 年前
        1
  •  3
  •   Collin Dauphinee    14 年前

    问题很可能是ioservice对象有多个线程处理工作。

    即使您使用bool来检查它当前是否正在发送,但在ioservice线程之一处理该函数之前,不会检查bool。如果两个ioservice线程在您发布这两件工作时处于活动状态,那么它可能会同时调度两个发送,从而导致两个发送函数在不同的线程上异步发生。“正在发送”检查可能在两个调用中都返回false,因为这两个发送是并行运行的。