代码之家  ›  专栏  ›  技术社区  ›  w0051977

处理聚合的所有事件

  •  2
  • w0051977  · 技术社区  · 6 年前

     namespace PersistentSubscription
        {
            internal class Program
            {
                private static void Main()
                {
                    var subscription = new PersistentSubscriptionClient();
                    subscription.Start();
                }
            }
    
            public class PersistentSubscriptionClient
            {
                private IEventStoreConnection _conn;
                private const string STREAM = "$ce-customer";
                private const string GROUP = "a_test_group";
                private const int DEFAULTPORT = 1113;
                private static readonly UserCredentials User = new UserCredentials("admin", "changeit");
                private EventStorePersistentSubscriptionBase _subscription;
    
                public void Start()
                {
                    var settings = ConnectionSettings.Create(); 
    
                    using (_conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
                    {
                        _conn.ConnectAsync().Wait();
    
                        CreateSubscription(); 
                        ConnectToSubscription();
    
                        Console.WriteLine("waiting for events. press enter to exit");
                        Console.ReadLine();
                    }
                }
    
                private void ConnectToSubscription()
                {
                    var bufferSize = 10;
                    var autoAck = true;
    
                    Action<EventStorePersistentSubscriptionBase, ResolvedEvent> eventAppeared = EventAppeared; 
                    _subscription = _conn.ConnectToPersistentSubscription(STREAM, GROUP, eventAppeared, SubscriptionDropped, User, bufferSize, autoAck);
                }
    
                private void SubscriptionDropped(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                    SubscriptionDropReason subscriptionDropReason, Exception ex)
                {
                    ConnectToSubscription();
                }
    
                private static void EventAppeared(EventStorePersistentSubscriptionBase eventStorePersistentSubscriptionBase,
                    ResolvedEvent resolvedEvent)
                {
                    MemoryStream stream = new MemoryStream(resolvedEvent.Event.Data);
                    IFormatter formatter = new BinaryFormatter();
                    stream.Seek(0, SeekOrigin.Begin);
                    try
                    {
                        CustomerCreated customerCreated = (CustomerCreated)formatter.Deserialize(stream); 
                        Console.WriteLine(customerCreated);
                    }
                    catch (Exception e)
                    {
                        var test = "test";
                    }
    
                }
    
                private void CreateSubscription()
                {
                    PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                        .DoNotResolveLinkTos()
                        .StartFromCurrent();
    
                    try
                    {
                        _conn.CreatePersistentSubscriptionAsync(STREAM, GROUP, settings, User).Wait();
                    }
                    catch (AggregateException ex)
                    {
                        if (ex.InnerException.GetType() != typeof(InvalidOperationException)
                            && ex.InnerException?.Message != $"Subscription group {GROUP} on stream {STREAM} already exists")
                        {
                            throw;
                        }
                    }
                }
            }
        }
    

    using System;
    using System.IO;
    using System.Net;
    using System.Runtime.Serialization;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Text;
    using EventStore.ClientAPI;
    
    namespace WritingEvents
    {
        class Program
        {
            static void Main(string[] args)
            {
                const int DEFAULTPORT = 1113;
                var settings = ConnectionSettings.Create();
                using (var conn = EventStoreConnection.Create(settings, new IPEndPoint(IPAddress.Loopback, DEFAULTPORT)))
                {
                    conn.ConnectAsync().Wait();
                    CustomerCreated c1 = new CustomerCreated { Id = Guid.NewGuid(), Name = "Maria" };
                    EventData customerCreated1 = GetEventDataFor(c1);
                    conn.AppendToStreamAsync("customer-100", ExpectedVersion.Any, customerCreated1).Wait();
                }
            }
    
            private static EventData GetEventDataFor(CustomerCreated customerCreated)
            {
                IFormatter formatter = new BinaryFormatter();
                MemoryStream stream = new MemoryStream();
                formatter.Serialize(stream, customerCreated);
                byte[] customerCreatedEventByteArray = stream.ToArray();
    
    
    
                return new EventData(
                    Guid.NewGuid(),
                    "eventType",
                    true,
                    customerCreatedEventByteArray,
                    null
                    );
            }
        }
    
        [Serializable]
        public class CustomerCreated
        {
            public Guid Id { get; set; }
            public string Name { get; set; }
        }
    }
    

    如果我改变这行:

    private const string STREAM = "$ce-customer";
    

    对此:

    private const string STREAM = "customer-100";
    

    然后反序列化在服务器端正常工作。

    --run-projections=all 启动事件存储时。我还启用了所有投影:

    enter image description here

    1 回复  |  直到 6 年前
        1
  •  1
  •   w0051977    6 年前

    这个问题帮助了我: Using the Event Store Client API (.NET), how to I write to a stream and link one event to another?

    我只需要改变一下:

    PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                    .DoNotResolveLinkTos() //Specifically this line
                    .StartFromCurrent();
    

    对此:

    PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.Create()
                    .ResolveLinkTos() //Specifically this line
                    .StartFromCurrent();
    

    DoNotResolveLinkTos 到原始事件,而ResolveLinkTos获取实际事件本身。因此,我试图反序列化链接对象,这导致了异常。