lemmy icon indicating copy to clipboard operation
lemmy copied to clipboard

Persistent, performant, reliable federation queue

Open phiresky opened this issue 2 years ago • 5 comments

This PR implements a new outgoing federation queue. The end goal is to create a queue that can scale to reddit scale (that is 100-1000 activities per second, sent to each federated instance).

The queue works as follows:

  • The main lemmy_server process with its send_lemmy_activity function only stores the activity in the db (like currently), with an addition of the send targets. https://github.com/LemmyNet/lemmy/pull/3583 would be helpful to reduce overhead.

  • There is a new table federation_queue_state (domain, last_successful_id, retries) that tracks the state of outgoing federation per instance

  • One or more lemmy_federate processes pick up the activities from the db and send them out. lemmy_federate works as follows:

    1. All known allow/non-blocklisted instances are read from the database every 60s

    2. A worker tokio task is started / stopped per federated instance. It does the following

      1. It fetches a list of local communities that at least one person on the remote instance is subscribe to (refreshed every 10-60 seconds)
      2. It loops through the activities table starting from last_successful_id up until the max(id). For every activity:
        1. It checks if this activity should be sent to this instance using (a) the list of communities from above and (b) a list of other inboxes stored in the activities tab
        2. It sends out the activity to the correct shared or individual inbox. If there's a retryable failure, it waits with exponential back off and tries again. I had to expose a raw non-retrying send_activity function from the activitypub-federation crate since the state of the retry has to be in the db. The activity_queue from there is not used at all.
      3. The updated federation_queue_state is stored to the database.
    3. A separate task logs the current progress of each domain once per minute. Example output:

       2023-07-12T21:43:15.642275Z  INFO lemmy_federate::worker: https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74 to https://toad.work/inbox failed: Queueing activity for retry after failure with status 502 Bad Gateway: <html><head><title>502 Bad Gateway</title></head>...
      2023-07-12T21:43:15.016121Z  INFO lemmy_federate: Federation state as of 2023-07-12T21:43:15+00:00:
      2023-07-12T21:43:15.024684Z  INFO lemmy_federate: lemmy.serverfail.party: Ok. 6 behind
      
      2023-07-12T21:43:15.025516Z  INFO lemmy_federate: toad.work: Warning. 3437 behind, 5 consecutive fails, current retry delay 320.00s
      2023-07-12T21:43:15.016220Z  INFO lemmy_federate: lemmy.byteunion.com: Ok. 0 behind
      2023-07-12T21:43:15.016228Z  INFO lemmy_federate: swiss-talk.net: Ok. 0 behind
      ...
      
    4. If a signal is received (ctrl+c, SIGINT, SIGTERM), all the workers are gracefully stopped, their most current state stored in the db.

This implementation has the following advantages:

  1. The queue is persistent and reliable - if any or all lemmy processes are stopped, the federation will continue without activity loss afterwards. If any process is killed or crashes, worst case is 100 activities or 10s of activity is resent. No losses.
  2. Low memory usage: The memory usage scales linearily with the number of instances but is constant for each instance and independent of the number of activities per second. It is also mostly uncoupled from the reliability of the other servers. No more unbounded memory use.
  3. The queue is performant - Each domain has a separate serialized queue, which means there is only ever a single request waiting for a server response per federated instance. Performance of each instance doesn't affect the others.
  4. Horizontally scalable - this outgoing federation can run on multiple separate processes (split by outgoing domain) or multiple servers - they just need access to the same PG database.
  5. No more DOSing other instances.
  6. If a remote instance goes down, activities will be replayed reliably and in order from the time it goes down.
  7. It's easier to find out what the state of federation is and where bottlenecks are.

It has the following disadvantages:

  1. It assumes that every inbox in the same instance/domain has the same reliability. If the instance fails to respond to one activity, it will not receive any other activities either. (failure is decided the same as before, http 2xx and 4xx count as success, everything else as retryable). This is not relevant to lemmy instances since they use shared inbox for everything.
  2. It's optimized for large instances and has more overhead if there's very little activity.
    • Also I inverted the logic for figuring out which remote instance care about which local community. Instead of getting the remote inboxes for one activity every remote inbox has a set of communities it cares about. The reason is that this exact thing is the most expensive federation activity type so optimizing it is important. The inversion optimizes for most instances caring about many communities. This is not intrinsic to the general approach though and could be changed.
  3. There's a time delay of up to 10 seconds for outgoing activities and follower changes and up to 60 seconds for instance blacklist changes.
  4. Right now, every http request waits for the response to arrive before the next one is sent (per instance). this could be changed by adding a FuturesOrdered or similar limited to N=e.g. 10 concurrent in-flight requests and adding more complicated retry / backoff logic but right now I don't think it's necessary.
  5. There's overhead in the way I load every activity for every remote instance. Caching is used to make sure this doesn't really cause redundant DB queries, just CPU work. This is a tradeoff to allow for different remote instances to be in different positions in time.

The approach of one worker per remote instance should scale to reddit scale imo (~100 - 1000 activities per second). The details will of course need tweaking.

I've tested this so far only with my own very low activity instance and the basics work as expected.


Here's an example of how the federation_queue_state table looks:

domain last_successful_id fail_count last_retry
toad.work 6832351 14 2023-07-12 21:42:22.642379+00
lemmy.deltaa.xyz 6837196 0 1970-01-01 00:00:00+00
battleangels.net 6837196 0 1970-01-01 00:00:00+00
social.fbxl.net 6837196 0 1970-01-01 00:00:00+00
mastodon.coloradocrest.net 6837196 0 1970-01-01 00:00:00+00

And here's an example of how the activity table looks (for sendable activities):

id            | 6817007
data          | {"cc": ["https://lemmy.phiresky.xyz/c/localtest/followers"], "id": "https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74", "to": ["https://www.w3.org/ns/activitystreams#Public"], "type": "Announce", ...
local         | t
published     | 2023-07-12 21:36:31.749541
updated       |
ap_id         | https://lemmy.phiresky.xyz/activities/announce/c06d2a17-4df9-43d3-b142-7a596b310d74
sensitive     | f
send_targets  | {"persons": [], "all_instances": false, "extra_inboxes": [], "community_followers_of": [54]}
actor_type    | community
actor_apub_id | https://lemmy.phiresky.xyz/c/localtest

phiresky avatar Jul 12 '23 23:07 phiresky

The general approach looks very good. Among other things it also gives us the per-instance failure limit which I tried to implement (https://github.com/LemmyNet/activitypub-federation-rust/pull/60). The main question for me is why you decided to implement this in Lemmy, and not in the federation library. It seems to make more sense to keep it encapsulated there, and only add a trait so that Lemmy can handle db storage.

The federate crate looks like it needs to run as an entirely separate process. Thats too complicated especially for small instances. Better run this logic from the lemmy_server crate, and provide a command line option to enable/disable activity sending.

cc @cetra3

Nutomic avatar Jul 14 '23 15:07 Nutomic

The main question for me is why you decided to implement this in Lemmy, and not in the federation library

Mainly for simplicity. I don't think the activitypub-federation crate should depend on postgresql specifically, and the way I fetch updates for instances and for communities is kind of tightly coupled with how lemmy stores that in the database. I can try to see how the trait would look if most of this code was in the ap crate, but it might be pretty hard to make it generic enough to actually work for other use cases (like @colatkinson wants to build here)

The federate crate looks like it needs to run as an entirely separate process. Thats too complicated especially for small instances. Better run this logic from the lemmy_server crate, and provide a command line option to enable/disable activity sending.

It would of course be possible to allow this to be run from the main process but I kind of disagree: Most admins, especially small instance admins, do the absolute minimum effort to set up an instance and then are confused if it runs badly. They don't understand postgresql tuning, configuration changes etc. Also I've seen multiple people use a script like lemmony to subscribe to every single community in existence and then are confused why their 2GB RAM server can't handle the federation traffic (I know that's just incoming traffic but still).

Also, the way it's implemented is "optimized" for running in a separate tokio pool.. Similar to the issues we have with the existing in-memory queue, this code spawns instance-count tokio tasks (e.g. 1000) which i think will dominate scheduling against the 1-100 max other / api query tasks against the if the process is at load. tokio doesn't have any task prioritization so I don't know how this could be prevented.

So IMO the default setup should be a very good and performant one - because admins don't understand or want to bother with optional tweaks. Lemmy already needs two processes (ui and server), I don't think adding a third one would increase the effort much? It can be in the same container as the other one by default.

If you're adamant about this I can make it integratable into the main process but it should definitely be an option to have it separate, one that all instance admins that federate with 100+ other instances should take.

phiresky avatar Jul 14 '23 21:07 phiresky

@phiresky I think it would be good if we had the option here. It wouldn't be too hard to spawn some sort of background tokio task in the embedded case and then have a thin wrapper in the "separate" process case.

cetra3 avatar Jul 15 '23 01:07 cetra3

Ah, another reason is that I add signal handlers for clean shutdown, since that can take multiple seconds (up to the http timeout). So if it was in the same process that would conflict with the actix shutdown handlers (no idea if it's possible to merge those) and also cause more downtime when updating / restarting processes.

phiresky avatar Jul 15 '23 01:07 phiresky

@phiresky You can't really inject your own listener for the shutdown in actix web, however you can do the reverse: signal the actix http server to shut down in your own custom signal handler.

The way you do this is roughly:

  • Disable the signals so it won't override tokio's signal handlers: https://docs.rs/actix-web/4.3.1/actix_web/struct.HttpServer.html#method.disable_signals
  • Grab a handle to the server: https://docs.rs/actix-web/4.3.1/actix_web/dev/struct.Server.html#method.handle
  • Call the stop method on the handle: https://docs.rs/actix-web/4.3.1/actix_web/dev/struct.ServerHandle.html#method.stop

cetra3 avatar Jul 15 '23 11:07 cetra3

One option might be to have the Lemmy process spawn the activity sender as a child process by default. Then its a separate process with separate tokio, but doesnt require any changes from instance admins. And both binaries can be included in the same dockerfile.

I suppose having the queue in Lemmy is fine, but I dont want to maintain two different queues. So if we use this approach, then I would get rid of the queue in the federation library, and only provide a simple method for sign + send on the current thread (no workers nor retry). Then that logic doesnt have to be reimplemented here. Later the queue could still be upstreamed.

Nutomic avatar Jul 17 '23 11:07 Nutomic

Real efficiency/performance would require federation to be done using an efficient binary format like speedy.

But I understand why diverging from apub, even if only optionally, and only with Lemmy-Lemmy instance communication, is something the project may not want to support.

AppleSheeple avatar Jul 21 '23 13:07 AppleSheeple

@AppleSheeple honestly JSON serialization is a tiny sliver of the perf issues that relate to apub comms.

The biggest contributing factor when I've benchmarked this before is HTTP signatures. It takes up about 70% of the processing time

cetra3 avatar Jul 22 '23 02:07 cetra3

@cetra3

A small difference becomes bigger at scale. And the small difference here covers all three of size, memory, and processing power needed.

And more relevantly, if the project were to be open to non-apub Lemmy-to-Lemmy federation, then the sky would be the limit. You can do batching however you want. You can even turn the architecture into a fully pull-based one. You can...

Creating a queue that can truly scale to reddit scale while sticking to apub is an unobtainable goal, was my point. The message format was just the easiest clear source of inefficiency to point out.


Lemmy can obviously still do much better than the status quo. And @phiresky's efforts are greatly appreciated, not that they need to hear it from a random githubber.

AppleSheeple avatar Jul 22 '23 09:07 AppleSheeple

Updated:

  • rebased on latest main branch (specifically the sent_activity split from #3583 )

  • added dead instance detection from #3427 to the federation worker start / stop logic

  • added logs to show how many instances it's federating to (e.g.:

    INFO lemmy_federate: Federating to 451/490 instances (34 dead, 5 disallowed)

  • Expose a start_stop_federation_workers_cancellable() function from the lemmy_federate crate that could be called from lemmy_server to spawn the federation workers in the same process and tokio runtime as the lemmy server.

  • Added cmd-args parsing to allow starting multiple processes (e.g. on different servers altogether)

phiresky avatar Jul 23 '23 22:07 phiresky

I've made the following changes

  • restructured worker.rs into a struct with member methods, no more nested loops
  • other nutomic comments
  • removed the lemmy_federate entry point. instead, it's now started through the main lemmy_server binary.

Example configurations:

  • basic: as before, one lemmy_server process, no args

  • separate fed process:

    lemmy_server --http-server=true --send-activities=false # http server
    lemmy_server --http-server=false --send-activities=true --disable-scheduled-tasks # federation server
    
  • horizontally scaled

    lemmy_server --http-server=false --send-activities=false # scheduled tasks
    lemmy_server --http-server=true --send-activities=false --disable-scheduled-task # http server 1
    lemmy_server --http-server=true --send-activities=false --disable-scheduled-task # http server 2
    lemmy_server --http-server=true --send-activities=false --disable-scheduled-task # http server 3
    
    lemmy_server --http-server=false --send-activities=true --activity-worker-index=1 --activity-worker-count=1 --disable-scheduled-tasks # federation server 1/3
    lemmy_server --http-server=false --send-activities=true --activity-worker-index=1 --activity-worker-count=2 --disable-scheduled-tasks # federation server 2/3
    lemmy_server --http-server=false --send-activities=true --activity-worker-index=1 --activity-worker-count=3 --disable-scheduled-tasks # federation server 3/3
    

phiresky avatar Aug 03 '23 14:08 phiresky

needs another cargo +nightly fmt

dessalines avatar Aug 04 '23 15:08 dessalines

Needs another sql and cargo fmt / run of lint.sh

dessalines avatar Aug 22 '23 13:08 dessalines

I've updated this PR to use cleaner / simpler code changes to activitypub-federation https://github.com/LemmyNet/activitypub-federation-rust/pull/75

I've also fixed the CI.

phiresky avatar Aug 23 '23 22:08 phiresky

I've updated this to accommodate the timezone PR merged into master.

phiresky avatar Aug 26 '23 12:08 phiresky

More merge conflicts.

dessalines avatar Aug 29 '23 14:08 dessalines

fixed

phiresky avatar Aug 29 '23 14:08 phiresky

Sweet thx. @Nutomic should look at this before we merge.

dessalines avatar Aug 29 '23 14:08 dessalines

Have you tested if this new code for activity sending works? Easiest way would be with federation/docker setup (make sure that LEMMY_SYNCHRONOUS_FEDERATION is not set). Then send a few federated actions like follow, create post, vote etc to see if it goes through.

Nutomic avatar Sep 01 '23 10:09 Nutomic

Have you tested if this new code for activity sending works

I have tested the basic federation functionality manually and it did work. I did not test extensively whether due to the changes all specific actions are still sent to the right places.

The federation tests were failing again, I'll see if I can fix those (possibly due to asynchronicity)

The scalability is also hard to test.

phiresky avatar Sep 01 '23 12:09 phiresky

@Nutomic I don't think the ActivityChannel does much anymore with activity sending now doing nothing except storing a value in the database. Could you say what you think of this commit: https://github.com/LemmyNet/lemmy/pull/3605/commits/2767ab4a6fed9aa8d197eda0c6a25a1d617d192d ?

phiresky avatar Sep 01 '23 13:09 phiresky

The synchronous federation is needed for api tests, to ensure that api calls are fully completed by the same that asserts are executed. If we make api tests async, we need to add explicit sleep or wait calls in lots of places. So I think its much easier to keep the sync sending.

Nutomic avatar Sep 04 '23 09:09 Nutomic

The synchronous federation is needed for api tests, to ensure that api calls are fully completed by the same that asserts are executed. If we make api tests async, we need to add explicit sleep or wait calls in lots of places. So I think its much easier to keep the sync sending.

I already just added that yesterday: https://github.com/LemmyNet/lemmy/pull/3605/commits/77b8adba1dbac4ab07838a289dae3a27af7bfd17 The API tests are mostly succeeding now (need a few fixes). They take a bit longer though. But I think it makes sense to remove code that only exists for testing since you'll never get that behaviour (federation guaranteed to happen instantly) on production.

phiresky avatar Sep 04 '23 09:09 phiresky

True. If you can get it working consistently like that it would make for better testing.

Nutomic avatar Sep 04 '23 10:09 Nutomic

There was one more issue in the federation tests due to a race condition in the initialization (that i think only happens in testing) . I've fixed it.

I've still got to fix a few more federation tests.

phiresky avatar Sep 05 '23 23:09 phiresky

There's one more issue where if a user is deleted it is no longer found and the federation worker can error out. I'll fix that by both skipping internal errors when sending activities so the queue doesn't get stuck as well as restarting workers when they exit.

then I'll merge (when tests pass).

phiresky avatar Sep 09 '23 02:09 phiresky