incubator-wayang icon indicating copy to clipboard operation
incubator-wayang copied to clipboard

use config buffer size

Open github-actions[bot] opened this issue 2 years ago • 0 comments

use config buffer size

https://github.com/apache/incubator-wayang/blob/d859a97d43a8c3c3c964150eaff8f3833e41ea75/wayang-api/wayang-api-python/src/main/java/org/apache/wayang/api/python/executor/ProcessFeeder.java#L63


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.api.python.executor;

import org.apache.wayang.api.python.function.PythonCode;
import org.apache.wayang.api.python.function.PythonUDF;
import org.apache.wayang.core.api.exception.WayangException;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;

public class ProcessFeeder<Input, Output> {

    private Socket socket;
    private PythonUDF<Input, Output> udf;
    private PythonCode serializedUDF;
    private Iterable<Input> input;

    //TODO add to a config file
    int END_OF_DATA_SECTION = -1;
    int NULL = -5;

    public ProcessFeeder(
            Socket socket,
            PythonUDF<Input, Output> udf,
            PythonCode serializedUDF,
            Iterable<Input> input){

        if(input == null) throw new WayangException("Nothing to process with Python API");

        this.socket = socket;
        this.udf = udf;
        this.serializedUDF = serializedUDF;
        this.input = input;

    }

    public void send(){

        try{
            //TODO use config buffer size
            int BUFFER_SIZE = 8 * 1024;

            BufferedOutputStream stream = new BufferedOutputStream(socket.getOutputStream(), BUFFER_SIZE);
            DataOutputStream dataOut = new DataOutputStream(stream);

            writeUDF(serializedUDF, dataOut);
            this.writeIteratorToStream(input.iterator(), dataOut);
            dataOut.writeInt(END_OF_DATA_SECTION);
            dataOut.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUDF(PythonCode serializedUDF, DataOutputStream dataOut){

        //write(serializedUDF.toByteArray(), dataOut);
        writeBytes(serializedUDF.toByteArray(), dataOut);
        System.out.println("UDF written");

    }

    public void writeIteratorToStream(Iterator<Input> iter, DataOutputStream dataOut)
        throws IOException {

        System.out.println("iterator being send");
        int buffer = 0;
        for (Iterator<Input> it = iter; it.hasNext(); ) {
            Input elem = it.next();
            //System.out.println(elem.toString());
            write(elem, dataOut);
        }
    }

    /*TODO Missing case PortableDataStream */
    public void write(Object obj, DataOutputStream dataOut){
        try {

            if(obj == null)
                dataOut.writeInt(this.NULL);

            /**
             * Byte Array cases
             */
            else if (obj instanceof Byte[] || obj instanceof byte[]) {
                System.out.println("Writing Bytes");
                writeBytes(obj, dataOut);
            }
            /**
             * String case
             * */
            else if (obj instanceof String)
                writeUTF((String) obj, dataOut);

            /**
             * Key, Value case
             * */
            else if (obj instanceof Map.Entry)
                writeKeyValue((Map.Entry) obj, dataOut);

            else{
                throw new WayangException("Unexpected element type " + obj.getClass());
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeBytes(Object obj, DataOutputStream dataOut){

        try{

            if (obj instanceof Byte[]) {

                int length = ((Byte[]) obj).length;

                byte[] bytes = new byte[length];
                int j=0;

                // Unboxing Byte values. (Byte[] to byte[])
                for(Byte b: ((Byte[]) obj))
                    bytes[j++] = b.byteValue();

                dataOut.writeInt(length);
                dataOut.write(bytes);

            } else if (obj instanceof byte[]) {

                dataOut.writeInt(((byte[]) obj).length);
                dataOut.write(((byte[]) obj));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeUTF(String str, DataOutputStream dataOut){

        byte[] bytes = str.getBytes(StandardCharsets.UTF_8);

        try {

            dataOut.writeInt(bytes.length);
            dataOut.write(bytes);
        } catch (SocketException e){

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeKeyValue(Map.Entry obj, DataOutputStream dataOut){

        write(obj.getKey(), dataOut);
        write(obj.getValue(), dataOut);
    }

}

84ea6d0fccdb4ad32b164d5ac8df30be52535b1b

github-actions[bot] avatar Jun 19 '22 22:06 github-actions[bot]