sys_reading icon indicating copy to clipboard operation
sys_reading copied to clipboard

Sponge: Fast Reactive Scaling for Stream Processing with Serverless Frameworks

Open pentium3 opened this issue 1 year ago • 1 comments

https://www.usenix.org/conference/atc23/presentation/song

pentium3 avatar Jul 07 '23 20:07 pentium3

summary

key problem

workload

single streaming job with dynamic rate.

optimization goal

Latency

  • maintain low query latencies while keeping CPU utilization stable across all active cloud instances. [ch4.3]

configurations to tune

when to triggers offloading, how many SF instances to use, and how many events to offload. [ch4.3]

scenario

cloud. VM + Lambda

technique

xxxxx

dynamic workload?

yes

multi-tenant?

no. single job with multiple operators

implementation

Apache Nemo

  • (comment: why not Flink?)

Problem and motivation

what is the problem this paper is solving?
why is it important?
why is it challenging?

challenge: [ch3]

  • even if we can set up SF instances quickly, the task state migration overhead is inevitable with existing systems
  • Indirect data communication between SF instances

Main ideas and insights

describe the paper gist in 1-2 sentences
what is important to remember? What did we learn?

[ch1]

  • uses VMs for processing stable streaming loads for longer periods of time
  • quickly invoking SF instances and using them for short periods of time to handle bursty loads. hiding the launch overhead during the preparation of the new VM instances
  • If the bursty input loads persist, we may consider launching new VM instances to permanently offload the tasks with existing state migration techniques

Solution description

explain how the solution work

1. Redirect-and-merge by Compile-time Graph Rewriting [ch4.1, 4.2]

in order to rapidly forward increased input load to available resources in SF instances, adding 3 types of operators in advance when compiling the job. These 3 types of operators are only activated only upon offloading actions.

  • transient operators (TOs)
    • operator logic prepared on SF instances in advance, and can receive events immediately when needed
    • TOs are cloned stream operators with additional features to run on SF instances. create TO for all operators.
  • router operators (ROs)
    • forward data to downstream tasks running on VM or SF instances.
    • enable data communications between VM and SF
  • merge operators (MOs)
    • enable the system to later merge the corresponding states of offloaded workload created on SFs with the states on the original VMs for any stateful operators
    • insert an MO after each stateful operator for every edge, so that the partially aggregated states on the TOs can be merged back into the original states on VMs

comments:

  • how does RO achieve direct communications between SF?

2. Dynamic Offloading Policy [ch4.3]

decide when to triggers offloading, how many SF instances to use, and how many events to offload.

overview

  • [ch4.1]
  • suppose for task $p_i$, its target input rate is $r_i$, its maximum throughput (depends on worker CPU capacity, could be profiled in advance) is $m_i$.
  • latency spike often happens when $r_i>m_i$, which means we need to offload some input load.
  • Based on input rate and CPU usage observations, Sponge estimates the amount of CPU resources needed to increase operator throughput and meet our SLOs under increased input loads.
  • goal: relieving CPU pressure when $r_i>m_i$

details:

  • keep monitoring CPU utilization of each worker.
  • when spike happens and backlog accumulates, estimate backlog events in the queue between times $t_p$ and $t_{p+1}$. ①: $\int_{t_p}^{t_{p+1}} [r_i(t)-m_i(t)] dt$
  • in order to clear these backlogs within a time interval (deadline) $t_o$ to $t_{o+1}$, we want to add additional resources to achieve a higher throughput $m_{i_o}$. ②: $\int_{t_o}^{t_{o+1}} [m_{i_o}-r_i(t)] dt$
  • estimate $m_{i_o}$ to guarantee that $② \geq ①$
  • True proc rate of VM: if a task $p_i$ is running on a single VM, with CPU usage is $u_{CPU_i}^{VM}$ and observed task input rate $r_i$ , then the estimated throughput of this task when this VM is under 100% CPU utilization is $rpc_i^{VM} = \frac{r_i}{\frac{u_{cpu_i}^{VM}}{100}}$ (similar with calculating true proc rate in Flink)
  • True proc rate of SF: $rpc_i^{SF} = \rho * rpc_i^{VM}$ . where $\rho$ is estimated fraction of CPU capacity of SF over VM.
  • the num of SF cores to add $c = \frac{m_{i_o}}{0.7 * rpc_i^{SF}}$ .
    • The num of SF instances to add is $\frac{c}{k}$, where $k$ is the num of CPU cores per SF. here they assume $k=1$.
    • 0.7 means the goal is to maintain the CPU utilization range around 70% at any time. [ch4.3.1]
  • evenly redirects data or redistributes keys to the added $\frac{c}{k}$ SF instances.

comments the main idea makes sense, but still lack many details. eg

  • state migration cost when merging state of tasks on SF and VM? or we don't need state migration at all? what if task on SF need state of task on VM
  • when latency spike happens, we can detect it by checking backlog size, latency marker and backpressure of the source. but how to know the target input rate $r_i$ at this time for a production environment? (maybe it's a good idea to use backpressure/backlog size instead)
  • how to decide the deadline to clear backlog?

Important results

describe the experimental setup
summarize the main results

setup:

  • VM: r5.xlarge instances (32GB of memory and 4 vCores, 10Gbps network)
    • During the stable load, we run 5 VM workers. input rate keeps all 5 VM undergo CPU util 60%-80%
    • During spike, spawn up to 200 SF for Sponge and SFBase / 50 new VM for VMBase
  • SF: AWS Lambda instances. one vCPU, 1769MB memory, enough network bandwidth
  • workload:
    • query: Nexmark
    • burst traffic: 3 patterns in Fig9
  • baseline:
    • NoScaling: no scaling at all. static parallelism
    • VMBase: dynamic scaling only use VM. when scaling, creates new VMs and migrates tasks to new VM
    • SFBase: dynamic scaling only use SF. when scaling, creates new SFs and migrates tasks to new SF
    • VMInit: initializes new VMs in advance and migrates tasks to the new VMs for scaling
    • Over: over-provisions VMs and already has enough resources to cover input loads

results

  • Fig.10, 11, 12 : p99 tail latency and CPU utilization over time under spike query.
    • Sponge reduces the tail latency on average by 88% compared to VMBase and 70% compared to SFBase and performs comparably to Over. Sponge also keeps the CPU utilization relatively stable across time, as shown in Fig. 11.
    • VMBase: up to 44s extra latency to cold start a VM. accumulated too much backlog to process during cold start
    • SFBase: start-up time only few hundred ms. But for complex queries with N-to-N shuffle data communications, the performance gain of SFBase compared to VMBase declines, since the operators with shuffle edges cannot be redistributed to SFs and VMs become the bottleneck.
    • VMInit: no cold start and can offload N-to-N shuffle tasks, but still have task and state migration overheads, which leads to short latency spike
    • Over: best performance (but similar to Sponge), but much more cost!

Limitations and opportunities for improvement

when doesn't it work?
what assumptions does the paper make and when are they valid?

assumptions

  • the bottleneck of operators is mainly caused by CPU [ch4.1, 4.3]. So the throughput of a task is linearly related to the CPU capacity of the worker running this task.
  • No contention between different tasks co-locating on same VM. Each task use one core.
  • no data skew between tasks under the same operator
  • Bottlenecks often occur individually on VMs, so it is sufficient to mitigate them locally within each VM. [ch4.1]

Closely related work

list of main competitors and how they differ

Follow-up research ideas (Optional)

If you were to base your next research project on this paper, what would you do?
Propose concrete ways to achieve one or more of the following:

Build a better (faster, more efficient, more user-friendly...) system to solve the same problem
Solve a generalization of the problem
Address one of the work's limitations
Solve the same problem in a different context
Solve the problem in a much larger scale
Apply the paper's methods to a different (but similar) problem
Solve a new problem created by this work

some background information about Serverless Functions: [ch2.2]

  • todo: summarize from all serverless related papers

pentium3 avatar Mar 21 '24 04:03 pentium3