streamz
streamz copied to clipboard
Fail to extend a Stream with new methods
Inheritance on a Stream seems to be broken.
The following example fails with 'map' object has no attribute 'draw'
:
import streamz
class MyStream(streamz.Stream) :
def draw(self) :
print('called draw')
@MyStream.register_api()
class draw(MyStream) :
def __init__(self, *args, **kwargs) :
print('called draw')
s = MyStream()
s.map(lambda x : x + 5).draw()
s.emit(99)
I couldn't find a class factory in the source and it looks like all the nodes are instantiated directlly with Stream
when it should use the type of the main instance.
It would work with @Stream.register_api
- will that suffice?
@martindurant, the issue with @Stream.register_api
is that it will mutate the original Stream class
which will impact all the instances everywere else. So it doesn't fullfill the requirement for extending a stream. For instance, if I were to override the sink
method, it would mess/break the other streams. What I'm expecting is basic polymorphism.
It's called subclassing in the pandas
documentation :
https://pandas.pydata.org/docs/development/extending.html#subclassing-pandas-data-structures
I think I see what you are trying to do, but you face the following problem: map
is a method on Stream
(and defined by the map
class), so that when you call .map
, there is no easy way to know that this was called from within a subclass - map
is not a child of your custom class and cannot be made so dynamically.
That leaves you with the two obvious paths, as you point out: come up with unique names for all node types, or allow name clashes and understand that you may clobber for all streams. For your example, I'm not totally sure how you expect the method draw
to be called, since it doesn't actually take any events. All existing nodes are implemented at the update
(incoming events) and emit
(outgoing events) points.
However, another possibility, is maybe what you are actually after. You can use Stream.connect
to tie together nodes without using the register_api route. For example, the following are equivalent, where the second and third versions avoid using the automatically created method.
s = Stream()
m = s.map(func)
or
s = Stream()
m = streamz.core.map(s, func)
or
s = Stream()
m = streamz.core.map(None, func)
s.connect(m)
map is a method on Stream (and defined by the map class), so that when you call .map, there is no easy way to know that this was called from within a subclass
map
is a wrapped function. The class/subclass is available at the time it's registered and called :
https://github.com/python-streamz/streamz/blob/745343193fd71793569be25b882b61ac43f93b55/streamz/core.py#L161
map is not a child of your custom class and cannot be made so dynamically.
The following Stream.register_api()
would adds the expected inheritance at registration :
@classmethod
def register_api(cls, func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
clazz = type(func.__name__, (func, type(self)), {})
node = clazz(self, *args, **kwargs)
return node
setattr(Stream, func.__name__, wrapped)
For your example, I'm not totally sure how you expect the method draw to be called, since it doesn't actually take any events.
This is just an example to demonstrate the issue with inheritance. Don't read too much into it.
You can use Stream.connect to tie together nodes without using the register_api route.
This library threats it's methods as plugings but doesn't keep track of them.
So I'd have to rewrite all the methods that are registered by Stream.register_api
to connect the nodes. It kind of defeat the purpose of inheritance.
I haven't had a chance yet to understand what difference your alternate decorator has, but please do propose a PR to demonstrate it! I daresay there are others who have thought along these lines and not known how to proceed.