dcmjs-dimse
dcmjs-dimse copied to clipboard
Added support for writing received DICOM content to Writable streams
An attempt was made to implement C-STORE SCP streaming support, based on the ideas discussed in https://github.com/PantelisGeorgiadis/dcmjs-dimse/issues/39.
More specifically, the Scp
class was enriched with two extra functions that give the caller the oppurtunity to create the stream that will receive the incoming fragment buffers and to translate the accumulated fragments back into a Dataset
.
The functions are the createStoreWritableStream
and the createDatasetFromStoreWritableStream
, respectively. The first one provides to the user the accepted presentation context (to assess the dataset type and transfer syntax) and expects a Writable stream to be returned. The latter provides to the user the previously created Writable stream, the accepted presentation context and callback that could be used to return the parsed Dataset
. The default implementation still uses memory buffers.
Bellow is a sample implementation of an Scp
class that accumulates the incoming C-STORE data to temp file streams (using the temp
library). Once the reception is over (last PDV fragment received), the files are read back into a Dataset
which are passed to the Scp.cStoreRequest
method for further processing (this is not a mandatory step - the callback can return undefined if there is no interest for passing the dataset in the Scp.cStoreRequest
method).
const temp = require('temp');
class StreamingScp extends Scp {
constructor(socket, opts) {
super(socket, opts);
this.association = undefined;
temp.track();
}
createStoreWritableStream(acceptedPresentationContext) {
return temp.createWriteStream();
}
createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
writable.on('finish', () => {
const path = writable.path;
const datasetBuffer = fs.readFileSync(path);
const dataset = new Dataset(
datasetBuffer,
acceptedPresentationContext.getAcceptedTransferSyntaxUid()
);
callback(dataset);
});
writable.on('error', (err) => {
callback(undefined);
});
}
associationRequested(association) {
this.association = association;
const contexts = association.getPresentationContexts();
contexts.forEach((c) => {
const context = association.getPresentationContext(c.id);
if (Object.values(StorageClass).includes(context.getAbstractSyntaxUid())) {
const transferSyntaxes = context.getTransferSyntaxUids();
transferSyntaxes.forEach((transferSyntax) => {
if (
transferSyntax === TransferSyntax.ImplicitVRLittleEndian ||
transferSyntax === TransferSyntax.ExplicitVRLittleEndian
) {
context.setResult(PresentationContextResult.Accept, transferSyntax);
} else {
context.setResult(PresentationContextResult.RejectTransferSyntaxesNotSupported);
}
});
} else {
context.setResult(PresentationContextResult.RejectAbstractSyntaxNotSupported);
}
});
this.sendAssociationAccept();
}
cStoreRequest(request, callback) {
console.log(request.getDataset());
const response = CStoreResponse.fromRequest(request);
response.setStatus(Status.Success);
callback(response);
}
associationReleaseRequested() {
this.sendAssociationReleaseResponse();
}
}
@richard-viney, @kinsonho, please review and let me know of your thoughts.
With createStoreWriteableStream(), the result is just the dataset, not a full P10 object, right?
For createDatasetFromStoreWriteableStream(), if I would like to have a filtered dataset to subsequent processing after the cStoreRequest, then I will need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes that I need. Then pass to the createDatasetFromStoreWritableStream(). Correct?
Thanks for working on this. Just some quick comments when reviewing the changes for now. I will try it out later.
With createStoreWriteableStream(), the result is just the dataset, not a full P10 object, right?
For createDatasetFromStoreWriteableStream(), if I would like to have a filtered dataset to subsequent processing after the cStoreRequest, then I will need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes that I need. Then pass to the createDatasetFromStoreWritableStream(). Correct?
You are right Kinson. The result accumulated in the writable created by createStoreWriteableStream
is just the dataset, not the full P10. However, this is still parsable by the Dataset
class, as long as you know whether the syntax is implicit or explicit (hence the acceptedPresentationContext
param). I just added a third param to the Dataset
constructor that can allow you to pass parsing options to the dcmjs library. These options can help in parsing datasets up to a specific tag (e.g. skip the pixel data).
Regarding, the "need to split the StoreWriteableStream and create another accumulator writer stream to collect the attributes", I don't think that this is nessesary now that the Dataset
can read partial datasets. You can create something like a custom "hybrid" writable which can early-decide about the proper course of action and definitely not wait for the Scp.cStoreRequest
event to occur.
Here's a super-naive implementation (which actually works!) but you will get the point:
const { Writable } = require('stream');
class StreamingScp extends Scp {
constructor(socket, opts) {
super(socket, opts);
this.association = undefined;
}
associationRequested(association) {
this.association = association;
const contexts = association.getPresentationContexts();
contexts.forEach((c) => {
const context = association.getPresentationContext(c.id);
if (Object.values(StorageClass).includes(context.getAbstractSyntaxUid())) {
const transferSyntaxes = context.getTransferSyntaxUids();
transferSyntaxes.forEach((transferSyntax) => {
if (
transferSyntax === TransferSyntax.ImplicitVRLittleEndian ||
transferSyntax === TransferSyntax.ExplicitVRLittleEndian
) {
context.setResult(PresentationContextResult.Accept, transferSyntax);
} else {
context.setResult(PresentationContextResult.RejectTransferSyntaxesNotSupported);
}
});
} else {
context.setResult(PresentationContextResult.RejectAbstractSyntaxNotSupported);
}
});
this.sendAssociationAccept();
}
createStoreWritableStream(acceptedPresentationContext) {
// Create a custom Writable that will handle the incoming chunks.
return new StreamingWritable(acceptedPresentationContext);
}
createDatasetFromStoreWritableStream(writable, acceptedPresentationContext, callback) {
// At this point, if there is no interest for the full Dataset that includes pixel data
// (because it was written to a file or uploaded to the cloud),
// just return the metadata dataset to the Scp.cStoreRequest handler (could also be undefined).
callback(writable.getMetadataDataset());
}
cStoreRequest(request, callback) {
console.log(request.getDataset());
const response = CStoreResponse.fromRequest(request);
response.setStatus(Status.Success);
callback(response);
}
associationReleaseRequested() {
this.sendAssociationReleaseResponse();
}
}
class StreamingWritable extends Writable {
constructor(acceptedPresentationContext, options) {
super(options);
this.metadataDataset = undefined;
this.shouldParse = true;
this.acceptedPresentationContext = acceptedPresentationContext;
}
_write(chunk, encoding, callback) {
if (this.shouldParse) {
// First write occurred. There's a good chance to have the complete
// metadata, depending on the max PDU value.
// Try to parse the chunk, up to the pixel data and create a Dataset.
this.metadataDataset = new Dataset(
chunk,
this.acceptedPresentationContext.getAcceptedTransferSyntaxUid(),
{
ignoreErrors: true,
untilTag: '7FE00010', // PixelData
includeUntilTagValue: false,
}
);
this.shouldParse = false;
}
// Evaluate the metadata Dataset (this.metadataDataset) and decide the proper course of action.
// i.e. Write/append the chunk to a file, upload it to the cloud or just accumulate it in memory.
// doStuffWithChunk(chunk)
// Call the callback to notify that you have finished working on this chunk.
callback(/* new Error ('Something went wrong during chunk handling') */);
}
_final(callback) {
// At this point no other chunk will be received.
// Call the callback to notify that you have finished receiving chunks and free resources.
callback();
}
// Returns the parsed metadata dataset.
getMetadataDataset() {
return this.metadataDataset;
}
}
@kinsonho, @richard-viney did you have the chance to test this? Any feedback?
Sorry for the long delay. I am planning to work on it this week. I will let you know.
On Sun, May 28, 2023 at 3:40 PM Pantelis Georgiadis < @.***> wrote:
@kinsonho https://github.com/kinsonho, @richard-viney https://github.com/richard-viney did you have the chance to test this? Any feedback?
— Reply to this email directly, view it on GitHub https://github.com/PantelisGeorgiadis/dcmjs-dimse/pull/41#issuecomment-1566241466, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB3KBIDMFKNOHYOYXVERZV3XIOSZXANCNFSM6AAAAAAV7S36HY . You are receiving this because you were mentioned.Message ID: @.***>
Sorry for the big delay, this looks good and we'll be testing it out next week. The branch is now in conflict with the master branch but we can sort that out no problem.
@richard-viney I just rebased to latest master. My only concern is that we still haven't handled Kinson's comment regarding the stream drain
event. Looking forward for your test results!
Yes the drain comment is correct. In practice if streaming is going to disk then the incoming network data will most likely be at a slower pace than local disk writes, so you'd get away with it, but for other use cases it could become an issue.
I haven't yet looked at the implications for the library/protocol of adding a wait on the stream here.
Would it be possible to pass the CStoreRequest
to createStoreWritableStream()
? This would allow it access to information such as getAffectedSopInstanceUid()
which would be useful for trying to write the file meta information prior to the main data set coming in, which is what I'm aiming to do.
I've implemented the above on a fork and it seems to work. Another couple of things have come up too:
- Have createStoreWritableStream take a callback to allow it to perform asynchronous actions.
- Should we allow createStoreWritableStream to return an error of some kind? Or do you think this should be handled downstream in the
cStoreRequest
method?
I've got this working, ended up writing a custom DICOM P10 header prior to streaming the rest to a temporary file, which results in a valid DICOM P10. Still more testing to be done though.
My previous comment about making createStoreWritableStream
asynchronous isn't important as I worked around it and deferred error handling to the existing cStoreRequest()
method, which I think is reasonable given that it decides on the response to send to the SCU.
IMO the most important change to add to this branch is making the C-STORE request available, which I did in a fork here: https://github.com/HeartLab/dcmjs-dimse/commit/4192ec93237d0cd7b23f89a063c21a1426f98c03.
The only other issue was regarding drain()
and back pressure, however this change is still a big improvement even without that issue being addressed. Have you had a chance to look into it? If not, I'll likely do so in the next couple of days.
Thanks.
Great news @richard-viney! Thanks for working on this! I incorporated your changes to the working branch and made an unsuccessful effort to handle stream backpressure, which I reverted… ☹ However, I’m pretty sure that we will find a way to handle it!