python-zeep
python-zeep copied to clipboard
idea!!! simultaneously work in synchronous and asynchronous mode
I had to make the following improvements, please make changes to correct the errors of the zeep architects
class AsyncTransport(OriginalAsyncTransport): # noqa
def __init__(self, auth: httpx.BasicAuth):
params = dict(
default_encoding=r'utf-8-sig',
timeout=httpx.Timeout(read=300, write=10, connect=1, pool=1),
)
super().__init__(client=httpx.AsyncClient(**params), wsdl_client=httpx.Client(**params, auth=auth))
def post(self, address, message, headers) -> httpx.Response | Awaitable[httpx.Response]:
return (self.wsdl_client if brom_sync_mode.get() else self.client) \
.post(address, content=message, headers=headers, auth=brom_soap_auth.get())
def post_xml(self, address, envelope, headers) -> httpx.Response | Awaitable[httpx.Response]:
return super(OriginalAsyncTransport, self).post_xml(address, envelope, headers)
@deprecated(r'Не нужный метод')
def new_response[T](self, response: T) -> T:
raise NotImplementedError()
class AsyncOperationProxy(OriginalAsyncOperationProxy): # noqa
def sync(self, *args, **kwargs):
return super(OriginalAsyncOperationProxy, self).__call__(*args, **kwargs)
class AsyncServiceProxy(OriginalServiceProxy): # noqa
def __init__(self: Self, client, binding, **binding_options): # noqa
super().__init__(client, binding, **binding_options)
self._operations = {name: AsyncOperationProxy(self, name) for name in self._binding.all()}
class AsyncClient(OriginalAsyncClient): # noqa
def __init__(self, wsdl: pydantic.HttpUrl, auth: httpx.BasicAuth):
super().__init__(settings=Settings(raw_response=False), wsdl=wsdl,
service_name=r'brom_api', port_name=r'brom_apiSoap12',
transport=AsyncTransport(auth=auth), plugins=[Interceptor()])
def bind(self, service_name: str | None = None,
port_name: str | None = None) -> AsyncServiceProxy | NoReturn:
if self.wsdl.services:
port = self._get_port(self._get_service(service_name), port_name)
return AsyncServiceProxy(self, port.binding, **port.binding_options)