psi
psi copied to clipboard
How to write asynchronous components to a store
Discussed in https://github.com/microsoft/psi/discussions/251
Originally posted by hetingjane August 12, 2022 Hello there, I'm using Microsoft cognitive services Face API to recognize emotions from a video file. To do so, I wrote an EmotionDetector class and initialized a component EmotionDetectorComponent that took images and output Emotion strings and values. The component used an asynchronous receiver that's similar to the Cognitive services FaceRecognizer:
protected override async Task ReceiveAsync(Shared<Image> data, Envelope e)
{
using Stream imageFileStream = new MemoryStream();
try
{
data.Resource.ToBitmap(false).Save(imageFileStream, ImageFormat.Jpeg);
imageFileStream.Seek(0, SeekOrigin.Begin);
var detected = (await this.client.Face.DetectWithStreamAsync(imageFileStream, recognitionModel: this.configuration.RecognitionModelName)).Select(d => d.FaceId.Value).ToList();
if (detected.Count > 0)
{
var identified = await this.client.Face.IdentifyAsync(detected, this.configuration.PersonGroupId);
var results = identified.Select(p => (IList<(string, double)>)p.Candidates.Select(c => (this.people[c.PersonId].Name, c.Confidence)).ToList()).ToList();
this.Out.Post(results, e.OriginatingTime);
}
else
{
this.Out.Post(Empty, e.OriginatingTime);
}
}
catch (APIErrorException exception)
{
if (exception.Body.Error.Code == "RateLimitExceeded")
{
this.RateLimitExceeded.Post(true, e.OriginatingTime);
}
else
{
throw;
}
}
}
Now I want to write its outputs into the same \PSI store and my pipeline looks like this:
player.Image.PipeTo(EmotionDetectorComponent.ImageIn, DeliveryPolicy.LatestMessage);
EmotionDetectorComponent.StringOut.Write("EmotionLabels", store, true, DeliveryPolicy.Unlimited);
EmotionDetectorComponent.ValueOut.Write("EmotionValues", store, true, DeliveryPolicy.Unlimited);
When I run it the terminal outputs:
The bottom two lines indicated some messages with an earlier originating time (2:43:59 PM) are returned later than the later messages (2:43:59 PM). And I got an exception:
System.InvalidOperationException: Attempted to post a message without strictly increasing originating time or that is out of order in wall-clock time: EmotionLabels
I think it's caused by the message originating time messed up when writing to a store. Does anyone know how to fix this?
Do I need to write a Fuse component that handles the input and output streams?
Any help would be appreciated!