connect
connect copied to clipboard
Memeory not be released after all stream was finished
I used connect to sync a table of bigquery which has 50GB storage usage, tring to do a batch synchronization. but i found that after the task is completed, the memory usage remains unchanged. I am confused about this, could someone tell me how to deal with this, thanks!
I runned this in stream mode
My stream config:
input:
label: "bigquery_package_versions"
broker:
inputs:
- gcp_bigquery_select:
project: example
table: bigquery-public-data.deps_dev_v1.PackageVersions
columns:
- SnapshotAt
- System
- Name
- Version
- Licenses
- Links
- Advisories
- VersionInfo
- Hashes
- DependenciesProcessed
- DependencyError
- UpstreamPublishedAt
- Registries
- SLSAProvenance
- UpstreamIdentifiers
where: SnapshotAt >= ? AND SnapshotAt < ? # No default (optional)
auto_replay_nacks: true
# job_labels: {}
# priority: ""
args_mapping: |
root = ["2024-07-29", "2024-08-08"]
# prefix: "" # No default (optional)
# 87212989
suffix: |
LIMIT 15000000 OFFSET 0
batching:
byte_size: 268435456
period: 10m
processors:
- mapping: | #!blobl
root = this
root.SnapshotAt = root.SnapshotAt.ts_unix_nano()
if root.exists("UpstreamPublishedAt") && root.UpstreamPublishedAt != null {
root.UpstreamPublishedAt = root.UpstreamPublishedAt.ts_unix_nano()
}
meta SnapshotAt = root.SnapshotAt
pipeline:
processors:
- parquet_encode:
schema:
- name: SnapshotAt
type: INT64
- name: System
type: UTF8
- name: Name
type: UTF8
- name: Version
type: UTF8
- name: Licenses
type: UTF8
repeated: true
- name: Links
repeated: true
fields:
- name: Label
type: UTF8
- name: URL
type: UTF8
- name: Advisories
repeated: true
fields:
- name: Source
type: UTF8
- name: SourceID
type: UTF8
- name: VersionInfo
optional: true
fields:
- name: IsRelease
type: BOOLEAN
- name: Ordinal
type: INT64
- name: Hashes
repeated: true
fields:
- name: Type
type: UTF8
- name: Hash
type: UTF8
- name: DependenciesProcessed
optional: true
type: BOOLEAN
- name: DependencyError
optional: true
type: BOOLEAN
- name: UpstreamPublishedAt
optional: true
type: INT64
- name: Registries
type: UTF8
repeated: true
- name: SLSAProvenance
optional: true
fields:
- name: SourceRepository
type: UTF8
- name: Commit
type: UTF8
- name: URL
type: UTF8
- name: Verified
type: BOOLEAN
- name: UpstreamIdentifiers
repeated: true
fields:
- name: PackageName
optional: true
type: UTF8
- name: VersionString
optional: true
type: UTF8
- name: Source
type: UTF8
default_compression: zstd
output:
file:
path: ${YSDB_WORKER_DATA_HOME:/opt/ysdb-worker/data}/package_versions/${! meta("SnapshotAt") }/${! timestamp_unix_nano() }.parquet # No default (required)
codec: all-bytes
And the metrics:
# HELP batch_created Benthos Counter metric
# TYPE batch_created counter
batch_created{label="bigquery_package_versions",mechanism="check",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="count",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="period",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="size",path="root.input.batching",stream="version"} 71
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000139994
go_gc_duration_seconds{quantile="0.25"} 0.000161236
go_gc_duration_seconds{quantile="0.5"} 0.000174105
go_gc_duration_seconds{quantile="0.75"} 0.00019372
go_gc_duration_seconds{quantile="1"} 0.00047506
go_gc_duration_seconds_sum 2.914484991
go_gc_duration_seconds_count 1168
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 378676
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.22.5"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 4.017885552e+09
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 9.99175765096e+11
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.922458e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 1.1805682842e+10
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 1.1821236e+08
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 4.017885552e+09
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 8.290402304e+09
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 4.57490432e+09
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 4.3511309e+07
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 7.828144128e+09
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 1.2865306624e+10
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 1.7234258623280537e+09
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 1.1849194151e+10
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 19200
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 31200
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 9.207696e+07
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 2.2848e+08
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 8.278799096e+09
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 2.129251e+07
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 7.99637504e+08
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 7.99637504e+08
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 1.4034882656e+10
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 22
# HELP input_connection_failed Benthos Counter metric
# TYPE input_connection_failed counter
input_connection_failed{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_lost Benthos Counter metric
# TYPE input_connection_lost counter
input_connection_lost{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_up Benthos Counter metric
# TYPE input_connection_up counter
input_connection_up{label="",path="root.input.broker.inputs.0",stream="version"} 2
# HELP input_latency_ns Benthos Timing metric
# TYPE input_latency_ns summary
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.5"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.9"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.99"} NaN
input_latency_ns_sum{label="",path="root.input.broker.inputs.0",stream="version"} 3.854610930651667e+18
input_latency_ns_count{label="",path="root.input.broker.inputs.0",stream="version"} 1.484165e+07
# HELP input_received Benthos Counter metric
# TYPE input_received counter
input_received{label="",path="root.input.broker.inputs.0",stream="version"} 1.5220244e+07
# HELP output_batch_sent Benthos Counter metric
# TYPE output_batch_sent counter
output_batch_sent{label="",path="root.output",stream="version"} 70
# HELP output_connection_failed Benthos Counter metric
# TYPE output_connection_failed counter
output_connection_failed{label="",path="root.output",stream="version"} 0
# HELP output_connection_lost Benthos Counter metric
# TYPE output_connection_lost counter
output_connection_lost{label="",path="root.output",stream="version"} 0
# HELP output_connection_up Benthos Counter metric
# TYPE output_connection_up counter
output_connection_up{label="",path="root.output",stream="version"} 1
# HELP output_error Benthos Counter metric
# TYPE output_error counter
output_error{label="",path="root.output",stream="version"} 0
# HELP output_latency_ns Benthos Timing metric
# TYPE output_latency_ns summary
output_latency_ns{label="",path="root.output",stream="version",quantile="0.5"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.9"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.99"} NaN
output_latency_ns_sum{label="",path="root.output",stream="version"} 5.135906716e+09
output_latency_ns_count{label="",path="root.output",stream="version"} 70
# HELP output_sent Benthos Counter metric
# TYPE output_sent counter
output_sent{label="",path="root.output",stream="version"} 70
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 14875.87
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 4096
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 13
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.0702270464e+10
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.72329419936e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.5477092352e+10
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes 1.8446744073709552e+19
# HELP processor_batch_received Benthos Counter metric
# TYPE processor_batch_received counter
processor_batch_received{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_received{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_batch_sent Benthos Counter metric
# TYPE processor_batch_sent counter
processor_batch_sent{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_sent{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_error Benthos Counter metric
# TYPE processor_error counter
processor_error{label="",path="root.input.batching.processors.0",stream="version"} 0
processor_error{label="",path="root.pipeline.processors.0",stream="version"} 0
# HELP processor_latency_ns Benthos Timing metric
# TYPE processor_latency_ns summary
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.input.batching.processors.0",stream="version"} 8.41241099571e+11
processor_latency_ns_count{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.pipeline.processors.0",stream="version"} 5.17977569825e+11
processor_latency_ns_count{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_received Benthos Counter metric
# TYPE processor_received counter
processor_received{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_received{label="",path="root.pipeline.processors.0",stream="version"} 1.484165e+07
# HELP processor_sent Benthos Counter metric
# TYPE processor_sent counter
processor_sent{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_sent{label="",path="root.pipeline.processors.0",stream="version"} 70
By the way, how to make the pagination loop for the input gcp_bigquery_select
, with LIMIT
and OFFSET
, to scan all rows of the table