pytest-rabbitmq
pytest-rabbitmq copied to clipboard
Accessing rabbitmq server path when using docker-compose
What action do you want to perform
I'm currently running rabbitMQ from docker-compose. To access the server directory I've created a docker volume to make it accessible from the host machine. When running the test I'm receiving an error.
Chown didn't fix the issue. After chmod -R 777 I can execute, read, write any files with my user. Tried os.mkdir() in the directory during the tests and it also works properly.
What are the results
test_bar_event_producer.py::TestBarEventProducer::test_emit ERROR [100%]
test setup failed
request = <SubRequest 'rabbitmq' for <Function test_emit>>
@pytest.fixture
def rabbitmq_factory(request):
"""
Client fixture for RabbitMQ.
#. Get module and config.
#. Connect to RabbitMQ using the parameters from config.
:param TCPExecutor rabbitmq_proc: tcp executor
:param FixtureRequest request: fixture request object
:rtype: rabbitpy.adapters.blocking_connection.BlockingConnection
:returns: instance of :class:`BlockingConnection`
"""
# load required process fixture
> process = request.getfixturevalue(process_fixture_name)
../../../../../../venv38/lib/python3.8/site-packages/pytest_rabbitmq/factories/client.py:102:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../../../../venv38/lib/python3.8/site-packages/pytest_rabbitmq/factories/process.py:112: in rabbitmq_proc_fixture
rabbit_executor.start()
../../../../../../venv38/lib/python3.8/site-packages/mirakuru/base.py:523: in start
super().start()
../../../../../../venv38/lib/python3.8/site-packages/mirakuru/base.py:263: in start
self.process = subprocess.Popen(command, **self._popen_kwargs)
/usr/lib/python3.8/subprocess.py:858: in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <subprocess.Popen object at 0x7efe27ff8310>
args = ['/home/kprystasz/rabbitmq'], executable = b'/home/kprystasz/rabbitmq'
preexec_fn = <built-in function setsid>, close_fds = True, pass_fds = ()
cwd = None
env = {'BAMF_DESKTOP_FILE_HINT': '/var/lib/snapd/desktop/applications/pycharm-professional_pycharm-professional.desktop', 'C...ADDRESS': 'unix:path=/run/user/1001/bus', 'DB_AITBARS': 'XXXX://XXXX@mysqldb-bars:3307/ait', ...}
startupinfo = None, creationflags = 0, shell = False, p2cread = 11
p2cwrite = 12, c2pread = 13, c2pwrite = 14, errread = -1, errwrite = -1
restore_signals = True, start_new_session = False
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals, start_new_session):
"""Execute program (POSIX version)"""
if isinstance(args, (str, bytes)):
args = [args]
elif isinstance(args, os.PathLike):
if shell:
raise TypeError('path-like args is not allowed when '
'shell is true')
args = [args]
else:
args = list(args)
if shell:
# On Android the default shell is at '/system/bin/sh'.
unix_shell = ('/system/bin/sh' if
hasattr(sys, 'getandroidapilevel') else '/bin/sh')
args = [unix_shell, "-c"] + args
if executable:
args[0] = executable
if executable is None:
executable = args[0]
sys.audit("subprocess.Popen", executable, args, cwd, env)
if (_USE_POSIX_SPAWN
and os.path.dirname(executable)
and preexec_fn is None
and not close_fds
and not pass_fds
and cwd is None
and (p2cread == -1 or p2cread > 2)
and (c2pwrite == -1 or c2pwrite > 2)
and (errwrite == -1 or errwrite > 2)
and not start_new_session):
self._posix_spawn(args, executable, env, restore_signals,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
return
orig_executable = executable
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
# errpipe_write must not be in the standard io 0, 1, or 2 fd range.
low_fds_to_close = []
while errpipe_write < 3:
low_fds_to_close.append(errpipe_write)
errpipe_write = os.dup(errpipe_write)
for low_fd in low_fds_to_close:
os.close(low_fd)
try:
try:
# We must avoid complex work that could involve
# malloc or free in the child process to avoid
# potential deadlocks, thus we do all this here.
# and pass it to fork_exec()
if env is not None:
env_list = []
for k, v in env.items():
k = os.fsencode(k)
if b'=' in k:
raise ValueError("illegal environment variable name")
env_list.append(k + b'=' + os.fsencode(v))
else:
env_list = None # Use execv instead of execve.
executable = os.fsencode(executable)
if os.path.dirname(executable):
executable_list = (executable,)
else:
# This matches the behavior of os._execvpe().
executable_list = tuple(
os.path.join(os.fsencode(dir), executable)
for dir in os.get_exec_path(env))
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _posixsubprocess.fork_exec(
args, executable_list,
close_fds, tuple(sorted(map(int, fds_to_keep))),
cwd, env_list,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session, preexec_fn)
self._child_created = True
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
self._close_pipe_fds(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
# Wait for exec to fail or succeed; possibly raising an
# exception (limited in size)
errpipe_data = bytearray()
while True:
part = os.read(errpipe_read, 50000)
errpipe_data += part
if not part or len(errpipe_data) > 50000:
break
finally:
# be sure the FD is closed no matter what
os.close(errpipe_read)
if errpipe_data:
try:
pid, sts = os.waitpid(self.pid, 0)
if pid == self.pid:
self._handle_exitstatus(sts)
else:
self.returncode = sys.maxsize
except ChildProcessError:
pass
try:
exception_name, hex_errno, err_msg = (
errpipe_data.split(b':', 2))
# The encoding here should match the encoding
# written in by the subprocess implementations
# like _posixsubprocess
err_msg = err_msg.decode()
except ValueError:
exception_name = b'SubprocessError'
hex_errno = b'0'
err_msg = 'Bad exception data from child: {!r}'.format(
bytes(errpipe_data))
child_exception_type = getattr(
builtins, exception_name.decode('ascii'),
SubprocessError)
if issubclass(child_exception_type, OSError) and hex_errno:
errno_num = int(hex_errno, 16)
child_exec_never_called = (err_msg == "noexec")
if child_exec_never_called:
err_msg = ""
# The error must be from chdir(cwd).
err_filename = cwd
else:
err_filename = orig_executable
if errno_num != 0:
err_msg = os.strerror(errno_num)
> raise child_exception_type(errno_num, err_msg, err_filename)
E PermissionError: [Errno 13] Permission denied: '/home/kprystasz/rabbitmq'
What are the expected results
@KajetanPrystasz you're running rabbitmq on a docker instance, and want to connect it within your tests? Do I understand you correctly?
The pytest-rabbitmq functionality is to start the server on the same machine you're running tests on.
If I understood correctly what you're trying to achieve, that's something that would have to be implemented, like here https://github.com/ClearcodeHQ/pytest-redis#connecting-to-already-existing-redis-database
@fizyk indeed, that's the functionality I'm looking for. Are you planning on creating it? Using Docker to spin up a rabbitmq server is currently the suggested way of running it. I believe that this feature would be beneficial for the project.
@KajetanPrystasz plan was there if it'd be needed. However, time is not something that would allow me to do it soon.
I'll review and accept PR however. If someone would want to develop it, the examples are there in pytest-redis or pytest-postgresql