EventStore.JVM icon indicating copy to clipboard operation
EventStore.JVM copied to clipboard

Event write ordering not preserved

Open rickardoberg opened this issue 8 years ago • 21 comments

Server: 3.8.1 JVM Client: <artifactId>eventstore-client_2.11</artifactId> 2.4.1

No custom Akka conf, just defaults. BetterOrdering:true on server.

As reported on mailing list, it seems the JVM client does not preserve write order. Minimal test creates 1000 events with the ordinal as metadata, and on write ack outputs ordinal + commit position. Here's a run without synchronous ack (=do not wait for future to complete before sending next write): Wrote:695:1650 Wrote:26:1876 Wrote:818:2101 Wrote:690:2327 Wrote:774:2553 Wrote:985:2779 Wrote:594:3005 Wrote:516:3231 Wrote:886:3457 Wrote:453:3683 Wrote:862:3909 Wrote:471:4135 Wrote:553:4361 Wrote:674:4587 Wrote:274:4813 Wrote:845:5039 Wrote:151:5265 Wrote:184:5491 ....

The futures are notified in the order they are written, but the events themselves have been reordered.

Adding a wait on each write future (created through ask(connection, message, writeTimeout), where message is a WriteEvents object) causes the following output, with no other changes: Wrote:0:1651 Wrote:1:1875 Wrote:2:2099 Wrote:3:2323 Wrote:4:2547 Wrote:5:2771 Wrote:6:2995 Wrote:7:3219 Wrote:8:3443 Wrote:9:3667 Wrote:10:3891 Wrote:11:4116 Wrote:12:4341 Wrote:13:4566 Wrote:14:4791 Wrote:15:5016 Wrote:16:5241 Wrote:17:5466 Wrote:18:5691 Wrote:19:5916 Wrote:20:6141 Wrote:21:6366 Wrote:22:6591 Wrote:23:6816 Wrote:24:7041 Wrote:25:7266 Wrote:26:7491 Wrote:27:7716 Wrote:28:7941 Wrote:29:8166 Wrote:30:8391 Wrote:31:8616 Wrote:32:8841 Wrote:33:9066 Wrote:34:9291 Wrote:35:95 ....

The commit position increases along with ordinal, so write order has been preserved.

For me it is critical to understand what I can do, if anything, to keep the writes in the same order as they were produced. Adding a wait on the future after each event is not really an option.

For reference, the use case is to copy events from one EventStore to another. We currently have 6M events, and with the synchronous write (to make it safe) it looks like it will take almost a week to do this copy, which is really really really bad.

Thanks!

rickardoberg avatar Nov 02 '16 01:11 rickardoberg

Testing with the msemys/esjc JVM client shows that I can get that to do write reordering as well, BUT with that one I can put in a simple semaphore throttle to have (for example) 10 concurrent writes, and write order will still be preserved. With the Akka client the semaphore throttle doesn't help at all, still get reordering.

rickardoberg avatar Nov 02 '16 01:11 rickardoberg

On it's own JVM Client is sequential as any other Actor, if you will fire 10 messages from the same thread, those will be processed by client in the same order. So I suppose tcp + server is the place where reordering may happen.

Anyway I'd like to help you to resolve this problem, could you please post a simple example reproducing this case?

Do you write to the same Stream ?

t3hnar avatar Nov 02 '16 07:11 t3hnar

Scratch that, even with the other JVM client I can still get reordering, even with just a 10 write semaphore (i.e. no more than 10 outstanding writes). As of now it doesn't seem possible to do an EventStore copy (subscriber to one, write to another) without waiting for each write to finish (which basically means in practice it's not possible, because of the time it would take to finish any reasonably sized store). Are there any tricks to get this to work?

rickardoberg avatar Nov 02 '16 07:11 rickardoberg

@t3hnar no, I write to one stream per aggregate. Does that make a difference?

rickardoberg avatar Nov 02 '16 07:11 rickardoberg

I can't easily post the Akka version code, because it uses too many internal helper classes. However, here's the esjc version of the test, same issue:

public class EventStoreOrderTest
{
    @Test
    public void testOrdering() throws IOException, InterruptedException
    {
        EventStore eventstore = EventStoreBuilder.newBuilder()
                                                 .singleNodeAddress("127.0.0.1", 1113)
                                                 .operationTimeout( Duration.ofSeconds(30) )
                                                 .userCredentials("admin", "changeit")
                                                 .build();

        int COUNT = 1_000_000;
        Semaphore throttle = new Semaphore(10);
        CountDownLatch latch = new CountDownLatch( COUNT );
        AtomicInteger next = new AtomicInteger(  );
        for ( int i = 0; i < COUNT; i++ )
        {
            final int j = i;
            throttle.acquire();
            eventstore.appendToStream("foo", ExpectedVersion.any(), asList(
                    EventData.newBuilder()
                             .type("bar")
                             .data("i:"+i)
                             .metadata("")
                             .build()))
                      .thenAccept(r ->
                      {
                          if (j != next.get())
                          {
                              System.out.println("Expected "+next.get()+", got "+j);
                          }
                          next.incrementAndGet();

//                          System.out.println("#"+j+":"+r.logPosition);
                          throttle.release();
/*
                          if (throttle.availablePermits() < 10)
                              throttle.release();
*/
                          latch.countDown();
                      });

        }

        latch.await();

        eventstore.disconnect();
    }
}

rickardoberg avatar Nov 02 '16 07:11 rickardoberg

Actually, as you see in the above test, it writes to the "foo" stream, makes no difference, still does reordering.

rickardoberg avatar Nov 02 '16 07:11 rickardoberg

Indeed you need to write next batch after the previous one completed and please select proper size of batches to increase throughput.

In the meantime I'm going to create test to verify that there are no such issues on JVM client, and the reordering we are facing here - (tcp + server) part. Or will ship the fix if issue is found. Looks like the plan.

t3hnar avatar Nov 02 '16 07:11 t3hnar

How do I create a batch? I would be happy to do so, have repeatedly asked on the list how to do this, but so far no go.

rickardoberg avatar Nov 02 '16 07:11 rickardoberg

asList(
                EventData.newBuilder()
                         .type("bar")
                         .data("i:"+i)
                         .metadata("")
                         .build()))

Currently You are passing asList with single element, but it is better to pass many entries with single write.

t3hnar avatar Nov 02 '16 07:11 t3hnar

@t3hnar true, but if I put many in there (same with Akka JVM client) then it is impossible to use different streams. You have to put all of them into the same one. Right?

rickardoberg avatar Nov 02 '16 08:11 rickardoberg

yes, so basically you need to group by stream and sequential write per stream. Different streams will be written concurrently, this should cover most of your needs.

t3hnar avatar Nov 02 '16 08:11 t3hnar

@t3hnar that will make it quite complex, and hard to get the exact same output sequence as the input sequence, not to mention the speed will still suck really badly, as most of the time the batch will only have one event in it (if I see a new stream id I have to flush the one currently being used), yet I need to wait for the future to complete before continuing. Really really bad.

Fortunately we don't really use the fact that we have one stream per aggregate, as we load aggregate snapshots from the database rather than from events, so I could try putting them all into one stream and effectively destroy any stream id information that I currently have. That would at least make it sort of work, and not take a week to complete.

rickardoberg avatar Nov 02 '16 08:11 rickardoberg

With the .NET client you can get ordering if writing async from a single thread if betterordering is set.

On Wed, Nov 2, 2016 at 7:25 AM, Yaroslav Klymko [email protected] wrote:

On it's own JVM Client is sequential as any other Actor, if you will fire 10 messages from the same thread, those will be processed by client in the same order. So I suppose tcp + server is the place where reordering may happen.

Anyway I'd like to help you to resolve this problem, could you please post a simple example reproducing this case?

Do you write to the same Stream ?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/EventStore/EventStore.JVM/issues/68#issuecomment-257791129, or mute the thread https://github.com/notifications/unsubscribe-auth/AAXRWgLtiN2Zr-LoGnY0_l9tb84tP6Gpks5q6Dr0gaJpZM4Kmw4O .

Studying for the Turing test

gregoryyoung avatar Nov 02 '16 09:11 gregoryyoung

@gregoryyoung that's good to know. Then it should be possible to fix this in the JVM client I guess? It would be great if you could replicate it locally, and see if there's ANYTHING to be done. For now I'm going to try erasing my stream id's and doing batching as per above. Doing a run now, and it seems to have better performance (200 messages/second, compared to 18 messages/second in our current production, which I will have to abort because it will just take too long to finish).

rickardoberg avatar Nov 02 '16 09:11 rickardoberg

Just to follow up: we have now completed our blue/green deployment from the broken production cluster (due to reordered events) into a new one where the events were reordered back. By doing the copy (subscribe to old ES, write to new ES) using only a single stream, we got performance that was adequate and copied the 6M events in a few hours.

The conclusion for now seems to be that you can either have the stream id be per aggregate, allowing you to load your aggregates from ES and do concurrency checking based on version, OR you can use a single stream id allowing you to do b/g deployments using the subscribe/write method. You cannot (practically speaking) get both, at least not with either available Java clients.

rickardoberg avatar Nov 03 '16 03:11 rickardoberg

@rickardoberg good to know you restored data.

I think we can close the ticket.

t3hnar avatar Nov 03 '16 05:11 t3hnar

@t3hnar well, so yes and no. I still think it's an issue that the JVM client reorders writes. If you want to open a new ticket more specific to that, and reference this one, that'd be fine.

rickardoberg avatar Nov 03 '16 05:11 rickardoberg

@t3hnar I looked at the test, but I don't read Scala so can't really tell what it's trying to do. Can you explain? Does it test for the case outlined in this issue?

rickardoberg avatar Nov 07 '16 05:11 rickardoberg

Yep, the test is just starting 1000 writes asynchronous then expect 1000 acks and then verify that order of acks matches the order of writes started.

actor ! WriteEvents(streamId, List(newEventData), ExpectedVersion.Exact(x)) this is an asynchronous write

expectMsgAllClassOf(List.fill(n)(classOf[WriteEventsCompleted]): _*) - expect acks

actual shouldEqual expected self explained :)

t3hnar avatar Nov 07 '16 05:11 t3hnar

@t3hnar and it is checking the nr written, not the position in the log (which obviously is correct)? I'm a bit confused, because if that passes, then what am I doing wrong. Or does that fail?

rickardoberg avatar Nov 07 '16 05:11 rickardoberg

  • my test writes to the same streams thus checking the order of responses + seqNr is totally correct
  • you could switch from using Future to direct message passing, that should improve order. To do that you would have to dive into http://akka.io to understand basic principles and how to implement message passing with java

t3hnar avatar Nov 07 '16 06:11 t3hnar