airbyte
airbyte copied to clipboard
✨ New Source: Select Star
What
Work on new source select star airbytehq/airbyte#29074
How
Using the low cdk approach
Recommended reading order
spec.yamlmanifest.ymlschemas/*
Before Merging a Connector Pull Request
Wow! What a great pull request you have here! 🎉
To merge this PR, ensure the following has been done/considered for each connector added or updated:
- [ ] PR name follows PR naming conventions
- [ ] Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
- [ ] Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
- [ ] You've updated the connector's
metadata.yamlfile any other relevant changes, including abreakingChangesentry for major version bumps. See metadata.yaml docs - [ ] Secrets in the connector's spec are annotated with
airbyte_secret - [ ] All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
- [ ] Changelog updated in
docs/integrations/<source or destination>/<name>.mdwith an entry for the new version. See changelog example - [ ] Migration guide updated in
docs/integrations/<source or destination>/<name>-migrations.mdwith an entry for the new version, if the version is a breaking change. See migration guide example - [ ] If set, you've ensured the icon is present in the
platform-internalrepo. (Docs)
If the checklist is complete, but the CI check is failing,
-
Check for hidden checklists in your PR description
-
Toggle the github label
checklist-action-runon/off to re-run the checklist CI.
Thanks for the contribution @vasilisgav right now the team is entirely dedicated to review Hackathon contributions. After the event is done we're going to return to your contribution. Is it possible to share the integration test output?
Hello,
I work for Select Star.
When it comes to integration testing, we are happy to provide a free environment so you can monitor the connector to ensure it is functioning properly. I am not sure about the best process to make it happen. What do you think?
Hi @marcosmarxm,
I have used an api token provided by @ad-m-ss in a testing environment, and I added the integrations tests:
The only change thats not committed is the url_base which points to a testing environment git diff
requester:
type: HttpRequester
- url_base: "https://api.production.selectstar.com/v1"
+ url_base: "https://api.eu.selectstar.com/v1"
I believe thats not part of the spec since when merged it will always point to production, If there is a more elegant way to express production vs testing environments let me know.
The unfortunate thing is that some integration tests are failing and I have hard time locating why. Integrations tests take about 35 minutes which only makes things more difficult.
Here is the integration test output:
(airbyte3.9) vgavriil@VG-Work:~/workspace/forks/airbyte_select_star/airbyte-integrations/connectors/source-select-star (New-Source-Select-Star)$
./acceptance-test-docker.sh
Sending build context to Docker daemon 36.86kB
Step 1/18 : FROM python:3.9.11-alpine3.15 as base
---> df5be861e65d
Step 2/18 : FROM base as builder
---> df5be861e65d
Step 3/18 : WORKDIR /airbyte/integration_code
---> Using cache
---> ae70d13b6bbe
Step 4/18 : RUN apk --no-cache upgrade && pip install --upgrade pip && apk --no-cache add tzdata build-base
---> Using cache
---> 356d1c20630d
Step 5/18 : COPY setup.py ./
---> Using cache
---> d90d1704a7c8
Step 6/18 : RUN pip install --prefix=/install .
---> Using cache
---> 93191aa7a959
Step 7/18 : FROM base
---> df5be861e65d
Step 8/18 : WORKDIR /airbyte/integration_code
---> Using cache
---> ae70d13b6bbe
Step 9/18 : COPY --from=builder /install /usr/local
---> Using cache
---> 6422d49decec
Step 10/18 : COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
---> Using cache
---> 21e70bbe5c12
Step 11/18 : RUN echo "Etc/UTC" > /etc/timezone
---> Using cache
---> 335df22361c1
Step 12/18 : RUN apk --no-cache add bash
---> Using cache
---> bf2a8f81fcb5
Step 13/18 : COPY main.py ./
---> Using cache
---> 93c050a1cd03
Step 14/18 : COPY source_select_star ./source_select_star
---> Using cache
---> 7d54c7c03151
Step 15/18 : ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
---> Using cache
---> a43f93728d35
Step 16/18 : ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
---> Using cache
---> 58d6f85a141e
Step 17/18 : LABEL io.airbyte.version=0.1.0
---> Using cache
---> 51d44f4806c6
Step 18/18 : LABEL io.airbyte.name=airbyte/source-select-star
---> Using cache
---> 0a926d992f6b
Successfully built 0a926d992f6b
Successfully tagged airbyte/source-select-star:dev
Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them
latest: Pulling from airbyte/connector-acceptance-test
Digest: sha256:273f283618e92e4af6b167d5ba64c6e88cf389d199462995bff9fef64e4404a1
Status: Image is up to date for airbyte/connector-acceptance-test:latest
docker.io/airbyte/connector-acceptance-test:latest
============================= test session starts ==============================
platform linux -- Python 3.10.12, pytest-6.2.5, py-1.11.0, pluggy-1.2.0
rootdir: /test_input
plugins: anyio-3.7.1, timeout-1.4.2, cov-3.0.0, sugar-0.9.7, requests-mock-1.9.3, hypothesis-6.82.3, mock-3.6.1
collected 39 items / 1 skipped / 38 selected
test_core.py ....................E.............EsF. [ 97%]
test_full_refresh.py
FE [100%]
==================================== ERRORS ====================================
_______ ERROR at setup of TestSpec.test_backward_compatibility[inputs0] ________
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
> yield
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
return_type = <class 'dagger.api.gen.ContainerID'>
async def execute(self, return_type: type[T] | None = None) -> T | None:
assert isinstance(self.session, AsyncClientSession)
await self.resolve_ids()
query = self.query()
with self._handle_execute(query):
> result = await self.session.execute(query)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}
result = ExecutionResult(data=None, errors=[{'message': 'pull access denied, repository does not exist or may require authoriza...e: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}])
async def execute(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
serialize_variables: Optional[bool] = None,
parse_result: Optional[bool] = None,
get_execution_result: bool = False,
**kwargs,
) -> Union[Dict[str, Any], ExecutionResult]:
"""Coroutine to execute the provided document AST asynchronously using
the async transport.
Raises a TransportQueryError if an error has been returned in
the ExecutionResult.
:param document: GraphQL query as AST Node object.
:param variable_values: Dictionary of input parameters.
:param operation_name: Name of the operation that shall be executed.
:param serialize_variables: whether the variable values should be
serialized. Used for custom scalars and/or enums.
By default use the serialize_variables argument of the client.
:param parse_result: Whether gql will unserialize the result.
By default use the parse_results argument of the client.
:param get_execution_result: return the full ExecutionResult instance instead of
only the "data" field. Necessary if you want to get the "extensions" field.
The extra arguments are passed to the transport execute method."""
# Validate and execute on the transport
result = await self._execute(
document,
variable_values=variable_values,
operation_name=operation_name,
serialize_variables=serialize_variables,
parse_result=parse_result,
**kwargs,
)
# Raise an error if an error is returned in the ExecutionResult object
if result.errors:
> raise TransportQueryError(
str(result.errors[0]),
errors=result.errors,
data=result.data,
extensions=result.extensions,
)
E gql.transport.exceptions.TransportQueryError: {'message': 'pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}
/usr/local/lib/python3.10/site-packages/gql/client.py:1231: TransportQueryError
The above exception was the direct cause of the following exception:
anyio_backend = 'asyncio', args = ()
kwargs = {'dagger_client': <dagger.api.gen.Client object at 0x7f3737fe7680>, 'previous_connector_image_name': 'airbyte/source-select-star:latest'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>
def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def]
backend_name, backend_options = extract_backend_and_options(anyio_backend)
if has_backend_arg:
kwargs["anyio_backend"] = anyio_backend
with get_runner(backend_name, backend_options) as runner:
if isasyncgenfunction(func):
yield from runner.run_asyncgen_fixture(func, kwargs)
else:
> yield runner.run_fixture(func, kwargs)
/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:70:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2105: in run_fixture
retval = self._loop.run_until_complete(fixture_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
/app/connector_acceptance_test/conftest.py:189: in previous_connector_docker_runner_fixture
await runner.load_container()
/app/connector_acceptance_test/utils/connector_runner.py:49: in load_container
await self._connector_under_test_container.with_exec(["spec"])
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:895: in sync
_id = await _ctx.execute(ContainerID)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f3737dfe8d0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
yield
except httpx.TimeoutException as e:
msg = (
"Request timed out. Try setting a higher value in 'execute_timeout' "
"config for this `dagger.Connection()`."
)
raise ExecuteTimeoutError(msg) from e
except httpx.RequestError as e:
msg = f"Failed to make request: {e}"
raise TransportError(msg) from e
except TransportClosed as e:
msg = (
"Connection to engine has been closed. Make sure you're "
"calling the API within a `dagger.Connection()` context."
)
raise TransportError(msg) from e
except (TransportProtocolError, TransportServerError) as e:
msg = f"Unexpected response from engine: {e}"
raise TransportError(msg) from e
except TransportQueryError as e:
if error := QueryError.from_transport(e, query):
> raise error from e
E dagger.exceptions.QueryError: pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:270: QueryError
_____ ERROR at setup of TestDiscovery.test_backward_compatibility[inputs0] _____
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
> yield
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
return_type = <class 'dagger.api.gen.ContainerID'>
async def execute(self, return_type: type[T] | None = None) -> T | None:
assert isinstance(self.session, AsyncClientSession)
await self.resolve_ids()
query = self.query()
with self._handle_execute(query):
> result = await self.session.execute(query)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}
result = ExecutionResult(data=None, errors=[{'message': 'pull access denied, repository does not exist or may require authoriza...e: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}])
async def execute(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
serialize_variables: Optional[bool] = None,
parse_result: Optional[bool] = None,
get_execution_result: bool = False,
**kwargs,
) -> Union[Dict[str, Any], ExecutionResult]:
"""Coroutine to execute the provided document AST asynchronously using
the async transport.
Raises a TransportQueryError if an error has been returned in
the ExecutionResult.
:param document: GraphQL query as AST Node object.
:param variable_values: Dictionary of input parameters.
:param operation_name: Name of the operation that shall be executed.
:param serialize_variables: whether the variable values should be
serialized. Used for custom scalars and/or enums.
By default use the serialize_variables argument of the client.
:param parse_result: Whether gql will unserialize the result.
By default use the parse_results argument of the client.
:param get_execution_result: return the full ExecutionResult instance instead of
only the "data" field. Necessary if you want to get the "extensions" field.
The extra arguments are passed to the transport execute method."""
# Validate and execute on the transport
result = await self._execute(
document,
variable_values=variable_values,
operation_name=operation_name,
serialize_variables=serialize_variables,
parse_result=parse_result,
**kwargs,
)
# Raise an error if an error is returned in the ExecutionResult object
if result.errors:
> raise TransportQueryError(
str(result.errors[0]),
errors=result.errors,
data=result.data,
extensions=result.extensions,
)
E gql.transport.exceptions.TransportQueryError: {'message': 'pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed', 'locations': [{'line': 3, 'column': 5}], 'path': ['container', 'from']}
/usr/local/lib/python3.10/site-packages/gql/client.py:1231: TransportQueryError
The above exception was the direct cause of the following exception:
anyio_backend = 'asyncio', args = ()
kwargs = {'dagger_client': <dagger.api.gen.Client object at 0x7f3737fe7680>, 'previous_connector_image_name': 'airbyte/source-select-star:latest'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>
def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def]
backend_name, backend_options = extract_backend_and_options(anyio_backend)
if has_backend_arg:
kwargs["anyio_backend"] = anyio_backend
with get_runner(backend_name, backend_options) as runner:
if isasyncgenfunction(func):
yield from runner.run_asyncgen_fixture(func, kwargs)
else:
> yield runner.run_fixture(func, kwargs)
/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:70:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2105: in run_fixture
retval = self._loop.run_until_complete(fixture_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:649: in run_until_complete
return future.result()
/app/connector_acceptance_test/conftest.py:189: in previous_connector_docker_runner_fixture
await runner.load_container()
/app/connector_acceptance_test/utils/connector_runner.py:49: in load_container
await self._connector_under_test_container.with_exec(["spec"])
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:895: in sync
_id = await _ctx.execute(ContainerID)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ntainer', name='sync', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f37370326c0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
yield
except httpx.TimeoutException as e:
msg = (
"Request timed out. Try setting a higher value in 'execute_timeout' "
"config for this `dagger.Connection()`."
)
raise ExecuteTimeoutError(msg) from e
except httpx.RequestError as e:
msg = f"Failed to make request: {e}"
raise TransportError(msg) from e
except TransportClosed as e:
msg = (
"Connection to engine has been closed. Make sure you're "
"calling the API within a `dagger.Connection()` context."
)
raise TransportError(msg) from e
except (TransportProtocolError, TransportServerError) as e:
msg = f"Unexpected response from engine: {e}"
raise TransportError(msg) from e
except TransportQueryError as e:
if error := QueryError.from_transport(e, query):
> raise error from e
E dagger.exceptions.QueryError: pull access denied, repository does not exist or may require authorization: server message: insufficient_scope: authorization failed
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:270: QueryError
_____ ERROR at teardown of TestFullRefresh.test_sequential_reads[inputs0] ______
map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}
@contextlib.contextmanager
def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
try:
> yield
/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:10:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpcore._backends.anyio.AnyIOStream object at 0x7f3736f39d50>
max_bytes = 65536, timeout = None
async def read(
self, max_bytes: int, timeout: typing.Optional[float] = None
) -> bytes:
exc_map = {
TimeoutError: ReadTimeout,
anyio.BrokenResourceError: ReadError,
anyio.ClosedResourceError: ReadError,
}
with map_exceptions(exc_map):
with anyio.fail_after(timeout):
try:
> return await self._stream.receive(max_bytes=max_bytes)
/usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <anyio._backends._asyncio.SocketStream object at 0x7f3736f3a530>
max_bytes = 65536
async def receive(self, max_bytes: int = 65536) -> bytes:
with self._receive_guard:
await checkpoint()
if (
not self._protocol.read_event.is_set()
and not self._transport.is_closing()
):
self._transport.resume_reading()
await self._protocol.read_event.wait()
self._transport.pause_reading()
try:
chunk = self._protocol.read_queue.popleft()
except IndexError:
if self._closed:
> raise ClosedResourceError from None
E anyio.ClosedResourceError
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:1210: ClosedResourceError
The above exception was the direct cause of the following exception:
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
try:
> yield
/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:60:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncHTTPTransport object at 0x7f37386638b0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
async def handle_async_request(
self,
request: Request,
) -> Response:
assert isinstance(request.stream, AsyncByteStream)
req = httpcore.Request(
method=request.method,
url=httpcore.URL(
scheme=request.url.raw_scheme,
host=request.url.raw_host,
port=request.url.port,
target=request.url.raw_path,
),
headers=request.headers.raw,
content=request.stream,
extensions=request.extensions,
)
with map_httpcore_exceptions():
> resp = await self._pool.handle_async_request(req)
/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:353:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpcore.AsyncConnectionPool object at 0x7f37386603a0>
request = <Request [b'POST']>
async def handle_async_request(self, request: Request) -> Response:
"""
Send an HTTP request, and return an HTTP response.
This is the core implementation that is called into by `.request()` or `.stream()`.
"""
scheme = request.url.scheme.decode()
if scheme == "":
raise UnsupportedProtocol(
"Request URL is missing an 'http://' or 'https://' protocol."
)
if scheme not in ("http", "https", "ws", "wss"):
raise UnsupportedProtocol(
f"Request URL has an unsupported protocol '{scheme}://'."
)
status = RequestStatus(request)
async with self._pool_lock:
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)
while True:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
try:
connection = await status.wait_for_connection(timeout=timeout)
except BaseException as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
async with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
self._requests.remove(status)
raise exc
try:
response = await connection.handle_async_request(request)
except ConnectionNotAvailable:
# The ConnectionNotAvailable exception is a special case, that
# indicates we need to retry the request on a new connection.
#
# The most common case where this can occur is when multiple
# requests are queued waiting for a single connection, which
# might end up as an HTTP/2 connection, but which actually ends
# up as HTTP/1.1.
async with self._pool_lock:
# Maintain our position in the request queue, but reset the
# status so that the request becomes queued again.
status.unset_connection()
await self._attempt_to_acquire_connection(status)
except BaseException as exc:
with AsyncShieldCancellation():
await self.response_closed(status)
> raise exc
/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:262:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpcore.AsyncConnectionPool object at 0x7f37386603a0>
request = <Request [b'POST']>
async def handle_async_request(self, request: Request) -> Response:
"""
Send an HTTP request, and return an HTTP response.
This is the core implementation that is called into by `.request()` or `.stream()`.
"""
scheme = request.url.scheme.decode()
if scheme == "":
raise UnsupportedProtocol(
"Request URL is missing an 'http://' or 'https://' protocol."
)
if scheme not in ("http", "https", "ws", "wss"):
raise UnsupportedProtocol(
f"Request URL has an unsupported protocol '{scheme}://'."
)
status = RequestStatus(request)
async with self._pool_lock:
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)
while True:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
try:
connection = await status.wait_for_connection(timeout=timeout)
except BaseException as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
async with self._pool_lock:
# Ensure only remove when task exists.
if status in self._requests:
self._requests.remove(status)
raise exc
try:
> response = await connection.handle_async_request(request)
/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py:245:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AsyncHTTPConnection ['http://127.0.0.1:35119', HTTP/1.1, CLOSED, Request Count: 4]>
request = <Request [b'POST']>
async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
raise RuntimeError(
f"Attempted to send request to {request.url.origin} on connection to {self._origin}"
)
async with self._request_lock:
if self._connection is None:
try:
stream = await self._connect(request)
ssl_object = stream.get_extra_info("ssl_object")
http2_negotiated = (
ssl_object is not None
and ssl_object.selected_alpn_protocol() == "h2"
)
if http2_negotiated or (self._http2 and not self._http1):
from .http2 import AsyncHTTP2Connection
self._connection = AsyncHTTP2Connection(
origin=self._origin,
stream=stream,
keepalive_expiry=self._keepalive_expiry,
)
else:
self._connection = AsyncHTTP11Connection(
origin=self._origin,
stream=stream,
keepalive_expiry=self._keepalive_expiry,
)
except Exception as exc:
self._connect_failed = True
raise exc
elif not self._connection.is_available():
raise ConnectionNotAvailable()
> return await self._connection.handle_async_request(request)
/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py:96:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>
async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
raise RuntimeError(
f"Attempted to send request to {request.url.origin} on connection "
f"to {self._origin}"
)
async with self._state_lock:
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
try:
kwargs = {"request": request}
async with Trace("send_request_headers", logger, request, kwargs) as trace:
await self._send_request_headers(**kwargs)
async with Trace("send_request_body", logger, request, kwargs) as trace:
await self._send_request_body(**kwargs)
async with Trace(
"receive_response_headers", logger, request, kwargs
) as trace:
(
http_version,
status,
reason_phrase,
headers,
) = await self._receive_response_headers(**kwargs)
trace.return_value = (
http_version,
status,
reason_phrase,
headers,
)
return Response(
status=status,
headers=headers,
content=HTTP11ConnectionByteStream(self, request),
extensions={
"http_version": http_version,
"reason_phrase": reason_phrase,
"network_stream": self._network_stream,
},
)
except BaseException as exc:
with AsyncShieldCancellation():
async with Trace("response_closed", logger, request) as trace:
await self._response_closed()
> raise exc
/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:121:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>
async def handle_async_request(self, request: Request) -> Response:
if not self.can_handle_request(request.url.origin):
raise RuntimeError(
f"Attempted to send request to {request.url.origin} on connection "
f"to {self._origin}"
)
async with self._state_lock:
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
try:
kwargs = {"request": request}
async with Trace("send_request_headers", logger, request, kwargs) as trace:
await self._send_request_headers(**kwargs)
async with Trace("send_request_body", logger, request, kwargs) as trace:
await self._send_request_body(**kwargs)
async with Trace(
"receive_response_headers", logger, request, kwargs
) as trace:
(
http_version,
status,
reason_phrase,
headers,
> ) = await self._receive_response_headers(**kwargs)
/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:99:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
request = <Request [b'POST']>
async def _receive_response_headers(
self, request: Request
) -> Tuple[bytes, int, bytes, List[Tuple[bytes, bytes]]]:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("read", None)
while True:
> event = await self._receive_event(timeout=timeout)
/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:164:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <AsyncHTTP11Connection ['http://127.0.0.1:35119', CLOSED, Request Count: 4]>
timeout = None
async def _receive_event(
self, timeout: Optional[float] = None
) -> Union[h11.Event, Type[h11.PAUSED]]:
while True:
with map_exceptions({h11.RemoteProtocolError: RemoteProtocolError}):
event = self._h11_state.next_event()
if event is h11.NEED_DATA:
> data = await self._network_stream.read(
self.READ_NUM_BYTES, timeout=timeout
)
/usr/local/lib/python3.10/site-packages/httpcore/_async/http11.py:200:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpcore._backends.anyio.AnyIOStream object at 0x7f3736f39d50>
max_bytes = 65536, timeout = None
async def read(
self, max_bytes: int, timeout: typing.Optional[float] = None
) -> bytes:
exc_map = {
TimeoutError: ReadTimeout,
anyio.BrokenResourceError: ReadError,
anyio.ClosedResourceError: ReadError,
}
> with map_exceptions(exc_map):
/usr/local/lib/python3.10/site-packages/httpcore/_backends/anyio.py:31:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <contextlib._GeneratorContextManager object at 0x7f3736d76440>
typ = <class 'anyio.ClosedResourceError'>, value = ClosedResourceError()
traceback = <traceback object at 0x7f37381bae00>
def __exit__(self, typ, value, traceback):
if typ is None:
try:
next(self.gen)
except StopIteration:
return False
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = typ()
try:
> self.gen.throw(typ, value, traceback)
/usr/local/lib/python3.10/contextlib.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
map = {<class 'TimeoutError'>: <class 'httpcore.ReadTimeout'>, <class 'anyio.BrokenResourceError'>: <class 'httpcore.ReadError'>, <class 'anyio.ClosedResourceError'>: <class 'httpcore.ReadError'>}
@contextlib.contextmanager
def map_exceptions(map: ExceptionMapping) -> Iterator[None]:
try:
yield
except Exception as exc: # noqa: PIE786
for from_exc, to_exc in map.items():
if isinstance(exc, from_exc):
> raise to_exc(exc) from exc
E httpcore.ReadError
/usr/local/lib/python3.10/site-packages/httpcore/_exceptions.py:14: ReadError
The above exception was the direct cause of the following exception:
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
> yield
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
return_type = <class 'str'>
async def execute(self, return_type: type[T] | None = None) -> T | None:
assert isinstance(self.session, AsyncClientSession)
await self.resolve_ids()
query = self.query()
with self._handle_execute(query):
> result = await self.session.execute(query)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:157:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, get_execution_result = False
kwargs = {}
async def execute(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
serialize_variables: Optional[bool] = None,
parse_result: Optional[bool] = None,
get_execution_result: bool = False,
**kwargs,
) -> Union[Dict[str, Any], ExecutionResult]:
"""Coroutine to execute the provided document AST asynchronously using
the async transport.
Raises a TransportQueryError if an error has been returned in
the ExecutionResult.
:param document: GraphQL query as AST Node object.
:param variable_values: Dictionary of input parameters.
:param operation_name: Name of the operation that shall be executed.
:param serialize_variables: whether the variable values should be
serialized. Used for custom scalars and/or enums.
By default use the serialize_variables argument of the client.
:param parse_result: Whether gql will unserialize the result.
By default use the parse_results argument of the client.
:param get_execution_result: return the full ExecutionResult instance instead of
only the "data" field. Necessary if you want to get the "extensions" field.
The extra arguments are passed to the transport execute method."""
# Validate and execute on the transport
> result = await self._execute(
document,
variable_values=variable_values,
operation_name=operation_name,
serialize_variables=serialize_variables,
parse_result=parse_result,
**kwargs,
)
/usr/local/lib/python3.10/site-packages/gql/client.py:1220:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <gql.client.AsyncClientSession object at 0x7f3738661c00>
document = DocumentNode, variable_values = None, operation_name = None
serialize_variables = None, parse_result = None, kwargs = {}
async def _execute(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
serialize_variables: Optional[bool] = None,
parse_result: Optional[bool] = None,
**kwargs,
) -> ExecutionResult:
"""Coroutine to execute the provided document AST asynchronously using
the async transport, returning an ExecutionResult object.
* Validate the query with the schema if provided.
* Serialize the variable_values if requested.
:param document: GraphQL query as AST Node object.
:param variable_values: Dictionary of input parameters.
:param operation_name: Name of the operation that shall be executed.
:param serialize_variables: whether the variable values should be
serialized. Used for custom scalars and/or enums.
By default use the serialize_variables argument of the client.
:param parse_result: Whether gql will unserialize the result.
By default use the parse_results argument of the client.
The extra arguments are passed to the transport execute method."""
# Validate document
if self.client.schema:
self.client.validate(document)
# Parse variable values for custom scalars if requested
if variable_values is not None:
if serialize_variables or (
serialize_variables is None and self.client.serialize_variables
):
variable_values = serialize_variable_values(
self.client.schema,
document,
variable_values,
operation_name=operation_name,
)
# Execute the query with the transport with a timeout
> result = await asyncio.wait_for(
self.transport.execute(
document,
variable_values=variable_values,
operation_name=operation_name,
**kwargs,
),
self.client.execute_timeout,
)
/usr/local/lib/python3.10/site-packages/gql/client.py:1126:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <coroutine object HTTPXAsyncTransport.execute at 0x7f37370757e0>
timeout = None
async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
loop = events.get_running_loop()
if timeout is None:
> return await fut
/usr/local/lib/python3.10/asyncio/tasks.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <dagger.transport.httpx.HTTPXAsyncTransport object at 0x7f37386629e0>
document = DocumentNode, variable_values = None, operation_name = None
extra_args = None, upload_files = False
async def execute(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
extra_args: Optional[Dict[str, Any]] = None,
upload_files: bool = False,
) -> ExecutionResult:
if not self.client:
raise TransportClosed("Transport is not connected")
post_args = self._prepare_request(
document,
variable_values,
operation_name,
extra_args,
)
> response = await self.client.post(self.url, **post_args)
/usr/local/lib/python3.10/site-packages/dagger/transport/httpx.py:179:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>
url = URL('http://127.0.0.1:35119/query')
async def post(
self,
url: URLTypes,
*,
content: typing.Optional[RequestContent] = None,
data: typing.Optional[RequestData] = None,
files: typing.Optional[RequestFiles] = None,
json: typing.Optional[typing.Any] = None,
params: typing.Optional[QueryParamTypes] = None,
headers: typing.Optional[HeaderTypes] = None,
cookies: typing.Optional[CookieTypes] = None,
auth: typing.Union[AuthTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
extensions: typing.Optional[RequestExtensions] = None,
) -> Response:
"""
Send a `POST` request.
**Parameters**: See `httpx.request`.
"""
> return await self.request(
"POST",
url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1848:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>, method = 'POST'
url = URL('http://127.0.0.1:35119/query')
async def request(
self,
method: str,
url: URLTypes,
*,
content: typing.Optional[RequestContent] = None,
data: typing.Optional[RequestData] = None,
files: typing.Optional[RequestFiles] = None,
json: typing.Optional[typing.Any] = None,
params: typing.Optional[QueryParamTypes] = None,
headers: typing.Optional[HeaderTypes] = None,
cookies: typing.Optional[CookieTypes] = None,
auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
timeout: typing.Union[TimeoutTypes, UseClientDefault] = USE_CLIENT_DEFAULT,
extensions: typing.Optional[RequestExtensions] = None,
) -> Response:
"""
Build and send a request.
Equivalent to:
```python
request = client.build_request(...)
response = await client.send(request, ...)
```
See `AsyncClient.build_request()`, `AsyncClient.send()`
and [Merging of configuration][0] for how the various parameters
are merged with client-level configuration.
[0]: /advanced/#merging-of-configuration
"""
request = self.build_request(
method=method,
url=url,
content=content,
data=data,
files=files,
json=json,
params=params,
headers=headers,
cookies=cookies,
timeout=timeout,
extensions=extensions,
)
> return await self.send(request, auth=auth, follow_redirects=follow_redirects)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1530:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
async def send(
self,
request: Request,
*,
stream: bool = False,
auth: typing.Union[AuthTypes, UseClientDefault, None] = USE_CLIENT_DEFAULT,
follow_redirects: typing.Union[bool, UseClientDefault] = USE_CLIENT_DEFAULT,
) -> Response:
"""
Send a request.
The request is sent as-is, unmodified.
Typically you'll want to build one with `AsyncClient.build_request()`
so that any client-level configuration is merged into the request,
but passing an explicit `httpx.Request()` is supported as well.
See also: [Request instances][0]
[0]: /advanced/#request-instances
"""
if self._state == ClientState.CLOSED:
raise RuntimeError("Cannot send a request, as the client has been closed.")
self._state = ClientState.OPENED
follow_redirects = (
self.follow_redirects
if isinstance(follow_redirects, UseClientDefault)
else follow_redirects
)
auth = self._build_request_auth(request, auth)
> response = await self._send_handling_auth(
request,
auth=auth,
follow_redirects=follow_redirects,
history=[],
)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1617:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
auth = <httpx.BasicAuth object at 0x7f3738661b40>, follow_redirects = False
history = []
async def _send_handling_auth(
self,
request: Request,
auth: Auth,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
auth_flow = auth.async_auth_flow(request)
try:
request = await auth_flow.__anext__()
while True:
> response = await self._send_handling_redirects(
request,
follow_redirects=follow_redirects,
history=history,
)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1645:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
follow_redirects = False, history = []
async def _send_handling_redirects(
self,
request: Request,
follow_redirects: bool,
history: typing.List[Response],
) -> Response:
while True:
if len(history) > self.max_redirects:
raise TooManyRedirects(
"Exceeded maximum allowed redirects.", request=request
)
for hook in self._event_hooks["request"]:
await hook(request)
> response = await self._send_single_request(request)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1682:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncClient object at 0x7f37386637f0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
async def _send_single_request(self, request: Request) -> Response:
"""
Sends a single request, without handling any redirections.
"""
transport = self._transport_for_url(request.url)
timer = Timer()
await timer.async_start()
if not isinstance(request.stream, AsyncByteStream):
raise RuntimeError(
"Attempted to send an sync request with an AsyncClient instance."
)
with request_context(request=request):
> response = await transport.handle_async_request(request)
/usr/local/lib/python3.10/site-packages/httpx/_client.py:1719:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpx.AsyncHTTPTransport object at 0x7f37386638b0>
request = <Request('POST', 'http://127.0.0.1:35119/query')>
async def handle_async_request(
self,
request: Request,
) -> Response:
assert isinstance(request.stream, AsyncByteStream)
req = httpcore.Request(
method=request.method,
url=httpcore.URL(
scheme=request.url.raw_scheme,
host=request.url.raw_host,
port=request.url.port,
target=request.url.raw_path,
),
headers=request.headers.raw,
content=request.stream,
extensions=request.extensions,
)
> with map_httpcore_exceptions():
/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:352:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <contextlib._GeneratorContextManager object at 0x7f3737415270>
typ = <class 'httpcore.ReadError'>, value = ReadError(ClosedResourceError())
traceback = <traceback object at 0x7f3736ecb500>
def __exit__(self, typ, value, traceback):
if typ is None:
try:
next(self.gen)
except StopIteration:
return False
else:
raise RuntimeError("generator didn't stop")
else:
if value is None:
# Need to force instantiation so we can reliably
# tell if we get the same exception back
value = typ()
try:
> self.gen.throw(typ, value, traceback)
/usr/local/lib/python3.10/contextlib.py:153:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
@contextlib.contextmanager
def map_httpcore_exceptions() -> typing.Iterator[None]:
try:
yield
except Exception as exc: # noqa: PIE-786
mapped_exc = None
for from_exc, to_exc in HTTPCORE_EXC_MAP.items():
if not isinstance(exc, from_exc):
continue
# We want to map to the most specific exception we can find.
# Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to
# `httpx.ReadTimeout`, not just `httpx.TimeoutException`.
if mapped_exc is None or issubclass(to_exc, mapped_exc):
mapped_exc = to_exc
if mapped_exc is None: # pragma: no cover
raise
message = str(exc)
> raise mapped_exc(message) from exc
E httpx.ReadError
/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py:77: ReadError
The above exception was the direct cause of the following exception:
anyio_backend = 'asyncio', args = (), kwargs = {'anyio_backend': 'asyncio'}
backend_name = 'asyncio', backend_options = {}
runner = <anyio._backends._asyncio.TestRunner object at 0x7f3738905e10>
def wrapper(*args, anyio_backend, **kwargs): # type: ignore[no-untyped-def]
backend_name, backend_options = extract_backend_and_options(anyio_backend)
if has_backend_arg:
kwargs["anyio_backend"] = anyio_backend
with get_runner(backend_name, backend_options) as runner:
if isasyncgenfunction(func):
> yield from runner.run_asyncgen_fixture(func, kwargs)
/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:68:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2098: in run_asyncgen_fixture
self._raise_async_exceptions()
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2054: in _raise_async_exceptions
raise exceptions[0]
/app/connector_acceptance_test/tests/test_core.py:971: in test_read
output = await docker_runner.call_read(connector_config, configured_catalog)
/app/connector_acceptance_test/utils/connector_runner.py:63: in call_read
return await self._run(
/app/connector_acceptance_test/utils/connector_runner.py:187: in _run
output = await self._read_output_from_stdout(airbyte_command, container)
/app/connector_acceptance_test/utils/connector_runner.py:202: in _read_output_from_stdout
return await container.with_exec(airbyte_command).stdout()
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:486: in async_wrapper
return await bear(*args, **kwargs)
/usr/local/lib/python3.10/site-packages/dagger/api/gen.py:878: in stdout
return await _ctx.execute(str)
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:156: in execute
with self._handle_execute(query):
/usr/local/lib/python3.10/contextlib.py:153: in __exit__
self.gen.throw(typ, value, traceback)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Context(session=<gql.client.AsyncClientSession object at 0x7f3738661c00>, schema=<gql.dsl.DSLSchema object at 0x7f3738...ainer', name='stdout', args={}, children={})]), converter=<cattrs.preconf.json.JsonConverter object at 0x7f373704f5e0>)
query = DocumentNode
@contextlib.contextmanager
def _handle_execute(self, query: graphql.DocumentNode):
# Reduces duplication when handling errors, between sync and async.
try:
yield
except httpx.TimeoutException as e:
msg = (
"Request timed out. Try setting a higher value in 'execute_timeout' "
"config for this `dagger.Connection()`."
)
raise ExecuteTimeoutError(msg) from e
except httpx.RequestError as e:
msg = f"Failed to make request: {e}"
> raise TransportError(msg) from e
E dagger.exceptions.TransportError: Failed to make request:
/usr/local/lib/python3.10/site-packages/dagger/api/base.py:255: TransportError
=================================== FAILURES ===================================
_______________________ TestBasicRead.test_read[inputs0] _______________________
pyfuncitem = <Function test_read[inputs0]>
@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
def run_with_hypothesis(**kwargs: Any) -> None:
with get_runner(backend_name, backend_options) as runner:
runner.run_test(original_func, kwargs)
backend = pyfuncitem.funcargs.get("anyio_backend")
if backend:
backend_name, backend_options = extract_backend_and_options(backend)
if hasattr(pyfuncitem.obj, "hypothesis"):
# Wrap the inner test function unless it's already wrapped
original_func = pyfuncitem.obj.hypothesis.inner_test
if original_func.__qualname__ != run_with_hypothesis.__qualname__:
if iscoroutinefunction(original_func):
pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
return None
if iscoroutinefunction(pyfuncitem.obj):
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
with get_runner(backend_name, backend_options) as runner:
> runner.run_test(pyfuncitem.obj, testargs)
/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
self._loop.run_until_complete(test_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
self.run_forever()
/usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
self._run_once()
/usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
event_list = self._selector.select(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <selectors.EpollSelector object at 0x7f37389043d0>, timeout = -1
def select(self, timeout=None):
if timeout is None:
timeout = -1
elif timeout <= 0:
timeout = 0
else:
# epoll_wait() has a resolution of 1 millisecond, round away
# from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3) * 1e-3
# epoll_wait() expects `maxevents` to be greater than zero;
# we want to make sure that `select()` can be called when no
# FD is registered.
max_ev = max(len(self._fd_to_key), 1)
ready = []
try:
> fd_event_list = self._selector.poll(timeout, max_ev)
E Failed: Timeout >600.0s
/usr/local/lib/python3.10/selectors.py:469: Failed
________________ TestFullRefresh.test_sequential_reads[inputs0] ________________
pyfuncitem = <Function test_sequential_reads[inputs0]>
@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(pyfuncitem: Any) -> bool | None:
def run_with_hypothesis(**kwargs: Any) -> None:
with get_runner(backend_name, backend_options) as runner:
runner.run_test(original_func, kwargs)
backend = pyfuncitem.funcargs.get("anyio_backend")
if backend:
backend_name, backend_options = extract_backend_and_options(backend)
if hasattr(pyfuncitem.obj, "hypothesis"):
# Wrap the inner test function unless it's already wrapped
original_func = pyfuncitem.obj.hypothesis.inner_test
if original_func.__qualname__ != run_with_hypothesis.__qualname__:
if iscoroutinefunction(original_func):
pyfuncitem.obj.hypothesis.inner_test = run_with_hypothesis
return None
if iscoroutinefunction(pyfuncitem.obj):
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg] for arg in pyfuncitem._fixtureinfo.argnames}
with get_runner(backend_name, backend_options) as runner:
> runner.run_test(pyfuncitem.obj, testargs)
/usr/local/lib/python3.10/site-packages/anyio/pytest_plugin.py:117:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py:2113: in run_test
self._loop.run_until_complete(test_func(**kwargs))
/usr/local/lib/python3.10/asyncio/base_events.py:636: in run_until_complete
self.run_forever()
/usr/local/lib/python3.10/asyncio/base_events.py:603: in run_forever
self._run_once()
/usr/local/lib/python3.10/asyncio/base_events.py:1871: in _run_once
event_list = self._selector.select(timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <selectors.EpollSelector object at 0x7f37389043d0>, timeout = -1
def select(self, timeout=None):
if timeout is None:
timeout = -1
elif timeout <= 0:
timeout = 0
else:
# epoll_wait() has a resolution of 1 millisecond, round away
# from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3) * 1e-3
# epoll_wait() expects `maxevents` to be greater than zero;
# we want to make sure that `select()` can be called when no
# FD is registered.
max_ev = max(len(self._fd_to_key), 1)
ready = []
try:
> fd_event_list = self._selector.poll(timeout, max_ev)
E Failed: Timeout >1200.0s
/usr/local/lib/python3.10/selectors.py:469: Failed
=============================== warnings summary ===============================
../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116
/usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_canonicalise.py:116: DeprecationWarning: jsonschema.exceptions.RefResolutionError is deprecated as of version 4.18.0. If you wish to catch potential reference resolution errors, directly catch referencing.exceptions.Unresolvable.
class HypothesisRefResolutionError(jsonschema.exceptions.RefResolutionError):
../usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31
/usr/local/lib/python3.10/site-packages/hypothesis_jsonschema/_resolve.py:31: DeprecationWarning: jsonschema.RefResolver is deprecated as of v4.18.0, in favor of the https://github.com/python-jsonschema/referencing library, which provides more compliant referencing behavior as well as more flexible APIs for customization. A future release will remove RefResolver. Please file a feature request (on referencing) if you are missing an API for the kind of customization you need.
class LocalResolver(jsonschema.RefResolver):
-- Docs: https://docs.pytest.org/en/stable/warnings.html
=========================== short test summary info ============================
FAILED test_core.py::TestBasicRead::test_read[inputs0] - Failed: Timeout >600.0s
FAILED test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
ERROR test_core.py::TestSpec::test_backward_compatibility[inputs0] - dagger.e...
ERROR test_core.py::TestDiscovery::test_backward_compatibility[inputs0] - dag...
ERROR test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0]
SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestIncremental.test_two_sequential_reads: This connector does not implement incremental sync
SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:720: This tests currently leads to too much failures. We need to fix the connectors at scale first.
== 2 failed, 34 passed, 2 skipped, 2 warnings, 3 errors in 2109.46s (0:35:09) ==
(airbyte3.9) vgavriil@VG-Work:~/workspace/forks/airbyte_select_star/airbyte-integrations/connectors/source-select-star (New-Source-Select-Star)$
If you have different endpoints you need to add to the connector spec so we can inject during tests or running the sync.
Sorry the delay to review the contribution @vasilisgav the August Hackathon is almost over and the team will return to community backlog.
hey @vasilisgav apologies again about the delay, we're looking into getting this merged. before we restart this work, can you confirm you're still interested in continuing this contribution?
Hi @sh4sh, sure, what else is needed?
Hello @vasilisgav sorry the lack of updates. I'll continue the review process next week.
https://github.com/airbytehq/airbyte-internal-issues/issues/7002
Hi @vasilisgav, apologies for the delay in updating and reviewing your contribution. Regrettably, we had to decline it for now because the team was unable to set up a sandbox and conduct integration tests. We understand this can be disappointing and we want to assure you that we recognize this issue. We dedicated a significant amount of effort in the previous quarter, and will continue to do so in the next to improve the process for contributions like new connectors. The team will include a new process for contributing and publishing sources in future doesn't depend on Airbyte needs to have a proper sandbox to accept contributions and ensure all tests are valid. Please get in touch if you want to learn more about it and resubmit the amazing source :) Thanks a lot for all effort you put into this contribution and feel free to contact me on Slack