elasticsearch-elixir
elasticsearch-elixir copied to clipboard
Indexing with hot_swap or build always fails with `Malformed content, found extra data after parsing: START_OBJECT`
When creating a new index and loading data, the index is created fine but when uploading the data to elasticsearch, it returns
%Elasticsearch.Exception{
status: 400,
line: nil,
col: nil,
message: "failed to parse",
type: "mapper_parsing_exception",
query: nil,
raw: %{
"error" => %{
"caused_by" => %{
"reason" => "Malformed content, found extra data after parsing: START_OBJECT",
"type" => "illegal_argument_exception"
},
"reason" => "failed to parse",
"root_cause" => [
%{
"reason" => "failed to parse",
"type" => "mapper_parsing_exception"
}
],
"type" => "mapper_parsing_exception"
},
"status" => 400
}
}
This implies the ndjson payload created for the bulk api is faulty from what I can tell. Would appreciate any guidance if I'm missing something.
example payload created for bulk api by library
"{\"create\":{\"_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Flail Joint - Paralytic\",\"external_id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140071AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"239763002\",\"external_id\":\"54779ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Flail Joint - Paralytic\",\"id\":\"175214\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"\",\"display_locale\":\"en\",\"display_name\":\"Floppy Infant Syndrome\",\"external_id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140054AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"33010005\",\"external_id\":\"54794ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"194010\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"P94.2\",\"external_id\":\"104024ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Floppy Infant Syndrome\",\"id\":\"168840\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Congenital hypotonia\"}]}\n{\"create\":{\"_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Cyst due to the occlusion of the duct of a follicle or small gland.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Cyst of Ovary\",\"external_id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140012AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"2615004\",\"external_id\":\"54834ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"174464\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"N83.0\",\"external_id\":\"118576ABBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Cyst of Ovary\",\"id\":\"197238\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Follicular cyst of ovary\"}]}\n{\"create\":{\"_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"Malignant lymphoma in which the lymphomatous cells are clustered into identifiable nodules within the lymph nodes. The nodules resemble to some extent the germinal centers of lymph node follicles and most likely represent neoplastic proliferation of lymph node-derived follicular center B-lymphocytes. This class of lymphoma usually occurs in older persons, is commonly multinodal, and possibly extranodal. Patients whose lymphomas present a follicular or nodular pattern generally have a more indolent course than those presenting with a diffuse pattern.\",\"display_locale\":\"en\",\"display_name\":\"Follicular Lymphoma\",\"external_id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140008AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"308121000\",\"external_id\":\"54839ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Follicular Lymphoma\",\"id\":\"175378\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null}]}\n{\"create\":{\"_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"_index\":\"diagnosis-1684390655188645\"}}\n{\"description\":\"A contagious skin disease caused by a parasitic mite (Sarcoptes scabiei) and characterized by intense itching.\",\"display_locale\":\"en\",\"display_name\":\"Scabies\",\"external_id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"id\":\"140AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\",\"standards\":[{\"code\":\"128869009\",\"external_id\":\"57446ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1070\",\"retired\":false,\"standard\":\"SNOMED-CT\",\"to_concept_name_resolved\":null},{\"code\":\"B86\",\"external_id\":\"87991ABBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\",\"from_concept_name_resolved\":\"Scabies\",\"id\":\"1375\",\"retired\":false,\"standard\":\"ICD-10-WHO\",\"to_concept_name_resolved\":\"Scabies\"}]}\n"
@danielberkompas if you have the chance, please help point me in the right direction, I've not found much helpful online but I've had no luck for the past week or so.
@cdvx did you find a solution to this? @danielberkompas please can you help?
@cdvx did you find a solution to this? @danielberkompas please can you help?
No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded
Hi @cdvx @danielberkompas I am also facing the same issue on Elasticsearch 8.x. Can you pls help in solving this
@cdvx did you find a solution to this? @danielberkompas please can you help?
No i didnt, just found a work around, adding the index to the the json index file so it's created when loaded
@cdvx can you share what did you add to the json index file ? Or a sample format of this file
Anyone have any luck on this? @krezicoder did you find a fix?
Hello everyone - this is because of Elasticsearch 8.x
https://stackoverflow.com/questions/33340153/elasticsearch-bulk-index-json-data
Based on the post - 7.x wants this, and what the libarary calls:
curl -XPOST localhost:9200/index_local/_doc/_bulk --data-binary @/home/data1.json
However in 8.x, _doc/ should be removed:
curl -XPOST localhost:9200/index_local/_bulk --data-binary @/home/data1.json`
My tests via Kibana does confirm this.
This code needs to change the prefix based on some config.
Not sure when @danielberkompas will be updating this repo... so if anyone is looking, my workaround was to:
- Copy and paste the file @lovebes pasted above and remove the
_doc/in the URL - Since I wanted to upsert and not create (because I am uploading the same documents multiple times and want them to be overwritten and not error out), I had to change my Elastic.Document record to follow the
{ "doc: { ... }, "doc_as_upsert": trueformat
Code:
lib/h1bjobs/elasticsearch/bulk.ex
defmodule Elasticsearch.Index.BulkV2 do
@moduledoc """
Functions for creating bulk indexing requests.
"""
alias Elasticsearch.{
Cluster,
Document
}
require Logger
@doc """
Encodes a given variable into an Elasticsearch bulk request. The variable
must implement `Elasticsearch.Document`.
## Examples
iex> Bulk.encode(Cluster, %Post{id: "my-id"}, "my-index")
{:ok, \"\"\"
{"create":{"_index":"my-index","_id":"my-id"}}
{"doctype":{"name":"post"},"author":null,"title":null}
\"\"\"}
iex> Bulk.encode(Cluster, 123, "my-index")
{:error,
%Protocol.UndefinedError{description: "",
protocol: Elasticsearch.Document, value: 123}}
"""
@spec encode(Cluster.t(), struct, String.t(), String.t()) ::
{:ok, String.t()}
| {:error, Error.t()}
def encode(cluster, struct, index, action \\ "create") do
{:ok, encode!(cluster, struct, index, action)}
rescue
exception ->
{:error, exception}
end
@doc """
Same as `encode/3`, but returns the request and raises errors.
## Example
iex> Bulk.encode!(Cluster, %Post{id: "my-id"}, "my-index")
\"\"\"
{"create":{"_index":"my-index","_id":"my-id"}}
{"doctype":{"name":"post"},"author":null,"title":null}
\"\"\"
iex> Bulk.encode!(Cluster, 123, "my-index")
** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123 of type Integer
"""
def encode!(cluster, struct, index, action \\ "create") do
config = Cluster.Config.get(cluster)
header = header(config, action, index, struct)
document =
struct
|> Document.encode()
|> config.json_library.encode!()
"#{header}\n#{document}\n"
end
defp header(config, type, index, struct) do
attrs = %{
"_index" => index,
"_id" => Document.id(struct)
}
attrs =
if routing = Document.routing(struct) do
Map.put(attrs, "_routing", routing)
else
attrs
end
config.json_library.encode!(%{type => attrs})
end
@doc """
Uploads all the data from the list of `sources` to the given index.
Data for each `source` will be fetched using the configured `:store`.
"""
@spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
:ok | {:error, [map]}
def upload(cluster, index_name, index_config, errors \\ [])
def upload(_cluster, _index_name, %{sources: []}, []), do: :ok
def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors}
def upload(
cluster,
index_name,
%{store: store, sources: [source | tail]} = index_config,
errors
)
when is_atom(store) do
config = Cluster.Config.get(cluster)
bulk_page_size = index_config[:bulk_page_size] || 5000
bulk_wait_interval = index_config[:bulk_wait_interval] || 0
action = index_config[:bulk_action] || "create"
errors =
store.transaction(fn ->
source
|> store.stream()
|> Stream.map(&encode!(config, &1, index_name, action))
|> Stream.chunk_every(bulk_page_size)
|> Stream.intersperse(bulk_wait_interval)
|> Stream.map(&put_bulk_page(config, index_name, &1))
|> Enum.reduce(errors, &collect_errors(&1, &2, action))
end)
upload(config, index_name, %{index_config | sources: tail}, errors)
end
defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do
Logger.debug("Pausing #{wait_interval}ms between bulk pages")
:timer.sleep(wait_interval)
end
defp put_bulk_page(config, index_name, items) when is_list(items) do
Elasticsearch.put(config, "/#{index_name}/_bulk", Enum.join(items))
end
defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do
new_errors =
response["items"]
|> Enum.filter(&(&1[action]["error"] != nil))
|> Enum.map(& &1[action])
|> Enum.map(&Elasticsearch.Exception.exception(response: &1))
new_errors ++ errors
end
defp collect_errors({:error, error}, errors, _action) do
[error | errors]
end
defp collect_errors(_response, errors, _action) do
errors
end
end
lib/h1bjobs/elasticsearch/models/job_listing.ex
defimpl Elasticsearch.Document, for: H1bjobs.JobListing do
def id(job_listing), do: job_listing.id
def routing(_), do: false
def encode(job_listing) do
%{
doc: %{
description: job_listing.description,
},
doc_as_upsert: true
}
end
end
And then I run it like this:
Elasticsearch.Index.BulkV2.upload(MyApp.ElasticsearchCluster, :jobs, %{store: MyApp.ElasticsearchStore, sources: [MyApp.JobListing], bulk_action: "update"}, [Exception])