elasticbulk
                                
                                 elasticbulk copied to clipboard
                                
                                    elasticbulk copied to clipboard
                            
                            
                            
                        Add data in bulk to elasticsearch. It supports data streaming from PostgreSQL or Filesystem
Elastic Bulk
Add data in bulk to ElasticSearch. It supports data streaming from PostgreSQL, MSSQL, MySQL, MariaDB, SQLite3, Filesystem and CSV
Start
npm install elasticbulk --save
const elasticbulk = require('elasticbulk');
Add JSON data to Elasticsearch
const elasticbulk = require('elasticbulk');
// some array data
var data = [];
elasticbulk.import(data, {
  index: 'movies',
  type: 'movies',
  host: 'http://localhost:9200'
})
.then(function(res) {
  console.log(res);
})
Add data to ItemsAPI from JSON file
The movies.json is a comma delimited json file.
const elasticbulk = require('elasticbulk');
const stream = fs.createReadStream('./movies.json')
.pipe(JSONStream.parse())
const config = {
  "sorting_fields": ["year", "rating", "votes", "reviews_count"],
  "aggregations": {
    "year": {
      "size": 10,
      "conjunction": true
    },
    "genres": {
      "size": 10,
      "conjunction": false
    },
    "tags": {
      "size": 10,
      "conjunction": true
    },
    "actors": {
      "size": 10,
      "conjunction": true
    },
    "country": {
      "size": 10,
      "conjunction": true
    }
  }
}
elasticbulk.import(stream, {
  engine: 'itemsapi',
  // api_key: '',
  index_name: 'movies',
  host: 'http://localhost:9200',
}, config)
.then(function(res) {
  console.log(res);
})
Add data to Meilisearch from JSON file
The movies.json is a comma delimited json file.
const elasticbulk = require('elasticbulk');
const stream = fs.createReadStream('./movies.json')
.pipe(JSONStream.parse())
const config = {
  rankingRules: [
    'typo',
  ],
  distinctAttribute: 'id',
  searchableAttributes: [
    'name'
  ],
  attributesForFaceting: [
    'director',
    'genres'
  ],
  displayedAttributes: [
    'name'
  ],
  stopWords: [
  ],
  synonyms: {
  }
}
elasticbulk.import(stream, {
  chunk_size: 1000,
  timeout: 6000,
  // intervalMs for check internal indexing status
  interval: 100,
  primary_key: 'id',
  engine: 'meilisearch',
  api_key: 'API_KEY',
  index_name: 'movies',
  host: 'http://localhost:9200',
}, config)
.then(function(res) {
  console.log(res);
})
Add data to Elasticsearch from JSON file
The movies.json is a comma delimited json file.
const elasticbulk = require('elasticbulk');
const stream = fs.createReadStream('./movies.json')
.pipe(JSONStream.parse())
elasticbulk.import(stream, {
  index: 'movies',
  type: 'movies',
  host: 'http://localhost:9200',
})
.then(function(res) {
  console.log(res);
})
Add data to Elasticsearch from CSV
You can also use ElasticBulk for importing data from CSV. It was tested for millions of records
const fs = require('fs');
const csv = require('fast-csv');
const elasticbulk = require('elasticbulk');
var stream = fs.createReadStream('questions.csv')
.pipe(csv({
  headers: true
}))
.transform(function(data){
  // you can transform your data here
  return data;
})
elasticbulk.import(stream, {
  index: 'questions',
  type: 'questions',
  host: 'http://localhost:9200'
})
.then(function(res) {
  console.log(res);
})
Add data to Elasticsearch from PostgreSQL
const Promise = require('bluebird');
const through2 = require('through2');
const db = require('knex');
const elasticbulk = require('elasticbulk');
var stream = db.select('*').from('movies')
.stream()
.pipe(through2({ objectMode: true, allowHalfOpen: false }, function (chunk, enc, cb) {
  cb(null, chunk)
}))
elasticbulk.import(stream, {
  index: 'movies',
  type: 'movies',
  host: 'localhost:9200',
})
.then(function(res) {
  console.log(res);
})
Add data to Elasticsearch from MongoDB
const elasticbulk = require('.elasticbulk');
const mongoose = require('mongoose');
const Promise = require('bluebird');
mongoose.connect('mongodb://localhost/your_database_name', {
  useMongoClient: true
});
mongoose.Promise = Promise;
var Page = mongoose.model('Page', new mongoose.Schema({
  title: String,
  categories: Array
}), 'your_collection_name');
// stream query 
var stream = Page.find({
}, {title: 1, _id: 0, categories: 1}).limit(1500000).skip(0).batchSize(500).stream();
elasticbulk.import(stream, {
  index: 'my_index_name',
  type: 'my_type_name',
  host: 'localhost:9200',
}, {
  title: {
    type: 'string'
  },
  categories: {
    type: 'string',
    index: 'not_analyzed'
  }
})
.then(function(res) {
  console.log('Importing finished');
})
Configuration
elasticbulk.import(data, {
  index: 'movies',
  // optional
  type: 'movies',
  // batch size 
  chunk_size: 500,
  debug: true,
  host: 'localhost:9200',
}, {
  // mapping
  name: {
    type: 'string'
  }
})
.then(function(res) {
  console.log(res);
})
Tests
# Test ES 1.7
docker run -it -d  -p 9200:9200 -p 9300:9300 -v $HOME/elasticsearch1.7/data:/data -v $HOME/elasticsearch1.7/logs:/logs barnybug/elasticsearch:1.7.2
mocha --exit -t 15000 tests/elasticitemsSpec.js
# Test ES 7.x
docker run -it -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:7.10.1
mocha --exit -t 15000 tests/elasticitems7xSpec.js