mycouch
mycouch copied to clipboard
Support streaming for bulk requests
Hi there!
I'd like to insert 10k rows to couchdb, and therefore I am using the bulk API:
var entities = /**/;
var docs = entities.Select(myCouchClient.DocumentSerializer.Serialize).ToArray();
var bulkRequest = new BulkRequest();
bulkRequest.Include(docs);
var bulkResponse = await myCouchClient.Documents.BulkAsync(
bulkRequest,
cancellationToken).ConfigureAwait(false);
The problem I am encountering here is "memory usage" (besides the GC-pressure due to usage of Array and List - Collections.Pooled might be an option, though).
Is there any chance to implement streaming (ie instead of including a string-array, why not include entities
directly, and use HttpRequest.GetRequestStream to minimize memory usage?)
Best Andreas
I'm not actively using CouchDB or doing any active development with MyCouch.
If you can add support for it, without breaking the existing API etc. then I don't see why it couldn't exist in MyCouch.
Thanks for your reply, I'll analyze the possibilities.
Do you mind, if I ping you again for a final sum up?
Best Andreas
I'll do my best to provide valid feedback
😓 ... a long way down
Due to the extra layer of abstraction over System.Net.HttpClient and json (de)serialization, I did not try to mess with your design. Instead, I based my idea on the "native" classes.
I am trying to store following entities:
public record Entity
{
public string Id { get; init; }
public string Value { get; init; }
}
To extract the System.Net.HttpClient, I used Reflection 💀
using System;
using System.Net.Http;
using MyCouch;
public static class HttpClientEx
{
/// <exception cref="ArgumentNullException"/>
/// <exception cref="Exception"/>
public static HttpClient CreateHttpClient<T>(T connection)
where T : IConnection
{
if (connection is null)
{
throw new ArgumentNullException(nameof(connection));
}
// TODO: This expects that Connection.CreateHttpClient has already
// run. Replace Reflection - but how?
var type = connection.GetType();
var propertyInfo = type.GetProperty(
"HttpClient",
BindingFlags.Instance | BindingFlags.NonPublic) ?? throw new InvalidOperationException();
var result = (propertyInfo.GetValue(connection) as HttpClient) ?? throw new InvalidOperationException();
return result;
}
}
After that, I need a tool for creating the request uri:
using System;
using System.IO;
using MyCouch;
public static class ConnectionEx
{
/// <exception cref="ArgumentNullException"/>
public static string GetRequestUri(
this IConnection connection,
string relativeUrl)
{
if (connection is null)
{
throw new ArgumentNullException(nameof(connection));
}
if (string.IsNullOrWhiteSpace(relativeUrl))
{
throw new ArgumentNullException(nameof(relativeUrl));
}
var result = string.Concat( // TODO: Rather use Path.Combine, but it mixes the slashes :/
connection.Address.AbsolutePath,
"/",
relativeUrl);
return result;
}
}
Now, the fun part:
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using MyCouch;
using MyCouch.Serialization;
using Newtonsoft.Json;
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
using System.Net.Mime;
#else
using MyCouch.Net;
#endif
public static class BulkRequestEx
{
public static string RelativeUrl { get; } = "_bulk_docs";
public static int BufferSize { get; set; } = 1024;
}
public sealed class BulkRequestEx<T>
where T : class
{
public BulkRequestEx()
{
this.Documents = new List<T>();
}
private List<T> Documents { get; }
public bool AllOrNothing { get; set; } = true;
public bool NewEdits { get; set; } = true;
/// <exception cref="ArgumentNullException"/>
/// <exception cref="Exception"/>
public void Include(IEnumerable<T> documents)
{
this.Documents.AddRange(documents ?? throw new ArgumentNullException(nameof(documents)));
}
/// <exception cref="ArgumentNullException"/>
public HttpRequestMessage ToHttpRequestMessage(
IDbConnection dbConnection,
ISerializer serializer)
{
if (dbConnection is null)
{
throw new ArgumentNullException(nameof(dbConnection));
}
if (serializer is null)
{
throw new ArgumentNullException(nameof(serializer));
}
var result =
new HttpRequestMessage(
HttpMethod.Post,
dbConnection.GetRequestUri(BulkRequestEx.RelativeUrl))
{
Headers =
{
Accept =
{
new MediaTypeWithQualityHeaderValue(
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
MediaTypeNames.Application.Json)
#else
HttpContentTypes.Json)
#endif
},
Host = dbConnection.Address.Authority
},
Content =
new StreamContent(
new BulkRequestStream(
this.AllOrNothing,
this.NewEdits,
this.Documents,
serializer),
bufferSize: BulkRequestEx.BufferSize)
{
Headers =
{
ContentType = new MediaTypeHeaderValue(
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
MediaTypeNames.Application.Json)
#else
HttpContentTypes.Json)
#endif
}
}
};
return result;
}
private sealed class BulkRequestStream : Stream
{
/// <exception cref="ArgumentNullException"/>
public BulkRequestStream(
bool allOrNothing,
bool newEdits,
IEnumerable<T> enumerable,
ISerializer serializer)
{
if (serializer is null)
{
throw new ArgumentNullException(nameof(serializer));
}
this.SerializeFnWritesNatively = false;
this.AllOrNothing = allOrNothing;
this.NewEdits = newEdits;
this.Enumerator = (enumerable ?? throw new ArgumentNullException(nameof(enumerable))).GetEnumerator();
this.SerializeFn =
(
textWriter,
obj) =>
{
var json = serializer.Serialize(obj);
textWriter.Write(json);
};
// TODO: Instead of using the serializer, one could easily
// use JsonSerializer here. Just set SerializeFnWritesNatively to
// true.
}
private bool SerializeFnWritesNatively { get; }
private bool AllOrNothing { get; }
private bool NewEdits { get; }
private IEnumerator<T>? Enumerator { get; set; }
private Action<TextWriter, T>? SerializeFn { get; set; }
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
this.Enumerator?.Dispose();
this.Enumerator = null;
this.SerializeFn = null;
this.MemoryStream?.Dispose();
this.MemoryStream = null;
this.StreamWriter?.Dispose();
this.StreamWriter = null;
this.JsonTextWriter = null;
}
/// <inheritdoc/>
public override bool CanRead => true;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => throw new NotImplementedException();
/// <inheritdoc/>
public override long Length => throw new NotImplementedException();
/// <inheritdoc/>
public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public bool EndOfStream { get; private set; }
/// <inheritdoc/>
public override void Flush()
{
throw new NotImplementedException();
}
private MemoryStream? MemoryStream { get; set; }
private StreamWriter? StreamWriter { get; set; }
private JsonTextWriter? JsonTextWriter { get; set; }
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
var enumerator = this.Enumerator ?? throw new ObjectDisposedException(nameof(this.Enumerator));
var serializeFn = this.SerializeFn ?? throw new ObjectDisposedException(nameof(this.SerializeFn));
var memoryStream = this.MemoryStream ??= new MemoryStream(count);
var streamWriter = this.StreamWriter ??= new StreamWriter(memoryStream);
if (memoryStream.Length == memoryStream.Position)
{
var jsonTextWriter = this.JsonTextWriter ??= new JsonTextWriter(streamWriter);
if (enumerator.MoveNext())
{
if (memoryStream.Length <= 0L)
{
jsonTextWriter.WriteStartObject();
if (this.AllOrNothing)
{
jsonTextWriter.WritePropertyName("all_or_nothing");
jsonTextWriter.WriteValue(true);
}
if (!this.NewEdits)
{
jsonTextWriter.WritePropertyName("new_edits");
jsonTextWriter.WriteValue(false);
}
jsonTextWriter.WritePropertyName("docs");
jsonTextWriter.WriteStartArray();
}
else
{
memoryStream.SetLength(0L);
if (!this.SerializeFnWritesNatively)
{
streamWriter.Write(",");
}
}
serializeFn.Invoke(
streamWriter,
enumerator.Current);
}
else
{
if (this.EndOfStream)
{
return 0;
}
memoryStream.SetLength(0L);
jsonTextWriter.WriteEndArray();
jsonTextWriter.WriteEndObject();
this.EndOfStream = true;
}
jsonTextWriter.Flush();
memoryStream.Position = 0L;
}
// TODO: This could be optimized to fully read {count}
// bytes beyond the current element.
// Currently, the logic fully reads an object in the
// list, and returns the remaining bytes of the current
// element. In the next call - in opposite to the suggested
// optimization - the cursors moves to the next element,
// and then continues serialization. This results in
// non-equalized reads.
var result = memoryStream.Read(
buffer,
offset,
count);
Console.Write(
Encoding.UTF8.GetString(
buffer,
offset,
result)); // for demo purposes
return result;
}
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotImplementedException();
}
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
}
And, finally the usage:
using var myCouchClient = new MyCouchClient(
"http://localhost:5984",
"foobar");
var httpClient = HttpClientEx.CreateHttpClient(myCouchClient.Connection);
var timeout = httpClient.Timeout;
bool result;
try
{
httpClient.Timeout = TimeSpan.FromMinutes(10d);
var bulkRequestEx =
new BulkRequestEx<Entity>
{
AllOrNothing = true
};
bulkRequestEx.Include(
new[]
{
new Entity
{
Id = Guid.NewGuid().ToString("N"),
Value = "Test1"
},
new Entity
{
Id = Guid.NewGuid().ToString("N"),
Value = "Test2"
}
});
using var httpRequestMessage = bulkRequestEx.ToHttpRequestMessage(
myCouchClient.Connection,
myCouchClient.DocumentSerializer);
Console.WriteLine(httpRequestMessage); // for demo purposes
using var httpResponseMessage = await httpClient.SendAsync(
httpRequestMessage,
HttpCompletionOption.ResponseContentRead,
CancellationToken.None).ConfigureAwait(false);
result = httpResponseMessage.IsSuccessStatusCode;
}
finally
{
// TODO: Storing the previous timeout in a temporary variable does
// not make the usage of HttpClient.Timeout "thread-safe" (ie other
// callers will also use 00:10:00 while the bulk request is in
// progress).
//httpClient.Timeout = timeout;
}
I know, not the prettiest lines, but enough for experiments.
On the console, I got following output
Method: POST, RequestUri: '/foobar/_bulk_docs', Version: 1.1, Content: System.Net.Http.StreamContent, Headers:
{
Accept: application/json
Host: localhost:5984
Content-Type: application/json
}
{"all_or_nothing":true,"docs":[{"$doctype":"entity","_id":"b1a2f4c149d140d1840ebcc4c079c297","value":"Test1"},{"$doctype":"entity","_id":"1848e4b73fd6424284f3c98cea7df888","value":"Test2"}]}
🍻