marten icon indicating copy to clipboard operation
marten copied to clipboard

Mechanism to Import Events

Open Leh2 opened this issue 4 years ago • 6 comments

When migrating old data into the event store it would be beneficial to be able to control the timestamp on events.

This would enable us to use the functionality with aggregate stream and timestamp: AggregateStreamAsync<T>(Guid streamId, ... DateTime? timestamp = null)

image

Leh2 avatar Mar 05 '20 09:03 Leh2

@Leh2 If it's okay, I'm going to rename this to "Mechanism to Import Events". Like some kind of bulk writer for events? You can always go to the SQL level of course.

jeremydmiller avatar Nov 17 '20 18:11 jeremydmiller

+1 to this idea. We have a few years of data we're migrating from SQL Server to Marten / Postgres and it would be great to have the ability to create the full stream of events from scratch via Marten's API. But, as you mention, we could just go the SQL level too. The latter would probably require extra debugging time though :)

jedidja avatar Mar 26 '21 13:03 jedidja

This is dependent on #780.

jeremydmiller avatar Apr 06 '21 14:04 jeremydmiller

This would be great. I am thinking of migrating from EF core /MS SQL Server to marten and some guidance or some helper api to create events with a specific timestamp (in the past) would be nice.

jannikbuschke avatar Apr 25 '22 15:04 jannikbuschke

Im currently experimenting with the following approach: Using an IDocumentSessionListener to edit event timestamps just before saving. The events are saved with the given timestamp. The streams table will have the current timestamps, I didnt yet find a way to adjust them, not sure if is a problem.

@oskardudycz @jeremydmiller do you think this is an ok approach for importing existing events from some other data source to marten or do you see obvious problems with this approach?

    public class MigrationEventListener: IDocumentSessionListener
    {
        public Task BeforeSaveChangesAsync(IDocumentSession session, CancellationToken token)
        {
            var streams = session.PendingChanges.Streams();
            foreach (var stream in streams)
            {
                foreach (var e in stream.Events)
                {
                    if (e.Headers.TryGetValue("MigrationTimestamp", out object timestamp))
                    {
                        if (timestamp is DateTimeOffset ts)
                        {
                            e.Timestamp = ts;
                        }
                    }
                }
            }

            return Task.CompletedTask;
        }
}

// usage
var x = session.Events.StartStream(id, new SomeEvent { Id = id });
// set header value to some datetimeoffset in the past, depending on our data
var someExistingTimestampFromThePast = DateTimeOffset.UtcNow.AddDays(-150);
x.Events.First().SetHeader("MigrationTimestamp", someExistingTimestampFromThePast );

jannikbuschke avatar Jun 07 '22 20:06 jannikbuschke