influxdb-python icon indicating copy to clipboard operation
influxdb-python copied to clipboard

requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

Open creslinux opened this issue 7 years ago • 4 comments

Summary. When uploading a large amount of plaintext data into influxdb HTTP api via requests the connection dies with the error: requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe')) after around 30MB of the upload; the file being loaded is 54.3MBs.

Im batching input to 5,000 candles per write.

The script is processing around 340,000 "candles" of data read form json flatfile being loaded into a influxdb over thei HTTP api - these are timetamp, open, high, low, close, volume. At around 200,000 candles sent in batches of 5,000 requests dies, with +- 150,000 remaining.

Expected Result

The file uploads without problem

Actual Result

Upload fails and requests reports BrokenPipeError, with around 150,000 candles remaining.

Traceback (most recent call last):
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 601, in urlopen
    chunked=chunked)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 357, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1239, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1285, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1234, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 986, in send
    self.sock.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/adapters.py", line 445, in send
    timeout=timeout
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 639, in urlopen
    _stacktrace=sys.exc_info()[2])
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/util/retry.py", line 357, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/packages/six.py", line 685, in reraise
    raise value.with_traceback(tb)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 601, in urlopen
    chunked=chunked)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/urllib3/connectionpool.py", line 357, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1239, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1285, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1234, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 1065, in _send_output
    self.send(chunk)
  File "/usr/local/Cellar/python/3.6.5/Frameworks/Python.framework/Versions/3.6/lib/python3.6/http/client.py", line 986, in send
    self.sock.sendall(data)
urllib3.exceptions.ProtocolError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/creslin/PycharmProjects/rtmtb/influxdb/creslin_influx_scratch/write_read_toy.py", line 179, in <module>
    "high": float(candle[2]),
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 490, in write_points
    tags=tags, protocol=protocol)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 551, in _write_points
    protocol=protocol
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 327, in write
    headers=headers
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/influxdb/client.py", line 267, in request
    timeout=self._timeout
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/sessions.py", line 512, in request
    resp = self.send(prep, **send_kwargs)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/sessions.py", line 622, in send
    r = adapter.send(request, **kwargs)
  File "/Users/creslin/PycharmProjects/rtmtb/env/lib/python3.6/site-packages/requests/adapters.py", line 495, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

Reproduction Steps

#######################
# Load candles - batches of 5,000: error
# requests.exceptions.ConnectionError: ('Connection aborted.', BrokenPipeError(32, 'Broken pipe'))

from influxdb import InfluxDBClient
from influxdb import DataFrameClient

import json

'''
create a client
'''
dbname   = 'crypto'
host     = 'localhost'
port     = '8086'
user     = ''
password = ''

client = InfluxDBClient(host, port, user, password, dbname)
df_client = DataFrameClient(host, port, user, password, dbname)

##### load 9months 1min eth candles (342,000) items
f = open('/Users/creslin/PycharmProjects/rtmtb/influxdb/json_candles/Binance_ETHBTC_1m_1514764800000-1535760000000.json','r')
candles = json.loads(f.read())

total_len  = len(candles)

candle_records = []
exchange = 'binance'
symbol = 'ETH/USDT'

count = 0
for candle in candles:
    candle_item = {}
    candle_item ={
            "measurement": "crypto_1m",
            "tags": {
                "exchange": exchange,
                "symbol": symbol
            },
            "time": candle[0],
            "fields": {
                "open": float(candle[1]),
                "high": float(candle[2]),
                "low": float(candle[3]),
                "close": float(candle[4]),
                "volume": float(candle[5])
            }
        }
    candle_records.append(candle_item)
    count = count + 1

    #Write candles in batches of 5000,
    if count >= 5000:
        '''
        Write Eth/btc candles in batches of 5000
        '''
        #print("Write points: {0}".format(candle_records))
        client.write_points(candle_records, time_precision='ms')

        count = 0
        total_len = int(total_len) - 5000
        print(total_len)

System Information

 python3 -m requests.help
{
  "chardet": {
    "version": "3.0.4"
  },
  "cryptography": {
    "version": ""
  },
  "idna": {
    "version": "2.6"
  },
  "implementation": {
    "name": "CPython",
    "version": "3.6.5"
  },
  "platform": {
    "release": "17.7.0",
    "system": "Darwin"
  },
  "pyOpenSSL": {
    "openssl_version": "",
    "version": null
  },
  "requests": {
    "version": "2.18.4"
  },
  "system_ssl": {
    "version": "100020ff"
  },
  "urllib3": {
    "version": "1.22"
  },
  "using_pyopenssl": false
}

The influxDB is a standard provided vanilla docker: https://hub.docker.com/_/influxdb/ 1.6-alpine, 1.6.2-alpine, alpine (influxdb/1.6/alpine/Dockerfile)

I can share the json input file if helpful, it is around p7 zips to around 10MB

creslinux avatar Sep 08 '18 08:09 creslinux

By using a series helper avoid the above problem.

Im unsure if there is a bug or not and im working around it or the error is expected behaviour as how i was inserting the data.

for future reference, this code works -inserting in 5000 chunks.

from influxdb import InfluxDBClient
from influxdb import DataFrameClient
from influxdb import SeriesHelper

import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

import json

'''
create a client
'''
dbname   = 'crypto'
host     = 'ibm'
port     = '8086'
user     = ''
password = ''

client = InfluxDBClient(host, port, user, password, dbname)
df_client = DataFrameClient(host, port, user, password, dbname)


### Useful in test - nuke it make it
print("Drop database: " + dbname)
client.drop_database(dbname)

print("Create database: " + dbname)
client.create_database(dbname)


class MySeriesHelper(SeriesHelper):
    '''
    Instantiate SeriesHelper to write points to the backend.
    This is to write streams of records / bulk inserts
    '''

    class Meta:
        """Meta class stores time series helper configuration."""

        # The client should be an instance of InfluxDBClient.
        client = client

        # The series name must be a string. Add dependent fields/tags
        # in curly brackets.
        #series_name = 'crypto_1.{exchange}.{symbol}'
        series_name = 'crypto_1'

        # Defines all the fields in this time series.
        fields = ['open', 'high', 'low', 'close', 'volume']

        # Defines all the tags for the series.
        tags = ['exchange', 'symbol']

        # Defines the number of data points to store prior to writing
        # on the wire.
        bulk_size = 5000

        # set the time precision to ms ("s"|"ns"|"ms"|"u") - make use use the right one!
        time_precision = 'ms'

        # autocommit must be set to True when using bulk_size
        autocommit = True


# ##### load 9months 1min eth candles (342,000) items
f = open('/Users/creslin/PycharmProjects/rtmtb/influxdb/json_candles/Binance_ETHBTC_1m_1514764800000-1535760000000.json','r')
candles = json.loads(f.read())

exchange = 'binance'
symbol = 'ETH/USDT'

# Since bulk_size is set to 5000, upon the 5,000th construction call, *all* data
# points will be written / committed on the wire via MySeriesHelper.Meta.client.
count = 0
for candle in candles:
    MySeriesHelper(exchange=exchange, symbol=symbol,
                   open  =float(candle[1]),
                   high  =float(candle[2]),
                   low   =float(candle[3]),
                   close =float(candle[4]),
                   volume=float(candle[5]),
                   time  =candle[0])

    count = count + 1
    print(count)

# To manually submit data points which are not yet written, call commit:
MySeriesHelper.commit()

# To inspect the JSON which will be written, call _json_body_():
MySeriesHelper._json_body_()


'''
Read last 10,000 candles into a dataframe
'''
query = 'select time, open, high, low, close, volume from crypto_1 ORDER by time DESC LIMIT 10000'
print("Querying data to a dataframe: " + query)
print('')
df_result = df_client.query(query)
#df_result = df_result['crypto_1m'].describe()
print(df_result)

creslinux avatar Sep 08 '18 12:09 creslinux

I had the same issue loading 38,000+ records. However adding batch_size=1000 to the write_points call solved the problem. I did experiment with the number, so the batches may be bigger.

dirkjankrijnders avatar Nov 20 '18 10:11 dirkjankrijnders

I had the same issue loading 38,000+ records. However adding batch_size=1000 to the write_points call solved the problem. I did experiment with the number, so the batches may be bigger.

Thank you so much it works for me! :)

sonejostudios avatar Mar 17 '20 16:03 sonejostudios

I had the same issue loading 38,000+ records. However adding batch_size=1000 to the write_points call solved the problem. I did experiment with the number, so the batches may be bigger.

Thanks you very much, worked like a charm

koutaiba-alabd avatar Sep 04 '21 12:09 koutaiba-alabd