`IterableWrapper` fails on an SQLite cursor
🐛 Describe the bug
I am wrapping an sqlite3 cursor inside an IterableWrapper for further processing.
import sqlite3
from torchdata.nodes import IterableWrapper, Loader, ParallelMapper
db = sqlite3.connect(':memory:')
db.execute('create table data (value int)')
db.executemany('insert into data values (?)', [(i,) for i in range(100)])
node = IterableWrapper(db.execute('select value from data'))
node = ParallelMapper(node, map_fn=lambda x: x, num_workers=3, method='thread')
loader = Loader(node)
result = list(loader)
print(result)
This raises
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.
I note that
- if I use a
Mapper, it works - surprisingly if I use a
Mapperfollowed by aParallelMapper, I get the same error -- not sure whyParallelMapperhas any relationship with the original sqlite cursor at this point.
For some context, I am trying to reproduce a tensorflow data pipeline based on tf.data.experimental.SqlDataset.
Versions
Collecting environment information... PyTorch version: 2.6.0+cpu Is debug build: False CUDA used to build PyTorch: None ROCM used to build PyTorch: N/A
OS: Ubuntu 22.04.5 LTS (x86_64) GCC version: (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0 Clang version: Could not collect CMake version: Could not collect Libc version: glibc-2.35
Python version: 3.10.16 | packaged by conda-forge | (main, Dec 5 2024, 14:16:10) [GCC 13.3.0] (64-bit runtime) Python platform: Linux-6.8.0-52-generic-x86_64-with-glibc2.35 Is CUDA available: False CUDA runtime version: No CUDA CUDA_MODULE_LOADING set to: N/A GPU models and configuration: No CUDA Nvidia driver version: No CUDA cuDNN version: No CUDA HIP runtime version: N/A MIOpen runtime version: N/A Is XNNPACK available: True
CPU: Architecture: x86_64 CPU op-mode(s): 32-bit, 64-bit Address sizes: 39 bits physical, 48 bits virtual Byte Order: Little Endian CPU(s): 12 On-line CPU(s) list: 0-11 Vendor ID: GenuineIntel Model name: 12th Gen Intel(R) Core(TM) i7-1250U CPU family: 6 Model: 154 Thread(s) per core: 2 Core(s) per socket: 10 Socket(s): 1 Stepping: 4 CPU max MHz: 4700,0000 CPU min MHz: 400,0000 BogoMIPS: 3763.20 Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc art arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm 3dnowprefetch cpuid_fault epb ssbd ibrs ibpb stibp ibrs_enhanced tpr_shadow flexpriority ept vpid ept_ad fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid rdseed adx smap clflushopt clwb intel_pt sha_ni xsaveopt xsavec xgetbv1 xsaves split_lock_detect user_shstk avx_vnni dtherm ida arat pln pts hwp hwp_notify hwp_act_window hwp_epp hwp_pkg_req hfi vnmi umip pku ospke waitpkg gfni vaes vpclmulqdq rdpid movdiri movdir64b fsrm md_clear serialize arch_lbr ibt flush_l1d arch_capabilities Virtualization: VT-x L1d cache: 352 KiB (10 instances) L1i cache: 576 KiB (10 instances) L2 cache: 6,5 MiB (4 instances) L3 cache: 12 MiB (1 instance) NUMA node(s): 1 NUMA node0 CPU(s): 0-11 Vulnerability Gather data sampling: Not affected Vulnerability Itlb multihit: Not affected Vulnerability L1tf: Not affected Vulnerability Mds: Not affected Vulnerability Meltdown: Not affected Vulnerability Mmio stale data: Not affected Vulnerability Reg file data sampling: Mitigation; Clear Register File Vulnerability Retbleed: Not affected Vulnerability Spec rstack overflow: Not affected Vulnerability Spec store bypass: Mitigation; Speculative Store Bypass disabled via prctl Vulnerability Spectre v1: Mitigation; usercopy/swapgs barriers and __user pointer sanitization Vulnerability Spectre v2: Mitigation; Enhanced / Automatic IBRS; IBPB conditional; RSB filling; PBRSB-eIBRS SW sequence; BHI BHI_DIS_S Vulnerability Srbds: Not affected Vulnerability Tsx async abort: Not affected
Versions of relevant libraries: [pip3] flake8==3.9.2 [pip3] mypy-boto3-s3==1.34.162 [pip3] mypy-extensions==1.0.0 [pip3] numpy==1.26.4 [pip3] torch==2.6.0+cpu [pip3] torchdata==0.11.0 [conda] numpy 1.26.4 pypi_0 pypi [conda] torch 2.6.0+cpu pypi_0 pypi [conda] torchdata 0.11.0 pypi_0 pypi
Thanks @tazr for the issue!
You already answered my first question - it works with a Mapper.
The reason it fails with ParallelMapper is because ParallelMapper uses background threads (for reading, applying transformation and sorting work) on the data pulled from IterableWrapper node. I am not super-familiar with cursor objects, but looking at the error msg it seems accessing the SQLlite objects is not straightforward between multiple threads.
Can you try with method=process in ParallelMapper though, although i think it would still fail.
Would Mapper be enough to unblock your use-case?
Yes, even using "process" raises the error (still related to threading for some reason).
I understand that sqlite3 connections cannot be shared between threads (by default). However I am surprised that I get the error downstream, even after interfacing with a Mapper. If using sqlite as the datasource means any downstream preprocessing is not parallel, this would be a bummer. I thought those mappings operate on the output of the predecessor (thus ignoring how it got there) but maybe my mental model is wrong and those mapping are closer to chaining python iterators?
@tazr You are overall right. Each node operates on the outputs of the previous nodes. ParallelMapper uses background threads which seems to causing the error in your case. In typical cases, the outputs of sources nodes are simple python object like a dict or list. I'm not sure but maybe with sqlite there is something else in the output which breaks when switching between threads.