incubator-wayang icon indicating copy to clipboard operation
incubator-wayang copied to clipboard

validate if is valite for several output

Open github-actions[bot] opened this issue 2 years ago • 0 comments

validate if is valite for several output

https://github.com/apache/incubator-wayang/blob/4cc0bfdca06dda171b661822daae5fa438d9d475/python/src/pywy/platforms/python/execution.py#L81


#
#  Licensed to the Apache Software Foundation (ASF) under one or more
#  contributor license agreements.  See the NOTICE file distributed with
#  this work for additional information regarding copyright ownership.
#  The ASF licenses this file to You under the Apache License, Version 2.0
#  (the "License"); you may not use this file except in compliance with
#  the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from pywy.graph.types import WGraphOfOperator, NodeOperator
from pywy.core import ChannelDescriptor
from pywy.core import Executor
from pywy.core import PywyPlan
from pywy.operators import TextFileSource
from pywy.platforms.python.channels import PY_ITERATOR_CHANNEL_DESCRIPTOR
from pywy.platforms.python.operator.py_execution_operator import PyExecutionOperator


class PyExecutor(Executor):

    def __init__(self):
        super(PyExecutor, self).__init__()

    def execute(self, plan):
        pywyPlan: PywyPlan = plan
        graph = WGraphOfOperator(pywyPlan.sinks)

        # TODO get this information by a configuration and ideally by the context
        descriptor_default: ChannelDescriptor = PY_ITERATOR_CHANNEL_DESCRIPTOR
        files_pool = []

        def execute(op_current: NodeOperator, op_next: NodeOperator):
            if op_current is None:
                return

            py_current: PyExecutionOperator = op_current.current
            if py_current.outputs == 0:
                py_current.execute(py_current.inputChannel, [])
                return

            if op_next is None:
                return
            py_next: PyExecutionOperator = op_next.current
            outputs = py_current.get_output_channeldescriptors()
            inputs = py_next.get_input_channeldescriptors()

            intersect = outputs.intersection(inputs)
            if len(intersect) == 0:
                raise Exception(
                    "The operator(A) {} can't connect with (B) {}, "
                    "because the output of (A) is {} and the input of (B) is {} ".format(
                        py_current,
                        py_next,
                        outputs,
                        inputs
                    )
                )

            if len(intersect) > 1:
                if descriptor_default is None:
                    raise Exception(
                        "The interaction between the operator (A) {} and (B) {}, "
                        "can't be decided because are several channel availables {}".format(
                            py_current,
                            py_next,
                            intersect
                        )
                    )
                descriptor = descriptor_default
            else:
                descriptor = intersect.pop()

            # TODO validate if is valite for several output
            py_current.outputChannel[0] = descriptor.create_instance()

            py_current.execute(py_current.inputChannel, py_current.outputChannel)

            py_next.inputChannel = py_current.outputChannel

            if isinstance(py_current, TextFileSource):
                files_pool.append(py_current.outputChannel[0].provide_iterable())

        graph.traversal(graph.starting_nodes, execute)
        # close the files used during the execution
        for f in files_pool:
            f.close()

1ad962a42f5e49779547a717aa0d44a67ac01d12

github-actions[bot] avatar Apr 08 '22 18:04 github-actions[bot]