beam icon indicating copy to clipboard operation
beam copied to clipboard

sdks/python: enrich data with Milvus Search [Vector, Keyword, Hybrid]

Open mohamedawnallah opened this issue 6 months ago • 7 comments

Description

  • [x] Add MilvusSearchEnrichmentHandler
  • [x] Unit Test
  • [ ] Integration Test
  • [ ] Update Enrichment Docs
  • [x] Add milvus_enrichment_transform Jupyter Notebook Example

Part of #35046.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [x] Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • [ ] Update CHANGES.md with noteworthy changes.
  • [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

mohamedawnallah avatar Jun 09 '25 17:06 mohamedawnallah

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

github-actions[bot] avatar Jun 09 '25 19:06 github-actions[bot]

Thanks! It looks like there are a bunch of precommits failing with failures like:

<testcase classname="apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment" name="test_chunks_batching" time="0.010">
<error message="failed on setup with "docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))"">self = <docker.transport.unixconn.UnixHTTPConnectionPool object at 0x138666680> method = 'GET', url = '/version', body = None headers = {'User-Agent': 'docker-sdk-python/7.1.0', 'Accept-Encoding': 'gzip, deflate, zstd', 'Accept': '*/*', 'Connection': 'keep-alive'} retries = Retry(total=0, connect=None, read=False, redirect=None, status=None) redirect = False, assert_same_host = False timeout = Timeout(connect=60, read=60, total=None), pool_timeout = None release_conn = False, chunked = False, body_pos = None, preload_content = False decode_content = False, response_kw = {} parsed_url = Url(scheme=None, auth=None, host=None, port=None, path='/version', query=None, fragment=None) destination_scheme = None, conn = None, release_this_conn = True http_tunnel_required = False, err = None, clean_exit = False def urlopen( # type: ignore[override] self, method: str, url: str, body: _TYPE_BODY | None = None, headers: typing.Mapping[str, str] | None = None, retries: Retry | bool | int | None = None, redirect: bool = True, assert_same_host: bool = True, timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, pool_timeout: int | None = None, release_conn: bool | None = None, chunked: bool = False, body_pos: _TYPE_BODY_POSITION | None = None, preload_content: bool = True, decode_content: bool = True, **response_kw: typing.Any, ) -> BaseHTTPResponse: """ Get a connection from the pool and perform an HTTP request. This is the lowest level call for making a request, so you'll need to specify all the raw details. .. note:: More commonly, it's appropriate to use a convenience method such as :meth:`request`. .. note:: `release_conn` will only behave as expected if `preload_content=False` because we want to make `preload_content=False` the default behaviour someday soon without breaking backwards compatibility. :param method: HTTP request method (such as GET, POST, PUT, etc.) :param url: The URL to perform the request on. :param body: Data to send in the request body, either :class:`str`, :class:`bytes`, an iterable of :class:`str`/:class:`bytes`, or a file-like object. :param headers: Dictionary of custom headers to send, such as User-Agent, If-None-Match, etc. If None, pool headers are used. If provided, these headers completely replace any pool-specific headers. :param retries: Configure the number of retries to allow before raising a :class:`~urllib3.exceptions.MaxRetryError` exception. If ``None`` (default) will retry 3 times, see ``Retry.DEFAULT``. Pass a :class:`~urllib3.util.retry.Retry` object for fine-grained control over different types of retries. Pass an integer number to retry connection errors that many times, but no other types of errors. Pass zero to never retry. If ``False``, then retries are disabled and any exception is raised immediately. Also, instead of raising a MaxRetryError on redirects, the redirect response will be returned. :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int. :param redirect: If True, automatically handle redirects (status codes 301, 302, 303, 307, 308). Each redirect counts as a retry. Disabling retries will disable redirect, too. :param assert_same_host: If ``True``, will make sure that the host of the pool requests is consistent else will raise HostChangedError. When ``False``, you can use the pool on an HTTP proxy and request foreign hosts. :param timeout: If specified, overrides the default timeout for this one request. It may be a float (in seconds) or an instance of :class:`urllib3.util.Timeout`. :param pool_timeout: If set and the pool is set to block=True, then this method will block for ``pool_timeout`` seconds and raise EmptyPoolError if no connection is available within the time period. :param bool preload_content: If True, the response's body will be preloaded into memory. :param bool decode_content: If True, will attempt to decode the body based on the 'content-encoding' header. :param release_conn: If False, then the urlopen call will not release the connection back into the pool once a response is received (but will release if you read the entire contents of the response such as when `preload_content=True`). This is useful if you're not preloading the response's content immediately. You will need to call ``r.release_conn()`` on the response ``r`` to return the connection back into the pool. If None, it takes the value of ``preload_content`` which defaults to ``True``. :param bool chunked: If True, urllib3 will send the body using chunked transfer encoding. Otherwise, urllib3 will send the body using the standard content-length form. Defaults to False. :param int body_pos: Position to seek to in file-like body in the event of a retry or redirect. Typically this won't need to be set because urllib3 will auto-populate the value when needed. """ parsed_url = parse_url(url) destination_scheme = parsed_url.scheme if headers is None: headers = self.headers if not isinstance(retries, Retry): retries = Retry.from_int(retries, redirect=redirect, default=self.retries) if release_conn is None: release_conn = preload_content # Check host if assert_same_host and not self.is_same_host(url): raise HostChangedError(self, url, retries) # Ensure that the URL we're connecting to is properly encoded if url.startswith("/"): url = to_str(_encode_target(url)) else: url = to_str(parsed_url.url) conn = None # Track whether `conn` needs to be released before # returning/raising/recursing. Update this variable if necessary, and # leave `release_conn` constant throughout the function. That way, if # the function recurses, the original value of `release_conn` will be # passed down into the recursive call, and its value will be respected. # # See issue #651 [1] for details. # # [1] <https://github.com/urllib3/urllib3/issues/651> release_this_conn = release_conn http_tunnel_required = connection_requires_http_tunnel( self.proxy, self.proxy_config, destination_scheme ) # Merge the proxy headers. Only done when not using HTTP CONNECT. We # have to copy the headers dict so we can safely change it without those # changes being reflected in anyone else's copy. if not http_tunnel_required: headers = headers.copy() # type: ignore[attr-defined] headers.update(self.proxy_headers) # type: ignore[union-attr] # Must keep the exception bound to a separate variable or else Python 3 # complains about UnboundLocalError. err = None # Keep track of whether we cleanly exited the except block. This # ensures we do proper cleanup in finally. clean_exit = False # Rewind body position, if needed. Record current position # for future rewinds in the event of a redirect/retry. body_pos = set_file_position(body, body_pos) try: # Request a connection from the queue. timeout_obj = self._get_timeout(timeout) conn = self._get_conn(timeout=pool_timeout) conn.timeout = timeout_obj.connect_timeout # type: ignore[assignment] # Is this a closed/new connection that requires CONNECT tunnelling? if self.proxy is not None and http_tunnel_required and conn.is_closed: try: self._prepare_proxy(conn) except (BaseSSLError, OSError, SocketTimeout) as e: self._raise_timeout( err=e, url=self.proxy.url, timeout_value=conn.timeout ) raise # If we're going to release the connection in ``finally:``, then # the response doesn't need to know about the connection. Otherwise # it will also try to release it and we'll have a double-release # mess. response_conn = conn if not release_conn else None # Make the request on the HTTPConnection object > response = self._make_request( conn, method, url, timeout=timeout_obj, body=body, headers=headers, chunked=chunked, retries=retries, response_conn=response_conn, preload_content=preload_content, decode_content=decode_content, **response_kw, ) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:787: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:493: in _make_request conn.request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connection.py:445: in request self.endheaders() /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1278: in endheaders self._send_output(message_body, encode_chunked=encode_chunked) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1038: in _send_output self.send(msg) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:976: in send self.connect() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPConnection object at 0x138665240> def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) > sock.connect(self.unix_socket) E FileNotFoundError: [Errno 2] No such file or directory target/.tox/py310-macos/lib/python3.10/site-packages/docker/transport/unixconn.py:26: FileNotFoundError During handling of the above exception, another exception occurred: self = <docker.transport.unixconn.UnixHTTPAdapter object at 0x138666d40> request = <PreparedRequest [GET]>, stream = False timeout = Timeout(connect=60, read=60, total=None), verify = True, cert = None proxies = OrderedDict() def send( self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None ): """Sends PreparedRequest object. Returns Response object. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response """ try: conn = self.get_connection_with_tls_context( request, verify, proxies=proxies, cert=cert ) except LocationValueError as e: raise InvalidURL(e, request=request) self.cert_verify(conn, request.url, verify, cert) url = self.request_url(request, proxies) self.add_headers( request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, ) chunked = not (request.body is None or "Content-Length" in request.headers) if isinstance(timeout, tuple): try: connect, read = timeout timeout = TimeoutSauce(connect=connect, read=read) except ValueError: raise ValueError( f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, " f"or a single float to set both timeouts to the same value." ) elif isinstance(timeout, TimeoutSauce): pass else: timeout = TimeoutSauce(connect=timeout, read=timeout) try: > resp = conn.urlopen( method=request.method, url=url, body=request.body, headers=request.headers, redirect=False, assert_same_host=False, preload_content=False, decode_content=False, retries=self.max_retries, timeout=timeout, chunked=chunked, ) target/.tox/py310-macos/lib/python3.10/site-packages/requests/adapters.py:667: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:841: in urlopen retries = retries.increment( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/util/retry.py:474: in increment raise reraise(type(error), error, _stacktrace) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/util/util.py:38: in reraise raise value.with_traceback(tb) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:787: in urlopen response = self._make_request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:493: in _make_request conn.request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connection.py:445: in request self.endheaders() /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1278: in endheaders self._send_output(message_body, encode_chunked=encode_chunked) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1038: in _send_output self.send(msg) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:976: in send self.connect() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPConnection object at 0x138665240> def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) > sock.connect(self.unix_socket) E urllib3.exceptions.ProtocolError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/docker/transport/unixconn.py:26: ProtocolError During handling of the above exception, another exception occurred: self = <docker.api.client.APIClient object at 0x138665600> def _retrieve_server_version(self): try: > return self.version(api_version=False)["ApiVersion"] target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:223: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/daemon.py:181: in version return self._result(self._get(url), json=True) target/.tox/py310-macos/lib/python3.10/site-packages/docker/utils/decorators.py:44: in inner return f(self, *args, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:246: in _get return self.get(url, **self._set_request_timeout(kwargs)) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:602: in get return self.request("GET", url, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:589: in request resp = self.send(prep, **send_kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:703: in send r = adapter.send(request, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPAdapter object at 0x138666d40> request = <PreparedRequest [GET]>, stream = False timeout = Timeout(connect=60, read=60, total=None), verify = True, cert = None proxies = OrderedDict() def send( self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None ): """Sends PreparedRequest object. Returns Response object. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response """ try: conn = self.get_connection_with_tls_context( request, verify, proxies=proxies, cert=cert ) except LocationValueError as e: raise InvalidURL(e, request=request) self.cert_verify(conn, request.url, verify, cert) url = self.request_url(request, proxies) self.add_headers( request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, ) chunked = not (request.body is None or "Content-Length" in request.headers) if isinstance(timeout, tuple): try: connect, read = timeout timeout = TimeoutSauce(connect=connect, read=read) except ValueError: raise ValueError( f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, " f"or a single float to set both timeouts to the same value." ) elif isinstance(timeout, TimeoutSauce): pass else: timeout = TimeoutSauce(connect=timeout, read=timeout) try: resp = conn.urlopen( method=request.method, url=url, body=request.body, headers=request.headers, redirect=False, assert_same_host=False, preload_content=False, decode_content=False, retries=self.max_retries, timeout=timeout, chunked=chunked, ) except (ProtocolError, OSError) as err: > raise ConnectionError(err, request=request) E requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/requests/adapters.py:682: ConnectionError The above exception was the direct cause of the following exception: @pytest.fixture(scope="session") def milvus_container(): # Start the container before any tests run. > container = MilvusEnrichmentTestHelper.start_milvus_search_db_container() apache_beam/ml/rag/enrichment/milvus_search_it_test.py:93: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/ml/rag/enrichment/milvus_search_it_test.py:75: in start_milvus_search_db_container raise e apache_beam/ml/rag/enrichment/milvus_search_it_test.py:54: in start_milvus_search_db_container vector_db_container = MilvusContainer(image=image, port=19530) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/milvus/__init__.py:45: in __init__ super().__init__(image=image, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/core/container.py:50: in __init__ self._docker = DockerClient(**(docker_client_kw or {})) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/core/docker_client.py:70: in __init__ self.client = docker.from_env(**kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/client.py:94: in from_env return cls( target/.tox/py310-macos/lib/python3.10/site-packages/docker/client.py:45: in __init__ self.api = APIClient(*args, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:207: in __init__ self._version = self._retrieve_server_version() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.api.client.APIClient object at 0x138665600> def _retrieve_server_version(self): try: return self.version(api_version=False)["ApiVersion"] except KeyError as ke: raise DockerException( 'Invalid response from docker daemon: key "ApiVersion"' ' is missing.' ) from ke except Exception as e: > raise DockerException( f'Error while fetching server API version: {e}' ) from e E docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:230: DockerException</error>
</testcase>

Would you mind taking a look?

damccorm avatar Jun 10 '25 14:06 damccorm

Assigning reviewers:

R: @liferoad for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar Jun 14 '25 20:06 github-actions[bot]

waiting on author

mohamedawnallah avatar Jun 14 '25 20:06 mohamedawnallah

stop reviewer notifications

mohamedawnallah avatar Jun 14 '25 20:06 mohamedawnallah

Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment assign set of reviewers

github-actions[bot] avatar Jun 14 '25 20:06 github-actions[bot]

@damccorm – CI tests are currently failing due to a dependency conflict. Currently pymilvus requires protobuf version >=5.27.2 as specified in pymilvus/pyproject.toml. Updating the major version for lower bound of protobuf causes conflict with existing beam requirements, as seen in the following error:

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.65.5 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.
pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.

This seems to mainly conflict with this comment in sdks/python/setup.py: https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L389

How is this supposed to be be handled moving forward?

mohamedawnallah avatar Jun 16 '25 09:06 mohamedawnallah

@damccorm – CI tests are currently failing due to a dependency conflict. Currently pymilvus requires protobuf version >=5.27.2 as specified in pymilvus/pyproject.toml. Updating the major version for lower bound of protobuf causes conflict with existing beam requirements, as seen in the following error:

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.65.5 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.
pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.

This seems to mainly conflict with this comment in sdks/python/setup.py:

https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L389

How is this supposed to be be handled moving forward?

It looks to me like the issue is actually more specifically the grpcio version - https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L371

From the error you shared, pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.. Beam requires grpcio>=1.67.0 when using python 3.12, so I'm guessing that is where we're seeing the issue. Are you using Python 3.12? If so, one temporary option would be to exclude Python 3.12 until pymilvus supports a higher grpcio version.

It looks like they are planning to change that soon - https://github.com/milvus-io/pymilvus/blob/cbc825e8857b74ccaf7b873fd91e4e64c64270dc/pyproject.toml#L25 - so we could also probably just pin to different versions of pymilvus for Python 3.12+ and Python <=3.11.

I'm guessing that is where the grpcio-status conflict is coming from as well.

damccorm avatar Jun 16 '25 14:06 damccorm

It looks to me like the issue is actually more specifically the grpcio version -

Thanks Danny for bringing clarity here. Updating this solved the issue. Yeah I was solely focused on protobuf minimum version 👍

mohamedawnallah avatar Jun 16 '25 17:06 mohamedawnallah

~~I think the remaining CI errors are because updating the testcontainers version. Raised this PR #35309 for easier debugging~~

mohamedawnallah avatar Jun 16 '25 18:06 mohamedawnallah

So currently, test cases fail on the following platforms in (https://github.com/apache/beam/blob/master/.github/workflows/python_tests.yml) due to using MilvusContainer in sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py that requires docker container runtime to be available:

  • macos-latest
  • windows-latest

The failures are due to a lack of support for nested virtualization on these GitHub-hosted runners.

I have attempted several workarounds, but most of them result in unreliable behavior. It's possible I missed something. For example, I tried using:

  • https://github.com/abiosoft/colima
  • https://github.com/docker/setup-docker-action

I have also run those test cases on ubuntu-latest and it worked fine.

How Can This Be Resolved?

There are a few possible solutions as far as I can think of:

  1. Use a shared, accessible Milvus DB container
    Set up a Milvus container that is externally accessible and can be reused during tests.

    This could be a more long-term infrastructure solution.

  2. Conditionally run Docker-dependent tests on Linux only (Updated the PR to apply this proposed solution)

    • Use a Python-based condition like:
      @unittest.skipUnless(platform.system() == "Linux", "Test runs only on Linux/Ubuntu")
      
    • Ensure the job includes ubuntu-latest as the platform.

    This isn't the cleanest approach, but it's practical and works.

  3. Wait for nested virtualization full support
    This would put Milvus integration tests in a waiting state until CI providers improve nested virtualization fully support on Windows and macOS platforms.

    Not ideal as it delays testing of critical functionality. This is a passive approach and it may resolve itself with time as the technology landscape changes.

References

  1. Support container operations on Windows runners
    GitHub Actions currently does not support container operations on Windows runners:
    https://github.com/actions/runner/issues/904

  2. Nested virtualization on Apple silicon (M3)
    MacOS does not support Docker due to lack of nest virtualization support n M1/M2 chips. This can be revisited when GitHub enables nested virtualization on M3/macOS-15 runners: https://developer.apple.com/documentation/virtualization/vzgenericplatformconfiguration/isnestedvirtualizationsupported

mohamedawnallah avatar Jun 18 '25 10:06 mohamedawnallah

Finally all CI tests passed 😁 Will try to push through for remaining work here to be ready for review

mohamedawnallah avatar Jun 18 '25 14:06 mohamedawnallah

Run Python_ML PreCommit 3.9

mohamedawnallah avatar Jun 22 '25 07:06 mohamedawnallah

Hey @damccorm,

Most of the main tasks in this PR are nearly complete. However, the beam_PreCommit_Python_ML job is currently failing due to an issue related to Docker container port mapping.

I've already tried a few workarounds such as ensuring there are no port conflicts and it turned out this is not the issue. This runs fine on a typical GitHub hosted Ubuntu runner. The runner for this beam_PreCommit_Python_ML job is self-hosted which leads me to mostly think this is likely a configuration issue specific to the self-hosted environment.

Given I have little input into the setup or computing environment of the self-hosted runner for this job, How am I supposed to troubleshoot and debug this further?

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusEnrichmentTestHelper.start_db_container(cls._version)

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:420: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:273: in start_db_container
    vector_db_container.start()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:83: in start
    self._connect()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
    return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:65: in _connect
    self._healthcheck()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
    return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:74: in _healthcheck
    healthcheck_url = self._get_healthcheck_url()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:69: in _get_healthcheck_url
    port = self.get_exposed_port(self.healthcheck_port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}

    @wrapt.decorator
    def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
        from testcontainers.core.container import DockerContainer
    
        if isinstance(instance, DockerContainer):
            logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
        else:
            logger.info("Waiting for %s to be ready ...", instance)
    
        exception = None
        for attempt_no in range(config.max_tries):
            try:
                return wrapped(*args, **kwargs)
            except transient_exceptions as e:
                logger.debug(
                    f"Connection attempt '{attempt_no + 1}' of '{config.max_tries + 1}' "
                    f"failed: {traceback.format_exc()}"
                )
>               time.sleep(config.sleep_time)
E               Failed: Timeout (>600.0s) from pytest-timeout.

target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:65: Failed
_ ERROR at setup of TestMilvusSearchEnrichment.test_vector_search_with_inner_product_similarity _
[gw2] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python

wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}

    @wrapt.decorator
    def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
        from testcontainers.core.container import DockerContainer
    
        if isinstance(instance, DockerContainer):
            logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
        else:
            logger.info("Waiting for %s to be ready ...", instance)
    
        exception = None
     
        for attempt_no in range(config.max_tries):
            try:
>               return wrapped(*args, **kwargs)

target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/container.py:155: in get_exposed_port
    return self.get_docker_client().port(self._container.id, port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7e27846acdc0>
container_id = 'a261d8725a7cc1549[4082](https://github.com/apache/beam/actions/runs/15813470637/job/44568183065?pr=35216#step:8:4084)8cafa5b084f506ab9a4f7c8b852c92be5d98638abdd'
port = 9091

    def port(self, container_id: str, port: int) -> int:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is " "not available")
E           ConnectionError: Port mapping for container a261d8725a7cc154940828cafa5b084f506ab9a4f7c8b852c92be5d98638abdd and port 9091 is not available

mohamedawnallah avatar Jun 23 '25 13:06 mohamedawnallah

This runs fine on a typical GitHub hosted Ubuntu runner ... but fails on the self-hosted beam environment for ubuntu ubuntu-20.04

I have written a workflow to confirm this Run Milvus Integration Tests and all tests pass on GitHub hosted ubuntu runner version 24.04.2: https://github.com/apache/beam/actions/runs/15828272812/job/44614105102?pr=35216

EDIT: It looks also the ubuntu 20.04 GitHub Hosted runner is no longer working: https://github.com/apache/beam/actions/runs/15828951633/job/44616419440?pr=35216 https://github.com/actions/runner-images/issues/11101

mohamedawnallah avatar Jun 23 '25 15:06 mohamedawnallah

Hey @damccorm,

Most of the main tasks in this PR are nearly complete. However, the beam_PreCommit_Python_ML job is currently failing due to an issue related to Docker container port mapping.

I've already tried a few workarounds such as ensuring there are no port conflicts and it turned out this is not the issue. This runs fine on a typical GitHub hosted Ubuntu runner. The runner for this beam_PreCommit_Python_ML job is self-hosted which leads me to mostly think this is likely a configuration issue specific to the self-hosted environment.

Given I have little input into the setup or computing environment of the self-hosted runner for this job, How am I supposed to troubleshoot and debug this further?

cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>

    @classmethod
    def setUpClass(cls):
>     cls._db = MilvusEnrichmentTestHelper.start_db_container(cls._version)

apache_beam/ml/rag/enrichment/milvus_search_it_test.py:420: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:273: in start_db_container
    vector_db_container.start()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:83: in start
    self._connect()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
    return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:65: in _connect
    self._healthcheck()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
    return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:74: in _healthcheck
    healthcheck_url = self._get_healthcheck_url()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:69: in _get_healthcheck_url
    port = self.get_exposed_port(self.healthcheck_port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}

    @wrapt.decorator
    def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
        from testcontainers.core.container import DockerContainer
    
        if isinstance(instance, DockerContainer):
            logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
        else:
            logger.info("Waiting for %s to be ready ...", instance)
    
        exception = None
        for attempt_no in range(config.max_tries):
            try:
                return wrapped(*args, **kwargs)
            except transient_exceptions as e:
                logger.debug(
                    f"Connection attempt '{attempt_no + 1}' of '{config.max_tries + 1}' "
                    f"failed: {traceback.format_exc()}"
                )
>               time.sleep(config.sleep_time)
E               Failed: Timeout (>600.0s) from pytest-timeout.

target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:65: Failed
_ ERROR at setup of TestMilvusSearchEnrichment.test_vector_search_with_inner_product_similarity _
[gw2] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python

wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}

    @wrapt.decorator
    def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
        from testcontainers.core.container import DockerContainer
    
        if isinstance(instance, DockerContainer):
            logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
        else:
            logger.info("Waiting for %s to be ready ...", instance)
    
        exception = None
     
        for attempt_no in range(config.max_tries):
            try:
>               return wrapped(*args, **kwargs)

target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/container.py:155: in get_exposed_port
    return self.get_docker_client().port(self._container.id, port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <testcontainers.core.docker_client.DockerClient object at 0x7e27846acdc0>
container_id = 'a261d8725a7cc1549[4082](https://github.com/apache/beam/actions/runs/15813470637/job/44568183065?pr=35216#step:8:4084)8cafa5b084f506ab9a4f7c8b852c92be5d98638abdd'
port = 9091

    def port(self, container_id: str, port: int) -> int:
        """
        Lookup the public-facing port that is NAT-ed to :code:`port`.
        """
        port_mappings = self.client.api.port(container_id, port)
        if not port_mappings:
>           raise ConnectionError(f"Port mapping for container {container_id} and port {port} is " "not available")
E           ConnectionError: Port mapping for container a261d8725a7cc154940828cafa5b084f506ab9a4f7c8b852c92be5d98638abdd and port 9091 is not available

It looks to me like this comes from specifically the healthcheck port - https://github.com/testcontainers/testcontainers-python/blob/f467c842b851613b9a087bd5f9a08d8c39577cb8/modules/milvus/testcontainers/milvus/init.py#L47

But I think the core problem is probably that we're already in a docker container in our CI environment and docker-in-docker can be problematic. Here is what I'd probably recommend:

  1. In setupClass, if setting up the containers fails, skip the test. This isn't ideal, but it will allow us to make progress (and we can manually verify the tests for now)
  2. In a future PR, we can add a workflow to execute tests which don't run correctly on self-hosted. This would involve:
  • Adding a marker like: no_self_hosted https://github.com/apache/beam/blob/b74c49602162cd752ae27d67481eed44dcfd06ea/sdks/python/pytest.ini#L32
  • Adding a gradle task to run any tests with that marker (installing deps along the way). Initially this could just be milvus tests, but this is a problem I've seen elsewhere as well
  • Adding a new github workflow to execute those that can run on ubuntu-latest

damccorm avatar Jun 23 '25 16:06 damccorm

If you have an example of the tests running on ubuntu-latest that you can link to eventually, that will help as well

damccorm avatar Jun 23 '25 16:06 damccorm

If you have an example of the tests running on ubuntu-latest that you can link to eventually, that will help as well

This one was on ubuntu-latest which is ubuntu version 24.04.2 and the tests pass: https://github.com/apache/beam/actions/runs/15831387422/job/44624648315?pr=35216

PS: ~~Working on the proposed solution regards skipping the tests if the container start fail in setupClass~~

EDIT: Skipped the tests if the container start fail in setupClass. Will do a follow-up PR after this one to mark and run tests that doesn't run correctly on self-hosted 👍

mohamedawnallah avatar Jun 23 '25 16:06 mohamedawnallah

Run Prism_Python PreCommit 3.9

mohamedawnallah avatar Jun 24 '25 09:06 mohamedawnallah

Run Python_Coverage PreCommit

mohamedawnallah avatar Jun 24 '25 12:06 mohamedawnallah

Hey @damccorm, I think this PR is now ready for review. The current failed test cases are flaky - it looks like they are failing consistently across other PRs

mohamedawnallah avatar Jun 24 '25 17:06 mohamedawnallah

Thanks a lot @damccorm for the thoughtful feedback. I've addressed it and left one comment explaining adding setuptools as dependency. I think it is now ready for another round of review. Also I raised this follow-up PR for updating/adding docs/exmaples for milvus search #35467

mohamedawnallah avatar Jun 27 '25 23:06 mohamedawnallah

Codecov Report

Attention: Patch coverage is 70.79646% with 66 lines in your changes missing coverage. Please review.

Project coverage is 56.52%. Comparing base (dd9552c) to head (65fc24b). Report is 3 commits behind head on master.

Files with missing lines Patch % Lines
...hon/apache_beam/ml/rag/enrichment/milvus_search.py 69.86% 66 Missing :warning:
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #35216      +/-   ##
============================================
+ Coverage     56.51%   56.52%   +0.01%     
  Complexity     3319     3319              
============================================
  Files          1198     1199       +1     
  Lines        182870   183091     +221     
  Branches       3426     3426              
============================================
+ Hits         103347   103496     +149     
- Misses        76223    76295      +72     
  Partials       3300     3300              
Flag Coverage Δ
python 80.77% <70.79%> (-0.04%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov[bot] avatar Jun 30 '25 16:06 codecov[bot]

Thanks @damccorm for taking another look at this PR. I've removed the explicit setuptools and pip dependencies in sdks/python/setup.py according to your feedback. I think it is now ready for review

P.S. The failed Go tests / Go Build job workflow appears unrelated to the changes introduced by this PR.

mohamedawnallah avatar Jun 30 '25 17:06 mohamedawnallah

Also, I plan to submit a follow-up PR perhaps after this one and that docs PR #35467? It will primarily address two pieces of feedback you provided:

  1. Handling tests that can't run on self-hosted runners due to the Docker-in-Docker environment (https://github.com/apache/beam/pull/35216#issuecomment-2997045270).
  2. Moving the milvus dependency into its own optional extra (https://github.com/apache/beam/pull/35216#discussion_r2175149226).

mohamedawnallah avatar Jun 30 '25 17:06 mohamedawnallah