influxdb-python
influxdb-python copied to clipboard
requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
Summary.
When uploading a large amount of plaintext data into influxdb HTTP api via requests the connection dies with the error:
requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
after around 30MB of the upload; the file being loaded is 54.3MBs.
Im batching input to 5,000 candles per write.
The script is processing around 340,000 "candles" of data read form json flatfile being loaded into a influxdb over thei HTTP api - these are timetamp, open, high, low, close, volume. At around 200,000 candles sent in batches of 5,000 requests dies, with +- 150,000 remaining.
Expected Result
The file uploads without problem
Actual Result
Upload fails and requests reports BrokenPipeError, with around 150,000 candles remaining.
Traceback (most recent call last):
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 601, in urlopen
chunked=chunked)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 357, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1239, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1285, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1234, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1065, in _send_output
self.send(chunk)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 986, in send
self.sock.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/adapters.py", line 445, in send
timeout=timeout
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 639, in urlopen
_stacktrace=sys.exc_info()[2])
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/util/retry.py", line 357, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 601, in urlopen
chunked=chunked)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 357, in _make_request
conn.request(method, url, **httplib_request_kw)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1239, in request
self._send_request(method, url, body, headers, encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1285, in _send_request
self.endheaders(body, encode_chunked=encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1234, in endheaders
self._send_output(message_body, encode_chunked=encode_chunked)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1065, in _send_output
self.send(chunk)
File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 986, in send
self.sock.sendall(data)
urllib3.exceptions.ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/creslin/PycharmProjects/rtmtb/influxdb/creslin_influx_scratch/write_read_toy.py", line 179, in <module>
"high": float(candle[2]),
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 490, in write_points
tags=tags, protocol=protocol)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 551, in _write_points
protocol=protocol
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 327, in write
headers=headers
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 267, in request
timeout=self._timeout
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/sessions.py", line 512, in request
resp = self.send(prep, **send_kwargs)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/sessions.py", line 622, in send
r = adapter.send(request, **kwargs)
File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/adapters.py", line 495, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
Reproduction Steps
#######################
# Load candles - batches of 5,000: error
# requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))
from influxdb import InfluxDBClient
from influxdb import DataFrameClient
import json
'''
create a client
'''
dbname = 'crypto'
host = 'localhost'
port = '8086'
user = ''
password = ''
client = InfluxDBClient(host, port, user, password, dbname)
df_client = DataFrameClient(host, port, user, password, dbname)
##### load 9months 1min eth candles (342,000) items
f = open('/Users/creslin/PycharmProjects/rtmtb/influxdb/json_candles/Binance_ETHBTC_1m_1514764800000-1535760000000.json','r')
candles = json.loads(f.read())
total_len = len(candles)
candle_records = []
exchange = 'binance'
symbol = 'ETH/USDT'
count = 0
for candle in candles:
candle_item = {}
candle_item ={
"measurement": "crypto_1m",
"tags": {
"exchange": exchange,
"symbol": symbol
},
"time": candle[0],
"fields": {
"open": float(candle[1]),
"high": float(candle[2]),
"low": float(candle[3]),
"close": float(candle[4]),
"volume": float(candle[5])
}
}
candle_records.append(candle_item)
count = count + 1
#Write candles in batches of 5000,
if count >= 5000:
'''
Write Eth/btc candles in batches of 5000
'''
#print("Write points: {0}".format(candle_records))
client.write_points(candle_records, time_precision='ms')
count = 0
total_len = int(total_len) - 5000
print(total_len)
System Information
python3 -m requests.help
{
"chardet": {
"version": "3.0.4"
},
"cryptography": {
"version": ""
},
"idna": {
"version": "2.6"
},
"implementation": {
"name": "CPython",
"version": "3.6.5"
},
"platform": {
"release": "17.7.0",
"system": "Darwin"
},
"pyOpenSSL": {
"openssl_version": "",
"version": null
},
"requests": {
"version": "2.18.4"
},
"system_ssl": {
"version": "100020ff"
},
"urllib3": {
"version": "1.22"
},
"using_pyopenssl": false
}
The influxDB is a standard provided vanilla docker:
https://hub.docker.com/_/influxdb/
1.6-alpine, 1.6.2-alpine, alpine (influxdb/1.6/alpine/Dockerfile)
I can share the json input file if helpful, it is around p7 zips to around 10MB
By using a series helper avoid the above problem.
Im unsure if there is a bug or not and im working around it or the error is expected behaviour as how i was inserting the data.
for future reference, this code works -inserting in 5000 chunks.
from influxdb import InfluxDBClient
from influxdb import DataFrameClient
from influxdb import SeriesHelper
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
import json
'''
create a client
'''
dbname = 'crypto'
host = 'ibm'
port = '8086'
user = ''
password = ''
client = InfluxDBClient(host, port, user, password, dbname)
df_client = DataFrameClient(host, port, user, password, dbname)
### Useful in test - nuke it make it
print("Drop database: " + dbname)
client.drop_database(dbname)
print("Create database: " + dbname)
client.create_database(dbname)
class MySeriesHelper(SeriesHelper):
'''
Instantiate SeriesHelper to write points to the backend.
This is to write streams of records / bulk inserts
'''
class Meta:
"""Meta class stores time series helper configuration."""
# The client should be an instance of InfluxDBClient.
client = client
# The series name must be a string. Add dependent fields/tags
# in curly brackets.
#series_name = 'crypto_1.{exchange}.{symbol}'
series_name = 'crypto_1'
# Defines all the fields in this time series.
fields = ['open', 'high', 'low', 'close', 'volume']
# Defines all the tags for the series.
tags = ['exchange', 'symbol']
# Defines the number of data points to store prior to writing
# on the wire.
bulk_size = 5000
# set the time precision to ms ("s"|"ns"|"ms"|"u") - make use use the right one!
time_precision = 'ms'
# autocommit must be set to True when using bulk_size
autocommit = True
# ##### load 9months 1min eth candles (342,000) items
f = open('/Users/creslin/PycharmProjects/rtmtb/influxdb/json_candles/Binance_ETHBTC_1m_1514764800000-1535760000000.json','r')
candles = json.loads(f.read())
exchange = 'binance'
symbol = 'ETH/USDT'
# Since bulk_size is set to 5000, upon the 5,000th construction call, *all* data
# points will be written / committed on the wire via MySeriesHelper.Meta.client.
count = 0
for candle in candles:
MySeriesHelper(exchange=exchange, symbol=symbol,
open =float(candle[1]),
high =float(candle[2]),
low =float(candle[3]),
close =float(candle[4]),
volume=float(candle[5]),
time =candle[0])
count = count + 1
print(count)
# To manually submit data points which are not yet written, call commit:
MySeriesHelper.commit()
# To inspect the JSON which will be written, call _json_body_():
MySeriesHelper._json_body_()
'''
Read last 10,000 candles into a dataframe
'''
query = 'select time, open, high, low, close, volume from crypto_1 ORDER by time DESC LIMIT 10000'
print("Querying data to a dataframe: " + query)
print('')
df_result = df_client.query(query)
#df_result = df_result['crypto_1m'].describe()
print(df_result)
I had the same issue loading 38,000+ records. However adding batch_size=1000 to the write_points call solved the problem. I did experiment with the number, so the batches may be bigger.
I had the same issue loading 38,000+ records. However adding
batch_size=1000to thewrite_pointscall solved the problem. I did experiment with the number, so the batches may be bigger.
Thank you so much it works for me! :)
I had the same issue loading 38,000+ records. However adding
batch_size=1000to thewrite_pointscall solved the problem. I did experiment with the number, so the batches may be bigger.
Thanks you very much, worked like a charm