dffml
dffml copied to clipboard
df: Rust implementation
The Data Flow Facilitator portion of DFFML would be great to port to other languages. As it's just a helper to use a data flow architecture. I hear rust has some async
like support these days.
APIs should be kept the same. If needed we'll change the python APIs to match so that it's all one API.
- References
- https://doc.rust-lang.org/stable/book/
- https://github.com/rust-lang/rustlings/tree/main/exercises
- https://rust-lang.github.io/async-book/
- https://github.com/apache/incubator-opendal#rust
- https://github.com/ucan-wg/rs-ucan
- https://github.com/rust-unofficial/awesome-rust
- https://seedwing.io
Data Flow Programming
Explainer on what data flow programming is and how it works. Alice thinks in parallel.
-
Rolling Alice: Architecting Alice: She's Arriving When?
- The following is reproduced from the above WIP tutorial on dataflows (originally #1279)
- It provides a minimal example of concurrent download in Python.
- The full execution loop exists within
dffml/df/memory.py
- The following is reproduced from the above WIP tutorial on dataflows (originally #1279)
-
Tutorials on DFFML Operations / DataFlows
- https://intel.github.io/dffml/main/examples/shouldi.html
- https://intel.github.io/dffml/main/examples/dataflows.html
- https://github.com/intel/dffml/blob/alice/entities/alice/CONTRIBUTING.rst#finding-data-types-to-work-with
-
Talk snippit explaining above tutorial
- https://youtu.be/D9puJiKKKS8?t=873
-
Concuptual docs on data flow execution
- https://github.com/intel/dffml/blob/alice/docs/about.rst#what-is-key-objective-of-dataflows
- https://intel.github.io/dffml/main/concepts/dataflow.html
-
Misc. References
- https://en.wikipedia.org/wiki/Dataflow_programming
- https://www.gamedeveloper.com/programming/tips-on-writing-code-for-data-oriented-design
- https://www.youtube.com/watch?v=aPh4Z3SioB8
We need to come up with serveral metrics to track and plot throughout. We also need to plot in relation to other metrics for tradeoff analysis.
We could also make this like a choose your own adventure style tutorial, if you want to do it with threads, here's your output metrics. We can later show that we're getting these metrics by putting all the steps into a dataflow and getting the metrics out by running them. We could then show how we can ask the orchestrator to optimize for speed, memory, etc. Then add in how you can have the orchestrator take those optimization constriants from dynamic conditions such as how much memory is on the machine you are running on, or do you have access to a k8s cluster. Also talked about power consumption vs. speed trade off for server vs. desktop. Could add in edge constraints like network latency.
Will need to add in metrics API and use in various places in orchestrators and expose to operations to report out. This will be the same APIs we'll use for stub operations to estimate time to completion, etc.
-
Make sure to measure speed and memory useage with ProcessPoolExecutor ThreadPoolExecutor. Make sure we take into accout memory from all processes.
-
Start to finish speed
- Plot with number of requests made
-
Memory consumed
- Plot with number of requests made
This could be done as an IPython notebook.
-
Show basic downloader code
- Observe speed bottleneck due to download in series
-
Parallelize download code
-
Observe increase in speed
-
Observe error handling issues
-
-
Add in need to call out via subprocess
- Observe subprocess issues
-
Move to event loop
-
Observe increase in speed (? Not sure on this yet)
-
Observe successful error handling
-
Observe need to track fine grained details
-
-
Move to event based implemention with director (orchestrator, this file minus prev pointers in Base Event)
-
Observe visablity into each event state of each request
-
Observe lack of visablity into chain of events
-
-
Add prev pointers
- Open Liniage
-
Move to data flow based implemention
-
Demo full DFFML data flow using execution on k8s
- Use k8s playground as target environment
"""
License: Public Domain
Source: https://gist.github.com/f6fe1a39bd4e66e7d0c6e7802872d3b5
"""
import os
import asyncio
import pathlib
import contextlib
import dataclasses
import urllib.parse
import concurrent.futures
import bs4
import dffml
import aiohttp
DOWNLOAD_PATH = pathlib.Path(__file__).parent.joinpath("downloads")
if not DOWNLOAD_PATH.is_dir():
DOWNLOAD_PATH.mkdir()
def parse_bs4(html_doc):
return bs4.BeautifulSoup(html_doc, "html.parser")
def mkurl(endpoint, **kwargs):
url = urllib.parse.urlparse(endpoint)
url = url._replace(**kwargs)
return urllib.parse.urlunparse(url)
@dataclasses.dataclass
class SiteContext:
username: str
password: str
endpoint: str = "https://site.com/"
headers: dict = dataclasses.field(
default_factory=lambda: {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101 Firefox/91.0",
"Connection": "keep-alive",
}
)
def mkurl(self, **kwargs):
return mkurl(self.endpoint, **kwargs)
async def main_with_stacks(ctx: SiteContext, loop, stack, astack):
# Create thread pool for CPU bound tasks
pool = stack.enter_context(concurrent.futures.ThreadPoolExecutor())
# Create aiohttp client session which pools TCP connections for reuse
session = await astack.enter_async_context(
aiohttp.ClientSession(headers=ctx.headers)
)
# Get initial web page
async with session.get(ctx.endpoint) as response:
# Read home page
initial_page_body_text = await response.read()
# Parse home page
initial_page_body_soup = await loop.run_in_executor(
pool, parse_bs4, initial_page_body_text
)
# Find authenticity_token (This is the CSRF token)
authenticity_token = initial_page_body_soup.find(
"input",
{"name": "authenticity_token"},
)
authenticity_token = authenticity_token.get("value")
# Login. Server sends cookie which is used to authenticate us in subsequent
# requests. The cookie is stored in the ClientSession.
# Cookie might be only sent if something like "rememeber" is sent.
# Use chrome devtools Network tab to see login request data, make sure to
# check "Preserve Log" before you trigger the login page. Incognito window
# can be helpful for getting a logged-out session.
await session.post(
ctx.mkurl(path="/login"),
data={
"authenticity_token": authenticity_token,
"username": ctx.username,
"password": ctx.password,
"remember": "on",
},
)
async def main():
# Grab loop
loop = asyncio.get_event_loop()
# Create a context using our credentials
ctx = SiteContext(
username=os.environ["USERNAME"],
password=os.environ["PASSWORD"],
)
with contextlib.ExitStack() as stack:
async with contextlib.AsyncExitStack() as astack:
await main_with_stacks(ctx, loop, stack, astack)
if __name__ == "__main__":
asyncio.run(main())
DFFML's Current Working Data Flow Execution Model
graph TD
subgraph dataflow_execution[Data Flow Execution]
inputs[New Inputs]
operations[Operations]
opimps[Operation Implementations]
ictx[Input Network]
opctx[Operation Network]
opimpctx[Operation Implementation Network]
rctx[Redundency Checker]
lctx[Lock Network]
opctx_operations[Determine which Operations may have new parameter sets]
ictx_gather_inputs[Generate Operation parameter set pairs]
opimpctx_dispatch[Dispatch operation for running]
opimpctx_run_operation[Run an operation using given parameter set as inputs]
inputs --> ictx
operations -->|Register With| opctx
opimps -->|Register With| opimpctx
ictx --> opctx_operations
opctx --> opctx_operations
opctx_operations --> ictx_gather_inputs
ictx_gather_inputs --> rctx
rctx --> |If operation has not been run with given parameter set before| opimpctx_dispatch
opimpctx_dispatch --> opimpctx
opimpctx --> lctx
lctx --> |Lock any inputs that can't be used at the same time| opimpctx_run_operation
opimpctx_run_operation --> |Outputs of Operation become inputs to other operations| inputs
end