incubator-wayang icon indicating copy to clipboard operation
incubator-wayang copied to clipboard

add the logic to execute the plan

Open github-actions[bot] opened this issue 2 years ago • 0 comments

add the logic to execute the plan

https://github.com/apache/incubator-wayang/blob/4cc0bfdca06dda171b661822daae5fa438d9d475/python/src/pywy/dataquanta.py#L103


#
#  Licensed to the Apache Software Foundation (ASF) under one or more
#  contributor license agreements.  See the NOTICE file distributed with
#  this work for additional information regarding copyright ownership.
#  The ASF licenses this file to You under the Apache License, Version 2.0
#  (the "License"); you may not use this file except in compliance with
#  the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from typing import Set, List, cast

from pywy.core import Translator
from pywy.operators.base import PO_T
from pywy.types import (GenericTco, Predicate, Function, FlatmapFunction, IterableOut, T, In, Out)
from pywy.operators import *
from pywy.core import PywyPlan
from pywy.core import Plugin


class WayangContext:
    """
    This is the entry point for users to work with Wayang.
    """
    plugins: Set[Plugin]

    def __init__(self):
        self.plugins = set()

    """
    add a :class:`Plugin` to the :class:`Context`
    """

    def register(self, *plugins: Plugin):
        for p in plugins:
            self.plugins.add(p)
        return self

    """
    remove a :class:`Plugin` from the :class:`Context`
    """

    def unregister(self, *plugins: Plugin):
        for p in plugins:
            self.plugins.remove(p)
        return self

    def textfile(self, file_path: str) -> 'DataQuanta[str]':
        return DataQuanta(self, TextFileSource(file_path))

    def __str__(self):
        return "Plugins: {}".format(str(self.plugins))

    def __repr__(self):
        return self.__str__()


class DataQuanta(GenericTco):
    """
    Represents an intermediate result/data flow edge in a [[WayangPlan]].
    """
    context: WayangContext

    def __init__(self, context: WayangContext, operator: PywyOperator):
        self.operator = operator
        self.context = context

    def filter(self: "DataQuanta[T]", p: Predicate) -> "DataQuanta[T]":
        return DataQuanta(self.context, self._connect(FilterOperator(p)))

    def map(self: "DataQuanta[In]", f: Function) -> "DataQuanta[Out]":
        return DataQuanta(self.context, self._connect(MapOperator(f)))

    def flatmap(self: "DataQuanta[In]", f: FlatmapFunction) -> "DataQuanta[IterableOut]":
        return DataQuanta(self.context, self._connect(FlatmapOperator(f)))

    def store_textfile(self: "DataQuanta[In]", path: str, end_line: str = None):
        last: List[SinkOperator] = [
            cast(
                SinkOperator,
                self._connect(
                    TextFileSink(
                        path,
                        self.operator.outputSlot[0],
                        end_line
                    )
                )
            )
        ]
        plan = PywyPlan(self.context.plugins, last)

        plug = self.context.plugins.pop()
        trs: Translator = Translator(plug, plan)
        new_plan = trs.translate()
        plug.get_executor().execute(new_plan)
        # TODO add the logic to execute the plan

    def _connect(self, op: PO_T, port_op: int = 0) -> PywyOperator:
        self.operator.connect(0, op, port_op)
        return op

    def __str__(self):
        return str(self.operator)

    def __repr__(self):
        return self.__str__()

4a29f0081231beb08b25fb64d6ece42227be765c

github-actions[bot] avatar Apr 08 '22 18:04 github-actions[bot]