refactor: piplineutil async mode
Analysis of current implementation issues
The isAsync method in PipelineUtil.java currently relies solely on the SERVICE_SHARE_ENABLE configuration key to determine the asynchronous mode. This implementation is overly simple and lacks flexibility.
Refactoring Plan
1. Create an Async Mode Strategy Interface
It is recommended to create an AsyncModeStrategy interface to abstract the asynchronous mode decision logic:
public interface AsyncModeStrategy {
boolean shouldUseAsyncMode(Configuration config);
String getStrategyName();
}
2. Implement Multiple Decision Strategies
- ConfigBasedStrategy: Configuration-based strategy (maintains compatibility with existing logic)
- ResourceBasedStrategy: Resource-based strategy
- WorkloadBasedStrategy: Workload-based strategy
- HybridStrategy: A strategy that combines multiple factors
3. Refactor the PipelineUtil Class
Refactor the isAsync method in PipelineUtil to:
public class PipelineUtil {
private static final Map<String, AsyncModeStrategy> strategies = new HashMap<>();
private static final String DEFAULT_STRATEGY = "config-based";
public static boolean isAsync(Configuration config) {
String strategyName = config.getString(FrameworkConfigKeys.ASYNC_MODE_STRATEGY, DEFAULT_STRATEGY);
AsyncModeStrategy strategy = strategies.getOrDefault(strategyName, new ConfigBasedStrategy());
return strategy.shouldUseAsyncMode(config);
}
}
4. Add new configuration key
Add related configuration keys in FrameworkConfigKeys.java:
public static final ConfigKey ASYNC_MODE_STRATEGY = ConfigKeys
.key("geaflow.pipeline.async.strategy")
.defaultValue("config-based") .description("Strategy for determining async mode");
5. Update PipelineClientFactory
Ensure PipelineClientFactory can correctly use the refactored asynchronous mode checking logic .
6. Enhance Validation Logic
Add configuration validation and exception handling:
- Verify configuration validity
- Provide clear error messages
- Support hot configuration updates
hi, can this be assigned to me?
hi, can this be assigned to me?
Hi @vamossagar12 , welcome to this question. I wrote some basic implementation a long time ago. You can refer to it.
@cbqiao Would you mind joining in the discussion on this issue?
@vamossagar12 As the initiator of this issue, I feel it's necessary to provide a design document for the workload strategy. Although this may not be very detailed, I hope it can provide you with a reference.
I. Architecture Design Overview
1.1 Overall Architecture
graph TB
A["WorkloadBasedStrategy"] --> B["WorkloadMetricsCollector"]
A --> C["ResourceAllocationDecider"]
A --> D["WorkloadThresholdConfig"]
B --> E["TaskMetricsAggregator"]
B --> F["ProcessMetricsCollector"]
B --> G["HistoricalMetricsStore"]
C --> H["ParallelismCalculator"]
C --> I["WorkerCountEstimator"]
E --> J["EventMetrics"]
F --> K["ProcessMetrics"]
G --> L["CycleMetrics History"]
1.2 Core Components
Based on Geaflow's existing architecture, we'll leverage existing components for metrics collection and resource management. 1
II. Detailed Development Plan
2.1 Phase 1: Metrics Collection System
2.1.1 Create WorkloadMetricsCollector
Implementation Steps:
-
Create
WorkloadMetricsCollector.java- Location:
geaflow-cluster/src/main/java/org/apache/geaflow/cluster/resourcemanager/workload/ - Responsibility: Aggregate process metrics from HeartbeatManager and task metrics from Scheduler
- Location:
-
Define WorkloadSnapshot Data Model
public class WorkloadSnapshot {
// Task-level metrics
private Map<Integer, TaskWorkloadMetrics> taskMetrics;
// Process-level metrics
private Map<String, ProcessMetrics> processMetrics;
// Operator-level metrics
private Map<Integer, OperatorWorkloadMetrics> operatorMetrics;
// Timestamp
private long timestamp;
}
- Integration Points:
- Integrate with existing scheduler metrics collection
- Report aggregated workload metrics to Master via RPC periodically
2.1.3 Historical Metrics Storage
Create HistoricalMetricsStore for storing historical execution data:
public class HistoricalMetricsStore {
// Use sliding window to store recent N cycles' metrics
private final Deque<CycleMetrics> recentCycles;
private final int maxHistorySize;
// Calculate historical averages, P99, P50, etc.
public WorkloadStatistics calculateStatistics();
}
2.2 Phase 2: Workload Analysis Engine
2.2.1 Create WorkloadAnalyzer
Location: geaflow-cluster/src/main/java/org/apache/geaflow/cluster/resourcemanager/workload/analyzer/
Core Functions:
-
DataVolumeAnalyzer
- Analyze based on
inputRecords,outputRecords,shuffleReadBytes,shuffleWriteBytes - Calculate data throughput and expansion rate for each operator
- Analyze based on
-
SkewDetector
- Analyze differences between
slowestTaskandavgExecuteTime - Identify data skew and hot tasks
- Analyze differences between
-
BottleneckIdentifier
- Analyze CPU, memory, GC time ratios
- Identify resource-constrained operators
2.3 Phase 3: Resource Allocation Decision Engine
2.3.1 Create ResourceAllocationDecider
Location: geaflow-cluster/src/main/java/org/apache/geaflow/cluster/resourcemanager/workload/decider/
Decision Algorithm:
public class ResourceAllocationDecider {
public AllocationDecision decide(WorkloadSnapshot snapshot,
ExecutionGraph executionGraph) {
// 1. Calculate resource requirements for each operator
Map<Integer, ResourceRequirement> requirements =
calculateOperatorRequirements(snapshot);
// 2. Estimate total parallelism requirement
int totalParallelism = estimateTotalParallelism(requirements);
// 3. Decide whether to use async mode
boolean useAsync = shouldUseAsyncMode(totalParallelism, snapshot);
return new AllocationDecision(useAsync, requirements);
}
}
Key Decision Metrics:
-
Throughput Threshold:
- If
totalInputRecords / duration > THRESHOLD, recommend async mode - If shuffle data volume is large (
shuffleWriteBytes > THRESHOLD), recommend async mode
- If
-
Parallelism Requirement:
// Estimate parallelism based on data volume int estimatedParallelism = (int) Math.ceil( totalDataVolume / TARGET_DATA_PER_TASK ); -
Resource Pressure Assessment:
- CPU utilization > 80%: increase parallelism
- Memory utilization > 70%: consider allocating more workers
- GC time > 20%: high memory pressure, adjust resources
2.4 Phase 4: Implement WorkloadBasedStrategy
2.4.1 Strategy Implementation
Location: geaflow-cluster/src/main/java/org/apache/geaflow/cluster/client/utils/strategy/
public class WorkloadBasedStrategy implements AsyncModeStrategy {
private final WorkloadMetricsCollector metricsCollector;
private final WorkloadAnalyzer analyzer;
private final ResourceAllocationDecider decider;
@Override
public boolean shouldUseAsyncMode(Configuration config) {
// 1. Get latest workload snapshot
WorkloadSnapshot snapshot = metricsCollector.getLatestSnapshot();
// 2. If first execution, use default strategy
if (snapshot == null || snapshot.isEmpty()) {
return config.getBoolean(FrameworkConfigKeys.SERVICE_SHARE_ENABLE);
}
// 3. Analyze workload characteristics
WorkloadCharacteristics characteristics = analyzer.analyze(snapshot);
// 4. Make decision based on characteristics
return decider.shouldUseAsync(characteristics, config);
}
}
2.4.2 Decision Logic
Based on comprehensive judgment across multiple dimensions:
-
Data Volume Dimension:
- Total input records > 10M records/cycle → async
- Shuffle data volume > 1GB/cycle → async
-
Parallelism Dimension:
- Required workers > available workers × 0.7 → async (resource constrained)
-
Execution Time Dimension:
- Average execution time > 30s/cycle → async
- Slowest task vs average time difference > 2x → skew exists, consider async
-
Resource Pressure Dimension:
- Process CPU > 80% or Memory > 70% → async
2.5 Phase 5: Integration with Existing Framework
2.5.2 Modify AbstractScheduledWorkerManager
Current resource allocation strategy can be enhanced to call WorkloadBasedStrategy before requestWorker() to get recommended resource quantities and dynamically adjust requestResourceNum based on workload analysis results.
2.5.3 Integration with ResourceManager
Start WorkloadMetricsCollector during resource initialization in the ResourceManager.
III. Configuration Parameter Design
3.1 New Configuration Keys
Add to FrameworkConfigKeys.java:
// Workload strategy configuration
public static final ConfigKey WORKLOAD_STRATEGY_ENABLE = ConfigKeys
.key("geaflow.workload.strategy.enable")
.defaultValue(false)
.description("Enable workload-based strategy");
public static final ConfigKey WORKLOAD_INPUT_RECORDS_THRESHOLD = ConfigKeys
.key("geaflow.workload.input.records.threshold")
.defaultValue(10000000L)
.description("Input records threshold for async mode");
public static final ConfigKey WORKLOAD_SHUFFLE_BYTES_THRESHOLD = ConfigKeys
.key("geaflow.workload.shuffle.bytes.threshold")
.defaultValue(1024L * 1024 * 1024)
.description("Shuffle bytes threshold for async mode");
public static final ConfigKey WORKLOAD_METRICS_HISTORY_SIZE = ConfigKeys
.key("geaflow.workload.metrics.history.size")
.defaultValue(10)
.description("Number of recent cycles to keep for analysis");
public static final ConfigKey WORKLOAD_CPU_THRESHOLD = ConfigKeys
.key("geaflow.workload.cpu.threshold")
.defaultValue(0.8)
.description("CPU utilization threshold");
public static final ConfigKey WORKLOAD_MEMORY_THRESHOLD = ConfigKeys
.key("geaflow.workload.memory.threshold")
.defaultValue(0.7)
.description("Memory utilization threshold");
IV. Implementation Steps
Phase 1: Infrastructure Preparation (1-2 weeks)
- Create workload package structure
- Implement WorkloadSnapshot and related data models
- Implement WorkloadMetricsCollector basic framework
- Integrate with HeartbeatManager and PipelineCycleScheduler
Phase 2: Metrics Collection and Storage (2-3 weeks)
- Enhance EventMetrics collection logic
- Implement HistoricalMetricsStore
- Implement metrics serialization and RPC transmission
- Add metrics query API
Phase 3: Workload Analysis (2-3 weeks)
- Implement DataVolumeAnalyzer
- Implement SkewDetector
- Implement BottleneckIdentifier
- Integrate into WorkloadAnalyzer
Phase 4: Decision Engine (2 weeks)
- Implement ResourceAllocationDecider
- Implement WorkloadBasedStrategy
- Integrate configuration parameters
- Add decision logging and monitoring
Phase 5: Integration and Testing (2-3 weeks)
- Modify PipelineUtil to support strategy pattern
- Update AbstractScheduledWorkerManager
- Comprehensive integration testing
- Performance testing and tuning
V. Monitoring and Observability
5.1 Key Metrics Exposure
Expose workload metrics through existing REST API with new endpoints:
/api/workload/metrics- Current workload snapshot/api/workload/decision- Recent decision records/api/workload/analysis- Workload analysis results
5.2 Decision Logging
Record detailed information for each decision:
- Input metrics snapshot
- Analysis results
- Decision rationale
- Actual effect feedback
VI. Production Environment Considerations
6.1 Cold Start Handling
- Fall back to ConfigBasedStrategy on first execution or when historical data is missing
- Gradually enable intelligent decisions after accumulating historical data
6.2 Exception Handling
- Degradation strategy when metrics collection fails
- Default behavior on decision timeout
- Smoothing mechanism to prevent decision oscillation
6.3 Performance Considerations
- Metrics collection performance overhead < 1%
- Asynchronous decision computation, non-blocking to main process
- Use caching to avoid redundant calculations
6.4 Configurability
- All thresholds are configurable
- Support runtime dynamic adjustment (via configuration center)
- Support A/B testing of different strategies
@kitalkuyo-gita Thank you very much for your proposal. It appears you have given some deep thought to the resource allocation aspect.
I need to clarify that the async mode design here is intended for OLAP scenarios, where multiple drivers are created to provide higher concurrency for query processing. Each driver involves considerations about how resources should be allocated, which graph shards should be cached on each worker, and how tasks submitted from different drivers can share read access to worker shards during runtime.
The current proposal focuses more on how drivers allocate resources based on queries, but it doesn't address the distribution of graph shards on workers or the shared read access issues. Therefore, there are some conflicts between this proposal and the original async mode problem we aimed to solve in this issue.
If we only consider the offline graph computing scenario—similar to Spark's approach of processing multiple computing tasks continuously or in parallel—the resource allocation based on workload discussed above does have some value. For this scenarios, we'd better open a new issue to further discuss and break down the plan.
@kitalkuyo-gita Thank you very much for your proposal. It appears you have given some deep thought to the resource allocation aspect.
I need to clarify that the async mode design here is intended for OLAP scenarios, where multiple drivers are created to provide higher concurrency for query processing. Each driver involves considerations about how resources should be allocated, which graph shards should be cached on each worker, and how tasks submitted from different drivers can share read access to worker shards during runtime.
The current proposal focuses more on how drivers allocate resources based on queries, but it doesn't address the distribution of graph shards on workers or the shared read access issues. Therefore, there are some conflicts between this proposal and the original async mode problem we aimed to solve in this issue.
If we only consider the offline graph computing scenario—similar to Spark's approach of processing multiple computing tasks continuously or in parallel—the resource allocation based on workload discussed above does have some value. For this scenarios, we'd better open a new issue to further discuss and break down the plan.
@cbqiao Okay, thank you for your reply. I will start a separate thread to continue discussing this matter.
@cbqiao Please allow me to introduce E2ern1ty, who has some experience in the field of graphs. I hope to work with him to discuss the detailed plan of this solution with you. However, I've been quite busy lately. Once I'm ready, I will open a new issue to introduce the current status of the solution and what can be done at present.
@kitalkuyo-gita Thanks for the introduction! Nice to meet @E2ern1ty. I appreciate you both wanting to collaborate on this solution. Whenever you're ready, I'll be happy to dive into the detailed planning with you both. Looking forward to it!