[DISCUSS]PIP-211: Introduce offload throttling
Motivation
Pulsar doesn't have offload throttling for now. With offload, pulsar could move backlog data from bookkeeper to some other long term storage which is cheaper(AWS S3, Aliyun OSS, GCS etc).
The offload task can have a chance to use up all the broker resources(CPU, network).
When offload tasks use too much broker resources, the latency of messaging may increase and it makes Pulsar broker unstable.
For the purpose of managing broker resources and improving broker stability, we would like to implement “Offload throttling” in the Pulsar brokers. This mechanism will allow the enforcement of an upper limit to the rate of Offload.
Goal
- Provide broker level offload limiting configurations.
- Provide a relatively accurate current limiting algorithm, to protect the broker.
API Changes
1. ServiceConfiguration
a. Long managedLedgerOffloadBrokerFlowPermits: The broker level flow permit per second
2. ManagedLedgerConfig
a. setManagedLedgerOffloadBrokerFlowPermits(long brokerFlowPermits);
b. Long getManagedLedgerOffloadBrokerFlowPermits();
Implementation
An offload task often started in ManagedLedgerImpl#offloadLoop(Args …) by calling:
prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
.thenCompose((ignore) -> getLedgerHandle(ledgerId))
// start to offload
.thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
.thenCompose((ignore) -> {
// ignore
})
.whenComplete((ignore, exception) -> {
// ignore
});
We can add a step here to let the Offloader use an decorated ReadHandle, in the decorated ReadHandle, it includes the limiting algorithm for read entries, and for the decorated ReadHandle, I named it OffloadReadHandle.
prepareLedgerInfoForOffloaded(ledgerId, uuid, driverName, driverMetadata)
.thenCompose((ignore) -> getLedgerHandle(ledgerId))
// decorate the ReadHandle to OffloadReadHandle
.thenCompose(readHandle -> OffloadReadHandle.create(readHandle, flowPermits, scheduler))
.thenCompose(readHandle -> config.getLedgerOffloader().offload(readHandle, uuid, extraMetadata))
.thenCompose((ignore) -> {
// ignore
})
.whenComplete((ignore, exception) -> {
// ignore
});
1. OffloadReadHandle
In the FileSystemManagedLedgerOffload and BlobStoreManagedLedgerOffloader, read entries by calling ReadHandle.readAsync(startEntry, endEntry).
For the purpose of implementing throttling, we can decorate ReadHandle and rewrite the readAsync(startEntry, endEntry) method. Before reading entries from the ledger, we need to check if the Ledger is restricted, submit a read entries task to a scheduler with delay milliseconds, if not, read entries from ReadHandle. After reading entries successfully, record the bytes of entries.
This method cannot strictly limit the read traffic, because we don’t know the size of entries before reading, but it provides a relatively accurate limit.
public class OffloadReadHandle implements ReadHandle {
// static fields, ignore
.....
// ignore some fields
.....
private final ReadHandle delegator;
private final OrderedScheduler scheduler;
public OffloadReadHandle(ReadHandle handle, String ledgerName, ManagedLedgerConfig config,
OrderedScheduler scheduler) {
// init fields, ignore
.....
}
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
final long delayMills = calculateDelayMillis();
if (delayMills > 0) {
CompletableFuture<LedgerEntries> f = new CompletableFuture<>();
Runnable cmd = new ReadAsyncCommand(firstEntry, lastEntry, f);
scheduler.schedule(cmd, delayMills, TimeUnit.MILLISECONDS);
return f;
}
return this.delegator
.readAsync(firstEntry, lastEntry)
.whenComplete((v, t) -> {
if (t == null) {
recordReadBytes(v);
}
});
}
//ignore unimportant methods
.....
// calculate the millis need to delay
private long calculateDelayMillis() {
// ignore
....
return 0;
}
// record entries size after reading
private void recordReadBytes(LedgerEntries entries) {
// ignore
.....
}
private final class ReadAsyncCommand implements Runnable {
private final long firstEntry;
private final long lastEntry;
private final CompletableFuture<LedgerEntries> f;
ReadAsyncCommand(long firstEntry, long lastEntry, CompletableFuture<LedgerEntries> f) {
// init fields
....
}
@Override
public void run() {
// if it still needs to wait, submit the next task.
long delayMillis = calculateDelayMillis();
if (delayMillis > 0) {
scheduler.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
return;
}
delegator.readAsync(firstEntry, lastEntry)
.whenComplete((entries, e) -> {
if (e != null) {
f.completeExceptionally(e);
} else {
f.complete(entries);
recordReadBytes(entries);
}
});
}
}
}
This class is based on a SlidingWindow, if the ledger is restricted, will submit the read task to the next window.
2. Metrics
a. Existing metrics
| Name | Type | Description |
|---|---|---|
| brk_ledgeroffloader_offload_error | Counter | The number of failed operations to offload. |
| brk_ledgeroffloader_offload_rate | Gauge | The rate of offloading(byte per second). |
| brk_ledgeroffloader_read_offload_error | Counter | The number of failed operations to read offload ledgers. |
| brk_ledgeroffloader_read_offload_rate | Gauge | The rate of reading entries from offload ledgers(byte per second). |
| brk_ledgeroffloader_write_storage_error | Counter | The number of failed operations to write to storage. |
| brk_ledgeroffloader_read_offload_index_latency | Summary | The latency of reading index from offload ledgers. |
| brk_ledgeroffloader_read_offload_data_latency | Summary | The latency of reading data from offload ledgers. |
| brk_ledgeroffloader_read_ledger_latency | Summary | The latency of reading entries from BookKeeper. |
| brk_ledgeroffloader_delete_offload_ops | Counter | The total number of successful and failed operations to delete offload ledgers. |
b. New Metrics
| Name | Type | Description |
|---|---|---|
| brk_ledgeroffloader_ledger_offload_limited | Counter | The number of times the ledger reading is restricted. |
| brk_ledgeroffloader_ledger_offload_time | Counter | The total time to read all the entries from the ledger. |
| brk_ledgeroffloader_ledger_reading_max_bytes | Gauge | The max number of bytes reading entries in the window. |
Alternatives
Anything else?
Google doc link: https://docs.google.com/document/d/1HiVyhFpkifODUXEpZF0UYPJGeXtQpI9tbeie_IkDXqk/edit#