代码之家  ›  专栏  ›  技术社区  ›  user584018

具有流大小而不是流数量的缓冲区响应扩展C[重复]

  •  0
  • user584018  · 技术社区  · 5 年前
    1. 我有一个 Producer() 将数据推送到 blocking 收集。
    2. Consumer() ,我订阅了 舞台调度 集合为 Observable 使用 System.Reactive (4.1.2)。
    3. 我在用 Buffer ,但只能缓冲流的数目。

    问题-我能用吗 buffer 流大小而不是流数量的运算符?

    当缓冲区大小交叉时(例如1024 KB或1 MB),是否创建新的缓冲区?

        class Program
        {
            private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();
    
        private static void Producer()
        {
            int ctr = 1;
            while (ctr <= 11)
            {
                MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
                Thread.Sleep(1000);
                ctr++;
            }
        }
    
        private static void Consumer()
        {
            var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();
    
            var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                        .Subscribe(ts =>
                                        {
                                            WriteToFile(ts.ToList());
                                        });
        }
    
        private static void WriteToFile(List<Message> listToWrite)
        {
            using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
            {
                outFile.Write(JsonConvert.SerializeObject(listToWrite));
            }
        }
    
        static void Main(string[] args)
        {
            var producer = Task.Factory.StartNew(() => Producer());
            var consumer = Task.Factory.StartNew(() => Consumer());
            Console.Read();
         }
        }
    

    可观测延伸法,

    public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                                TimeSpan threshold, int noOfStream)
        {
            return Observable.Create<IList<TSource>>((obs) =>
            {
                return source.GroupByUntil(_ => true,
                                           g => g.Throttle(threshold).Select(_ => Unit.Default)
                                                 .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                             .SelectMany(i => i.ToList())
                             .Subscribe(obs);
            });
        }
    
    1 回复  |  直到 5 年前
        1
  •  1
  •   supertopi Steve Harrison    5 年前

    很高兴看到扩展方法在使用中:)

    你可以稍微修改一下 Scan 的运行计数 Message 尺寸。这样我们就失去了类型泛型。

    public class Message
    {
        public string Payload { get; set; }
        public int Size { get; set; }
    }
    
    public static IObservable<IList<Message>> BufferWithThrottle(this IObservable<Message> source,
                                                         TimeSpan threshold, int maxSize)
    {
        return Observable.Create<IList<Message>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Select( i => i.Size)
                                                     .Scan(0, (a, b) => a + b)
                                                     .Where(a => a >= maxSize)
                                                     .Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }