ejabberd
ejabberd copied to clipboard
Optionally queue outgoing data
Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead.
The feature is supported by ejabberd_c2s
, ejabberd_s2s_out
, and ejabberd_service
. It can be enabled by configuring the maximum number of milliseconds to queue an element (default: 0
), and optionally the maximum number of elements to queue (default: 10
). This can be done by using the following new ejabberd_c2s
/ejabberd_service
listener options:
-
max_send_queue_size
-
max_send_queue_delay
For ejabberd_c2s
, the following global options can be specified instead:
-
c2s_max_send_queue_size
-
c2s_max_send_queue_delay
For ejabberd_s2s_out
, the following global options can be specified:
-
s2s_max_send_queue_size
-
s2s_max_send_queue_delay
This PR depends on processone/xmpp#63.
Coverage increased (+0.05%) to 33.66% when pulling abfb62f27ac410e3fd4e609c7a7738c32a2a7fdb on weiss:feature/send-queue into a89b1f332d279da3c134b074a797fda4b6818409 on processone:master.
We are running this in large-scale production now and it works well
@mzealey any experience you can share, eg. Values for those two settings ?
50ms / batch size 10 works just fine for us to reduce number of packets flowing. Slight increase in latency obviously but we only need near-realtime and would prefer to save on packets
50ms / batch size 10 works just fine for us to reduce number of packets flowing. Slight increase in latency obviously but we only need near-realtime and would prefer to save on packets
I have applied those changes https://github.com/processone/ejabberd/commit/abfb62f27ac410e3fd4e609c7a7738c32a2a7fdb
But there is not improvement the message distribution still slow on a group of 1000 members.. plz help to advise.. Thanks.
@iPPLE two ideas, maybe this is not the right approach for you, maybe look at mucsub or disable presence in those groups
But, you applied this PR and also used the corresponding xmpp lib PR https://github.com/processone/xmpp/pull/63 yes?
@iPPLE two ideas, maybe this is not the right approach for you, maybe look at mucsub or disable presence in those groups
But, you applied this PR and also used the corresponding xmpp lib PR processone/xmpp#63 yes?
I have this error logs :
2023-11-15 15:34:25.267498+07:00 [error] <0.8281.0> ** Generic server <0.8281.0> terminating ** Last message in was timeout ** When Server state == #{mgmt_state => inactive, lserver => <<"dev.abcd.com">>, mgmt_stanzas_req => 0,access => c2s, auth_module => ejabberd_oauth, stream_direction => in, stream_version => {1,0}, stream_id => <<"1374550741427560919">>, stream_queue_max => 10,csi_state => active, mgmt_stanzas_in => 0, csi_queue => {0,#{}}, stream_authenticated => true, stream_compressed => false,owner => <0.8281.0>, tls_options => [compression_none], codec_options => [ignore_els], stream_timeout => infinity,mgmt_timeout => 1000, socket_monitor => #Ref<0.1300238487.1738801153.57971>, pres_a => {0,nil}, stream_restarted => true,shaper => c2s_shaper, zlib => false, stream_queue => [{iq,<<"_bind_auth_2">>,result,<<>>,undefined, undefined, [{bind, {jid,<<"123456780">>,<<"dev.abcd.com">>, <<"0c4f0930-72fe-11ee-a8a0-15a5da1b1f4b">>, <<"123456780">>,<<"dev.abcd.com">>, <<"0c4f0930-72fe-11ee-a8a0-15a5da1b1f4b">>}, <<>>}], #{}}], resource => <<"0c4f0930-72fe-11ee-a8a0-15a5da1b1f4b">>, user => <<"123456780">>,tls_verify => false, server => <<"dev.abcd.com">>, xmlns => <<"jabber:client">>,mgmt_resend => false, tls_enabled => false,mgmt_max_queue => 5000, mod => ejabberd_c2s, sid => {{1700,37265,48776},<0.8281.0>}, mgmt_ack_timeout => 60000,tls_required => false, conn => websocket, ip => {{0,0,0,0,0,65535,49320,3014},57814}, jid => {jid,<<"123456780">>,<<"dev.abcd.com">>, <<"0c4f0930-72fe-11ee-a8a0-15a5da1b1f4b">>, <<"123456780">>,<<"dev.abcd.com">>, <<"0c4f0930-72fe-11ee-a8a0-15a5da1b1f4b">>}, stream_state => established, stream_encrypted => false, socket => {socket_state,ejabberd_http_ws, {http_ws,<0.8280.0>, {{0,0,0,0,0,65535,49320,3014},57814}}, 262144,undefined,none,none}, lang => <<"en">>, stream_queue_timeout => {50,-576460031879}, mgmt_queue_type => ram,mgmt_stanzas_out => 0, mgmt_max_timeout => 1000,stream_header_sent => true} ** Reason for termination == ** {not_implemented,[{xmpp_socket,send_elements,2, [{file,"src/xmpp_socket.erl"},{line,202}]}, {xmpp_stream_in,flush_queue,1, [{file,"src/xmpp_stream_in.erl"}, {line,1364}]}, {xmpp_stream_in,handle_info,2, [{file,"src/xmpp_stream_in.erl"}, {line,447}]}, {p1_server,handle_msg,8, [{file,"src/p1_server.erl"},{line,696}]}, {proc_lib,init_p_do_apply,3, [{file,"proc_lib.erl"},{line,226}]}]}
2023-11-15 15:34:25.271062+07:00 [error] <0.8281.0>@proc_lib:crash_report/4:525 CRASH REPORT: crasher: initial call: xmpp_stream_in:init/1 pid: <0.8281.0> registered_name: [] exception exit: {not_implemented, [{xmpp_socket,send_elements,2, [{file,"src/xmpp_socket.erl"},{line,202}]}, {xmpp_stream_in,flush_queue,1, [{file,"src/xmpp_stream_in.erl"},{line,1364}]}, {xmpp_stream_in,handle_info,2, [{file,"src/xmpp_stream_in.erl"},{line,447}]}, {p1_server,handle_msg,8, [{file,"src/p1_server.erl"},{line,696}]}, {proc_lib,init_p_do_apply,3, [{file,"proc_lib.erl"},{line,226}]}]} in function p1_server:terminate/7 (src/p1_server.erl, line 878) ancestors: [<0.8280.0>,<0.8279.0>,ejabberd_http_sup,ejabberd_sup, <0.116.0>] message_queue_len: 2 messages: [{tcp_closed, {http_ws,<0.8280.0>, {{0,0,0,0,0,65535,49320,3014},57814}}}, {'DOWN',#Ref<0.1300238487.1738801153.57971>,process, <0.8280.0>,normal}] links: [] dictionary: [{'$internal_queue_len',0}, {rand_seed,{#{bits => 58,jump => #Fun<rand.3.47293030>, next => #Fun<rand.0.47293030>,type => exsss, uniform => #Fun<rand.1.47293030>, uniform_n => #Fun<rand.2.47293030>}, [17302275869646053|270078345859518411]}}, {already_terminated,true}] trap_exit: false status: running heap_size: 4185 stack_size: 28 reductions: 101593 neighbours:
@iPPLE two ideas, maybe this is not the right approach for you, maybe look at mucsub or disable presence in those groups
But, you applied this PR and also used the corresponding xmpp lib PR processone/xmpp#63 yes?
- Yes i'm using xmpp dependency from
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
- Anyhow i tested with only 2 members online, so presence broadcasting is not the case.. i feel like those code changes are not working at all.. but yes the code is there because i put a log to output the
c2s_max_send_queue_delay, s2s_max_send_queue_delay, c2s_max_send_queue_size, s2s_max_send_queue_size
I'm running ejabberd 21.01.7
cat rebar.config
%%%----------------------------------------------------------------------
%%%
%%% ejabberd, Copyright (C) 2002-2021 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
{deps, [
{gun, ".*", {git, "https://github.com/ninenines/gun", {tag, "1.3.3"}}},
{jsx, ".*", {git, "https://github.com/talentdeficit/jsx", {tag, "2.9.0"}}},
{base64url, ".*", {git, "https://github.com/dvv/base64url", {tag, "1.0.1"}}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.27"}}},
{eimp, ".*", {git, "https://github.com/processone/eimp", {tag, "1.0.19"}}},
{if_var_true, tools,
{ejabberd_po, ".*", {git, "https://github.com/processone/ejabberd-po", {branch, "main"}}}},
{if_var_true, elixir,
{elixir, ".*", {git, "https://github.com/elixir-lang/elixir", {tag, "v1.4.4"}}}},
{if_var_true, pam,
{epam, ".*", {git, "https://github.com/processone/epam", {tag, "1.0.10"}}}},
{if_var_true, redis,
{eredis, ".*", {git, "https://github.com/wooga/eredis", {tag, "v1.0.8"}}}},
{if_var_true, sip,
{esip, ".*", {git, "https://github.com/processone/esip", {tag, "1.0.41"}}}},
{if_var_true, zlib,
{ezlib, ".*", {git, "https://github.com/processone/ezlib", {tag, "1.0.9"}}}},
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.1.11"}}},
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.45"}}},
{fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.30"}}},
{idna, ".*", {git, "https://github.com/benoitc/erlang-idna", {tag, "6.0.0"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "1.0.5"}}},
{jose, ".*", {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.9.0"}}},
{lager, ".*", {git, "https://github.com/erlang-lager/lager", {tag, "3.6.10"}}},
{if_var_true, tools,
{luerl, ".*", {git, "https://github.com/rvirding/luerl", {tag, "v0.3"}}}},
{mqtree, ".*", {git, "https://github.com/processone/mqtree", {tag, "1.0.12"}}},
{p1_acme, ".*", {git, "https://github.com/processone/p1_acme", {tag, "1.0.11"}}},
{if_var_true, mysql,
{p1_mysql, ".*", {git, "https://github.com/processone/p1_mysql", {tag, "1.0.17"}}}},
{p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.8"}}},
{if_var_true, pgsql,
{p1_pgsql, ".*", {git, "https://github.com/processone/p1_pgsql", {tag, "1.1.10"}}}},
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.21"}}},
{pkix, ".*", {git, "https://github.com/processone/pkix", {tag, "1.0.7"}}},
{if_not_rebar3, %% Needed because modules are not fully migrated to new structure and mix
{if_var_true, elixir,
{rebar_elixir_plugin, ".*", {git, "https://github.com/processone/rebar_elixir_plugin", "0.1.0"}}}},
{if_var_true, sqlite,
{sqlite3, ".*", {git, "https://github.com/processone/erlang-sqlite3", {tag, "1.1.11"}}}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.24"}}},
{if_var_true, stun,
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.0.42"}}}},
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.10"}}}
]}.
{gitonly_deps, [elixir, luerl]}.
{if_var_true, latest_deps,
{floating_deps, [cache_tab,
eimp,
epam,
esip,
ezlib,
fast_tls,
fast_xml,
fast_yaml,
mqtree,
p1_acme,
p1_mysql,
p1_oauth2,
p1_pgsql,
p1_utils,
pkix,
sqlite3,
stringprep,
stun,
xmpp,
yconf]}}.
{erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl",
"src/gen_mod.erl", "src/mod_muc_room.erl",
"src/mod_push.erl", "src/xmpp_socket.erl"]}.
{erl_opts, [nowarn_deprecated_function,
{i, "include"},
{if_version_above, "20", {d, 'DEPRECATED_GET_STACKTRACE'}},
{if_version_below, "21", {d, 'USE_OLD_HTTP_URI'}},
{if_version_below, "22", {d, 'LAGER'}},
{if_version_below, "23", {d, 'USE_OLD_CRYPTO_HMAC'}},
{if_version_below, "23", {d, 'USE_OLD_PG2'}},
{if_var_match, db_type, mssql, {d, 'mssql'}},
{if_var_false, debug, no_debug_info},
{if_var_true, debug, debug_info},
{if_var_true, elixir, {d, 'ELIXIR_ENABLED'}},
{if_var_true, hipe, native},
{if_var_true, new_sql_schema, {d, 'NEW_SQL_SCHEMA'}},
{if_var_true, roster_gateway_workaround, {d, 'ROSTER_GATWAY_WORKAROUND'}},
{if_var_true, sip, {d, 'SIP'}},
{if_var_true, stun, {d, 'STUN'}},
{if_have_fun, {erl_error, format_exception, 6}, {d, 'HAVE_ERL_ERROR'}},
{if_rebar3, {extra_src_dirs, [sql]}},
{src_dirs, [src,
{if_var_true, tools, tools},
{if_var_true, elixir, include}]}]}.
{deps_erl_opts, [{if_var_true, hipe, native}]}.
{if_rebar3, {plugins, [rebar3_hex, {provider_asn1, "0.2.0"}]}}.
{if_rebar3, {project_plugins, [configure_deps]}}.
{if_not_rebar3, {plugins, [
deps_erl_opts, override_deps_versions2, override_opts, configure_deps,
{if_var_true, elixir, rebar_elixir_compiler},
{if_var_true, elixir, rebar_exunit}
]}}.
{if_rebar3, {if_var_true, elixir,
{project_app_dirs, [".", "elixir/lib"]}}}.
{if_not_rebar3, {if_var_true, elixir,
{lib_dirs, ["deps/elixir/lib"]}}}.
{if_var_true, elixir,
{src_dirs, ["include"]}}.
{sub_dirs, ["rel"]}.
{keep_build_info, true}.
{xref_warnings, false}.
{xref_checks, [deprecated_function_calls]}.
{xref_exclusions, [
"(\"gen_transport\":_/_)",
"(\"eprof\":_/_)",
{if_var_false, elixir, "(\"Elixir.*\":_/_)"},
{if_var_false, http, "(\"lhttpc\":_/_)"},
{if_var_false, mysql, "(\".*mysql.*\":_/_)"},
{if_var_false, odbc, "(\"odbc\":_/_)"},
{if_var_false, pam, "(\"epam\":_/_)"},
{if_var_false, pgsql, "(\".*pgsql.*\":_/_)"},
{if_var_false, redis, "(\"eredis\":_/_)"},
{if_var_false, sqlite, "(\"sqlite3\":_/_)"},
{if_var_false, zlib, "(\"ezlib\":_/_)"}]}.
{eunit_compile_opts, [{i, "tools"},
{i, "include"}]}.
{cover_enabled, true}.
{cover_export_enabled, true}.
{recursive_cmds, ['configure-deps']}.
{overrides, [
{del, [{erl_opts, [warnings_as_errors]}]}]}.
{post_hook_configure, [{"eimp", []},
{if_var_true, pam, {"epam", []}},
{if_var_true, sip, {"esip", []}},
{if_var_true, zlib, {"ezlib", []}},
{"fast_tls", []},
{"fast_xml", [{if_var_true, full_xml, "--enable-full-xml"}]},
{"fast_yaml", []},
{"stringprep", []}]}.
%% Local Variables:
%% mode: erlang
%% End:
%% vim: set filetype=erlang tabstop=8:
cat deps/xmpp/src/xmpp.app.src
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <[email protected]>
%%% @doc
%%%
%%% @end
%%% Created : 18 Nov 2016 by Evgeny Khramtsov <[email protected]>
%%%
%%%
%%% Copyright (C) 2002-2022 ProcessOne, SARL. All Rights Reserved.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
%%% You may obtain a copy of the License at
%%%
%%% http://www.apache.org/licenses/LICENSE-2.0
%%%
%%% Unless required by applicable law or agreed to in writing, software
%%% distributed under the License is distributed on an "AS IS" BASIS,
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% See the License for the specific language governing permissions and
%%% limitations under the License.
%%%
%%%-------------------------------------------------------------------
{application, xmpp,
[{description, "Erlang/Elixir XMPP parsing and serialization library"},
{vsn, "1.5.8"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, ezlib, fast_tls, fast_xml, idna, p1_utils, stringprep]},
{mod, {xmpp, []}},
{env, []},
%% hex.pm packaging:
{files, ["src/", "specs/", "asn1/", "include/", "c_src/jid.c", "c_src/xmpp_uri.c", "c_src/xmpp_lang.c", "rebar.config", "rebar.config.script", "README.md", "LICENSE.txt"]},
{exclude_files, ["src/XmppAddr.erl", "src/XmppAddr.asn1db", "include/XmppAddr.hrl"]},
{licenses, ["Apache 2.0"]},
{links, [{"Github", "https://github.com/processone/xmpp"}]}]}.
%% Local Variables:
%% mode: erlang
%% End:
%% vim: set filetype=erlang tabstop=8:
@weiss: Have you seen comments from @iPPLE?
@weiss: Have you seen comments from @iPPLE?
Yes. As I've submitted this PR, I'm notified on comments. Actually, I've enabled all notifications for the ejabberd repository, so there's no point in pinging me.
I didn't respond to @iPPLE since @licaon-kter already mentioned that "this is not the right approach" for solving the issue of "message distribution" being "slow" in groupchats. I'd rather not spam the discussion of this PR with unrelated topics and would therefore suggest @iPPLE to back out this patch and open a new issue to describe the actual problem.
As for the not_implemented
error, seems @iPPLE is using WebSocket access and this patch doesn't handle that yet (I'll convert it to a draft PR).
@weiss: Have you seen comments from @iPPLE?
Yes. As I've submitted this PR, I'm notified on comments. Actually, I've enabled all notifications for the ejabberd repository, so there's no point in pinging me.
I didn't respond to @iPPLE since @licaon-kter already mentioned that "this is not the right approach" for solving the issue of "message distribution" being "slow" in groupchats. I'd rather not spam the discussion of this PR with unrelated topics and would therefore suggest @iPPLE to back out this patch and open a new issue to describe the actual problem.
As for the
not_implemented
error, seems @iPPLE is using WebSocket access and this patch doesn't handle that yet (I'll convert it to a draft PR).Yes you are right.. we have some clients are using WebSocket.
weiss & Neustradamus Thanks for the reply.. i thought this PR is somehow helping to improve the group message distributions of 1k members as mzealey claimed that it is working.. but actually it is not working for me.. i've been wondering if something i have missed in that PR... Because it seems the messages are not batched into one for that configuration...
Anyway is there any approach for that big group messages distribution ? is it improved or fixed in the latest version of ejabberd ? because this PR is not included in the official latest version of ejabberd.
Thanks.
@weiss: Have you seen comments from @iPPLE?
Yes. As I've submitted this PR, I'm notified on comments. Actually, I've enabled all notifications for the ejabberd repository, so there's no point in pinging me.
I didn't respond to @iPPLE since @licaon-kter already mentioned that "this is not the right approach" for solving the issue of "message distribution" being "slow" in groupchats. I'd rather not spam the discussion of this PR with unrelated topics and would therefore suggest @iPPLE to back out this patch and open a new issue to describe the actual problem.
As for the
not_implemented
error, seems @iPPLE is using WebSocket access and this patch doesn't handle that yet (I'll convert it to a draft PR).
@weiss plz keep me posted when the patch will be done for WebSocket..
Thanks.
@iPPLE
keep me posted when the patch will be done for WebSocket
Actually, I didn't plan to support WebSocket (as it's not super-trivial and I don't need it myself). The new options are documented to work for c2s
, s2s
, and ejabberd_service
components. If they're specified for a WebSocket/BOSH listener, ejabberd would log an error message and abort startup. I guess you're using the global c2s_max_send_queue_delay
option instead, and this does affect WebSocket (and then yield the not_implemented
errors you've seen). This was unintended, and the thing I'm going to fix.
I might have a quick look into supporting WebSocket after all, but as I said above, this patch wouldn't help with the issues you mentioned.
@iPPLE
keep me posted when the patch will be done for WebSocket
Actually, I didn't plan to support WebSocket (as it's not super-trivial and I don't need it myself). The new options are documented to work for
c2s
,s2s
, andejabberd_service
components. If they're specified for a WebSocket/BOSH listener, ejabberd would log an error message and abort startup. I guess you're using the globalc2s_max_send_queue_delay
option instead, and this does affect WebSocket (and then yield thenot_implemented
errors you've seen). This was unintended, and the thing I'm going to fix.I might have a quick look into supporting WebSocket after all, but as I said above, this patch wouldn't help with the issues you mentioned.
- Thank you @weiss for your kindness to reply me.. actually i'm using these configurations:
c2s_max_send_queue_delay: 50
c2s_max_send_queue_size: 10
s2s_max_send_queue_delay: 50
s2s_max_send_queue_size: 10
mod_muc:
access:
- allow
access_admin:
- allow: admin
access_create: muc_create
access_persistent: muc_create
access_mam:
- allow
default_room_options:
allow_subscription: true
allow_change_subj: true
allow_private_messages: true
allow_query_users: true
allow_user_invites: true
allow_visitor_nickchange: true
anonymous: false
members_by_default: true
members_only: true
public: false
mam: true
max_users: 1000
persistent: false
After all i don't find any improvement on the message distribution, I'm trying to check the code to see where the messages are batched but it seems everything is ended up here
mod_muc_room.erl
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
-spec route(pid(), stanza()) -> ok.
route(Pid, Packet) ->
?DEBUG("Routing to MUC room ~p:~n~ts", [Pid, xmpp:pp(Packet)]),
#jid{lresource = Nick} = xmpp:get_to(Packet),
p1_fsm:send_event(Pid, {route, Nick, Packet}).
p1_fsm.erl
send_event(Name, Event) ->
Name ! {'$gen_event', Event},
ok.
and the server still route the message one by one i guess.
i'm wondering what scenario that the https://github.com/processone/xmpp/pull/63 is for ?
- Secondly the error of
not_implemented
only happen when web clients trying to authenticate i've realized that if i disabled the 2 options the web can login and right after that i enable the 2 options and reload ejabberd again then the web client still can send and receive messages..
My scenario is that i managed to have a room of 1000 members
logged in and after that only 2 members
are online (showed presence to the room to receive the online messages) the rest are offline
NOT all member are joining the room at the same time while the 2 members are sending message.
Please kindly help to explain and advice on the 2 points. Best Regards.