mycouch icon indicating copy to clipboard operation
mycouch copied to clipboard

Support streaming for bulk requests

Open dittodhole opened this issue 3 years ago • 4 comments

Hi there!

I'd like to insert 10k rows to couchdb, and therefore I am using the bulk API:

var entities = /**/;
var docs = entities.Select(myCouchClient.DocumentSerializer.Serialize).ToArray();

var bulkRequest = new BulkRequest();
bulkRequest.Include(docs);

var bulkResponse = await myCouchClient.Documents.BulkAsync(
  bulkRequest,
  cancellationToken).ConfigureAwait(false);

The problem I am encountering here is "memory usage" (besides the GC-pressure due to usage of Array and List - Collections.Pooled might be an option, though).

Is there any chance to implement streaming (ie instead of including a string-array, why not include entities directly, and use HttpRequest.GetRequestStream to minimize memory usage?)

Best Andreas

dittodhole avatar Dec 06 '21 12:12 dittodhole

I'm not actively using CouchDB or doing any active development with MyCouch.

If you can add support for it, without breaking the existing API etc. then I don't see why it couldn't exist in MyCouch.

danielwertheim avatar Dec 06 '21 16:12 danielwertheim

Thanks for your reply, I'll analyze the possibilities.

Do you mind, if I ping you again for a final sum up?

Best Andreas

dittodhole avatar Dec 06 '21 17:12 dittodhole

I'll do my best to provide valid feedback

danielwertheim avatar Dec 06 '21 19:12 danielwertheim

😓 ... a long way down

Due to the extra layer of abstraction over System.Net.HttpClient and json (de)serialization, I did not try to mess with your design. Instead, I based my idea on the "native" classes.

I am trying to store following entities:

public record Entity
{
  public string Id { get; init; }
  public string Value { get; init; }
}

To extract the System.Net.HttpClient, I used Reflection 💀

using System;
using System.Net.Http;
using MyCouch;

public static class HttpClientEx
{
  /// <exception cref="ArgumentNullException"/>
  /// <exception cref="Exception"/>
  public static HttpClient CreateHttpClient<T>(T connection)
    where T : IConnection
  {
    if (connection is null)
    {
      throw new ArgumentNullException(nameof(connection));
    }

    // TODO: This expects that Connection.CreateHttpClient has already
    // run. Replace Reflection - but how?

    var type = connection.GetType();
    var propertyInfo = type.GetProperty(
      "HttpClient",
      BindingFlags.Instance | BindingFlags.NonPublic) ?? throw new InvalidOperationException();

    var result = (propertyInfo.GetValue(connection) as HttpClient) ?? throw new InvalidOperationException();

    return result;
  }
}

After that, I need a tool for creating the request uri:

using System;
using System.IO;
using MyCouch;

public static class ConnectionEx
{
  /// <exception cref="ArgumentNullException"/>
  public static string GetRequestUri(
    this IConnection connection,
    string relativeUrl)
  {
    if (connection is null)
    {
      throw new ArgumentNullException(nameof(connection));
    }
    if (string.IsNullOrWhiteSpace(relativeUrl))
    {
      throw new ArgumentNullException(nameof(relativeUrl));
    }

    var result = string.Concat( // TODO: Rather use Path.Combine, but it mixes the slashes :/
      connection.Address.AbsolutePath,
      "/",
      relativeUrl);

    return result;
  }
}

Now, the fun part:

using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using MyCouch;
using MyCouch.Serialization;
using Newtonsoft.Json;

#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
using System.Net.Mime;
#else
using MyCouch.Net;
#endif

public static class BulkRequestEx
{
  public static string RelativeUrl { get; } = "_bulk_docs";
  public static int BufferSize { get; set; } = 1024;
}

public sealed class BulkRequestEx<T>
  where T : class
{
  public BulkRequestEx()
  {
    this.Documents = new List<T>();
  }

  private List<T> Documents { get; }

  public bool AllOrNothing { get; set; } = true;
  public bool NewEdits { get; set; } = true;

  /// <exception cref="ArgumentNullException"/>
  /// <exception cref="Exception"/>
  public void Include(IEnumerable<T> documents)
  {
    this.Documents.AddRange(documents ?? throw new ArgumentNullException(nameof(documents)));
  }

  /// <exception cref="ArgumentNullException"/>
  public HttpRequestMessage ToHttpRequestMessage(
    IDbConnection dbConnection,
    ISerializer serializer)
  {
    if (dbConnection is null)
    {
      throw new ArgumentNullException(nameof(dbConnection));
    }
    if (serializer is null)
    {
      throw new ArgumentNullException(nameof(serializer));
    }

    var result =
      new HttpRequestMessage(
        HttpMethod.Post,
        dbConnection.GetRequestUri(BulkRequestEx.RelativeUrl))
      {
        Headers =
        {
          Accept =
          {
            new MediaTypeWithQualityHeaderValue(
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
              MediaTypeNames.Application.Json)
#else
              HttpContentTypes.Json)
#endif
          },
          Host = dbConnection.Address.Authority
        },
        Content =
          new StreamContent(
            new BulkRequestStream(
              this.AllOrNothing,
              this.NewEdits,
              this.Documents,
              serializer),
            bufferSize: BulkRequestEx.BufferSize)
          {
            Headers =
            {
              ContentType = new MediaTypeHeaderValue(
#if NETSTANDARD2_1_OR_GREATER || NET5_0_OR_GREATER
                MediaTypeNames.Application.Json)
#else
                HttpContentTypes.Json)
#endif
            }
          }
      };

    return result;
  }

  private sealed class BulkRequestStream : Stream
  {
    /// <exception cref="ArgumentNullException"/>
    public BulkRequestStream(
      bool allOrNothing,
      bool newEdits,
      IEnumerable<T> enumerable,
      ISerializer serializer)
    {
      if (serializer is null)
      {
        throw new ArgumentNullException(nameof(serializer));
      }

      this.SerializeFnWritesNatively = false;
      this.AllOrNothing = allOrNothing;
      this.NewEdits = newEdits;
      this.Enumerator = (enumerable ?? throw new ArgumentNullException(nameof(enumerable))).GetEnumerator();
      this.SerializeFn =
        (
          textWriter,
          obj) =>
        {
          var json = serializer.Serialize(obj);

          textWriter.Write(json);
        };

      // TODO: Instead of using the serializer, one could easily
      // use JsonSerializer here. Just set SerializeFnWritesNatively to
      // true.
    }

    private bool SerializeFnWritesNatively { get; }
    private bool AllOrNothing { get; }
    private bool NewEdits { get; }
    private IEnumerator<T>? Enumerator { get; set; }
    private Action<TextWriter, T>? SerializeFn { get; set; }

    /// <inheritdoc/>
    protected override void Dispose(bool disposing)
    {
      base.Dispose(disposing);

      this.Enumerator?.Dispose();
      this.Enumerator = null;
      this.SerializeFn = null;
      this.MemoryStream?.Dispose();
      this.MemoryStream = null;
      this.StreamWriter?.Dispose();
      this.StreamWriter = null;
      this.JsonTextWriter = null;
    }

    /// <inheritdoc/>
    public override bool CanRead => true;

    /// <inheritdoc/>
    public override bool CanSeek => false;

    /// <inheritdoc/>
    public override bool CanWrite => throw new NotImplementedException();

    /// <inheritdoc/>
    public override long Length => throw new NotImplementedException();

    /// <inheritdoc/>
    public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }

    public bool EndOfStream { get; private set; }

    /// <inheritdoc/>
    public override void Flush()
    {
      throw new NotImplementedException();
    }

    private MemoryStream? MemoryStream { get; set; }
    private StreamWriter? StreamWriter { get; set; }
    private JsonTextWriter? JsonTextWriter { get; set; }

    /// <inheritdoc/>
    public override int Read(byte[] buffer, int offset, int count)
    {
      var enumerator = this.Enumerator ?? throw new ObjectDisposedException(nameof(this.Enumerator));
      var serializeFn = this.SerializeFn ?? throw new ObjectDisposedException(nameof(this.SerializeFn));

      var memoryStream = this.MemoryStream ??= new MemoryStream(count);
      var streamWriter = this.StreamWriter ??= new StreamWriter(memoryStream);

      if (memoryStream.Length == memoryStream.Position)
      {
        var jsonTextWriter = this.JsonTextWriter ??= new JsonTextWriter(streamWriter);

        if (enumerator.MoveNext())
        {
          if (memoryStream.Length <= 0L)
          {
            jsonTextWriter.WriteStartObject();

            if (this.AllOrNothing)
            {
              jsonTextWriter.WritePropertyName("all_or_nothing");
              jsonTextWriter.WriteValue(true);
            }

            if (!this.NewEdits)
            {
              jsonTextWriter.WritePropertyName("new_edits");
              jsonTextWriter.WriteValue(false);
            }

            jsonTextWriter.WritePropertyName("docs");

            jsonTextWriter.WriteStartArray();
          }
          else
          {
            memoryStream.SetLength(0L);

            if (!this.SerializeFnWritesNatively)
            {
              streamWriter.Write(",");
            }
          }

          serializeFn.Invoke(
            streamWriter,
            enumerator.Current);
        }
        else
        {
          if (this.EndOfStream)
          {
            return 0;
          }

          memoryStream.SetLength(0L);

          jsonTextWriter.WriteEndArray();

          jsonTextWriter.WriteEndObject();

          this.EndOfStream = true;
        }

        jsonTextWriter.Flush();

        memoryStream.Position = 0L;
      }

      // TODO: This could be optimized to fully read {count}
      // bytes beyond the current element.
      // Currently, the logic fully reads an object in the
      // list, and returns the remaining bytes of the current
      // element. In the next call - in opposite to the suggested
      // optimization - the cursors moves to the next element,
      // and then continues serialization. This results in
      // non-equalized reads.

      var result = memoryStream.Read(
        buffer,
        offset,
        count);

      Console.Write(
        Encoding.UTF8.GetString(
          buffer,
          offset,
          result)); // for demo purposes

      return result;
    }

    /// <inheritdoc/>
    public override long Seek(long offset, SeekOrigin origin)
    {
      throw new NotImplementedException();
    }

    /// <inheritdoc/>
    public override void SetLength(long value)
    {
      throw new NotImplementedException();
    }

    /// <inheritdoc/>
    public override void Write(byte[] buffer, int offset, int count)
    {
      throw new NotImplementedException();
    }
  }
}

And, finally the usage:

using var myCouchClient = new MyCouchClient(
  "http://localhost:5984",
  "foobar");

var httpClient = HttpClientEx.CreateHttpClient(myCouchClient.Connection);

var timeout = httpClient.Timeout;

bool result;
try
{
  httpClient.Timeout = TimeSpan.FromMinutes(10d);

  var bulkRequestEx =
    new BulkRequestEx<Entity>
    {
      AllOrNothing = true
    };

  bulkRequestEx.Include(
    new[]
    {
      new Entity
      {
        Id = Guid.NewGuid().ToString("N"),
        Value = "Test1"
      },
      new Entity
      {
        Id = Guid.NewGuid().ToString("N"),
        Value = "Test2"
      }
    });

  using var httpRequestMessage = bulkRequestEx.ToHttpRequestMessage(
    myCouchClient.Connection,
    myCouchClient.DocumentSerializer);

  Console.WriteLine(httpRequestMessage); // for demo purposes

  using var httpResponseMessage = await httpClient.SendAsync(
    httpRequestMessage,
    HttpCompletionOption.ResponseContentRead,
    CancellationToken.None).ConfigureAwait(false);

  result = httpResponseMessage.IsSuccessStatusCode;
}
finally
{
  // TODO: Storing the previous timeout in a temporary variable does
  // not make the usage of HttpClient.Timeout "thread-safe" (ie other
  // callers will also use 00:10:00 while the bulk request is in
  // progress).

  //httpClient.Timeout = timeout;
}

I know, not the prettiest lines, but enough for experiments.

On the console, I got following output

Method: POST, RequestUri: '/foobar/_bulk_docs', Version: 1.1, Content: System.Net.Http.StreamContent, Headers:
{
  Accept: application/json
  Host: localhost:5984
  Content-Type: application/json
}
{"all_or_nothing":true,"docs":[{"$doctype":"entity","_id":"b1a2f4c149d140d1840ebcc4c079c297","value":"Test1"},{"$doctype":"entity","_id":"1848e4b73fd6424284f3c98cea7df888","value":"Test2"}]}

🍻

dittodhole avatar Dec 07 '21 14:12 dittodhole