代码之家  ›  专栏  ›  技术社区  ›  fizzer

如何在多线程C++中删除观察者关系?

  •  11
  • fizzer  · 技术社区  · 15 年前

    我有一个主题 Subscribe(Observer*) Unsubscribe(Observer*) 给客户。Subject在自己的线程中运行(从中调用 Notify() 在订阅的观察者上),互斥锁保护其内部观察者列表。

    我希望客户端代码(我无法控制)能够在取消订阅后安全地删除观察者。如何做到这一点?

    • 保持互斥锁-即使是递归 互斥-当我通知观察者时 不是一个选项,因为 僵局风险。
    • 在“取消订阅”通话中,将其删除 从主题线程。然后 客户可以等待一个特殊的 “安全删除”通知。这 看起来很安全,但对我来说很麻烦 客户。

    下面是一些说明性代码。问题是如何防止在“此处有问题”注释处运行时发生退订。然后我可以调用一个已删除的对象。或者,如果我始终保持互斥,而不是复制,我可以使某些客户端死锁。

    #include <set>
    #include <functional>
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    
    using namespace std;
    using namespace boost;
    
    class Observer
    {
    public:
        void Notify() {}
    };
    
    class Subject
    {
    public:
        Subject() : t(bind(&Subject::Run, this))
        {
        }
    
        void Subscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.insert(o);
        }
    
        void Unsubscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.erase(o);
        }
    
        void Run()
        {
            for (;;)
            {
                WaitForSomethingInterestingToHappen();
                set<Observer*> notifyList;
                {
                    mutex::scoped_lock l(m);
                    notifyList = observers;
                }
                // Problem here
                for_each(notifyList.begin(), notifyList.end(), 
                         mem_fun(&Observer::Notify));
            }
        }
    
    private:
        set<Observer*> observers;
        thread t;
        mutex m;
    };
    

    编辑

    由于存在死锁风险,我无法在保持互斥时通知观察者。发生这种情况的最明显的方式——客户端从通知内部调用Subscribe或Unsubscribe——可以通过使互斥体递归来轻松解决。更危险的是不同线程上出现间歇性死锁的风险。

    9 回复  |  直到 15 年前
        1
  •  7
  •   Rob K    15 年前

    Unsubscribe()应该是同步的,这样在保证观察者不再在主题列表中之前,它不会返回。这是唯一安全的方法。

    ETA(将我的评论移至答案):

    因为时间似乎不是问题,所以在通知每个观察者之间获取并释放互斥。您将无法按现在的方式使用for_,您必须检查迭代器以确保它仍然有效。

    for ( ... )
    {
        take mutex
        check iterator validity
        notify
        release mutex
    }
    

    那会做你想做的。

        2
  •  3
  •   Éric Malenfant    15 年前

    你能把Subscribe()的签名改成Unsubscribe()吗?将观察者*替换为类似共享的内容<观察家>会让事情变得更容易。

    编辑:将上面的“轻松”替换为“轻松”。 有关如何“纠正”这一点的示例,请参阅 Boost.Signals 以及 adopted -但还没有在发行中 Boost.Signals2 (以前是Boost.ThreadSafeSignals)库。

        3
  •  1
  •   Matthieu M.    9 年前

    “理想”解决方案包括使用 shared_ptr weak_ptr . 然而,为了通用,它还必须考虑到 Subject Observer (是的,这种情况也可能发生)。

    class Subject {
    public:
        void Subscribe(std::weak_ptr<Observer> o);
        void Unsubscribe(std::weak_ptr<Observer> o);
    
    private:
        std::mutex mutex;
        std::set< std::weak_ptr<Observer> > observers;
    };
    
    class Observer: boost::noncopyable {
    public:
        ~Observer();
    
        void Notify();
    
    private:
        std::mutex;
        std::weak_ptr<Subject> subject;
    };
    

    弱ptr 所以两者 观察者 主题 可以在没有协调的情况下销毁。

    注意:为了简单起见,我假设 观察单个 主题 一次,但它可以很容易地观察多个对象。


    异步的 Unsubscribe . 或者至少,是对 退订 将从外部同步,但将异步实现。

    想法很简单:我们将使用事件队列来实现同步。即:

    • 呼吁 在队列中发布事件(有效负载 Observer*
    • 主题 退订 事件,它将唤醒等待的线程

    您可以使用忙等待或条件变量,我建议使用条件变量,除非性能另有要求。

    注意:此解决方案完全无法解释 主题

        4
  •  1
  •   anon anon    15 年前

    与其让客户机收到“SafeToDelete”通知,不如向他们提供IssuSubscribed(Observer*)方法。然后,客户端代码变为:

    subject.Unsubscribe( obsever );l
    while( subject.IsSubscribed( observer ) ) {
       sleep_some_short_time;   // OS specific sleep stuff
    }
    delete observer;
    

    这并不太繁重。

        5
  •  1
  •   m-sharp    15 年前

    您可以在CSubject类型中创建“删除队列”。移除观察者时,可以调用PSObject->QueueForDelete(pObserver)。然后,当主题线程处于两个通知之间时,它可以安全地从队列中删除观察者。

        6
  •  1
  •   Diego Sevilla    15 年前

    嗯。。。我真的不理解你的问题,因为如果客户打电话取消订阅,你应该可以让客户删除它(你没有使用它)。但是,如果由于某种原因,在客户端取消订阅观察者后无法关闭关系,则可以添加“Subject”新操作以安全删除观察者,或者只是让客户端发出信号,表示他们不再对观察者感兴趣。

    重新思考编辑 :好的,现在我想我明白你的问题了。我认为解决您问题的最佳方法是:

    1. 使每个存储的观察者元素具有“有效”标志。当您处于通知循环中时,此标志将用于通知它或不通知它。
    2. 您需要一个互斥来保护对该“有效”标志的访问。然后,unsubscribe操作为“valid”标志锁定互斥锁,为所选观察者将其设置为false。
    3. 通知循环还必须锁定和解锁有效标志的互斥体,并且只对“有效”的观察者进行操作。

    考虑到取消订阅操作将阻塞互斥锁以重置有效标志(并且该特定观察者将不再在线程中使用),代码是线程安全的,客户机可以在取消订阅返回后立即删除任何观察者。

        7
  •  1
  •   Greg Rogers    15 年前

    这样的东西令人满意吗?在收到通知的同时取消订阅一个观察者仍然是不安全的,因为你需要一个你提到的界面(据我所知)。

    Subscribe(Observer *x)
    {
        mutex.lock();
        // add x to the list
        mutex.unlock();
    }
    
    Unsubscribe(Observer *x)
    {
        mutex.lock();
        while (!ok_to_delete)
            cond.wait(mutex);
        // remove x from list
        mutex.unlock();
    }
    
    NotifyLoop()
    {
        while (true) {
            // wait for something to trigger a notify
    
            mutex.lock();
            ok_to_delete = false;
            // build a list of observers to notify
            mutex.unlock();
    
            // notify all observers from the list saved earlier
    
            mutex.lock();
            ok_to_delete = true;
            cond.notify_all();
            mutex.unlock();
        }
    }
    

    如果您希望能够在Notify()中取消订阅()- (客户IMO的错误设计决策…) 您可以将通知程序线程的线程id添加到数据结构中。在Unsubscribe函数中,您可以根据当前线程的id检查线程id(大多数线程库都提供了这个功能-例如pthread_self)。如果它们相同,则无需等待条件变量即可继续。

    注意:如果客户端负责删除观察者,这意味着您会遇到这样的情况:在Notify回调中,您将取消订阅并删除观察者,但仍在使用废弃的此指针执行某些操作。这是客户端必须知道的,并且只在Notify()结束时删除它。

        8
  •  0
  •   Patrick    15 年前

    我认为这即使不是很优雅,也能起到作用:

    class Subject {
    public:
    Subject() : t(bind(&Subject::Run, this)),m_key(0)    {    }
    void Subscribe(Observer* o) {
        mutex::scoped_lock l(m);
        InternalObserver io( o );
        boost::shared_ptr<InternalObserver> sp(&io);
        observers.insert(pair<int,boost::shared_ptr<InternalObserver>> (MakeKey(o),sp));
    }
    
    void Unsubscribe(Observer* o) {
        mutex::scoped_lock l(m);
        observers.find( MakeKey(o) )->second->exists = false;    }
    
    void WaitForSomethingInterestingToHappen() {}
    void Run()
    {
        for (;;)
        {
            WaitForSomethingInterestingToHappen();
            for( unsigned int i = 0; i < observers.size(); ++ i )
            {
                mutex::scoped_lock l(m);
                if( observers[i]->exists )
                {
                    mem_fun(&Observer::Notify);//needs changing
                }
                else
                {
                    observers.erase(i);
                    --i;
                }
            }
        }
    }
    private:
    
    int MakeKey(Observer* o) {
        return ++m_key;//needs changeing, sha of the object?
    }
    class InternalObserver {
    public:
        InternalObserver(Observer* o) : m_o( o ), exists( true ) {}
        Observer* m_o;
        bool exists;
    };
    
    map< int, boost::shared_ptr<InternalObserver> > observers;
    thread t;
    mutex m;
    int m_key;
    };
    
        9
  •  0
  •   Sameer    9 年前

    改变 observers map 带钥匙 Observer* 和价值的包装 Observer . 包装器包括一个 volatile 布尔值,指示 这是有效的。在里面 subscribe 有效的 状态在里面 unsubscribe 方法,包装器标记为 无效的 . Notify 被称为 包装纸 而不是 实际观察者 . 包装器将调用 通知 如果有效(仍然订阅)

    #include <map>
    #include <functional>
    #include <boost/thread.hpp>
    #include <boost/bind.hpp>
    
    using namespace std;
    using namespace boost;
    
    class Observer
    {
    public:
        void Notify() {}
    };
    
    class ObserverWrapper : public Observer
    {
    public:
        Observer* wrappee;
        volatile bool valid;
        ObserverWrapper(Observer* o) 
        {
            wrappee = o;
            valid = true;
        }
    
        void Notify() 
        {
            if (valid) wrappee->Notify();
        }
    }
    class Subject
    {
    public:
        Subject() : t(bind(&Subject::Run, this))
        {
        }
    
        void Subscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            boost::shared_ptr<ObserverWrapper> sptr(new ObserverWrapper(o));
            observers.insert(pair<Observer*, sptr));
        }
    
        void Unsubscribe(Observer* o)
        {
            mutex::scoped_lock l(m);
            observers.find(o)->second->valid = false;
            observers.erase(o);
        }
    
        void Run()
        {
            for (;;)
            {
                WaitForSomethingInterestingToHappen();
                vector<ObserverWrapper*> notifyList;
                {
                    mutex::scoped_lock l(m);
                    boost::copy(observers | boost::adaptors::map_values, std::back_inserter(notifyList));
                }
                // Should be no problem here
                for_each(notifyList.begin(), notifyList.end(), 
                         mem_fun(&ObserverWrapper::Notify));
            }
        }
    
    private:
        map<Observer*, ObserverWrapper*> observers;
        thread t;
        mutex m;
    };