代码之家  ›  专栏  ›  技术社区  ›  Bercovici Adrian

如何倾听不断变化的馈送

  •  0
  • Bercovici Adrian  · 技术社区  · 6 年前

    我有以下问题: RethinkDB RunChangesAsync 方法运行一次,使用时,它将开始侦听给定查询上的更改。当查询更改时,将为您提供 Cursor<Change<Class>>

    我的问题是我怎样才能连续跑下去?

    while(true)
    {
     code....   //changes happening while program is here
    ....../
    ...RunChangesAsync();
    /......processed buffered items
    code   //new changes here
    }
    

    如果我在代码中指出的地方发生了更改,它们将不会被 RunChanges . 唯一能捕捉到的变化是 运行更改 在倾听。不是在..之前或者在检索结果之后。

    所以我试着把 在一个 observable 但它并不像我所期望的那样持续地监听更改…它只是检索2个空项(我想是垃圾)并结束。

    可观察的

    public  IObservable<Cursor<Change<UserStatus?>>> GetObservable() =>
        r.Db(Constants.DB_NAME).Table(Constants.CLIENT_TABLE).RunChangesAsync<UserStatus?>(this.con,CancellationToken.None).ToObservable();
    

    class PlayerSubscriber : IObserver<Cursor<Change<UserStatus?>>>
    {
        public void OnCompleted() => Console.WriteLine("Finished");
        public void OnError(Exception error) => Console.WriteLine("error");
        public void OnNext(Cursor<Change<UserStatus?>> value)
        {
            foreach (var item in value.BufferedItems)
                Console.WriteLine(item);
        }
    }
    

    程序

    class Program
    {
        public static RethinkDB r = RethinkDB.R;
        public static bool End = false;
        static async Task Main(string[] args)
        {
            var address = new Address { Host = "127.0.0.1", Port = 28015 };
            var con = await r.Connection().Hostname(address.Host).Port(address.Port).ConnectAsync();
            var database = new Database(r, con);
            var obs  = database.GetObservable();
            var sub  = new PlayerSubscriber();
            var disp = obs.Subscribe(sub);
    
            Console.ReadKey();
            Console.WriteLine("Hello World!");
        }
    }
    

    当我调试时,如您所见 OnNext Observer 只执行一次(返回两个空对象),然后关闭。

    enter image description here

    附笔: Database GetObservable 是我贴的。这个 UserStatus 是一个 POCO

    2 回复  |  直到 6 年前
        1
  •  1
  •   Brian Chavez    6 年前

    创建更改源时,您需要创建 Cursor<Change<T>> 运行后 .RunChangesAsync();

    这个 cursor 你回来的对象 query.RunChangesAsync() 是要接收更改的整个生存期内使用的更改源对象。

    while(true)
    {
     code....   //changes happening while program is here
    ....../
    ...RunChangesAsync();
    /......processed buffered items
    code   //new changes here
    }
    

    .RunChangesAsync(); while 循环不是正确的方法。您不需要再次运行查询并获取另一个 光标<更改<T>>

    另外,不要使用 cursor.BufferedItems 光标 游标.BufferedItems 游标上的属性不是直接由代码使用的 属性仅在希望在游标对象(客户端)内“向前窥视”特定于更改馈送查询的已准备好使用的项的特殊情况下才公开。

    使用change提要中的项的正确方法是在 对象本身如下所示:

    var cursor = await query.RunChangesAsync(conn);
    foreach (var item in cursor){
        Console.WriteLine(item);
    }
    

    项目用完,它将向 重新思考数据库 服务器以获取更多项目。记住,每次迭代 foreach foreach公司 (一) 客户端没有要消费的项目( .BufferedItems.Count == 0 )以及 根据更改源查询条件,服务器端没有已更改的项。在这种情况下 foreach公司 循环将阻塞,直到 重新思考数据库 服务器向您发送一个可以使用的项目。

    Documentation about using Reactive Extensions and RethinkDB in C#

    here .

    明确地, Lines 31 - 47 in this unit test

    var changes = R.Db(DbName).Table(TableName)
        //.changes()[new {include_states = true, include_initial = true}]
        .Changes()
        .RunChanges<JObject>(conn);
    
    changes.IsFeed.Should().BeTrue();
    
    var observable = changes.ToObservable();
    
    //use a new thread if you want to continue,
    //otherwise, subscription will block.
    observable.SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(
            x => OnNext(x),
            e => OnError(e),
            () => OnCompleted()
        );
    

    另外, here is a good example and explanation 发生了什么以及如何使用C#:

    希望能有所帮助。


    布瑞恩

        2
  •  1
  •   Enigmativity    6 年前

    如果你的手术有签名 Task<int> ReadAsync()

    IObservable<int> PollRead(TimeSpan interval)
    {
        return
            Observable
                .Interval(interval)
                .SelectMany(n => Observable.FromAsync(() => ReadAsync()));
    }
    

    我还要提醒你自己创建 IObservable<T> Observer.Create(...) 如果你正在创建你自己的观察者,你想交给他们。一般来说,你甚至都不会那样做。