influxdb-client-csharp
influxdb-client-csharp copied to clipboard
The library hangs on the Flush method
I tried to upload many records via WriteAPI. When I try to send to many records (10000-100000) they are doesn't be received all. When I try to split to 1000 records per request the library hangs on Flush method. If I doesn't use Flush before disposing WriteAPI some data is lost.
The sample code:
var start = 0;
for (;;)
{
var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
if (historyBars.Length == 0)
{
return;
}
if (start != 0)
await Task.Delay(100, token);
start += MaxBarsPerRequest;
m_logger.LogDebug(
$"Write bars for {historyBars.First().Security}. From: {historyBars.First().Date}, To: {historyBars.Last().Date}");
using var api = m_client.GetWriteApi();
api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S,
historyBars);
//api.Flush();
}
@NektoDron thanks! we will take a look.
Hi @NektoDron,
thanks again for the issue.
We recognized that our write buffer use incorrect batch size during heavy load. I merged #69 that fix it.
The issue is fixed in 1.7.0 milestone. If you would like to use a preview version use our nightly build: InfluxDB.Client 1.7.0-dev.512.
One note to your code... write_api should be used as a singleton. Try to use something like that:
var m_client = InfluxDBClientFactory.Create("http://localhost:9999", "my-token".ToCharArray());
var api = m_client.GetWriteApi(WriteOptions.CreateNew().BatchSize(MaxBarsPerRequest).FlushInterval(10_000).Build());
var start = 0;
for (;;)
{
var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
if (historyBars.Length == 0)
{
break;
}
if (start != 0) {
Trace.WriteLine("Delaying...");
await Task.Delay(100);
}
start += MaxBarsPerRequest;
Trace.WriteLine(
$"Add bars to buffer From: {historyBars.First().Date}, To: {historyBars.Last().Date}. Remaining {CountToWrite-start}");
api.WriteMeasurements(HistoryBarConstant.Bucket, HistoryBarConstant.OrgId, WritePrecision.S,
historyBars);
}
Trace.WriteLine("Flushing data...");
m_client.Dispose();
Trace.WriteLine("Finished");
I am also working on improve a serialization of measurements into LineProtocol... stay tuned ;)
Regards
Can I use WriteApi for multi threads?
Yes, It's support multithreading.
Hi @NektoDron,
The improved serialization of measurements into LineProtocol - #70 - is merged.
If you would like to use a preview version use our nightly build: InfluxDB.Client 1.7.0-dev.528.
Regards
No, it is not worked. I've removed the old bucket and made the new one. After I have reloaded the all of data with new library and modified code like your sample. In the logs I see what all data has been success written up to 2 April. But in the DB I see only half or less.
You could enable detail logging by: Client.SetLogLevel(LogLevel.Body) and you will see output requests to InfuxDB.
Are your data a unique points?
https://docs.influxdata.com/influxdb/v1.7/troubleshooting/frequently-asked-questions/#how-does-influxdb-handle-duplicate-points
I see the library is locked here: https://www.screencast.com/t/JpQgrB61
I will try with loglevel
This lock means that the library waiting to flush all data into InfluxDB.
Anyway thanks for cooperation!
How many it should be uploaded? I have wait some minutes.
It depends how much data you want to save into InfluxDB, as you see in log:
<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END
<-- 204
<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END
<-- 204
<-- Header: Date=Mon, 06 Apr 2020 10:43:56 GMT
<-- END
you are able to do 3 requests per seconds...
The best approach will be directly prepare LineProtocol to avoid type overhead in MeasurementMapper and PointData.
Regards
I have waited for 15 minutes. There are no new records in the log and library is stuck at the WaitToCondition method.
https://www.screencast.com/t/o32qAhK5LH There are no active threads with data uploading but thread is locked.
I made some experiments. All works fine if I don't call Dispose. All data is loaded fine. But garbage collector can't Dispose WriteApi's objects in the finalize thread.
Got any news? I can help if necessary.
I am out of an office, but next week I will will take a look.
Thanks for good cooperation
Hi @NektoDron,
I tried simulate it by import large amout of data as a simple program with main method and everything was fine... all threads was disposed :(
Could you please describe your environment? Which version of dotnet do you use, which platform?
It's strange behaviour because InfluxDB.Client.WriteApi.Dispose - WaitToCondition is limited by 30seconds.
Regards
Hi @NektoDron,
The problem was wrong implementation of WaitToCondition. It is fixed by #81.
If you would like to use a preview version use our nightly build: InfluxDB.Client 1.8.0-dev.639.
Regards
It's not worked. The problem is same. There is deadlock at Dispose.
It's work, but write freeze thread for 30sec. It' not a good solution.
Flushing batches before shutdown. Exception thrown: 'System.ArgumentNullException' in System.Private.CoreLib.dll TSCloud.TSSignal.Agent Error: 0 : The WriteApi can't be gracefully dispose! - 30000ms elapsed. Flushing batches before shutdown. Exception thrown: 'System.ArgumentNullException' in System.Private.CoreLib.dll TSCloud.TSSignal.Agent Error: 0 : The WriteApi can't be gracefully dispose! - 30000ms elapsed. Flushing batches before shutdown.
I think bug is at the disposing logic.

It looks like that is related to handling an exception. Could you share an exception log from your debug output?
I modified code to this and now it's works fine for me

PS. The exceptions are not from this part. .
@NektoDron Are you running in UI Thread. ( WPF or WinForm If running in UI Thread, the asynchronous method will hang.
I'm running net core console application.
I can't reproduce your bug, but I am very interested in this question, can you upload the code where you have a problem? My test code is like this
async static void Write()
{
var bars = Enumerable.Range(1, 100000).Select(
(n) =>
{
return new Bar() { Security = "Safe", Value = n, Date = DateTime.UtcNow.AddMinutes(n) };
}).ToList();
CancellationToken token;
var MaxBarsPerRequest = 1000;
var start = 0;
var m_client = InfluxDB.Client.InfluxDBClientFactory.CreateV1("http://localhost:8086", "admin", "admin".ToCharArray(), "SCTech", "autogen");
for (; ; )
{
var historyBars = bars.Skip(start).Take(MaxBarsPerRequest).ToArray();
if (historyBars.Length == 0)
{
return;
}
if (start != 0)
await Task.Delay(100, token);
start += MaxBarsPerRequest;
Console.WriteLine(
$"Write bars for {historyBars.First().Security}. From: {historyBars.First().Date}, To: {historyBars.Last().Date}");
using var api = m_client.GetWriteApi();
api.WriteMeasurements("SCTech", "-", WritePrecision.S,
historyBars);
api.Flush();
}
}
[Measurement("History")]
class Bar
{
[Column("Security", IsTag = true)]
public string Security { get; set; }
[Column("Time", IsTimestamp = true)]
public DateTime Date { get; set; }
[Column("Value")]
public int Value { get; set; }
}
My code is a part of big application. It can't work separately. May be problem is what I have about 10 field per history bar (different parameters). Now I'm using WriteRecord method but result is same.
public static string GetInfluxRecord(this IDataBar bar, string broker, string security, ChartInterval interval, bool convertTime)
{
var b = new StringBuilder();
void Append(string field, double v, bool isFirst = false)
{
if (!isFirst && DoubleUtil.IsZero(v))
return;
if (!isFirst)
b.Append(',');
b.Append(field);
b.Append('=');
b.Append(v.ToString(CultureInfo.InvariantCulture));
}
void AppendInt(string field, int v)
{
if (v == 0)
return;
b.Append(',');
b.Append(field);
b.Append('=');
b.Append(v);
}
b.Append(HistoryBarConstant.Measurement);
b.Append(",broker=");
b.Append(broker);
b.Append(",security=");
b.Append(security);
b.Append(",interval=");
b.Append(interval);
b.Append(' ');
Append("open", bar.Open, true);
Append("low", bar.Low);
Append("high", bar.High);
Append("close", bar.Close);
Append("volume", bar.Volume);
Append("interest", bar.Interest);
AppendInt("ticks_count", bar.TicksCount);
if (bar is IBar br)
{
Append("bid", br.Bid);
Append("bid_qty", br.BidQty);
Append("ask", br.Ask);
Append("ask_qty", br.AskQty);
Append("step_price", br.StepPrice);
}
var date = convertTime ? bar.Date.ToUniversalTime() : new DateTime(bar.Date.Ticks, DateTimeKind.Utc);
b.Append(' ');
b.Append(date.Subtract(new DateTime(1970, 1, 1)).TotalSeconds);
return b.ToString();
}
And I written later what fix in the WriteApi constructor helps me:
_subject
//
// Batching
//
.Publish(connectedSource =>
{
return connectedSource
.Window(tempBoundary)
.Merge(Observable.Defer(() =>
{
connectedSource
.Window(TimeSpan.FromMilliseconds(writeOptions.FlushInterval), writeOptions.BatchSize,
writeOptions.WriteScheduler)
.Merge(_flush)
.Subscribe(t => {},
exception =>
{
_disposed = true;
Console.WriteLine($"The unhandled exception occurs: {exception}");
},
() =>
{
_disposed = true;
Console.WriteLine("The WriteApi was disposed.");
});
return Observable.Empty<IObservable<BatchWriteData>>();
}));
})
It happens because Dispose at "if (!_subject.IsDisposed) _subject.OnCompleted();" doesn't set "_disposed" for me. But my correction set it at "if (!_flush.IsDisposed) _flush.OnCompleted()"
@NektoDron hi, I try to use 10 or more fields, but it works fine. I think this exception may be caused when the asynchronous task hangs, so after onComplete, the thread does not return. You can track the calling chains, for example
return Observable
.Defer(() =>
service.PostWriteAsyncWithIRestResponse(org, bucket,
Encoding.UTF8.GetBytes(lineProtocol), null,
"identity", "text/plain; charset=utf-8", null, "application/json", null, precision)
.ToObservable())
.RetryWhen(f => f.SelectMany(e =>
Track PostWriteAsyncWithIRestResponse
// make the HTTP request
IRestResponse localVarResponse = (IRestResponse) await this.Configuration.ApiClient.CallApiAsync(localVarPath,
Method.POST, localVarQueryParams, localVarPostBody, localVarHeaderParams, localVarFormParams, localVarFileParams,
localVarPathParams, localVarHttpContentType);
Track CallApiAsync
When I run in UI Thread, the asynchronous method will deadlock, it looks like you.
Where I can track it? I'm using WriteApi only at this simple code and I don't want to go down to such a low level.