guide-rpc-framework icon indicating copy to clipboard operation
guide-rpc-framework copied to clipboard

关于ProtostuffSerializer 的使用问题

Open victordjj opened this issue 5 years ago • 5 comments

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

victordjj avatar Nov 29 '20 13:11 victordjj

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

老哥 可以提交一个pr么?

Snailclimb avatar Dec 02 '20 16:12 Snailclimb

https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java

/**
 * Avoid re applying buffer space every time serialization
 */
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

@Override
public byte[] serialize(Object obj) {
    Class<?> clazz = obj.getClass();
    Schema schema = RuntimeSchema.getSchema(clazz);
    byte[] bytes;
    try {
        bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
    } finally {
        BUFFER.clear();
    }
    return bytes;
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = RuntimeSchema.getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:

private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
    if (schemaMap.containsKey(clazz)) {
        return (Schema<T>)schemaMap.get(clazz);
    }
    Schema schema = RuntimeSchema.getSchema(clazz);
    // 双重检查
    if (schema != null) {
        schemaMap.put(clazz, schema);
    }
    return schema;
}

@Override
public byte[] serialize(Object object) {
    Class<?> clazz = object.getClass();
    LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    try {
        Schema schema = getSchema(clazz);
        return ProtostuffIOUtil.toByteArray(object, schema, buffer);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    } finally {
        buffer.clear();
    }
}

@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
    Schema<T> schema = getSchema(clazz);
    T obj = schema.newMessage();
    ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
    return obj;
}

老哥 可以提交一个pr么?

private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);

guide哥,我想问下,这个buffer不会有线程安全问题么? 我看他内部好像没能保证线程安全

ChoKhoOu avatar Jan 06 '21 06:01 ChoKhoOu

想到了同样的问题,这里肯定是有线程安全问题的

microhardsmith avatar May 06 '21 06:05 microhardsmith

该问题我在压测负载均衡功能时也出现了

  1. 在多线程场景下,如果只有一个provider,那么所有并发请求都会交由同一个Channel进行处理,而Channel是天然线程安全的,一个Channel只会对应唯一一个线程进行执行,那么使用ProtostuffSerializer进行序列化时不会有线程安全问题
  2. 当有多个provider时,并发请求进来就会拿到多个Channel,从而多个Channel同时进行序列化,ProtostuffSerializer会出现线程安全的问题,如Buffer previously used and had not been reset.

对此,我的解决办法如下

public class ProtostuffSerializer implements Serializer {

    /**
     * Avoid re applying buffer space every time serialization
     *
     */
//    private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); 线程不安全
    private static final LinkedBuffer[] BUFFERS = new LinkedBuffer[]{LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE),LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)};

    @Override
    public byte[] serialize(Object obj) {
        System.out.println(Thread.currentThread().getId());
        Class<?> clazz = obj.getClass();
        Schema schema = RuntimeSchema.getSchema(clazz);
        byte[] bytes;
        try {
            bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFERS[(int) (Thread.currentThread().getId()%2)]);
        } finally {
            BUFFERS[(int) (Thread.currentThread().getId()%2)].clear();
        }
        return bytes;
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
        Schema<T> schema = RuntimeSchema.getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
        return obj;
    }
}

huangxiao1234 avatar Aug 10 '21 09:08 huangxiao1234

双重检查一样会有线程安全问题,建议修改为computeIfAbsent()方法,会保证本次操作是线程安全的 1654051649(1)

zhaojiale1213 avatar Jun 01 '22 02:06 zhaojiale1213