psi icon indicating copy to clipboard operation
psi copied to clipboard

How to write asynchronous components to a store

Open hetingjane opened this issue 1 year ago • 0 comments

Discussed in https://github.com/microsoft/psi/discussions/251

Originally posted by hetingjane August 12, 2022 Hello there, I'm using Microsoft cognitive services Face API to recognize emotions from a video file. To do so, I wrote an EmotionDetector class and initialized a component EmotionDetectorComponent that took images and output Emotion strings and values. The component used an asynchronous receiver that's similar to the Cognitive services FaceRecognizer:

  protected override async Task ReceiveAsync(Shared<Image> data, Envelope e)
    {
        using Stream imageFileStream = new MemoryStream();
        try
        {
           
            data.Resource.ToBitmap(false).Save(imageFileStream, ImageFormat.Jpeg);
            imageFileStream.Seek(0, SeekOrigin.Begin);

            var detected = (await this.client.Face.DetectWithStreamAsync(imageFileStream, recognitionModel: this.configuration.RecognitionModelName)).Select(d => d.FaceId.Value).ToList();

   
            if (detected.Count > 0)
            {
                var identified = await this.client.Face.IdentifyAsync(detected, this.configuration.PersonGroupId);
                var results = identified.Select(p => (IList<(string, double)>)p.Candidates.Select(c => (this.people[c.PersonId].Name, c.Confidence)).ToList()).ToList();
                this.Out.Post(results, e.OriginatingTime);
            }
            else
            {
                this.Out.Post(Empty, e.OriginatingTime);
            }
        }
        catch (APIErrorException exception)
        {
      
            if (exception.Body.Error.Code == "RateLimitExceeded")
            {
                this.RateLimitExceeded.Post(true, e.OriginatingTime);
            }
            else
            {
                throw;
            }
        }
    }

Now I want to write its outputs into the same \PSI store and my pipeline looks like this:

player.Image.PipeTo(EmotionDetectorComponent.ImageIn, DeliveryPolicy.LatestMessage);
EmotionDetectorComponent.StringOut.Write("EmotionLabels", store, true, DeliveryPolicy.Unlimited);
EmotionDetectorComponent.ValueOut.Write("EmotionValues", store, true, DeliveryPolicy.Unlimited);

When I run it the terminal outputs: Screenshot 2022-08-12 120704 The bottom two lines indicated some messages with an earlier originating time (2:43:59 PM) are returned later than the later messages (2:43:59 PM). And I got an exception: System.InvalidOperationException: Attempted to post a message without strictly increasing originating time or that is out of order in wall-clock time: EmotionLabels I think it's caused by the message originating time messed up when writing to a store. Does anyone know how to fix this? Do I need to write a Fuse component that handles the input and output streams? Any help would be appreciated!

hetingjane avatar Aug 12 '22 19:08 hetingjane