关于ProtostuffSerializer 的使用问题
https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java
/**
* Avoid re applying buffer space every time serialization
*/
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
@Override
public byte[] serialize(Object obj) {
Class<?> clazz = obj.getClass();
Schema schema = RuntimeSchema.getSchema(clazz);
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
} finally {
BUFFER.clear();
}
return bytes;
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:
private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>();
private static <T>Schema<T> getSchema(Class<?> clazz) {
if (schemaMap.containsKey(clazz)) {
return (Schema<T>)schemaMap.get(clazz);
}
Schema schema = RuntimeSchema.getSchema(clazz);
// 双重检查
if (schema != null) {
schemaMap.put(clazz, schema);
}
return schema;
}
@Override
public byte[] serialize(Object object) {
Class<?> clazz = object.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema schema = getSchema(clazz);
return ProtostuffIOUtil.toByteArray(object, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java
/** * Avoid re applying buffer space every time serialization */ private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); @Override public byte[] serialize(Object obj) { Class<?> clazz = obj.getClass(); Schema schema = RuntimeSchema.getSchema(clazz); byte[] bytes; try { bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER); } finally { BUFFER.clear(); } return bytes; } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; }1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:
private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>(); private static <T>Schema<T> getSchema(Class<?> clazz) { if (schemaMap.containsKey(clazz)) { return (Schema<T>)schemaMap.get(clazz); } Schema schema = RuntimeSchema.getSchema(clazz); // 双重检查 if (schema != null) { schemaMap.put(clazz, schema); } return schema; } @Override public byte[] serialize(Object object) { Class<?> clazz = object.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(object, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; }
老哥 可以提交一个pr么?
https://github.com/Snailclimb/guide-rpc-framework/blob/master/rpc-framework-simple/src/main/java/github/javaguide/serialize/protostuff/ProtostuffSerializer.java
/** * Avoid re applying buffer space every time serialization */ private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); @Override public byte[] serialize(Object obj) { Class<?> clazz = obj.getClass(); Schema schema = RuntimeSchema.getSchema(clazz); byte[] bytes; try { bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER); } finally { BUFFER.clear(); } return bytes; } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = RuntimeSchema.getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; }1,其中LinkedBuffer 为静态,在并发情况下其中一个线程执行这个方法BUFFER.clear(),其他的是否会报错 2,Schema schema 这个可以用一个ConcurrentHashMap 存储起来,性能会更好 下面是我修改的代码:
private static Map<Class<?>, Schema<?>> schemaMap = new ConcurrentHashMap<>(); private static <T>Schema<T> getSchema(Class<?> clazz) { if (schemaMap.containsKey(clazz)) { return (Schema<T>)schemaMap.get(clazz); } Schema schema = RuntimeSchema.getSchema(clazz); // 双重检查 if (schema != null) { schemaMap.put(clazz, schema); } return schema; } @Override public byte[] serialize(Object object) { Class<?> clazz = object.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(object, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } @Override public <T> T deserialize(byte[] bytes, Class<T> clazz) { Schema<T> schema = getSchema(clazz); T obj = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, obj, schema); return obj; }老哥 可以提交一个pr么?
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
guide哥,我想问下,这个buffer不会有线程安全问题么? 我看他内部好像没能保证线程安全
想到了同样的问题,这里肯定是有线程安全问题的
该问题我在压测负载均衡功能时也出现了
- 在多线程场景下,如果只有一个provider,那么所有并发请求都会交由同一个Channel进行处理,而Channel是天然线程安全的,一个Channel只会对应唯一一个线程进行执行,那么使用ProtostuffSerializer进行序列化时不会有线程安全问题
- 当有多个provider时,并发请求进来就会拿到多个Channel,从而多个Channel同时进行序列化,ProtostuffSerializer会出现线程安全的问题,如
Buffer previously used and had not been reset.
对此,我的解决办法如下
public class ProtostuffSerializer implements Serializer {
/**
* Avoid re applying buffer space every time serialization
*
*/
// private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); 线程不安全
private static final LinkedBuffer[] BUFFERS = new LinkedBuffer[]{LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE),LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE)};
@Override
public byte[] serialize(Object obj) {
System.out.println(Thread.currentThread().getId());
Class<?> clazz = obj.getClass();
Schema schema = RuntimeSchema.getSchema(clazz);
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFERS[(int) (Thread.currentThread().getId()%2)]);
} finally {
BUFFERS[(int) (Thread.currentThread().getId()%2)].clear();
}
return bytes;
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.getSchema(clazz);
T obj = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, obj, schema);
return obj;
}
}
双重检查一样会有线程安全问题,建议修改为computeIfAbsent()方法,会保证本次操作是线程安全的
