BitSwanPump icon indicating copy to clipboard operation
BitSwanPump copied to clipboard

BitSwan Pump: A real-time stream processor for Python 3.5+

BSPump: A real-time stream processor for Python 3.5+

.. image:: https://readthedocs.org/projects/bspump/badge/?version=latest :target: https://docs.libertyaces.com/?badge=latest :alt: Documentation Status

.. image:: https://codecov.io/gh/LibertyAces/BitSwanPump/branch/master/graph/badge.svg?sanitize=true :alt: Code coverage :target: https://codecov.io/gh/LibertyAces/BitSwanPump

.. image:: https://badges.gitter.im/TeskaLabs/bspump.svg :alt: Join the chat at https://gitter.im/TeskaLabs/bspump :target: https://gitter.im/TeskaLabs/bspump?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge

Documentation

https://bitswanpump.readthedocs.io/en/latest/index.html

Principles

  • Write once, use many times
  • Everything is a stream
  • Schema-less
  • Kappa architecture
  • Real-time
  • High performance
  • Simple to use and well documented, so anyone can write their own stream processor
  • Asynchronous via Python 3.5+ async/await and asyncio
  • Event driven Architecture <https://en.wikipedia.org/wiki/Event-driven_architecture>_ / Reactor pattern <https://en.wikipedia.org/wiki/Reactor_pattern>_
  • Single-threaded core but compatible with threads
  • Compatible with pypy <http://pypy.org>_, Just-In-Time compiler capable of boosting Python code performace by more than 5x
  • Good citizen of the Python ecosystem
  • Modularized

Stream processor example

.. code:: python

#!/usr/bin/env python3
import bspump
import bspump.socket
import bspump.common
import bspump.elasticsearch

class MyPipeline(bspump.Pipeline):
    def __init__(self, app):
        super().__init__(app)
        self.build(
            bspump.socket.TCPStreamSource(app, self),
            bspump.common.JSONParserProcessor(app, self),
            bspump.elasticsearch.ElasticSearchSink(app, self, "ESConnection")
        )


if __name__ == '__main__':
    app = bspump.BSPumpApplication()
    svc = app.get_service("bspump.PumpService")
    svc.add_connection(bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection"))
    svc.add_pipeline(MyPipeline(app))
    app.run()

Video tutorial ^^^^^^^^^^^^^^

.. image:: http://img.youtube.com/vi/QvjiPxO4w6w/0.jpg :target: https://www.youtube.com/watch?v=QvjiPxO4w6w&list=PLb0LvCJCZKt_1QcQwpJXqsm-AY_ty4udo

Build

Docker build ^^^^^^^^^^^^ Dockerfile and instructions are in separate repository <https://github.com/LibertyAces/docker-bspump/>_.

PyPI release ^^^^^^^^^^^^ Releases are happening from a git tag (format: vYY.MM) git tag -a v19.07

Following the PyPI packaging <https://packaging.python.org/tutorials/packaging-projects/#generating-distribution-archives>, generate distribution package <https://packaging.python.org/glossary/#term-distribution-package> and upload it <https://packaging.python.org/tutorials/packaging-projects/#uploading-the-distribution-archives>_ using following command python -m twine upload dist/*

Blank application setup

You can clone the blank application from its own repository <https://github.com/LibertyAces/BitSwanTelco-BlankApp>_.

Available technologies

  • bspump.amqp AMQP/RabbitMQ connection, source and sink

  • bspump.avro Apache Avro file source and sink

  • bspump.common Common processors and parsers

  • bspump.elasticsearch ElasticSearch connection, source and sink

  • bspump.file File sources and sinks (plain files, JSON, CSV)

  • bspump.filter Content, Attribute and TimeDrift filters

  • bspump.http.client HTTP client source, WebSocket client sink

  • bspump.http.web HTTP server source and sink, WebSocket server source

  • bspump.influxdb InfluxDB connection and sink

  • bspump.kafka Kafka connection, source and sink

  • bspump.mail SMTP connection and sink

  • bspump.mongodb MongoDB connection and lookup

  • bspump.mysql MySQL connection, source and sink

  • bspump.parquet Apache Parquet file sink

  • bspump.postgresql PostgreSQL connection and sink

  • bspump.slack Slack connection and sink

  • bspump.socket TCP source, UDP source

  • bspump.trigger Opportunistic, PubSub and Periodic triggers

  • bspump.crypto Cryptography

  • bspump.declarative Declarative processors and expressions

    • Hashing: SHA224, SHA256, SHA384, SHA512, SHA1, MD5, BLAKE2b, BLAKE2s
    • Symmetric Encryption: AES 128, AES 192, AES 256
  • bspump.analyzer

    • Time Window analyzer
    • Session analyzer
    • Geographical analyzer
    • Time Drift analyzer
  • bspump.lookup

    • GeoIP Lookup
  • bspump.unittest

    • Interface for testing Processors / Pipelines
  • bspump.web Pump API endpoints for pipelines, lookups etc.

Google Sheet with technological compatiblity matrix: https://docs.google.com/spreadsheets/d/1L1DvSuHuhKUyZ3FEFxqEKNpSoamPH2Z1ZaFuHyageoI/edit?usp=sharing

High-level architecture

.. image:: ./doc/_static/bspump-architecture.png :alt: Schema of BSPump high-level achitecture

Unit test

.. code:: python

from unittest.mock import MagicMock
from bspump.unittest import ProcessorTestCase


class MyProcessorTestCase(ProcessorTestCase):

    def test_my_processor(self):

        # setup processor for test
        self.set_up_processor(my_project.processor.MyProcessor, "proc-arg", proc="key_arg")

        # mock methods to suit your needs on pipeline ..
        self.Pipeline.method = MagicMock()

        # .. or instance of processor
        self.Pipeline.Processor.method = MagicMock()

        output = self.execute(
            [(None, {'foo': 'bar'})]  # Context, event
        )

        # assert output
        self.assertEqual(
            [event for context, event in output],
            [{'FOO': 'BAR'}]
        )

        # asssert expected calls on `self.Pipeline.method` or `self.Pipeline.Processor.method`
        self.Pipeline.Processor.method.assert_called_with(**expected)

Running of unit tests

python3 -m unittest test

You can replace test with a location of your unit test module.

Licence

BSPump is open source software, available under the BSD 3-Clause License.