ex_aws_sqs icon indicating copy to clipboard operation
ex_aws_sqs copied to clipboard

Saxy.ParseError: unexpected byte "H", expected token: :lt

Open yordis opened this issue 3 years ago • 2 comments

Environment

  • Elixir & Erlang versions (elixir --version):
ELIXIR_VERSION=1.14.0
OTP_VERSION=24.3.4
ALPINE_VERSION=3.15.3
  • ExAws version mix deps |grep ex_aws
* ex_aws 2.4.0 (Hex package) (mix)
  locked at 2.4.0 (ex_aws) 66dd0bac
* ex_aws_s3 2.3.3 (Hex package) (mix)
  locked at 2.3.3 (ex_aws_s3) 0044f0b6
* ex_aws_sqs 3.3.1 (Hex package) (mix)
  locked at 3.3.1 (ex_aws_sqs) 47d8fc29
  • HTTP client version. IE for hackney do mix deps | grep hackney
* hackney 1.18.1 (Hex package) (rebar3)
  locked at 1.18.1 (hackney) a4ecdaff

Current behavior

Saxy.ParseError: unexpected byte "H", expected token: :lt File "lib/ex_aws/sqs/saxy_collector.ex", line 93, in ExAws.SQS.SaxyCollector.parse_string!/2 File "lib/ex_aws/sqs/saxy_parser.ex", line 261, in ExAws.SQS.SaxyParser.parse/2 File "lib/my_app/kafka_to_sqs_sink_connector.ex", line 46, in MyApp.KafkaToSqsSinkConnector.handle_message/2 File "/opt/app/deps/brod/src/brod_group_subscriber_worker.erl", line 79, in :brod_group_subscriber_worker.handle_message/3 File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 463, in :brod_topic_subscriber.handle_message_set/2 File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 299, in :brod_topic_subscriber.handle_info/2 File "gen_server.erl", line 695, in :gen_server.try_dispatch/4 File "gen_server.erl", line 771, in :gen_server.handle_msg/6

Expected behavior

We have the following module

defmodule MyApp.KafkaToSqsSinkConnector do
  @behaviour :brod_group_subscriber_v2

  @prefetch_count 5

  def child_spec(args \\ []) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [args]},
      type: :worker,
      restart: :permanent,
      shutdown: 5000
    }
  end

  def start_link(args) do
    args =
      :my_app
      |> Application.get_env(__MODULE__, [])
      |> Keyword.get(:kafka, [])
      |> Keyword.merge(Keyword.get(args, :kafka, []))
      |> Keyword.update(
        :consumer_config,
        [prefetch_count: @prefetch_count],
        &Keyword.put(&1, :prefetch_count, @prefetch_count)
      )
      |> Keyword.update!(:group_id, &MyApp.Kafka.consumer_group_id/1)
      |> Keyword.put(:message_type, :message_set)
      |> Keyword.put(:cb_module, __MODULE__)
      |> Enum.into(%{})

    :brod_group_subscriber_v2.start_link(args)
  end

  @impl :brod_group_subscriber_v2
  def init(_group_id, _init_data) do
    {:ok, []}
  end

  @impl :brod_group_subscriber_v2
  def handle_message(message, state) do
    {:kafka_message_set, _topic_name, _, _, messages} = message

    message_batch = Enum.map(messages, &to_sqs_message_body/1)
    
    case ExAws.request(ExAws.SQS.send_message_batch(queue_url, message_batch), ExAws.Config.new(:sqs)) do
      {:ok, _} -> {:ok, :commit, state}
      {:error, reason} -> raise_error(reason)
    end
  end

  defp to_sqs_message_body({:kafka_message, _offset, _key, data, _action, _timestamp, _headers}) do
    [id: :erlang.phash2(data), message_body: data]
  end

  defp raise_error(error_message) do
    raise "#{__MODULE__} failed due to #{inspect(error_message)}"
  end

  defp queue_url do
    config()
    |> Keyword.fetch!(:sqs)
    |> Keyword.fetch!(:queue_url)
  end

  defp config do
    Application.get_env(:my_app, __MODULE__, [])
  end
end

An SQS sink connector for Kafka.

We have moved some messages just fine so far, but we started getting the error.

yordis avatar Nov 24 '22 22:11 yordis

I switch the parser

config :ex_aws_sqs, parser: ExAws.SQS.SweetXmlParser

So now I am getting

** (exit) {:fatal, {:expected_element_start_tag, {:file, :file_name_unknown}, {:line, 1}, {:col, 2}}}
  File "xmerl_scan.erl", line 4127, in :xmerl_scan.fatal/2
  File "xmerl_scan.erl", line 575, in :xmerl_scan.scan_document/2
  File "xmerl_scan.erl", line 294, in :xmerl_scan.string/2
  File "lib/sweet_xml.ex", line 292, in SweetXml.do_parse/2
  File "lib/sweet_xml.ex", line 281, in SweetXml.parse/2
  File "lib/ex_aws/sqs/sweet_xml_parser.ex", line 11, in ExAws.SQS.SweetXmlParser.parse/2

So something is broken with the client. Maybe something weird responding from the API. I can't tell.

yordis avatar Nov 24 '22 23:11 yordis

The issue was

MyApp.KafkaToSqsSinkConnector failed due to {:http_error, 413, "HTTP content length exceeded 1662976 bytes."}\n

Which #30 will allow people to know what happened

yordis avatar Nov 25 '22 01:11 yordis

Closed by: https://github.com/ex-aws/ex_aws_sqs/pull/31

TattdCodeMonkey avatar Jan 03 '23 22:01 TattdCodeMonkey