SwayDB
SwayDB copied to clipboard
High rate of ClosedChannelException during compaction
I'm getting a high rate of ClosedChannelException
with the persistent map configuration described below.
- does it have any impact on data consistency?
- is there any way to optimize compaction to reduce such errors?
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:790)
at swaydb.core.io.file.ChannelFile.read(ChannelFile.scala:94)
at swaydb.core.io.file.DBFile.read(DBFile.scala:412)
at swaydb.core.io.reader.FileReader.read(FileReader.scala:73)
at swaydb.core.segment.format.a.block.reader.BlockReader$.readFullBlock(BlockReader.scala:97)
at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock(BlockReaderBase.scala:65)
at swaydb.core.segment.format.a.block.reader.BlockReaderBase.readFullBlock$(BlockReaderBase.scala:64)
at swaydb.core.segment.format.a.block.reader.UnblockedReader.readFullBlock(UnblockedReader.scala:81)
at swaydb.core.segment.format.a.block.reader.UnblockedReader.readAllAndGetReader(UnblockedReader.scala:123)
at swaydb.core.segment.format.a.block.Block$.unblock(Block.scala:299)
at swaydb.core.segment.format.a.block.reader.UnblockedReader$.apply(UnblockedReader.scala:69)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$6(SegmentBlockCache.scala:202)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$buildBlockReaderCache$5(SegmentBlockCache.scala:197)
at swaydb.data.cache.SynchronisedIO.$anonfun$value$7(Cache.scala:323)
at swaydb.data.cache.LazyIO.$anonfun$getOrSet$3(Lazy.scala:165)
at swaydb.data.cache.LazyValue.$anonfun$getOrSet$2(Lazy.scala:96)
at scala.Option.getOrElse(Option.scala:189)
at swaydb.data.cache.LazyValue.$anonfun$getOrSet$1(Lazy.scala:95)
at scala.Option.getOrElse(Option.scala:189)
at swaydb.data.cache.LazyValue.getOrSet(Lazy.scala:93)
at swaydb.data.cache.LazyIO.getOrSet(Lazy.scala:165)
at swaydb.data.cache.SynchronisedIO.value(Cache.scala:323)
at swaydb.data.cache.DeferredIO.value(Cache.scala:295)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.$anonfun$createReader$1(SegmentBlockCache.scala:330)
at scala.Option.getOrElse(Option.scala:189)
at swaydb.data.cache.DeferredIO.getOrElse(Cache.scala:302)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createReader(SegmentBlockCache.scala:328)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.createSortedIndexReader(SegmentBlockCache.scala:470)
at swaydb.core.segment.format.a.block.segment.SegmentBlockCache.iterator(SegmentBlockCache.scala:531)
at swaydb.core.segment.SegmentRef.iterator(SegmentRef.scala:889)
at swaydb.core.segment.SegmentRef$.put(SegmentRef.scala:635)
at swaydb.core.segment.PersistentSegmentOne.put(PersistentSegmentOne.scala:288)
at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$2(Level.scala:1160)
at swaydb.core.level.Level.$anonfun$putAssignedKeyValues$1(Level.scala:1158)
at swaydb.IO$IterableIOImplicit.mapRecoverIO(IO.scala:227)
at swaydb.core.level.Level.putAssignedKeyValues(Level.scala:1155)
at swaydb.core.level.Level.$anonfun$putKeyValues$2(Level.scala:1106)
at swaydb.IO$Right.$anonfun$flatMap$1(IO.scala:570)
at swaydb.IO$Right.flatMap(IO.scala:404)
at swaydb.core.level.Level.putKeyValues(Level.scala:1098)
at swaydb.core.level.Level.$anonfun$put$11(Level.scala:704)
at swaydb.core.level.Level.$anonfun$reserveAndRelease$2(Level.scala:578)
at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
at swaydb.IO$Right.map(IO.scala:396)
at swaydb.core.level.Level.$anonfun$reserveAndRelease$1(Level.scala:576)
at swaydb.IO$Right.$anonfun$map$1(IO.scala:560)
at swaydb.IO$Right.map(IO.scala:396)
at swaydb.core.level.Level.reserveAndRelease(Level.scala:574)
at swaydb.core.level.Level.put(Level.scala:694)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:191)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:176)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.pushForward(ThrottleCompaction.scala:155)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJob(ThrottleCompaction.scala:131)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runJobs(ThrottleCompaction.scala:107)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.runNow(ThrottleCompaction.scala:73)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:59)
at swaydb.core.level.compaction.throttle.ThrottleCompaction$.run(ThrottleCompaction.scala:48)
at swaydb.core.level.compaction.throttle.ThrottleCompactor$.doWakeUp(ThrottleCompactor.scala:287)
at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:309)
at swaydb.core.level.compaction.throttle.ThrottleCompactor$.wakeUp(ThrottleCompactor.scala:51)
at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1(ThrottleCompactor.scala:232)
at swaydb.core.level.compaction.throttle.ThrottleCompactor$.$anonfun$sendWakeUp$1$adapted(ThrottleCompactor.scala:228)
at swaydb.ActorWire.$anonfun$send$2$adapted(ActorWire.scala:148)
at swaydb.ActorWire.$anonfun$actor$2$adapted(ActorWire.scala:61)
at swaydb.Actor.swaydb$Actor$$receive(Actor.scala:773)
at swaydb.Actor.$anonfun$swaydb$Actor$$basicWakeUp$1(Actor.scala:569)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:834)
/*
Specifies if key-value IDs should be cached. There are over 1300 key-value Ids, if caching is disabled then on disk binary-search is performed to find the IDs.
https://swaydb.io/configuration/cacheKeyValueIds/?language=scala
*/
val cacheKeyValueIdsOverride = false
/*
This enables caching raw bytes stored in Persistent Segment and disables caching of actual key-values
https://swaydb.io/configuration/memoryCache/?language=scala
*/
val memoryCacheOverride = MemoryCache.off
/*
Setting mmap to true in LevelZero will write key-values to memory-mapped write-ahead log files.
If false, java.nio.FileChannel are used.
https://swaydb.io/configuration/mmap/map/?language=scala
*/
val mmapDisabled = MMAP.Off(
ForceSave.Off
)
val threadStateCacheOverride = ThreadStateCache.Limit(hashMapMaxSize = 10, maxProbe = 1)
//ThreadStateCache.off
/*
Configures all Persistent Segment file for the Level
https://swaydb.io/configuration/segmentConfig/?language=scala
*/
val segmentConfigOverride = DefaultConfigs
.segmentConfig()
.copyWithMmap(
mmapDisabled
)
.copyWithCacheSegmentBlocksOnCreate(false)
.copyWithFileOpenIOStrategy(IOStrategy.SynchronisedIO(cacheOnAccess = true))
.copyWithBlockIOStrategy(
blockIOStrategy = (_) => IOStrategy.SynchronisedIO(cacheOnAccess = false)
)
// Following configuration disable caching on the file read/write layer
val fileCacheOverride = FileCache.On(
0,
ActorConfig.TimeLoop(
name = s"${this.getClass.getName} - FileCache TimeLoop Actor",
delay = 1.seconds,
ec = DefaultExecutionContext.sweeperEC
)
)
/*
It stores all keys in sorted order.
https://swaydb.io/configuration/sortedKeyIndex/?language=scala&q=sortedKeyIndex
*/
val sortedKeyIndexOverride = DefaultConfigs.sortedKeyIndex(false)
/*
hashIndexes can double random read performance and reduce IOps which would increases overall DB performance.
https://swaydb.io/configuration/randomKeyIndex/?language=scala
*/
val randomSearchIndexOverride = DefaultConfigs.randomSearchIndex(false)
/*
It allows for faster read performance over sortedKeyIndex or linear-search for random reads and is essential for fast forward & reverse iterations.
https://swaydb.io/configuration/binarySearchIndex/?language=scala
*/
val binarySearchIndexOverride = DefaultConfigs.binarySearchIndex(false)
/*
BloomFilter is a small byte array that can be created in each persistent segment and is used to determine if a key exists in the segment without actually searching the Segment.
https://swaydb.io/configuration/mightContainKeyIndex/?language=scal
*/
val mightContainIndexOverride = DefaultConfigs.mightContainIndex(false)
/*
Configures storage for values within a persistent segment.
https://swaydb.io/configuration/valuesConfig/?language=scala&q=mightContainIndex
*/
val valuesConfigOverride = DefaultConfigs.valuesConfig(false)
def mapCreate(name: String) =
persistent.Map[String, Int, Nothing, Glass](
dir = File.newTemporaryDirectory(name).deleteOnExit().path,
mmapMaps = mmapDisabled,
cacheKeyValueIds = cacheKeyValueIdsOverride,
segmentConfig = segmentConfigOverride,
sortedKeyIndex = sortedKeyIndexOverride,
randomSearchIndex = randomSearchIndexOverride,
binarySearchIndex = binarySearchIndexOverride,
mightContainIndex = mightContainIndexOverride,
valuesConfig = valuesConfigOverride,
fileCache = fileCacheOverride,
memoryCache = memoryCacheOverride,
)
This will not cause any data consistency issue.
The problem here is known - Caching issue. Compaction is trying to merge a Segment file using a FileChannel
that is closed. It should attempt at reopening the file. This could cause compaction to not progress if it keeps failing.
I've been wanting to fix this just haven't had the time.
Thank you for reporting these issues.