FASTER icon indicating copy to clipboard operation
FASTER copied to clipboard

Null reference exception on Upsert

Open lance-cognitiv opened this issue 8 months ago • 0 comments

Stack trace:

 ---> System.NullReferenceException: Object reference not set to an instance of an object.
   at FASTER.core.FasterKV`2.HandleRetryStatus[Input,Output,Context,FasterSession](OperationStatus internalStatus, FasterSession fasterSession, PendingContext`3& pendingContext)
   at Cognitiv.Leo.DataApi.Server.MemoryCache`1.AddOrUpdate(TKey key, Int32 payloadSize, Action`1 serializer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 156
   --- End of inner exception stack trace ---
   at Cognitiv.Leo.DataApi.Server.MemoryCache`1.AddOrUpdate(TKey key, Int32 payloadSize, Action`1 serializer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 237
   at Cognitiv.Leo.DataApi.Server.MessageCache.AddOrUpdate(Int64 partitionKey, Int32 messageId, MessageContainer messageContainer) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/MessageCache.cs:line 293
   at Cognitiv.Leo.DataApi.Server.ProjectionEngine.ExecuteQueryAsync(Dictionary`2 featureSetBases, Dictionary`2 features, Object featureLock, Dictionary`2 messages, Object messageLock, QueryMessagesOperation operation, Func`2 valueAllocator, Func`2 arrayAllocator) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectionEngine.cs:line 267
   at Cognitiv.Leo.DataApi.Server.ProjectionEngine.ProjectAsync(ProjectionRequest request, FlatBufferBuilder responseBuilder) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectionEngine.cs:line 677
   at Cognitiv.Leo.DataApi.Server.ProjectRequestHandler.HandleAsync(HttpContext httpContext) in /home/lance/repos/leo/Services/DataApi/Cognitiv.Leo.DataApi.Server/ProjectRequestHandler.cs:line 85```

I added some logs in the exception in an attempt to capture the underlying issue. This seems to happen under high load scenarios, and I have not been able to reproduce it. I am currently using faster version 2.6.5.

The error pops up once, and then the server becomes unresponsive and CPU gets pinned.

Perf metrics:

  49.24%  [kernel]                            [k] arch_local_irq_enable
   7.59%  memfd:doublemapper (deleted)        [.] 0x00000000028492ec
   6.80%  [kernel]                            [k] do_sched_yield
   6.29%  memfd:doublemapper (deleted)        [.] 0x00000000028493c4
   4.31%  memfd:doublemapper (deleted)        [.] 0x00000000023e0290
   3.29%  [kernel]                            [k] el0_svc
   1.70%  libc.so.6 (deleted)                 [.] 0x00000000000ceccc
   1.27%  [kernel]                            [k] get_random_u16
   1.11%  memfd:doublemapper (deleted)        [.] 0x00000000023e02c0
   0.78%  memfd:doublemapper (deleted)        [.] 0x00000000023e0284
   0.75%  memfd:doublemapper (deleted)        [.] 0x0000000002849e8c
   0.74%  memfd:doublemapper (deleted)        [.] 0x00000000023e02e8
   0.65%  [kernel]                            [k] schedule_debug.constprop.0
   0.56%  [kernel]                            [k] __schedule
   0.53%  [kernel]                            [k] do_notify_resume
   0.52%  [kernel]                            [k] invoke_syscall
   0.48%  memfd:doublemapper (deleted)        [.] 0x0000000002848d9c

Nominal perf metrics:

   3.93%  libcoreclr.so                       [.] 0x0000000000316988
   3.79%  [kernel]                            [k] arch_local_irq_enable
   3.06%  libcoreclr.so                       [.] 0x00000000001f139c
   2.11%  libcoreclr.so                       [.] 0x00000000001f144c
   1.85%  memfd:doublemapper (deleted)        [.] 0x000000000240314c
   1.41%  libcoreclr.so                       [.] 0x00000000001f1398
   1.29%  libcoreclr.so                       [.] 0x000000000035ccf8
   1.22%  agent                               [.] _start
   0.90%  memfd:doublemapper (deleted)        [.] 0x0000000000000078
   0.89%  [kernel]                            [k] el0_svc
   0.87%  memfd:doublemapper (deleted)        [.] 0x00000000010edeb0
   0.84%  memfd:doublemapper (deleted)        [.] 0x0000000000000038
   0.80%  [kernel]                            [k] arch_local_irq_restore
   0.79%  memfd:doublemapper (deleted)        [.] 0x00000000022887f8
   0.78%  memfd:doublemapper (deleted)        [.] 0x00000000023ec6b8
   0.76%  [vdso]                              [.] 0x0000000000000548
   0.71%  memfd:doublemapper (deleted)        [.] 0x00000000019f4278
   0.66%  libcoreclr.so                       [.] 0x000000000035ccf4
   0.63%  memfd:doublemapper (deleted)        [.] 0x00000000023eee68
   0.59%  memfd:doublemapper (deleted)        [.] 0x0000000002403390
   0.58%  [kernel]                            [k] default_idle_call
   0.53%  libcoreclr.so                       [.] 0x0000000000316990
   0.53%  [kernel]                            [k] do_sched_yield
   0.51%  memfd:doublemapper (deleted)        [.] 0x00000000019edf68

Memory:

ubuntu@ip-10-38-64-176:~$ free -h
               total        used        free      shared  buff/cache   available
Mem:            61Gi        14Gi        42Gi        50Mi       4.6Gi        46Gi
Swap:             0B          0B          0B

Code:

using System.Runtime.InteropServices;
using Cognitiv.Leo.Shared;
using FASTER.core;
using StatsdClient;

namespace Cognitiv.Leo.DataApi.Server;

[StructLayout(LayoutKind.Sequential)]
public struct EntryInfo
{
	public int HitCount;

	public void Serialize(Span<byte> buffer)
	{
		BinaryPrimitives.WriteInt32LittleEndian(buffer, HitCount);
	}

	public static EntryInfo Deserialize(ReadOnlySpan<byte> buffer)
	{
		return new EntryInfo
		{
			HitCount = BinaryPrimitives.ReadInt32LittleEndian(buffer)
		};
	}

	public static int Size => sizeof(int);
}

public class SpanByteFunctions : SpanByteFunctions<SpanByte, ArraySegment<byte>, Empty>
{
	public Func<int, ArraySegment<byte>>? Allocator { get; set; }

	public override bool SingleReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref ArraySegment<byte> dst, ref ReadInfo readInfo)
	{
		dst = ArrayAllocator.Allocate(value.LengthWithoutMetadata, Allocator);
		value.AsSpan().CopyTo(dst);
		return true;
	}

	public override bool ConcurrentReader(ref SpanByte key, ref SpanByte input, ref SpanByte value, ref ArraySegment<byte> dst, ref ReadInfo readInfo)
	{
		dst = ArrayAllocator.Allocate(value.LengthWithoutMetadata, Allocator);
		value.AsSpan().CopyTo(dst);
		return true;
	}
}

public abstract class MemoryCache<TKey> where TKey : struct
{
	private const int MaxStackallocSize = 1024;
	private readonly FasterKV<SpanByte, SpanByte> store;
	private readonly ThreadLocal<(SpanByteFunctions functions, ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions> session)> sessions;
	private readonly SimpleObjectPool<UnmanagedBuffer> payloadBufferPool;

	public MemoryCache(long memorySizeBytes)
	{
		var settings = new FasterKVSettings<SpanByte, SpanByte>(null)
		{
			MemorySize = memorySizeBytes,
			ReadCacheEnabled = false
		};
		store = new FasterKV<SpanByte, SpanByte>(settings);
		sessions = new ThreadLocal<(SpanByteFunctions functions, ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions> session)>(
			() => {
				var functions = new SpanByteFunctions();
				var session = store.For(functions).NewSession<SpanByteFunctions>();
				return (functions, session);
			},
			trackAllValues: true);
		payloadBufferPool = new SimpleObjectPool<UnmanagedBuffer>(() => new UnmanagedBuffer(1 << 14));
	}

	protected abstract int GetKeySize();
	protected abstract void SerializeKey(TKey key, Span<byte> buffer);

	public bool Read(
		TKey key, 
		Func<int, ArraySegment<byte>>? valueAllocator,
		out EntryInfo entryInfo,
		out ArraySegment<byte> value)
	{
		ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions>? session = null;
		var gotSession = false;
		var keySerialized = false;
		var spanByteSerialized = false;
		var read = false;
		var entryDeserialized = false;

		try
		{
			entryInfo = default;
			value = default;

			var (functions, s) = sessions.Value;
			gotSession = true;

			functions.Allocator = valueAllocator;
			session = s;

			// Stack allocation is safe because session completion always called
			Span<byte> keyBuffer = stackalloc byte[GetKeySize()];
			SerializeKey(key, keyBuffer);
			keySerialized = true;

			var keySpanByte = SpanByte.FromFixedSpan(keyBuffer);
			spanByteSerialized = true;

			var status = session.Read(keySpanByte, out var output);
			read = true;
			if (!status.Found)
				return false;
		
			entryInfo = EntryInfo.Deserialize(output[..EntryInfo.Size]);
			entryDeserialized = true;

			value = output[EntryInfo.Size..];
			return true;
		}
		catch (Exception e)
		{
			var keySerializedInException = true;
			var keyHex = string.Empty;
			try
			{
				Span<byte> keyBuffer = new byte[GetKeySize()];
				SerializeKey(key, keyBuffer);
				keyHex = Convert.ToHexString(keyBuffer);
			}
			catch (Exception)
			{
				keySerializedInException = false;
			}

			var createdNewSession = true;
			try
			{
				var functions = new SpanByteFunctions();
				sessions.Value = (functions, store.For(functions).NewSession<SpanByteFunctions>());
			}
			catch (Exception)
			{
				createdNewSession = false;
			}

			var sessionIsNull = session is null;
			throw new Exception($"Exception on Read. Key={keyHex}, GotSession={gotSession}, SessionIsNull={sessionIsNull}, KeySerialized={keySerialized}, SpanByteSerialized={spanByteSerialized}, Read={read}, EntryDeserialized={entryDeserialized}, KeySerializedInException={keySerializedInException}, CreatedNewSession={createdNewSession}", e);
		}
	}

	public void AddOrUpdate(
		TKey key, 
		int payloadSize, 
		Action<Span<byte>> serializer)
	{
		ClientSession<SpanByte, SpanByte, SpanByte, ArraySegment<byte>, Empty, SpanByteFunctions>? session = null;
		var gotSession = false;
		var keySerialized = false;
		var spanByteSerialized = false;
		var isSmallPayloadSerialized = false;
		var isLargePayloadSerialized = false;
		var isUpserted = false;
		try
		{
			var (_, s) = sessions.Value;
			gotSession = true;
			session = s;

			Span<byte> keyBuffer = stackalloc byte[GetKeySize()];
			SerializeKey(key, keyBuffer);
			keySerialized = true;

			var keySpanByte = SpanByte.FromFixedSpan(keyBuffer);
			spanByteSerialized = true;

			var entryInfo = new EntryInfo { HitCount = 0 };
		
			var size = EntryInfo.Size + payloadSize;
			if (size <= MaxStackallocSize)
			{
				Span<byte> buffer = stackalloc byte[size];
			
				entryInfo.Serialize(buffer[..EntryInfo.Size]);
				serializer(buffer[EntryInfo.Size..]);
				isSmallPayloadSerialized = true;

				session.Upsert(keySpanByte, SpanByte.FromFixedSpan(buffer));
				isUpserted = true;
			}
			else
			{
				var buffer = payloadBufferPool.Get();
				try
				{
					var span = buffer.AsSpan(size);

					entryInfo.Serialize(span[..EntryInfo.Size]);
					serializer(span[EntryInfo.Size..]);
					isLargePayloadSerialized = true;

					session.Upsert(keySpanByte, SpanByte.FromFixedSpan(span));
					isUpserted = true;
				}
				finally
				{
					payloadBufferPool.Return(buffer);
				}
			}
		}
		catch (Exception e)
		{
			var keySerializedInException = true;
			var keyHex = string.Empty;
			try
			{
				Span<byte> keyBuffer = new byte[GetKeySize()];
				SerializeKey(key, keyBuffer);
				keyHex = Convert.ToHexString(keyBuffer);
			}
			catch (Exception)
			{
				keySerializedInException = false;
			}

			var createdNewSession = true;
			try
			{
				var functions = new SpanByteFunctions();
				sessions.Value = (functions, store.For(functions).NewSession<SpanByteFunctions>());
			}
			catch (Exception)
			{
				createdNewSession = false;
			}

			var sessionIsNull = session is null;
			throw new Exception($"Exception on AddOrUpdate. Key={keyHex}, GotSession={gotSession}, SessionIsNull={sessionIsNull}, KeySerialized={keySerialized}, SpanByteSerialized={spanByteSerialized}, IsSmallPayloadSerialized={isSmallPayloadSerialized}, IsLargePayloadSerialized={isLargePayloadSerialized}, IsUpserted={isUpserted}, KeySerializedInException={keySerializedInException}, CreatedNewSession={createdNewSession}", e);
		}
	}
}

public class MessageCache : MemoryCache<(long, int)>
{
	private Dictionary<int, long> messageTtls = new();

	public MessageCache() : base(10L * 1024L * 1024L * 1024L)
	{
	}

	public void SetTtls(Dictionary<int, long> ttls)
	{
		messageTtls = ttls;
	}

	protected override int GetKeySize() => sizeof(int) + sizeof(long);

	protected override void SerializeKey((long, int) key, Span<byte> buffer)
	{
		BitConverter.TryWriteBytes(buffer[..sizeof(long)], key.Item1);
		BitConverter.TryWriteBytes(buffer.Slice(sizeof(long), sizeof(int)), key.Item2);
	}

	public bool TryPopulate(
		long partitionKey, 
		int messageId, 
		MessageContainer messageContainer,
		Func<int, ArraySegment<byte>>? valueAllocator=null,
		Func<int, ArraySegment<ArraySegment<byte>>>? arrayAllocator=null)
	{
		if (!messageTtls.TryGetValue(messageId, out var ttl))
			return false;

		var key = (partitionKey, messageId);
		if (!Read(key, valueAllocator, out _, out var output))
			return false;

		var readEpoch = MessageContainer.DeserializeReadEpochMs(output);
		var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
		var staleness = now - readEpoch;
		if (staleness > ttl)
			return false;

		messageContainer.Deserialize(output, 0, arrayAllocator);
		return true;
	}

	public void AddOrUpdate(
		long partitionKey, 
		int messageId, 
		MessageContainer messageContainer)
	{
		var containerSize = messageContainer.GetSerializedSize();
		AddOrUpdate((partitionKey, messageId), containerSize, messageContainer.Serialize);
	}
}

lance-cognitiv avatar Jun 23 '25 20:06 lance-cognitiv