wagtail-streamfield-migration-toolkit icon indicating copy to clipboard operation
wagtail-streamfield-migration-toolkit copied to clipboard

Feature request/discussion: make operations composable

Open jams2 opened this issue 2 years ago • 1 comments

Is it useful to make operations composable? There may be benefits both to performance and developer experience.

Scenario

Consider the following scenario: I make the (questionable) decision to record purchase orders for my frobnicator business in a StreamField.

class PurchaseOrder(models.Model):
    products = StreamField([("product_id", IntegerBlock())])

This represents a customer's purchase: products is a stream of frobnicator IDs, which are Ints. An example order's JSON might look like the following:

[
    {"type": "product_id", "value": 1, "id": "block-1"},
    {"type": "product_id", "value": 1, "id": "block-2"},
    {"type": "product_id", "value": 7, "id": "block-3"}
]

Schema change

Upon deploying the system to production and taking many orders, I realize that my product IDs need to be alpha-numeric (so their type should be String, not Int) and I can remove some redundant data by storing a quantity field with a product_id, rather than storing the product_id multiple times. I change my block definition to:

class PurchaseOrder(models.Model):
    products = StreamField(
        [
            (
                "product",
                StructBlock(
                    [
                        ("product_id", CharBlock()),
                        ("quantity", IntegerBlock()),
                    ]
                ),
            )
        ]
    )

I decide that legacy product IDs should be cast to Strings and have "-LEGACY" appended to them, and that for the initial migration I don't care about folding multiples of the same product into a single item. The above example order should be transformed to the following.

[
    {"type": "product", "value": {"product_id": "1-LEGACY", "quantity": 1}, "id": "block-1"},
    {"type": "product", "value": {"product_id": "1-LEGACY", "quantity": 1}, "id": "block-2"},
    {"type": "product", "value": {"product_id": "7-LEGACY", "quantity": 1}, "id": "block-3"}
]

Transforming the data

The following operations need to be applied in a data migration:

  1. transform product_id values from Ints to Strings (I believe this would be handled automatically, but not sure - for the sake of example let's assume not);
  2. append the "-LEGACY" suffix to all product_id values; and
  3. transform stream children to struct blocks (StreamChildrenToStructBlockOperation);

The ordering could change: 3 -> 1 -> 2 would achieve the same result.

We will need a custom operation for steps 2 and 3, which I define the following helper for:

class TransformBlockValueOperation(BaseBlockOperation):
    def __init__(self, transformer):
        super().__init__()
        self.transformer = transformer

    def apply(self, block_value):
        return self.transformer(block_value)

We will then need to define a migration:

class Migration(migrations.Migration):
    operations = [
        MigrateStreamData(
            app_name="frobnicators",
            model_name="PurchaseOrder",
            field_name="products",
            operations_and_block_paths=[
                (TransformBlockValueOperation(str), "product_id"),
                (
                    TransformBlockValueOperation(lambda x: x + "-LEGACY"),
                    "product_id",
                ),
                (StreamChildrenToStructBlockOperation("product_id", "product"), ""),
            ],
        )
    ]

Combining operations

The two TransformBlockValueOperation operations could of course be trivially combined into a single top-level function, but for the sake of example I have layed them out individually.

Under the current implementation, performing these three operations requires traversing the stream 3 times, where that could be trivially reduced to 2, and perhaps even to a single traversal. Can we define combinators on operations that will allow developers to create complex operations that only require traversing a stream once?

Consider the following definition:

class CombinedOperation:
    def __init__(self, first, then):
        self.first = first
        self.then = then

    def apply(self, value):
        # It might be nice to alias BaseBlockOperation.__call__ to apply, but this
        # is quite readable anyway.
        return self.then.apply(self.first.apply(value))

...and an embellished BaseBlockOperation:

class BaseBlockOperation:
    ...

    def __add__(self, other):
        return CombinedOperation(self, other)

With these, the above data migration could be changed to:

 
class Migration(migrations.Migration):
    operations = [
        MigrateStreamData(
            app_name="frobnicators",
            model_name="PurchaseOrder",
            field_name="products",
            operations_and_block_paths=[
                (StreamChildrenToStructBlockOperation("product_id", "product"), ""),
                (
                    TransformBlockValueOperation(str)
                    + TransformBlockValueOperation(lambda x: x + "-LEGACY"),
                    "product.product_id",
                ),
            ],
        )
    ]

The use of the + operator isn't a recommendation - this could be a different operator, or a method named then or compose, for example.

Composition

This is basically function composition with extra steps. With sufficient care we may be able to create ways of composing operations such that arbitrary operations can be combined declaratively, and operate on a stream in a single traversal. Care must be taken as not all of our operations operate on the same types, so may not trivially compose.

We have the following members in our family of types (abstracting over some of the details - notably the type x value dict containers):

  • Stream - A non-homogeneous sequence of Struct, List, Atom, Stream
  • Struct - A record type, inhabited by Atom x List x Stream
  • List - [Atom] | [Struct] | [Stream]
  • Atom - str | int | float | Decimal | bool (the basic types that back FieldBlocks)

Extra - aggregating multiple orders of the same product

Consider that we want to reduce redundancy in our PurchaseOrders, and fold repeated products together, summing the quantities. This would post-compose with StreamChildrenToStructBlockOperation, as:

  • our new operation's application would have the type Stream (Struct String Int) -> Stream (Struct String Int)
  • the application of StreamChildrenToStructBlockOperation has the type Stream Int -> Stream (Struct String Int)
from collections import Counter


class AggregateProductQuantities(BaseBlockOperation):
    def __init__(self, child_type):
        self.child_type = child_type

    def apply(self, stream):
        quantities = Counter(x["value"]["product_id"] for x in stream)
        return [
            {"type": self.child_type, "value": {"product_id": p, "quantity": q}}
            for p, q in quantities.items()
        ]
 
class Migration(migrations.Migration):
    operations = [
        MigrateStreamData(
            app_name="frobnicators",
            model_name="PurchaseOrder",
            field_name="products",
            operations_and_block_paths=[
                (
                    TransformBlockValueOperation(str)
                    + TransformBlockValueOperation(lambda x: x + "-LEGACY"),
                    "product.product_id",
                ),
                (
		    StreamChildrenToStructBlockOperation("product_id", "product")
                    + AggregateProductQuantities("product"),
		    "",
	        ),

            ],
        )
    ]

jams2 avatar Nov 29 '22 19:11 jams2

A question was asked in the Wagtail slack (in #support) about a migration for moving a block value from one streamfield to another - this could potentially be a use case for the composition of operations, although it might require moving the field name to be an attribute of the operation, rather than the migration.

jams2 avatar Mar 21 '23 15:03 jams2