sdks/python: enrich data with Milvus Search [Vector, Keyword, Hybrid]
Description
- [x] Add
MilvusSearchEnrichmentHandler - [x] Unit Test
- [ ] Integration Test
- [ ] Update Enrichment Docs
- [x] Add
milvus_enrichment_transformJupyter Notebook Example
Part of #35046.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [x] Mention the appropriate issue in your description (for example:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead. - [ ] Update
CHANGES.mdwith noteworthy changes. - [ ] If this contribution is large, please file an Apache Individual Contributor License Agreement.
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
Thanks! It looks like there are a bunch of precommits failing with failures like:
<testcase classname="apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment" name="test_chunks_batching" time="0.010">
<error message="failed on setup with "docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))"">self = <docker.transport.unixconn.UnixHTTPConnectionPool object at 0x138666680> method = 'GET', url = '/version', body = None headers = {'User-Agent': 'docker-sdk-python/7.1.0', 'Accept-Encoding': 'gzip, deflate, zstd', 'Accept': '*/*', 'Connection': 'keep-alive'} retries = Retry(total=0, connect=None, read=False, redirect=None, status=None) redirect = False, assert_same_host = False timeout = Timeout(connect=60, read=60, total=None), pool_timeout = None release_conn = False, chunked = False, body_pos = None, preload_content = False decode_content = False, response_kw = {} parsed_url = Url(scheme=None, auth=None, host=None, port=None, path='/version', query=None, fragment=None) destination_scheme = None, conn = None, release_this_conn = True http_tunnel_required = False, err = None, clean_exit = False def urlopen( # type: ignore[override] self, method: str, url: str, body: _TYPE_BODY | None = None, headers: typing.Mapping[str, str] | None = None, retries: Retry | bool | int | None = None, redirect: bool = True, assert_same_host: bool = True, timeout: _TYPE_TIMEOUT = _DEFAULT_TIMEOUT, pool_timeout: int | None = None, release_conn: bool | None = None, chunked: bool = False, body_pos: _TYPE_BODY_POSITION | None = None, preload_content: bool = True, decode_content: bool = True, **response_kw: typing.Any, ) -> BaseHTTPResponse: """ Get a connection from the pool and perform an HTTP request. This is the lowest level call for making a request, so you'll need to specify all the raw details. .. note:: More commonly, it's appropriate to use a convenience method such as :meth:`request`. .. note:: `release_conn` will only behave as expected if `preload_content=False` because we want to make `preload_content=False` the default behaviour someday soon without breaking backwards compatibility. :param method: HTTP request method (such as GET, POST, PUT, etc.) :param url: The URL to perform the request on. :param body: Data to send in the request body, either :class:`str`, :class:`bytes`, an iterable of :class:`str`/:class:`bytes`, or a file-like object. :param headers: Dictionary of custom headers to send, such as User-Agent, If-None-Match, etc. If None, pool headers are used. If provided, these headers completely replace any pool-specific headers. :param retries: Configure the number of retries to allow before raising a :class:`~urllib3.exceptions.MaxRetryError` exception. If ``None`` (default) will retry 3 times, see ``Retry.DEFAULT``. Pass a :class:`~urllib3.util.retry.Retry` object for fine-grained control over different types of retries. Pass an integer number to retry connection errors that many times, but no other types of errors. Pass zero to never retry. If ``False``, then retries are disabled and any exception is raised immediately. Also, instead of raising a MaxRetryError on redirects, the redirect response will be returned. :type retries: :class:`~urllib3.util.retry.Retry`, False, or an int. :param redirect: If True, automatically handle redirects (status codes 301, 302, 303, 307, 308). Each redirect counts as a retry. Disabling retries will disable redirect, too. :param assert_same_host: If ``True``, will make sure that the host of the pool requests is consistent else will raise HostChangedError. When ``False``, you can use the pool on an HTTP proxy and request foreign hosts. :param timeout: If specified, overrides the default timeout for this one request. It may be a float (in seconds) or an instance of :class:`urllib3.util.Timeout`. :param pool_timeout: If set and the pool is set to block=True, then this method will block for ``pool_timeout`` seconds and raise EmptyPoolError if no connection is available within the time period. :param bool preload_content: If True, the response's body will be preloaded into memory. :param bool decode_content: If True, will attempt to decode the body based on the 'content-encoding' header. :param release_conn: If False, then the urlopen call will not release the connection back into the pool once a response is received (but will release if you read the entire contents of the response such as when `preload_content=True`). This is useful if you're not preloading the response's content immediately. You will need to call ``r.release_conn()`` on the response ``r`` to return the connection back into the pool. If None, it takes the value of ``preload_content`` which defaults to ``True``. :param bool chunked: If True, urllib3 will send the body using chunked transfer encoding. Otherwise, urllib3 will send the body using the standard content-length form. Defaults to False. :param int body_pos: Position to seek to in file-like body in the event of a retry or redirect. Typically this won't need to be set because urllib3 will auto-populate the value when needed. """ parsed_url = parse_url(url) destination_scheme = parsed_url.scheme if headers is None: headers = self.headers if not isinstance(retries, Retry): retries = Retry.from_int(retries, redirect=redirect, default=self.retries) if release_conn is None: release_conn = preload_content # Check host if assert_same_host and not self.is_same_host(url): raise HostChangedError(self, url, retries) # Ensure that the URL we're connecting to is properly encoded if url.startswith("/"): url = to_str(_encode_target(url)) else: url = to_str(parsed_url.url) conn = None # Track whether `conn` needs to be released before # returning/raising/recursing. Update this variable if necessary, and # leave `release_conn` constant throughout the function. That way, if # the function recurses, the original value of `release_conn` will be # passed down into the recursive call, and its value will be respected. # # See issue #651 [1] for details. # # [1] <https://github.com/urllib3/urllib3/issues/651> release_this_conn = release_conn http_tunnel_required = connection_requires_http_tunnel( self.proxy, self.proxy_config, destination_scheme ) # Merge the proxy headers. Only done when not using HTTP CONNECT. We # have to copy the headers dict so we can safely change it without those # changes being reflected in anyone else's copy. if not http_tunnel_required: headers = headers.copy() # type: ignore[attr-defined] headers.update(self.proxy_headers) # type: ignore[union-attr] # Must keep the exception bound to a separate variable or else Python 3 # complains about UnboundLocalError. err = None # Keep track of whether we cleanly exited the except block. This # ensures we do proper cleanup in finally. clean_exit = False # Rewind body position, if needed. Record current position # for future rewinds in the event of a redirect/retry. body_pos = set_file_position(body, body_pos) try: # Request a connection from the queue. timeout_obj = self._get_timeout(timeout) conn = self._get_conn(timeout=pool_timeout) conn.timeout = timeout_obj.connect_timeout # type: ignore[assignment] # Is this a closed/new connection that requires CONNECT tunnelling? if self.proxy is not None and http_tunnel_required and conn.is_closed: try: self._prepare_proxy(conn) except (BaseSSLError, OSError, SocketTimeout) as e: self._raise_timeout( err=e, url=self.proxy.url, timeout_value=conn.timeout ) raise # If we're going to release the connection in ``finally:``, then # the response doesn't need to know about the connection. Otherwise # it will also try to release it and we'll have a double-release # mess. response_conn = conn if not release_conn else None # Make the request on the HTTPConnection object > response = self._make_request( conn, method, url, timeout=timeout_obj, body=body, headers=headers, chunked=chunked, retries=retries, response_conn=response_conn, preload_content=preload_content, decode_content=decode_content, **response_kw, ) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:787: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:493: in _make_request conn.request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connection.py:445: in request self.endheaders() /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1278: in endheaders self._send_output(message_body, encode_chunked=encode_chunked) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1038: in _send_output self.send(msg) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:976: in send self.connect() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPConnection object at 0x138665240> def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) > sock.connect(self.unix_socket) E FileNotFoundError: [Errno 2] No such file or directory target/.tox/py310-macos/lib/python3.10/site-packages/docker/transport/unixconn.py:26: FileNotFoundError During handling of the above exception, another exception occurred: self = <docker.transport.unixconn.UnixHTTPAdapter object at 0x138666d40> request = <PreparedRequest [GET]>, stream = False timeout = Timeout(connect=60, read=60, total=None), verify = True, cert = None proxies = OrderedDict() def send( self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None ): """Sends PreparedRequest object. Returns Response object. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response """ try: conn = self.get_connection_with_tls_context( request, verify, proxies=proxies, cert=cert ) except LocationValueError as e: raise InvalidURL(e, request=request) self.cert_verify(conn, request.url, verify, cert) url = self.request_url(request, proxies) self.add_headers( request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, ) chunked = not (request.body is None or "Content-Length" in request.headers) if isinstance(timeout, tuple): try: connect, read = timeout timeout = TimeoutSauce(connect=connect, read=read) except ValueError: raise ValueError( f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, " f"or a single float to set both timeouts to the same value." ) elif isinstance(timeout, TimeoutSauce): pass else: timeout = TimeoutSauce(connect=timeout, read=timeout) try: > resp = conn.urlopen( method=request.method, url=url, body=request.body, headers=request.headers, redirect=False, assert_same_host=False, preload_content=False, decode_content=False, retries=self.max_retries, timeout=timeout, chunked=chunked, ) target/.tox/py310-macos/lib/python3.10/site-packages/requests/adapters.py:667: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:841: in urlopen retries = retries.increment( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/util/retry.py:474: in increment raise reraise(type(error), error, _stacktrace) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/util/util.py:38: in reraise raise value.with_traceback(tb) target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:787: in urlopen response = self._make_request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connectionpool.py:493: in _make_request conn.request( target/.tox/py310-macos/lib/python3.10/site-packages/urllib3/connection.py:445: in request self.endheaders() /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1278: in endheaders self._send_output(message_body, encode_chunked=encode_chunked) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1038: in _send_output self.send(msg) /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:976: in send self.connect() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPConnection object at 0x138665240> def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) > sock.connect(self.unix_socket) E urllib3.exceptions.ProtocolError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/docker/transport/unixconn.py:26: ProtocolError During handling of the above exception, another exception occurred: self = <docker.api.client.APIClient object at 0x138665600> def _retrieve_server_version(self): try: > return self.version(api_version=False)["ApiVersion"] target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:223: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/daemon.py:181: in version return self._result(self._get(url), json=True) target/.tox/py310-macos/lib/python3.10/site-packages/docker/utils/decorators.py:44: in inner return f(self, *args, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:246: in _get return self.get(url, **self._set_request_timeout(kwargs)) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:602: in get return self.request("GET", url, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:589: in request resp = self.send(prep, **send_kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/requests/sessions.py:703: in send r = adapter.send(request, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.transport.unixconn.UnixHTTPAdapter object at 0x138666d40> request = <PreparedRequest [GET]>, stream = False timeout = Timeout(connect=60, read=60, total=None), verify = True, cert = None proxies = OrderedDict() def send( self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None ): """Sends PreparedRequest object. Returns Response object. :param request: The :class:`PreparedRequest <PreparedRequest>` being sent. :param stream: (optional) Whether to stream the request content. :param timeout: (optional) How long to wait for the server to send data before giving up, as a float, or a :ref:`(connect timeout, read timeout) <timeouts>` tuple. :type timeout: float or tuple or urllib3 Timeout object :param verify: (optional) Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use :param cert: (optional) Any user-provided SSL certificate to be trusted. :param proxies: (optional) The proxies dictionary to apply to the request. :rtype: requests.Response """ try: conn = self.get_connection_with_tls_context( request, verify, proxies=proxies, cert=cert ) except LocationValueError as e: raise InvalidURL(e, request=request) self.cert_verify(conn, request.url, verify, cert) url = self.request_url(request, proxies) self.add_headers( request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies, ) chunked = not (request.body is None or "Content-Length" in request.headers) if isinstance(timeout, tuple): try: connect, read = timeout timeout = TimeoutSauce(connect=connect, read=read) except ValueError: raise ValueError( f"Invalid timeout {timeout}. Pass a (connect, read) timeout tuple, " f"or a single float to set both timeouts to the same value." ) elif isinstance(timeout, TimeoutSauce): pass else: timeout = TimeoutSauce(connect=timeout, read=timeout) try: resp = conn.urlopen( method=request.method, url=url, body=request.body, headers=request.headers, redirect=False, assert_same_host=False, preload_content=False, decode_content=False, retries=self.max_retries, timeout=timeout, chunked=chunked, ) except (ProtocolError, OSError) as err: > raise ConnectionError(err, request=request) E requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/requests/adapters.py:682: ConnectionError The above exception was the direct cause of the following exception: @pytest.fixture(scope="session") def milvus_container(): # Start the container before any tests run. > container = MilvusEnrichmentTestHelper.start_milvus_search_db_container() apache_beam/ml/rag/enrichment/milvus_search_it_test.py:93: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/ml/rag/enrichment/milvus_search_it_test.py:75: in start_milvus_search_db_container raise e apache_beam/ml/rag/enrichment/milvus_search_it_test.py:54: in start_milvus_search_db_container vector_db_container = MilvusContainer(image=image, port=19530) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/milvus/__init__.py:45: in __init__ super().__init__(image=image, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/core/container.py:50: in __init__ self._docker = DockerClient(**(docker_client_kw or {})) target/.tox/py310-macos/lib/python3.10/site-packages/testcontainers/core/docker_client.py:70: in __init__ self.client = docker.from_env(**kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/client.py:94: in from_env return cls( target/.tox/py310-macos/lib/python3.10/site-packages/docker/client.py:45: in __init__ self.api = APIClient(*args, **kwargs) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:207: in __init__ self._version = self._retrieve_server_version() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <docker.api.client.APIClient object at 0x138665600> def _retrieve_server_version(self): try: return self.version(api_version=False)["ApiVersion"] except KeyError as ke: raise DockerException( 'Invalid response from docker daemon: key "ApiVersion"' ' is missing.' ) from ke except Exception as e: > raise DockerException( f'Error while fetching server API version: {e}' ) from e E docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory')) target/.tox/py310-macos/lib/python3.10/site-packages/docker/api/client.py:230: DockerException</error>
</testcase>
Would you mind taking a look?
Assigning reviewers:
R: @liferoad for label python.
Note: If you would like to opt out of this review, comment assign to next reviewer.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
waiting on author
stop reviewer notifications
Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment assign set of reviewers
@damccorm – CI tests are currently failing due to a dependency conflict. Currently pymilvus requires protobuf version >=5.27.2 as specified in pymilvus/pyproject.toml. Updating the major version for lower bound of protobuf causes conflict with existing beam requirements, as seen in the following error:
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
grpcio-status 1.65.5 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible.
pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.
This seems to mainly conflict with this comment in sdks/python/setup.py:
https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L389
How is this supposed to be be handled moving forward?
@damccorm – CI tests are currently failing due to a dependency conflict. Currently pymilvus requires protobuf version >=5.27.2 as specified in pymilvus/pyproject.toml. Updating the major version for lower bound of protobuf causes conflict with existing beam requirements, as seen in the following error:
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts. grpcio-status 1.65.5 requires protobuf<6.0dev,>=5.26.1, but you have protobuf 4.25.8 which is incompatible. pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.This seems to mainly conflict with this comment in
sdks/python/setup.py:https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L389
How is this supposed to be be handled moving forward?
It looks to me like the issue is actually more specifically the grpcio version - https://github.com/apache/beam/blob/9057cc29f3f3ac1464d0351628b8fa26339cca16/sdks/python/setup.py#L371
From the error you shared, pymilvus 2.5.11 requires grpcio<=1.67.1,>=1.49.1, but you have grpcio 1.73.0 which is incompatible.. Beam requires grpcio>=1.67.0 when using python 3.12, so I'm guessing that is where we're seeing the issue. Are you using Python 3.12? If so, one temporary option would be to exclude Python 3.12 until pymilvus supports a higher grpcio version.
It looks like they are planning to change that soon - https://github.com/milvus-io/pymilvus/blob/cbc825e8857b74ccaf7b873fd91e4e64c64270dc/pyproject.toml#L25 - so we could also probably just pin to different versions of pymilvus for Python 3.12+ and Python <=3.11.
I'm guessing that is where the grpcio-status conflict is coming from as well.
It looks to me like the issue is actually more specifically the grpcio version -
Thanks Danny for bringing clarity here. Updating this solved the issue. Yeah I was solely focused on protobuf minimum version 👍
~~I think the remaining CI errors are because updating the testcontainers version. Raised this PR #35309 for easier debugging~~
So currently, test cases fail on the following platforms in (https://github.com/apache/beam/blob/master/.github/workflows/python_tests.yml) due to using MilvusContainer in sdks/python/apache_beam/ml/rag/enrichment/milvus_search_it_test.py that requires docker container runtime to be available:
macos-latestwindows-latest
The failures are due to a lack of support for nested virtualization on these GitHub-hosted runners.
I have attempted several workarounds, but most of them result in unreliable behavior. It's possible I missed something. For example, I tried using:
- https://github.com/abiosoft/colima
- https://github.com/docker/setup-docker-action
I have also run those test cases on ubuntu-latest and it worked fine.
How Can This Be Resolved?
There are a few possible solutions as far as I can think of:
-
Use a shared, accessible Milvus DB container
Set up a Milvus container that is externally accessible and can be reused during tests.This could be a more long-term infrastructure solution.
-
Conditionally run Docker-dependent tests on Linux only (Updated the PR to apply this proposed solution)
- Use a Python-based condition like:
@unittest.skipUnless(platform.system() == "Linux", "Test runs only on Linux/Ubuntu") - Ensure the job includes
ubuntu-latestas the platform.
This isn't the cleanest approach, but it's practical and works.
- Use a Python-based condition like:
-
Wait for nested virtualization full support
This would put Milvus integration tests in a waiting state until CI providers improve nested virtualization fully support on Windows and macOS platforms.Not ideal as it delays testing of critical functionality. This is a passive approach and it may resolve itself with time as the technology landscape changes.
References
-
Support container operations on Windows runners
GitHub Actions currently does not support container operations on Windows runners:
https://github.com/actions/runner/issues/904 -
Nested virtualization on Apple silicon (M3)
MacOS does not support Docker due to lack of nest virtualization support n M1/M2 chips. This can be revisited when GitHub enables nested virtualization on M3/macOS-15 runners: https://developer.apple.com/documentation/virtualization/vzgenericplatformconfiguration/isnestedvirtualizationsupported
Finally all CI tests passed 😁 Will try to push through for remaining work here to be ready for review
Run Python_ML PreCommit 3.9
Hey @damccorm,
Most of the main tasks in this PR are nearly complete. However, the beam_PreCommit_Python_ML job is currently failing due to an issue related to Docker container port mapping.
I've already tried a few workarounds such as ensuring there are no port conflicts and it turned out this is not the issue. This runs fine on a typical GitHub hosted Ubuntu runner. The runner for this beam_PreCommit_Python_ML job is self-hosted which leads me to mostly think this is likely a configuration issue specific to the self-hosted environment.
Given I have little input into the setup or computing environment of the self-hosted runner for this job, How am I supposed to troubleshoot and debug this further?
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'>
@classmethod
def setUpClass(cls):
> cls._db = MilvusEnrichmentTestHelper.start_db_container(cls._version)
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:420:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:273: in start_db_container
vector_db_container.start()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:83: in start
self._connect()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:65: in _connect
self._healthcheck()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper
return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:74: in _healthcheck
healthcheck_url = self._get_healthcheck_url()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:69: in _get_healthcheck_url
port = self.get_exposed_port(self.healthcheck_port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}
@wrapt.decorator
def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
from testcontainers.core.container import DockerContainer
if isinstance(instance, DockerContainer):
logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
else:
logger.info("Waiting for %s to be ready ...", instance)
exception = None
for attempt_no in range(config.max_tries):
try:
return wrapped(*args, **kwargs)
except transient_exceptions as e:
logger.debug(
f"Connection attempt '{attempt_no + 1}' of '{config.max_tries + 1}' "
f"failed: {traceback.format_exc()}"
)
> time.sleep(config.sleep_time)
E Failed: Timeout (>600.0s) from pytest-timeout.
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:65: Failed
_ ERROR at setup of TestMilvusSearchEnrichment.test_vector_search_with_inner_product_similarity _
[gw2] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python
wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>>
instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>
args = (9091,), kwargs = {}
@wrapt.decorator
def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
from testcontainers.core.container import DockerContainer
if isinstance(instance, DockerContainer):
logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image)
else:
logger.info("Waiting for %s to be ready ...", instance)
exception = None
for attempt_no in range(config.max_tries):
try:
> return wrapped(*args, **kwargs)
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/container.py:155: in get_exposed_port
return self.get_docker_client().port(self._container.id, port)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <testcontainers.core.docker_client.DockerClient object at 0x7e27846acdc0>
container_id = 'a261d8725a7cc1549[4082](https://github.com/apache/beam/actions/runs/15813470637/job/44568183065?pr=35216#step:8:4084)8cafa5b084f506ab9a4f7c8b852c92be5d98638abdd'
port = 9091
def port(self, container_id: str, port: int) -> int:
"""
Lookup the public-facing port that is NAT-ed to :code:`port`.
"""
port_mappings = self.client.api.port(container_id, port)
if not port_mappings:
> raise ConnectionError(f"Port mapping for container {container_id} and port {port} is " "not available")
E ConnectionError: Port mapping for container a261d8725a7cc154940828cafa5b084f506ab9a4f7c8b852c92be5d98638abdd and port 9091 is not available
This runs fine on a typical GitHub hosted Ubuntu runner ... but fails on the self-hosted beam environment for ubuntu
ubuntu-20.04
I have written a workflow to confirm this Run Milvus Integration Tests and all tests pass on GitHub hosted ubuntu runner version 24.04.2:
https://github.com/apache/beam/actions/runs/15828272812/job/44614105102?pr=35216
EDIT: It looks also the ubuntu 20.04 GitHub Hosted runner is no longer working: https://github.com/apache/beam/actions/runs/15828951633/job/44616419440?pr=35216 https://github.com/actions/runner-images/issues/11101
Hey @damccorm,
Most of the main tasks in this PR are nearly complete. However, the
beam_PreCommit_Python_MLjob is currently failing due to an issue related to Docker container port mapping.I've already tried a few workarounds such as ensuring there are no port conflicts and it turned out this is not the issue. This runs fine on a typical GitHub hosted Ubuntu runner. The runner for this
beam_PreCommit_Python_MLjob is self-hosted which leads me to mostly think this is likely a configuration issue specific to the self-hosted environment.Given I have little input into the setup or computing environment of the self-hosted runner for this job, How am I supposed to troubleshoot and debug this further?
cls = <class 'apache_beam.ml.rag.enrichment.milvus_search_it_test.TestMilvusSearchEnrichment'> @classmethod def setUpClass(cls): > cls._db = MilvusEnrichmentTestHelper.start_db_container(cls._version) apache_beam/ml/rag/enrichment/milvus_search_it_test.py:420: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ apache_beam/ml/rag/enrichment/milvus_search_it_test.py:273: in start_db_container vector_db_container.start() target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:83: in start self._connect() target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper return wrapped(*args, **kwargs) target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:65: in _connect self._healthcheck() target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: in wrapper return wrapped(*args, **kwargs) target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:74: in _healthcheck healthcheck_url = self._get_healthcheck_url() target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/milvus/__init__.py:69: in _get_healthcheck_url port = self.get_exposed_port(self.healthcheck_port) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>> instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0> args = (9091,), kwargs = {} @wrapt.decorator def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any: from testcontainers.core.container import DockerContainer if isinstance(instance, DockerContainer): logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image) else: logger.info("Waiting for %s to be ready ...", instance) exception = None for attempt_no in range(config.max_tries): try: return wrapped(*args, **kwargs) except transient_exceptions as e: logger.debug( f"Connection attempt '{attempt_no + 1}' of '{config.max_tries + 1}' " f"failed: {traceback.format_exc()}" ) > time.sleep(config.sleep_time) E Failed: Timeout (>600.0s) from pytest-timeout. target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:65: Failed _ ERROR at setup of TestMilvusSearchEnrichment.test_vector_search_with_inner_product_similarity _ [gw2] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python wrapped = <bound method DockerContainer.get_exposed_port of <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0>> instance = <testcontainers.milvus.MilvusContainer object at 0x7e27846ac4f0> args = (9091,), kwargs = {} @wrapt.decorator def wrapper(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any: from testcontainers.core.container import DockerContainer if isinstance(instance, DockerContainer): logger.info("Waiting for container %s with image %s to be ready ...", instance._container, instance.image) else: logger.info("Waiting for %s to be ready ...", instance) exception = None for attempt_no in range(config.max_tries): try: > return wrapped(*args, **kwargs) target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/waiting_utils.py:59: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/testcontainers/core/container.py:155: in get_exposed_port return self.get_docker_client().port(self._container.id, port) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <testcontainers.core.docker_client.DockerClient object at 0x7e27846acdc0> container_id = 'a261d8725a7cc1549[4082](https://github.com/apache/beam/actions/runs/15813470637/job/44568183065?pr=35216#step:8:4084)8cafa5b084f506ab9a4f7c8b852c92be5d98638abdd' port = 9091 def port(self, container_id: str, port: int) -> int: """ Lookup the public-facing port that is NAT-ed to :code:`port`. """ port_mappings = self.client.api.port(container_id, port) if not port_mappings: > raise ConnectionError(f"Port mapping for container {container_id} and port {port} is " "not available") E ConnectionError: Port mapping for container a261d8725a7cc154940828cafa5b084f506ab9a4f7c8b852c92be5d98638abdd and port 9091 is not available
It looks to me like this comes from specifically the healthcheck port - https://github.com/testcontainers/testcontainers-python/blob/f467c842b851613b9a087bd5f9a08d8c39577cb8/modules/milvus/testcontainers/milvus/init.py#L47
But I think the core problem is probably that we're already in a docker container in our CI environment and docker-in-docker can be problematic. Here is what I'd probably recommend:
- In
setupClass, if setting up the containers fails, skip the test. This isn't ideal, but it will allow us to make progress (and we can manually verify the tests for now) - In a future PR, we can add a workflow to execute tests which don't run correctly on self-hosted. This would involve:
- Adding a marker like:
no_self_hostedhttps://github.com/apache/beam/blob/b74c49602162cd752ae27d67481eed44dcfd06ea/sdks/python/pytest.ini#L32 - Adding a gradle task to run any tests with that marker (installing deps along the way). Initially this could just be milvus tests, but this is a problem I've seen elsewhere as well
- Adding a new github workflow to execute those that can run on ubuntu-latest
If you have an example of the tests running on ubuntu-latest that you can link to eventually, that will help as well
If you have an example of the tests running on ubuntu-latest that you can link to eventually, that will help as well
This one was on ubuntu-latest which is ubuntu version 24.04.2 and the tests pass:
https://github.com/apache/beam/actions/runs/15831387422/job/44624648315?pr=35216
PS:
~~Working on the proposed solution regards skipping the tests if the container start fail in setupClass~~
EDIT:
Skipped the tests if the container start fail in setupClass. Will do a follow-up PR after this one to mark and run tests that doesn't run correctly on self-hosted 👍
Run Prism_Python PreCommit 3.9
Run Python_Coverage PreCommit
Hey @damccorm, I think this PR is now ready for review. The current failed test cases are flaky - it looks like they are failing consistently across other PRs
Thanks a lot @damccorm for the thoughtful feedback. I've addressed it and left one comment explaining adding setuptools as dependency. I think it is now ready for another round of review. Also I raised this follow-up PR for updating/adding docs/exmaples for milvus search #35467
Codecov Report
Attention: Patch coverage is 70.79646% with 66 lines in your changes missing coverage. Please review.
Project coverage is 56.52%. Comparing base (
dd9552c) to head (65fc24b). Report is 3 commits behind head on master.
| Files with missing lines | Patch % | Lines |
|---|---|---|
| ...hon/apache_beam/ml/rag/enrichment/milvus_search.py | 69.86% | 66 Missing :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## master #35216 +/- ##
============================================
+ Coverage 56.51% 56.52% +0.01%
Complexity 3319 3319
============================================
Files 1198 1199 +1
Lines 182870 183091 +221
Branches 3426 3426
============================================
+ Hits 103347 103496 +149
- Misses 76223 76295 +72
Partials 3300 3300
| Flag | Coverage Δ | |
|---|---|---|
| python | 80.77% <70.79%> (-0.04%) |
:arrow_down: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
Thanks @damccorm for taking another look at this PR. I've removed the explicit setuptools and pip dependencies in sdks/python/setup.py according to your feedback. I think it is now ready for review
P.S. The failed Go tests / Go Build job workflow appears unrelated to the changes introduced by this PR.
Also, I plan to submit a follow-up PR perhaps after this one and that docs PR #35467? It will primarily address two pieces of feedback you provided:
- Handling tests that can't run on self-hosted runners due to the Docker-in-Docker environment (https://github.com/apache/beam/pull/35216#issuecomment-2997045270).
- Moving the
milvusdependency into its own optional extra (https://github.com/apache/beam/pull/35216#discussion_r2175149226).