airbyte icon indicating copy to clipboard operation
airbyte copied to clipboard

✨ New Source: Select Star

Open vasilisgav opened this issue 2 years ago • 10 comments
trafficstars

What

Work on new source select star airbytehq/airbyte#29074

How

Using the low cdk approach

Recommended reading order

  1. spec.yaml
  2. manifest.yml
  3. schemas/*

vasilisgav avatar Aug 04 '23 14:08 vasilisgav

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • [ ] PR name follows PR naming conventions
  • [ ] Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • [ ] Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • [ ] You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • [ ] Secrets in the connector's spec are annotated with airbyte_secret
  • [ ] All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • [ ] Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • [ ] Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • [ ] If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

github-actions[bot] avatar Aug 04 '23 14:08 github-actions[bot]

Thanks for the contribution @vasilisgav right now the team is entirely dedicated to review Hackathon contributions. After the event is done we're going to return to your contribution. Is it possible to share the integration test output?

marcosmarxm avatar Aug 08 '23 14:08 marcosmarxm

Hello,

I work for Select Star.

When it comes to integration testing, we are happy to provide a free environment so you can monitor the connector to ensure it is functioning properly. I am not sure about the best process to make it happen. What do you think?

ad-m-ss avatar Aug 09 '23 05:08 ad-m-ss

Hi @marcosmarxm,

I have used an api token provided by @ad-m-ss in a testing environment, and I added the integrations tests:

The only change thats not committed is the url_base which points to a testing environment git diff

   requester:
     type: HttpRequester
-    url_base: "https://api.production.selectstar.com/v1"
+    url_base: "https://api.eu.selectstar.com/v1"

I believe thats not part of the spec since when merged it will always point to production, If there is a more elegant way to express production vs testing environments let me know.

The unfortunate thing is that some integration tests are failing and I have hard time locating why. Integrations tests take about 35 minutes which only makes things more difficult.

Here is the integration test output:

(airbyte3.9) vgavriil@VG-Work:~/workspace/forks/airbyte_select_star/airbyte-integrations/connectors/source-select-star (New-Source-Select-Star)$
./acceptance-test-docker.sh
Sending build context to Docker daemon  36.86kB
Step 1/18 : FROM python:3.9.11-alpine3.15 as base
 ---> df5be861e65d
Step 2/18 : FROM base as builder
 ---> df5be861e65d
Step 3/18 : WORKDIR /airbyte/integration_code
 ---> Using cache
 ---> ae70d13b6bbe
Step 4/18 : RUN apk --no-cache upgrade     && pip install --upgrade pip     && apk --no-cache add tzdata build-base
 ---> Using cache
 ---> 356d1c20630d
Step 5/18 : COPY setup.py ./
 ---> Using cache
 ---> d90d1704a7c8
Step 6/18 : RUN pip install --prefix=/install .
 ---> Using cache
 ---> 93191aa7a959
Step 7/18 : FROM base
 ---> df5be861e65d
Step 8/18 : WORKDIR /airbyte/integration_code
 ---> Using cache
 ---> ae70d13b6bbe
Step 9/18 : COPY --from=builder /install /usr/local
 ---> Using cache
 ---> 6422d49decec
Step 10/18 : COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
 ---> Using cache
 ---> 21e70bbe5c12
Step 11/18 : RUN echo "Etc/UTC" > /etc/timezone
 ---> Using cache
 ---> 335df22361c1
Step 12/18 : RUN apk --no-cache add bash
 ---> Using cache
 ---> bf2a8f81fcb5
Step 13/18 : COPY main.py ./
 ---> Using cache
 ---> 93c050a1cd03
Step 14/18 : COPY source_select_star ./source_select_star
 ---> Using cache
 ---> 7d54c7c03151
Step 15/18 : ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
 ---> Using cache
 ---> a43f93728d35
Step 16/18 : ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
 ---> Using cache
 ---> 58d6f85a141e
Step 17/18 : LABEL io.airbyte.version=0.1.0
 ---> Using cache
 ---> 51d44f4806c6
Step 18/18 : LABEL io.airbyte.name=airbyte/source-select-star
 ---> Using cache
 ---> 0a926d992f6b
Successfully built 0a926d992f6b
Successfully tagged airbyte/source-select-star:dev

Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
latest: Pulling from airbyte/connector-acceptance-test
Digest: sha256:273f283618e92e4af6b167d5ba64c6e88cf389d199462995bff9fef64e4404a1
Status: Image is up to date for airbyte/connector-acceptance-test:latest
docker.io/airbyte/connector-acceptance-test:latest
============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-6.2.5, py-1.11.0, pluggy-1.2.0
rootdir: /test_input
plugins: anyio-3.7.1, timeout-1.4.2, cov-3.0.0, sugar-0.9.7, requests-mock-1.9.3, hypothesis-6.82.3, mock-3.6.1
collected 39 items / 1 skipped / 38 selected

test_core.py ....................E.............EsF.                      [ 97%]
test_full_refresh.py
FE                                                  [100%]


==================================== ERRORS ====================================
_______ ERROR at setup of TestSpec.test_backward_compatibility[inputs0] ________

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
>           yield

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
return_type = <class 'dagger.api.gen.ContainerID'>

    async def execute(self, return_type: type[T] | None = None) -> T | None:
        assert isinstance(self.session, AsyncClientSession)
        await self.resolve_ids()
        query = self.query()
        with self._handle_execute(query):
>           result = await self.session.execute(query)

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}
result = ExecutionResult(data=None, errors=[{'message': 'pull access denied, repository does not exist or may require authoriza...e: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}])

    async def execute(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
        serialize_variables: Optional[bool] = None,
        parse_result: Optional[bool] = None,
        get_execution_result: bool = False,
        **kwargs,
    ) -> Union[Dict[str, Any], ExecutionResult]:
        """Coroutine to execute the provided document AST asynchronously using
        the async transport.

        Raises a TransportQueryError if an error has been returned in
            the ExecutionResult.

        :param document: GraphQL query as AST Node object.
        :param variable_values: Dictionary of input parameters.
        :param operation_name: Name of the operation that shall be executed.
        :param serialize_variables: whether the variable values should be
            serialized. Used for custom scalars and/or enums.
            By default use the serialize_variables argument of the client.
        :param parse_result: Whether gql will unserialize the result.
            By default use the parse_results argument of the client.
        :param get_execution_result: return the full ExecutionResult instance instead of
            only the "data" field. Necessary if you want to get the "extensions" field.

        The extra arguments are passed to the transport execute method."""

        # Validate and execute on the transport
        result = await self._execute(
            document,
            variable_values=variable_values,
            operation_name=operation_name,
            serialize_variables=serialize_variables,
            parse_result=parse_result,
            **kwargs,
        )

        # Raise an error if an error is returned in the ExecutionResult object
        if result.errors:
>           raise TransportQueryError(
                str(result.errors[0]),
                errors=result.errors,
                data=result.data,
                extensions=result.extensions,
            )
E           gql.transport.exceptions.TransportQueryError: {'message': 'pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}

/usr/local/lib/python3.10/site-packages/gql/client.py:1231: TransportQueryError

The above exception was the direct cause of the following exception:

anyio_backend = 'asyncio', args = ()
kwargs = {'dagger_client': <dagger.api.gen.Client object at 0x7f3737fe7680>, 'previous_connector_image_name': 'airbyte/source-select-star:latest'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend

        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
                yield from runner.run_asyncgen_fixture(func, kwargs)
            else:
>               yield runner.run_fixture(func, kwargs)

/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:70:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2105: in run_fixture
    retval = self._loop.run_until_complete(fixture_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
/app/connector_acceptance_test/conftest.py:189: in previous_connector_docker_runner_fixture
    await runner.load_container()
/app/connector_acceptance_test/utils/connector_runner.py:49: in load_container
    await self._connector_under_test_container.with_exec(["spec"])
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
    return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:895: in sync
    _id = await _ctx.execute(ContainerID)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
    with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
            yield

        except httpx.TimeoutException as e:
            msg = (
                "Request timed out. Try setting a higher value in 'execute_timeout' "
                "config for this `dagger.Connection()`."
            )
            raise ExecuteTimeoutError(msg) from e

        except httpx.RequestError as e:
            msg = f"Failed to make request: {e}"
            raise TransportError(msg) from e

        except TransportClosed as e:
            msg = (
                "Connection to engine has been closed. Make sure you're "
                "calling the API within a `dagger.Connection()` context."
            )
            raise TransportError(msg) from e

        except (TransportProtocolError, TransportServerError) as e:
            msg = f"Unexpected response from engine: {e}"
            raise TransportError(msg) from e

        except TransportQueryError as e:
            if error := QueryError.from_transport(e, query):
>               raise error from e
E               dagger.exceptions.QueryError: pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:270: QueryError
_____ ERROR at setup of TestDiscovery.test_backward_compatibility[inputs0] _____

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
>           yield

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
return_type = <class 'dagger.api.gen.ContainerID'>

    async def execute(self, return_type: type[T] | None = None) -> T | None:
        assert isinstance(self.session, AsyncClientSession)
        await self.resolve_ids()
        query = self.query()
        with self._handle_execute(query):
>           result = await self.session.execute(query)

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}
result = ExecutionResult(data=None, errors=[{'message': 'pull access denied, repository does not exist or may require authoriza...e: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}])

    async def execute(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
        serialize_variables: Optional[bool] = None,
        parse_result: Optional[bool] = None,
        get_execution_result: bool = False,
        **kwargs,
    ) -> Union[Dict[str, Any], ExecutionResult]:
        """Coroutine to execute the provided document AST asynchronously using
        the async transport.

        Raises a TransportQueryError if an error has been returned in
            the ExecutionResult.

        :param document: GraphQL query as AST Node object.
        :param variable_values: Dictionary of input parameters.
        :param operation_name: Name of the operation that shall be executed.
        :param serialize_variables: whether the variable values should be
            serialized. Used for custom scalars and/or enums.
            By default use the serialize_variables argument of the client.
        :param parse_result: Whether gql will unserialize the result.
            By default use the parse_results argument of the client.
        :param get_execution_result: return the full ExecutionResult instance instead of
            only the "data" field. Necessary if you want to get the "extensions" field.

        The extra arguments are passed to the transport execute method."""

        # Validate and execute on the transport
        result = await self._execute(
            document,
            variable_values=variable_values,
            operation_name=operation_name,
            serialize_variables=serialize_variables,
            parse_result=parse_result,
            **kwargs,
        )

        # Raise an error if an error is returned in the ExecutionResult object
        if result.errors:
>           raise TransportQueryError(
                str(result.errors[0]),
                errors=result.errors,
                data=result.data,
                extensions=result.extensions,
            )
E           gql.transport.exceptions.TransportQueryError: {'message': 'pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}

/usr/local/lib/python3.10/site-packages/gql/client.py:1231: TransportQueryError

The above exception was the direct cause of the following exception:

anyio_backend = 'asyncio', args = ()
kwargs = {'dagger_client': <dagger.api.gen.Client object at 0x7f3737fe7680>, 'previous_connector_image_name': 'airbyte/source-select-star:latest'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend

        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
                yield from runner.run_asyncgen_fixture(func, kwargs)
            else:
>               yield runner.run_fixture(func, kwargs)

/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:70:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2105: in run_fixture
    retval = self._loop.run_until_complete(fixture_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
    return future.result()
/app/connector_acceptance_test/conftest.py:189: in previous_connector_docker_runner_fixture
    await runner.load_container()
/app/connector_acceptance_test/utils/connector_runner.py:49: in load_container
    await self._connector_under_test_container.with_exec(["spec"])
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
    return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:895: in sync
    _id = await _ctx.execute(ContainerID)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
    with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
            yield

        except httpx.TimeoutException as e:
            msg = (
                "Request timed out. Try setting a higher value in 'execute_timeout' "
                "config for this `dagger.Connection()`."
            )
            raise ExecuteTimeoutError(msg) from e

        except httpx.RequestError as e:
            msg = f"Failed to make request: {e}"
            raise TransportError(msg) from e

        except TransportClosed as e:
            msg = (
                "Connection to engine has been closed. Make sure you're "
                "calling the API within a `dagger.Connection()` context."
            )
            raise TransportError(msg) from e

        except (TransportProtocolError, TransportServerError) as e:
            msg = f"Unexpected response from engine: {e}"
            raise TransportError(msg) from e

        except TransportQueryError as e:
            if error := QueryError.from_transport(e, query):
>               raise error from e
E               dagger.exceptions.QueryError: pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:270: QueryError
_____ ERROR at teardown of TestFullRefresh.test_sequential_reads[inputs0] ______

map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}

    @contextlib.contextmanager
    def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
        try:
>           yield

/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:10:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore._backends.anyio.AnyIOStream object at 0x7f3736f39d50>
max_bytes = 65536, timeout = None

    async def read(
        self, max_bytes: int, timeout: typing.Optional[float] = None
    ) -> bytes:
        exc_map = {
            TimeoutError: ReadTimeout,
            anyio.BrokenResourceError: ReadError,
            anyio.ClosedResourceError: ReadError,
        }
        with map_exceptions(exc_map):
            with anyio.fail_after(timeout):
                try:
>                   return await self._stream.receive(max_bytes=max_bytes)

/usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <anyio._backends._asyncio.SocketStream object at 0x7f3736f3a530>
max_bytes = 65536

    async def receive(self, max_bytes: int = 65536) -> bytes:
        with self._receive_guard:
            await checkpoint()

            if (
                not self._protocol.read_event.is_set()
                and not self._transport.is_closing()
            ):
                self._transport.resume_reading()
                await self._protocol.read_event.wait()
                self._transport.pause_reading()

            try:
                chunk = self._protocol.read_queue.popleft()
            except IndexError:
                if self._closed:
>                   raise ClosedResourceError from None
E                   anyio.ClosedResourceError

/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:1210: ClosedResourceError

The above exception was the direct cause of the following exception:

    @contextlib.contextmanager
    def map_httpcore_exceptions() -> typing.Iterator[None]:
        try:
>           yield

/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:60:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncHTTPTransport object at 0x7f37386638b0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        assert isinstance(request.stream, AsyncByteStream)

        req = httpcore.Request(
            method=request.method,
            url=httpcore.URL(
                scheme=request.url.raw_scheme,
                host=request.url.raw_host,
                port=request.url.port,
                target=request.url.raw_path,
            ),
            headers=request.headers.raw,
            content=request.stream,
            extensions=request.extensions,
        )
        with map_httpcore_exceptions():
>           resp = await self._pool.handle_async_request(req)

/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:353:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x7f37386603a0>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.

        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https", "ws", "wss"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )

        status = RequestStatus(request)

        async with self._pool_lock:
            self._requests.append(status)
            await self._close_expired_connections()
            await self._attempt_to_acquire_connection(status)

        while True:
            timeouts = request.extensions.get("timeout", {})
            timeout = timeouts.get("pool", None)
            try:
                connection = await status.wait_for_connection(timeout=timeout)
            except BaseException as exc:
                # If we timeout here, or if the task is cancelled, then make
                # sure to remove the request from the queue before bubbling
                # up the exception.
                async with self._pool_lock:
                    # Ensure only remove when task exists.
                    if status in self._requests:
                        self._requests.remove(status)
                    raise exc

            try:
                response = await connection.handle_async_request(request)
            except ConnectionNotAvailable:
                # The ConnectionNotAvailable exception is a special case, that
                # indicates we need to retry the request on a new connection.
                #
                # The most common case where this can occur is when multiple
                # requests are queued waiting for a single connection, which
                # might end up as an HTTP/2 connection, but which actually ends
                # up as HTTP/1.1.
                async with self._pool_lock:
                    # Maintain our position in the request queue, but reset the
                    # status so that the request becomes queued again.
                    status.unset_connection()
                    await self._attempt_to_acquire_connection(status)
            except BaseException as exc:
                with AsyncShieldCancellation():
                    await self.response_closed(status)
>               raise exc

/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:262:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore.AsyncConnectionPool object at 0x7f37386603a0>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        """
        Send an HTTP request, and return an HTTP response.

        This is the core implementation that is called into by `.request()` or `.stream()`.
        """
        scheme = request.url.scheme.decode()
        if scheme == "":
            raise UnsupportedProtocol(
                "Request URL is missing an 'http://' or 'https://' protocol."
            )
        if scheme not in ("http", "https", "ws", "wss"):
            raise UnsupportedProtocol(
                f"Request URL has an unsupported protocol '{scheme}://'."
            )

        status = RequestStatus(request)

        async with self._pool_lock:
            self._requests.append(status)
            await self._close_expired_connections()
            await self._attempt_to_acquire_connection(status)

        while True:
            timeouts = request.extensions.get("timeout", {})
            timeout = timeouts.get("pool", None)
            try:
                connection = await status.wait_for_connection(timeout=timeout)
            except BaseException as exc:
                # If we timeout here, or if the task is cancelled, then make
                # sure to remove the request from the queue before bubbling
                # up the exception.
                async with self._pool_lock:
                    # Ensure only remove when task exists.
                    if status in self._requests:
                        self._requests.remove(status)
                    raise exc

            try:
>               response = await connection.handle_async_request(request)

/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:245:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTPConnection ['http://127.0.0.1:35119', HTTP/1.1, CLOSED, Request Count: 4]>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
            )

        async with self._request_lock:
            if self._connection is None:
                try:
                    stream = await self._connect(request)

                    ssl_object = stream.get_extra_info("ssl_object")
                    http2_negotiated = (
                        ssl_object is not None
                        and ssl_object.selected_alpn_protocol() == "h2"
                    )
                    if http2_negotiated or (self._http2 and not self._http1):
                        from .http2 import AsyncHTTP2Connection

                        self._connection = AsyncHTTP2Connection(
                            origin=self._origin,
                            stream=stream,
                            keepalive_expiry=self._keepalive_expiry,
                        )
                    else:
                        self._connection = AsyncHTTP11Connection(
                            origin=self._origin,
                            stream=stream,
                            keepalive_expiry=self._keepalive_expiry,
                        )
                except Exception as exc:
                    self._connect_failed = True
                    raise exc
            elif not self._connection.is_available():
                raise ConnectionNotAvailable()

>       return await self._connection.handle_async_request(request)

/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection "
                f"to {self._origin}"
            )

        async with self._state_lock:
            if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
                self._request_count += 1
                self._state = HTTPConnectionState.ACTIVE
                self._expire_at = None
            else:
                raise ConnectionNotAvailable()

        try:
            kwargs = {"request": request}
            async with Trace("send_request_headers", logger, request, kwargs) as trace:
                await self._send_request_headers(**kwargs)
            async with Trace("send_request_body", logger, request, kwargs) as trace:
                await self._send_request_body(**kwargs)
            async with Trace(
                "receive_response_headers", logger, request, kwargs
            ) as trace:
                (
                    http_version,
                    status,
                    reason_phrase,
                    headers,
                ) = await self._receive_response_headers(**kwargs)
                trace.return_value = (
                    http_version,
                    status,
                    reason_phrase,
                    headers,
                )

            return Response(
                status=status,
                headers=headers,
                content=HTTP11ConnectionByteStream(self, request),
                extensions={
                    "http_version": http_version,
                    "reason_phrase": reason_phrase,
                    "network_stream": self._network_stream,
                },
            )
        except BaseException as exc:
            with AsyncShieldCancellation():
                async with Trace("response_closed", logger, request) as trace:
                    await self._response_closed()
>           raise exc

/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:121:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>

    async def handle_async_request(self, request: Request) -> Response:
        if not self.can_handle_request(request.url.origin):
            raise RuntimeError(
                f"Attempted to send request to {request.url.origin} on connection "
                f"to {self._origin}"
            )

        async with self._state_lock:
            if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
                self._request_count += 1
                self._state = HTTPConnectionState.ACTIVE
                self._expire_at = None
            else:
                raise ConnectionNotAvailable()

        try:
            kwargs = {"request": request}
            async with Trace("send_request_headers", logger, request, kwargs) as trace:
                await self._send_request_headers(**kwargs)
            async with Trace("send_request_body", logger, request, kwargs) as trace:
                await self._send_request_body(**kwargs)
            async with Trace(
                "receive_response_headers", logger, request, kwargs
            ) as trace:
                (
                    http_version,
                    status,
                    reason_phrase,
                    headers,
>               ) = await self._receive_response_headers(**kwargs)

/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:99:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>

    async def _receive_response_headers(
        self, request: Request
    ) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
        timeouts = request.extensions.get("timeout", {})
        timeout = timeouts.get("read", None)

        while True:
>           event = await self._receive_event(timeout=timeout)

/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:164:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
timeout = None

    async def _receive_event(
        self, timeout: Optional[float] = None
    ) -> Union[h11.Event, Type[h11.PAUSED]]:
        while True:
            with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
                event = self._h11_state.next_event()

            if event is h11.NEED_DATA:
>               data = await self._network_stream.read(
                    self.READ_NUM_BYTES, timeout=timeout
                )

/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpcore._backends.anyio.AnyIOStream object at 0x7f3736f39d50>
max_bytes = 65536, timeout = None

    async def read(
        self, max_bytes: int, timeout: typing.Optional[float] = None
    ) -> bytes:
        exc_map = {
            TimeoutError: ReadTimeout,
            anyio.BrokenResourceError: ReadError,
            anyio.ClosedResourceError: ReadError,
        }
>       with map_exceptions(exc_map):

/usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:31:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <contextlib._GeneratorContextManager object at 0x7f3736d76440>
typ = <class 'anyio.ClosedResourceError'>, value = ClosedResourceError()
traceback = <traceback object at 0x7f37381bae00>

    def __exit__(self, typ, value, traceback):
        if typ is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = typ()
            try:
>               self.gen.throw(typ, value, traceback)

/usr/local/lib/python3.10/contextlib.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}

    @contextlib.contextmanager
    def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
        try:
            yield
        except Exception as exc:  # noqa: PIE786
            for from_exc, to_exc in map.items():
                if isinstance(exc, from_exc):
>                   raise to_exc(exc) from exc
E                   httpcore.ReadError

/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:14: ReadError

The above exception was the direct cause of the following exception:

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
>           yield

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
return_type = <class 'str'>

    async def execute(self, return_type: type[T] | None = None) -> T | None:
        assert isinstance(self.session, AsyncClientSession)
        await self.resolve_ids()
        query = self.query()
        with self._handle_execute(query):
>           result = await self.session.execute(query)

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}

    async def execute(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
        serialize_variables: Optional[bool] = None,
        parse_result: Optional[bool] = None,
        get_execution_result: bool = False,
        **kwargs,
    ) -> Union[Dict[str, Any], ExecutionResult]:
        """Coroutine to execute the provided document AST asynchronously using
        the async transport.

        Raises a TransportQueryError if an error has been returned in
            the ExecutionResult.

        :param document: GraphQL query as AST Node object.
        :param variable_values: Dictionary of input parameters.
        :param operation_name: Name of the operation that shall be executed.
        :param serialize_variables: whether the variable values should be
            serialized. Used for custom scalars and/or enums.
            By default use the serialize_variables argument of the client.
        :param parse_result: Whether gql will unserialize the result.
            By default use the parse_results argument of the client.
        :param get_execution_result: return the full ExecutionResult instance instead of
            only the "data" field. Necessary if you want to get the "extensions" field.

        The extra arguments are passed to the transport execute method."""

        # Validate and execute on the transport
>       result = await self._execute(
            document,
            variable_values=variable_values,
            operation_name=operation_name,
            serialize_variables=serialize_variables,
            parse_result=parse_result,
            **kwargs,
        )

/usr/local/lib/python3.10/site-packages/gql/client.py:1220:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, kwargs = {}

    async def _execute(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
        serialize_variables: Optional[bool] = None,
        parse_result: Optional[bool] = None,
        **kwargs,
    ) -> ExecutionResult:
        """Coroutine to execute the provided document AST asynchronously using
        the async transport, returning an ExecutionResult object.

        * Validate the query with the schema if provided.
        * Serialize the variable_values if requested.

        :param document: GraphQL query as AST Node object.
        :param variable_values: Dictionary of input parameters.
        :param operation_name: Name of the operation that shall be executed.
        :param serialize_variables: whether the variable values should be
            serialized. Used for custom scalars and/or enums.
            By default use the serialize_variables argument of the client.
        :param parse_result: Whether gql will unserialize the result.
            By default use the parse_results argument of the client.

        The extra arguments are passed to the transport execute method."""

        # Validate document
        if self.client.schema:
            self.client.validate(document)

            # Parse variable values for custom scalars if requested
            if variable_values is not None:
                if serialize_variables or (
                    serialize_variables is None and self.client.serialize_variables
                ):
                    variable_values = serialize_variable_values(
                        self.client.schema,
                        document,
                        variable_values,
                        operation_name=operation_name,
                    )

        # Execute the query with the transport with a timeout
>       result = await asyncio.wait_for(
            self.transport.execute(
                document,
                variable_values=variable_values,
                operation_name=operation_name,
                **kwargs,
            ),
            self.client.execute_timeout,
        )

/usr/local/lib/python3.10/site-packages/gql/client.py:1126:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

fut = <coroutine object HTTPXAsyncTransport.execute at 0x7f37370757e0>
timeout = None

    async def wait_for(fut, timeout):
        """Wait for the single Future or coroutine to complete, with timeout.

        Coroutine will be wrapped in Task.

        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().

        If the wait is cancelled, the task is also cancelled.

        This function is a coroutine.
        """
        loop = events.get_running_loop()

        if timeout is None:
>           return await fut

/usr/local/lib/python3.10/asyncio/tasks.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <dagger.transport.httpx.HTTPXAsyncTransport object at 0x7f37386629e0>
document = DocumentNode, variable_values = None, operation_name = None
extra_args = None, upload_files = False

    async def execute(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
        extra_args: Optional[Dict[str, Any]] = None,
        upload_files: bool = False,
    ) -> ExecutionResult:
        if not self.client:
            raise TransportClosed("Transport is not connected")

        post_args = self._prepare_request(
            document,
            variable_values,
            operation_name,
            extra_args,
        )

>       response = await self.client.post(self.url, **post_args)

/usr/local/lib/python3.10/site-packages/dagger/transport/httpx.py:179:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>
url = URL('http://127.0.0.1:35119/query')

    async def post(
        self,
        url: URLTypes,
        *,
        content: typing.Optional[RequestContent] = None,
        data: typing.Optional[RequestData] = None,
        files: typing.Optional[RequestFiles] = None,
        json: typing.Optional[typing.Any] = None,
        params: typing.Optional[QueryParamTypes] = None,
        headers: typing.Optional[HeaderTypes] = None,
        cookies: typing.Optional[CookieTypes] = None,
        auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
        follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
        timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
        extensions: typing.Optional[RequestExtensions] = None,
    ) -> Response:
        """
        Send a `POST` request.

        **Parameters**: See `httpx.request`.
        """
>       return await self.request(
            "POST",
            url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1848:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>, method = 'POST'
url = URL('http://127.0.0.1:35119/query')

    async def request(
        self,
        method: str,
        url: URLTypes,
        *,
        content: typing.Optional[RequestContent] = None,
        data: typing.Optional[RequestData] = None,
        files: typing.Optional[RequestFiles] = None,
        json: typing.Optional[typing.Any] = None,
        params: typing.Optional[QueryParamTypes] = None,
        headers: typing.Optional[HeaderTypes] = None,
        cookies: typing.Optional[CookieTypes] = None,
        auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
        follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
        timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
        extensions: typing.Optional[RequestExtensions] = None,
    ) -> Response:
        """
        Build and send a request.

        Equivalent to:

        ```python
        request = client.build_request(...)
        response = await client.send(request, ...)
        ```

        See `AsyncClient.build_request()`, `AsyncClient.send()`
        and [Merging of configuration][0] for how the various parameters
        are merged with client-level configuration.

        [0]: /advanced/#merging-of-configuration
        """
        request = self.build_request(
            method=method,
            url=url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            timeout=timeout,
            extensions=extensions,
        )
>       return await self.send(request, auth=auth, follow_redirects=follow_redirects)

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1530:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>

    async def send(
        self,
        request: Request,
        *,
        stream: bool = False,
        auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
        follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
    ) -> Response:
        """
        Send a request.

        The request is sent as-is, unmodified.

        Typically you'll want to build one with `AsyncClient.build_request()`
        so that any client-level configuration is merged into the request,
        but passing an explicit `httpx.Request()` is supported as well.

        See also: [Request instances][0]

        [0]: /advanced/#request-instances
        """
        if self._state == ClientState.CLOSED:
            raise RuntimeError("Cannot send a request, as the client has been closed.")

        self._state = ClientState.OPENED
        follow_redirects = (
            self.follow_redirects
            if isinstance(follow_redirects, UseClientDefault)
            else follow_redirects
        )

        auth = self._build_request_auth(request, auth)

>       response = await self._send_handling_auth(
            request,
            auth=auth,
            follow_redirects=follow_redirects,
            history=[],
        )

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1617:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
auth = <httpx.BasicAuth object at 0x7f3738661b40>, follow_redirects = False
history = []

    async def _send_handling_auth(
        self,
        request: Request,
        auth: Auth,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        auth_flow = auth.async_auth_flow(request)
        try:
            request = await auth_flow.__anext__()

            while True:
>               response = await self._send_handling_redirects(
                    request,
                    follow_redirects=follow_redirects,
                    history=history,
                )

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1645:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
follow_redirects = False, history = []

    async def _send_handling_redirects(
        self,
        request: Request,
        follow_redirects: bool,
        history: typing.List[Response],
    ) -> Response:
        while True:
            if len(history) > self.max_redirects:
                raise TooManyRedirects(
                    "Exceeded maximum allowed redirects.", request=request
                )

            for hook in self._event_hooks["request"]:
                await hook(request)

>           response = await self._send_single_request(request)

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1682:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>

    async def _send_single_request(self, request: Request) -> Response:
        """
        Sends a single request, without handling any redirections.
        """
        transport = self._transport_for_url(request.url)
        timer = Timer()
        await timer.async_start()

        if not isinstance(request.stream, AsyncByteStream):
            raise RuntimeError(
                "Attempted to send an sync request with an AsyncClient instance."
            )

        with request_context(request=request):
>           response = await transport.handle_async_request(request)

/usr/local/lib/python3.10/site-packages/httpx/_client.py:1719:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <httpx.AsyncHTTPTransport object at 0x7f37386638b0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>

    async def handle_async_request(
        self,
        request: Request,
    ) -> Response:
        assert isinstance(request.stream, AsyncByteStream)

        req = httpcore.Request(
            method=request.method,
            url=httpcore.URL(
                scheme=request.url.raw_scheme,
                host=request.url.raw_host,
                port=request.url.port,
                target=request.url.raw_path,
            ),
            headers=request.headers.raw,
            content=request.stream,
            extensions=request.extensions,
        )
>       with map_httpcore_exceptions():

/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:352:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <contextlib._GeneratorContextManager object at 0x7f3737415270>
typ = <class 'httpcore.ReadError'>, value = ReadError(ClosedResourceError())
traceback = <traceback object at 0x7f3736ecb500>

    def __exit__(self, typ, value, traceback):
        if typ is None:
            try:
                next(self.gen)
            except StopIteration:
                return False
            else:
                raise RuntimeError("generator didn't stop")
        else:
            if value is None:
                # Need to force instantiation so we can reliably
                # tell if we get the same exception back
                value = typ()
            try:
>               self.gen.throw(typ, value, traceback)

/usr/local/lib/python3.10/contextlib.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

    @contextlib.contextmanager
    def map_httpcore_exceptions() -> typing.Iterator[None]:
        try:
            yield
        except Exception as exc:  # noqa: PIE-786
            mapped_exc = None

            for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
                if not isinstance(exc, from_exc):
                    continue
                # We want to map to the most specific exception we can find.
                # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
                # `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
                if mapped_exc is None or issubclass(to_exc, mapped_exc):
                    mapped_exc = to_exc

            if mapped_exc is None:  # pragma: no cover
                raise

            message = str(exc)
>           raise mapped_exc(message) from exc
E           httpx.ReadError

/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:77: ReadError

The above exception was the direct cause of the following exception:

anyio_backend = 'asyncio', args = (), kwargs = {'anyio_backend': 'asyncio'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>

    def wrapper(*args, anyio_backend, **kwargs):  # type: ignore[no-untyped-def]
        backend_name, backend_options = extract_backend_and_options(anyio_backend)
        if has_backend_arg:
            kwargs["anyio_backend"] = anyio_backend

        with get_runner(backend_name, backend_options) as runner:
            if isasyncgenfunction(func):
>               yield from runner.run_asyncgen_fixture(func, kwargs)

/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:68:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2098: in run_asyncgen_fixture
    self._raise_async_exceptions()
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2054: in _raise_async_exceptions
    raise exceptions[0]
/app/connector_acceptance_test/tests/test_core.py:971: in test_read
    output = await docker_runner.call_read(connector_config, configured_catalog)
/app/connector_acceptance_test/utils/connector_runner.py:63: in call_read
    return await self._run(
/app/connector_acceptance_test/utils/connector_runner.py:187: in _run
    output = await self._read_output_from_stdout(airbyte_command, container)
/app/connector_acceptance_test/utils/connector_runner.py:202: in _read_output_from_stdout
    return await container.with_exec(airbyte_command).stdout()
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
    return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:878: in stdout
    return await _ctx.execute(str)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
    with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
    self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
query = DocumentNode

    @contextlib.contextmanager
    def _handle_execute(self, query: graphql.DocumentNode):
        # Reduces duplication when handling errors, between sync and async.
        try:
            yield

        except httpx.TimeoutException as e:
            msg = (
                "Request timed out. Try setting a higher value in 'execute_timeout' "
                "config for this `dagger.Connection()`."
            )
            raise ExecuteTimeoutError(msg) from e

        except httpx.RequestError as e:
            msg = f"Failed to make request: {e}"
>           raise TransportError(msg) from e
E           dagger.exceptions.TransportError: Failed to make request:

/usr/local/lib/python3.10/site-packages/dagger/api/base.py:255: TransportError
=================================== FAILURES ===================================
_______________________ TestBasicRead.test_read[inputs0] _______________________

pyfuncitem = <Function test_read[inputs0]>

    @pytest.hookimpl(tryfirst=True)
    def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
        def run_with_hypothesis(**kwargs: Any) -> None:
            with get_runner(backend_name, backend_options) as runner:
                runner.run_test(original_func, kwargs)

        backend = pyfuncitem.funcargs.get("anyio_backend")
        if backend:
            backend_name, backend_options = extract_backend_and_options(backend)

            if hasattr(pyfuncitem.obj, "hypothesis"):
                # Wrap the inner test function unless it's already wrapped
                original_func = pyfuncitem.obj.hypothesis.inner_test
                if original_func.__qualname__ != run_with_hypothesis.__qualname__:
                    if iscoroutinefunction(original_func):
                        pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis

                return None

            if iscoroutinefunction(pyfuncitem.obj):
                funcargs = pyfuncitem.funcargs
                testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
                with get_runner(backend_name, backend_options) as runner:
>                   runner.run_test(pyfuncitem.obj, testargs)

/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
    self._loop.run_until_complete(test_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
    self.run_forever()
/usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
    self._run_once()
/usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
    event_list = self._selector.select(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <selectors.EpollSelector object at 0x7f37389043d0>, timeout = -1

    def select(self, timeout=None):
        if timeout is None:
            timeout = -1
        elif timeout <= 0:
            timeout = 0
        else:
            # epoll_wait() has a resolution of 1 millisecond, round away
            # from zero to wait *at least* timeout seconds.
            timeout = math.ceil(timeout * 1e3) * 1e-3

        # epoll_wait() expects `maxevents` to be greater than zero;
        # we want to make sure that `select()` can be called when no
        # FD is registered.
        max_ev = max(len(self._fd_to_key), 1)

        ready = []
        try:
>           fd_event_list = self._selector.poll(timeout, max_ev)
E           Failed: Timeout >600.0s

/usr/local/lib/python3.10/selectors.py:469: Failed
________________ TestFullRefresh.test_sequential_reads[inputs0] ________________

pyfuncitem = <Function test_sequential_reads[inputs0]>

    @pytest.hookimpl(tryfirst=True)
    def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
        def run_with_hypothesis(**kwargs: Any) -> None:
            with get_runner(backend_name, backend_options) as runner:
                runner.run_test(original_func, kwargs)

        backend = pyfuncitem.funcargs.get("anyio_backend")
        if backend:
            backend_name, backend_options = extract_backend_and_options(backend)

            if hasattr(pyfuncitem.obj, "hypothesis"):
                # Wrap the inner test function unless it's already wrapped
                original_func = pyfuncitem.obj.hypothesis.inner_test
                if original_func.__qualname__ != run_with_hypothesis.__qualname__:
                    if iscoroutinefunction(original_func):
                        pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis

                return None

            if iscoroutinefunction(pyfuncitem.obj):
                funcargs = pyfuncitem.funcargs
                testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
                with get_runner(backend_name, backend_options) as runner:
>                   runner.run_test(pyfuncitem.obj, testargs)

/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
    self._loop.run_until_complete(test_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
    self.run_forever()
/usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
    self._run_once()
/usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
    event_list = self._selector.select(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <selectors.EpollSelector object at 0x7f37389043d0>, timeout = -1

    def select(self, timeout=None):
        if timeout is None:
            timeout = -1
        elif timeout <= 0:
            timeout = 0
        else:
            # epoll_wait() has a resolution of 1 millisecond, round away
            # from zero to wait *at least* timeout seconds.
            timeout = math.ceil(timeout * 1e3) * 1e-3

        # epoll_wait() expects `maxevents` to be greater than zero;
        # we want to make sure that `select()` can be called when no
        # FD is registered.
        max_ev = max(len(self._fd_to_key), 1)

        ready = []
        try:
>           fd_event_list = self._selector.poll(timeout, max_ev)
E           Failed: Timeout >1200.0s

/usr/local/lib/python3.10/selectors.py:469: Failed
=============================== warnings summary ===============================
../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116
  /usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116: DeprecationWarning: jsonschema.exceptions.RefResolutionError is deprecated as of version 4.18.0. If you wish to catch potential reference resolution errors, directly catch referencing.exceptions.Unresolvable.
    class HypothesisRefResolutionError(jsonschema.exceptions.RefResolutionError):

../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31
  /usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31: DeprecationWarning: jsonschema.RefResolver is deprecated as of v4.18.0, in favor of the https://github.com/python-jsonschema/referencing library, which provides more compliant referencing behavior as well as more flexible APIs for customization. A future release will remove RefResolver. Please file a feature request (on referencing) if you are missing an API for the kind of customization you need.
    class LocalResolver(jsonschema.RefResolver):

-- Docs: https://docs.pytest.org/en/stable/warnings.html
=========================== short test summary info ============================
FAILED test_core.py::TestBasicRead::test_read[inputs0] - Failed: Timeout >600.0s
FAILED test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
ERROR test_core.py::TestSpec::test_backward_compatibility[inputs0] - dagger.e...
ERROR test_core.py::TestDiscovery::test_backward_compatibility[inputs0] - dag...
ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: This connector does not implement incremental sync
SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:720: This tests currently leads to too much failures. We need to fix the connectors at scale first.
== 2 failed, 34 passed, 2 skipped, 2 warnings, 3 errors in 2109.46s (0:35:09) ==
(airbyte3.9) vgavriil@VG-Work:~/workspace/forks/airbyte_select_star/airbyte-integrations/connectors/source-select-star (New-Source-Select-Star)$

vasilisgav avatar Aug 30 '23 08:08 vasilisgav

If you have different endpoints you need to add to the connector spec so we can inject during tests or running the sync.

marcosmarxm avatar Sep 04 '23 17:09 marcosmarxm

Sorry the delay to review the contribution @vasilisgav the August Hackathon is almost over and the team will return to community backlog.

marcosmarxm avatar Sep 04 '23 17:09 marcosmarxm

hey @vasilisgav apologies again about the delay, we're looking into getting this merged. before we restart this work, can you confirm you're still interested in continuing this contribution?

sh4sh avatar Oct 03 '23 15:10 sh4sh

Hi @sh4sh, sure, what else is needed?

vasilisgav avatar Oct 04 '23 07:10 vasilisgav

Hello @vasilisgav sorry the lack of updates. I'll continue the review process next week.

marcosmarxm avatar Nov 22 '23 17:11 marcosmarxm

https://github.com/airbytehq/airbyte-internal-issues/issues/7002

midavadim avatar Mar 26 '24 10:03 midavadim

Hi @vasilisgav, apologies for the delay in updating and reviewing your contribution. Regrettably, we had to decline it for now because the team was unable to set up a sandbox and conduct integration tests. We understand this can be disappointing and we want to assure you that we recognize this issue. We dedicated a significant amount of effort in the previous quarter, and will continue to do so in the next to improve the process for contributions like new connectors. The team will include a new process for contributing and publishing sources in future doesn't depend on Airbyte needs to have a proper sandbox to accept contributions and ensure all tests are valid. Please get in touch if you want to learn more about it and resubmit the amazing source :) Thanks a lot for all effort you put into this contribution and feel free to contact me on Slack

marcosmarxm avatar Jun 14 '24 13:06 marcosmarxm