代码之家  ›  专栏  ›  技术社区  ›  category

python-死机时重新连接stomp连接

  •  0
  • category  · 技术社区  · 8 年前

    根据 documentation

    >>> import stomp
    >>> c = stomp.Connection([('127.0.0.1', 62615)])
    >>> c.start()
    >>> c.connect('admin', 'password', wait=True)
    

    如何监视它,使其重新连接 c.is_connected == False ?

    >>> reconnect_on_dead_connection(c)
    ...
    >>> [1479749503] reconnected dead connection
    
    2 回复  |  直到 6 年前
        1
  •  1
  •   grzgrzgrz3    8 年前

    您可以包装您的连接,并检查它是否连接了每个呼叫。

    import stomp
    
    
    def reconnect(connection):
        """reconnect here"""
    
    
    class ReconnectWrapper(object):
        def __init__(self, connection):
            self.__connection = connection
    
        def __getattr__(self, item):
            if not self.__connection.is_connected:
                reconnect(self.__connection)
            return getattr(self.__connection, item)
    
    
    if __name__ == '__main__':
        c = stomp.Connection([('127.0.0.1', 62615)])
        c.start()
        c.connect('admin', 'password', wait=True)
        magic_connection = ReconnectWrapper(c)
    

    测试:

    from scratch_35 import ReconnectWrapper
    import unittest
    import mock
    
    
    class TestReconnection(unittest.TestCase):
    
        def setUp(self):
            self.connection = mock.MagicMock()
            self.reconnect_patcher = mock.patch("scratch_35.reconnect")
            self.reconnect = self.reconnect_patcher.start()
    
        def tearDown(self):
            self.reconnect_patcher.stop()
    
        def test_pass_call_to_warapped_connection(self):
            connection = ReconnectWrapper(self.connection)
            connection.send("abc")
            self.reconnect.assert_not_called()
            self.connection.send.assert_called_once_with("abc")
    
        def test_reconnect_when_disconnected(self):
            self.connection.is_connected = False
            connection = ReconnectWrapper(self.connection)
            connection.send("abc")
            self.reconnect.assert_called_once_with(self.connection)
            self.connection.send.assert_called_once_with("abc")
    
    
    if __name__ == '__main__':
        unittest.main()
    

    结果:

    ..
    ----------------------------------------------------------------------
    Ran 2 tests in 0.004s
    
    OK
    

    关键是魔术方法 __getatter__ 每次您尝试访问对象未提供的属性时,都会调用它。更多关于方法 __getattr__ 你可以在doucmentation中找到 https://docs.python.org/2/reference/datamodel.html#object. getattr .

        2
  •  0
  •   Mirko    2 年前

    如果您使用Stomp作为发件人:

    def reconnect_on_dead_connection(c):
        if c.is_connected():
            c.connect('admin', 'password', wait=True)
    

    并在发送Stomp消息之前调用它。

    如果您正在使用ConnectionListener类扩展,请使用 on_disconnected 回调以重新连接。