ex_aws_sqs
ex_aws_sqs copied to clipboard
Saxy.ParseError: unexpected byte "H", expected token: :lt
Environment
- Elixir & Erlang versions (elixir --version):
ELIXIR_VERSION=1.14.0
OTP_VERSION=24.3.4
ALPINE_VERSION=3.15.3
- ExAws version
mix deps |grep ex_aws
* ex_aws 2.4.0 (Hex package) (mix)
locked at 2.4.0 (ex_aws) 66dd0bac
* ex_aws_s3 2.3.3 (Hex package) (mix)
locked at 2.3.3 (ex_aws_s3) 0044f0b6
* ex_aws_sqs 3.3.1 (Hex package) (mix)
locked at 3.3.1 (ex_aws_sqs) 47d8fc29
- HTTP client version. IE for hackney do
mix deps | grep hackney
* hackney 1.18.1 (Hex package) (rebar3)
locked at 1.18.1 (hackney) a4ecdaff
Current behavior
Saxy.ParseError: unexpected byte "H", expected token: :lt File "lib/ex_aws/sqs/saxy_collector.ex", line 93, in ExAws.SQS.SaxyCollector.parse_string!/2 File "lib/ex_aws/sqs/saxy_parser.ex", line 261, in ExAws.SQS.SaxyParser.parse/2 File "lib/my_app/kafka_to_sqs_sink_connector.ex", line 46, in MyApp.KafkaToSqsSinkConnector.handle_message/2 File "/opt/app/deps/brod/src/brod_group_subscriber_worker.erl", line 79, in :brod_group_subscriber_worker.handle_message/3 File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 463, in :brod_topic_subscriber.handle_message_set/2 File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 299, in :brod_topic_subscriber.handle_info/2 File "gen_server.erl", line 695, in :gen_server.try_dispatch/4 File "gen_server.erl", line 771, in :gen_server.handle_msg/6
Expected behavior
We have the following module
defmodule MyApp.KafkaToSqsSinkConnector do
@behaviour :brod_group_subscriber_v2
@prefetch_count 5
def child_spec(args \\ []) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]},
type: :worker,
restart: :permanent,
shutdown: 5000
}
end
def start_link(args) do
args =
:my_app
|> Application.get_env(__MODULE__, [])
|> Keyword.get(:kafka, [])
|> Keyword.merge(Keyword.get(args, :kafka, []))
|> Keyword.update(
:consumer_config,
[prefetch_count: @prefetch_count],
&Keyword.put(&1, :prefetch_count, @prefetch_count)
)
|> Keyword.update!(:group_id, &MyApp.Kafka.consumer_group_id/1)
|> Keyword.put(:message_type, :message_set)
|> Keyword.put(:cb_module, __MODULE__)
|> Enum.into(%{})
:brod_group_subscriber_v2.start_link(args)
end
@impl :brod_group_subscriber_v2
def init(_group_id, _init_data) do
{:ok, []}
end
@impl :brod_group_subscriber_v2
def handle_message(message, state) do
{:kafka_message_set, _topic_name, _, _, messages} = message
message_batch = Enum.map(messages, &to_sqs_message_body/1)
case ExAws.request(ExAws.SQS.send_message_batch(queue_url, message_batch), ExAws.Config.new(:sqs)) do
{:ok, _} -> {:ok, :commit, state}
{:error, reason} -> raise_error(reason)
end
end
defp to_sqs_message_body({:kafka_message, _offset, _key, data, _action, _timestamp, _headers}) do
[id: :erlang.phash2(data), message_body: data]
end
defp raise_error(error_message) do
raise "#{__MODULE__} failed due to #{inspect(error_message)}"
end
defp queue_url do
config()
|> Keyword.fetch!(:sqs)
|> Keyword.fetch!(:queue_url)
end
defp config do
Application.get_env(:my_app, __MODULE__, [])
end
end
An SQS sink connector for Kafka.
We have moved some messages just fine so far, but we started getting the error.
I switch the parser
config :ex_aws_sqs, parser: ExAws.SQS.SweetXmlParser
So now I am getting
** (exit) {:fatal, {:expected_element_start_tag, {:file, :file_name_unknown}, {:line, 1}, {:col, 2}}}
File "xmerl_scan.erl", line 4127, in :xmerl_scan.fatal/2
File "xmerl_scan.erl", line 575, in :xmerl_scan.scan_document/2
File "xmerl_scan.erl", line 294, in :xmerl_scan.string/2
File "lib/sweet_xml.ex", line 292, in SweetXml.do_parse/2
File "lib/sweet_xml.ex", line 281, in SweetXml.parse/2
File "lib/ex_aws/sqs/sweet_xml_parser.ex", line 11, in ExAws.SQS.SweetXmlParser.parse/2
So something is broken with the client. Maybe something weird responding from the API. I can't tell.
The issue was
MyApp.KafkaToSqsSinkConnector failed due to {:http_error, 413, "HTTP content length exceeded 1662976 bytes."}\n
Which #30 will allow people to know what happened
Closed by: https://github.com/ex-aws/ex_aws_sqs/pull/31