Adds basic test for convert command.

This commit is contained in:
Romain Dorgueil
2017-11-04 14:55:08 +01:00
parent 25e919ab96
commit 0b969d31e0
18 changed files with 420 additions and 353 deletions

View File

@ -6,8 +6,8 @@ from bonobo.util.resolvers import _resolve_transformations, _resolve_options
class ConvertCommand(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('input-filename', help='Input filename.')
parser.add_argument('output-filename', help='Output filename.')
parser.add_argument('input_filename', help='Input filename.')
parser.add_argument('output_filename', help='Output filename.')
parser.add_argument(
'--' + READER,
'-r',

View File

@ -1,3 +1,4 @@
import logging
import sys
from contextlib import contextmanager
from logging import WARNING, ERROR
@ -38,6 +39,10 @@ class LoopingExecutionContext(Wrapper):
def stopped(self):
return self._stopped
@property
def defunct(self):
return self._defunct
@property
def alive(self):
return self._started and not self._stopped
@ -45,6 +50,8 @@ class LoopingExecutionContext(Wrapper):
@property
def status(self):
"""One character status for this node. """
if self._defunct:
return '!'
if not self.started:
return ' '
if not self.stopped:
@ -65,7 +72,7 @@ class LoopingExecutionContext(Wrapper):
else:
self.services = None
self._started, self._stopped = False, False
self._started, self._stopped, self._defunct = False, False, False
self._stack = None
def __enter__(self):
@ -81,15 +88,17 @@ class LoopingExecutionContext(Wrapper):
self._started = True
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
if isconfigurabletype(self.wrapped):
# Not normal to have a partially configured object here, so let's warn the user instead of having get into
# the hard trouble of understanding that by himself.
raise TypeError(
'The Configurable should be fully instanciated by now, unfortunately I got a PartiallyConfigured object...'
)
self._stack.setup(self)
try:
self._stack = ContextCurrifier(self.wrapped, *self._get_initial_context())
if isconfigurabletype(self.wrapped):
# Not normal to have a partially configured object here, so let's warn the user instead of having get into
# the hard trouble of understanding that by himself.
raise TypeError(
'The Configurable should be fully instanciated by now, unfortunately I got a PartiallyConfigured object...'
)
self._stack.setup(self)
except Exception:
return self.fatal(sys.exc_info())
def loop(self):
"""Generic loop. A bit boring. """
@ -113,14 +122,17 @@ class LoopingExecutionContext(Wrapper):
finally:
self._stopped = True
def handle_error(self, exctype, exc, tb):
mondrian.excepthook(
exctype, exc, tb, level=WARNING, context='{} in {}'.format(exctype.__name__, get_name(self)), logger=logger
)
def _get_initial_context(self):
if self.parent:
return self.parent.services.args_for(self.wrapped)
if self.services:
return self.services.args_for(self.wrapped)
return ()
def handle_error(self, exctype, exc, tb, *, level=logging.ERROR):
logging.getLogger(__name__).log(level, repr(self), exc_info=(exctype, exc, tb))
def fatal(self, exc_info):
self._defunct = True
self.input.shutdown()
self.handle_error(*exc_info, level=logging.CRITICAL)

View File

@ -1,3 +1,4 @@
import logging
import sys
from queue import Empty
from time import sleep
@ -12,6 +13,7 @@ from bonobo.structs.tokens import Token
from bonobo.util import get_name, iserrorbag, isloopbackbag, isbag, istuple
from bonobo.util.compat import deprecated_alias
from bonobo.util.statistics import WithStatistics
from mondrian import term
class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
@ -39,10 +41,12 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
return '<{}({}{}){}>'.format(type_name, self.status, name, self.get_statistics_as_string(prefix=' '))
def get_flags_as_string(self):
if self._defunct:
return term.red('[defunct]')
if self.killed:
return '[killed]'
return term.lightred('[killed]')
if self.stopped:
return '[done]'
return term.lightblack('[done]')
return ''
def write(self, *messages):
@ -92,13 +96,13 @@ class NodeExecutionContext(WithStatistics, LoopingExecutionContext):
self.increment('in')
return row
def should_loop(self):
return not any((self.defunct, self.killed))
def loop(self):
while not self._killed:
while self.should_loop():
try:
self.step()
except KeyboardInterrupt:
self.handle_error(*sys.exc_info())
break
except InactiveReadableError:
break
except Empty:

View File

@ -27,7 +27,11 @@ class ExecutorStrategy(Strategy):
futures = []
with self.create_executor() as executor:
context.start(self.get_starter(executor, futures))
try:
context.start(self.get_starter(executor, futures))
except:
logging.getLogger(__name__
).warning('KeyboardInterrupt received. Trying to terminate the nodes gracefully.')
while context.alive:
try:
@ -50,12 +54,17 @@ class ExecutorStrategy(Strategy):
try:
with node:
node.loop()
except BaseException as exc:
logging.getLogger(__name__).info(
'Got {} in {} runner.'.format(get_name(exc), node), exc_info=sys.exc_info()
except:
logging.getLogger(__name__).critical(
'Uncaught exception in node execution for {}.'.format(node), exc_info=True
)
node.shutdown()
node.stop()
futures.append(executor.submit(_runner))
try:
futures.append(executor.submit(_runner))
except:
logging.getLogger(__name__).critical('futures.append', exc_info=sys.exc_info())
return starter

View File

@ -1,4 +1,7 @@
from fs.errors import ResourceNotFound
from bonobo.config import Configurable, ContextProcessor, Option, Service
from bonobo.errors import UnrecoverableError
class FileHandler(Configurable):

View File

@ -152,3 +152,13 @@ def parse_args(mixed=None):
del os.environ[name]
else:
os.environ[name] = value
@contextmanager
def change_working_directory(path):
old_dir = os.getcwd()
os.chdir(str(path))
try:
yield
finally:
os.chdir(old_dir)

View File

@ -72,6 +72,7 @@ def _resolve_transformations(transformations):
:return: tuple(object)
"""
registry = _ModulesRegistry()
transformations = transformations or []
for t in transformations:
try:
mod, attr = t.split(':', 1)

View File

@ -1,6 +1,15 @@
from contextlib import contextmanager
import functools
import io
import os
import runpy
import sys
from contextlib import contextmanager, redirect_stdout, redirect_stderr
from unittest.mock import patch
from bonobo import open_fs, Token
import pytest
from bonobo import open_fs, Token, __main__, get_examples_path
from bonobo.commands import entrypoint
from bonobo.execution.contexts.graph import GraphExecutionContext
from bonobo.execution.contexts.node import NodeExecutionContext
@ -64,3 +73,68 @@ class BufferingGraphExecutionContext(BufferingContext, GraphExecutionContext):
def create_node_execution_context_for(self, node):
return self.NodeExecutionContextType(node, parent=self, buffer=self.buffer)
def runner(f):
@functools.wraps(f)
def wrapped_runner(*args, catch_errors=False):
with redirect_stdout(io.StringIO()) as stdout, redirect_stderr(io.StringIO()) as stderr:
try:
f(list(args))
except BaseException as exc:
if not catch_errors:
raise
elif isinstance(catch_errors, BaseException) and not isinstance(exc, catch_errors):
raise
return stdout.getvalue(), stderr.getvalue(), exc
return stdout.getvalue(), stderr.getvalue()
return wrapped_runner
@runner
def runner_entrypoint(args):
""" Run bonobo using the python command entrypoint directly (bonobo.commands.entrypoint). """
return entrypoint(args)
@runner
def runner_module(args):
""" Run bonobo using the bonobo.__main__ file, which is equivalent as doing "python -m bonobo ..."."""
with patch.object(sys, 'argv', ['bonobo', *args]):
return runpy.run_path(__main__.__file__, run_name='__main__')
all_runners = pytest.mark.parametrize('runner', [runner_entrypoint, runner_module])
all_environ_targets = pytest.mark.parametrize(
'target', [
(get_examples_path('environ.py'), ),
(
'-m',
'bonobo.examples.environ',
),
]
)
@all_runners
@all_environ_targets
class EnvironmentTestCase():
def run_quiet(self, runner, *args):
return runner('run', '--quiet', *args)
def run_environ(self, runner, *args, environ=None):
_environ = {'PATH': '/usr/bin'}
if environ:
_environ.update(environ)
with patch.dict('os.environ', _environ, clear=True):
out, err = self.run_quiet(runner, *args)
assert 'SECRET' not in os.environ
assert 'PASSWORD' not in os.environ
if 'PATH' in _environ:
assert 'PATH' in os.environ
assert os.environ['PATH'] == _environ['PATH']
assert err == ''
return dict(map(lambda line: line.split(' ', 1), filter(None, out.split('\n'))))

View File

@ -0,0 +1,25 @@
import pkg_resources
from bonobo.util.testing import all_runners
def test_entrypoint():
commands = {}
for command in pkg_resources.iter_entry_points('bonobo.commands'):
commands[command.name] = command
assert not {
'convert',
'init',
'inspect',
'run',
'version',
}.difference(set(commands))
@all_runners
def test_no_command(runner):
_, err, exc = runner(catch_errors=True)
assert type(exc) == SystemExit
assert 'error: the following arguments are required: command' in err

View File

@ -0,0 +1,13 @@
from bonobo.util.environ import change_working_directory
from bonobo.util.testing import all_runners
@all_runners
def test_convert(runner, tmpdir):
csv_content = 'id;name\n1;Romain'
tmpdir.join('in.csv').write(csv_content)
with change_working_directory(tmpdir):
runner('convert', 'in.csv', 'out.csv')
assert tmpdir.join('out.csv').read().strip() == csv_content

View File

@ -0,0 +1,44 @@
import io
from unittest.mock import patch
import pytest
from bonobo.commands.download import EXAMPLES_BASE_URL
from bonobo.util.testing import all_runners
@all_runners
def test_download_works_for_examples(runner):
expected_bytes = b'hello world'
class MockResponse(object):
def __init__(self):
self.status_code = 200
def iter_content(self, *args, **kwargs):
return [expected_bytes]
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
pass
fout = io.BytesIO()
fout.close = lambda: None
with patch('bonobo.commands.download._open_url') as mock_open_url, \
patch('bonobo.commands.download.open') as mock_open:
mock_open_url.return_value = MockResponse()
mock_open.return_value = fout
runner('download', 'examples/datasets/coffeeshops.txt')
expected_url = EXAMPLES_BASE_URL + 'datasets/coffeeshops.txt'
mock_open_url.assert_called_once_with(expected_url)
assert fout.getvalue() == expected_bytes
@all_runners
def test_download_fails_non_example(runner):
with pytest.raises(ValueError):
runner('download', 'something/entirely/different.txt')

View File

@ -0,0 +1,15 @@
import os
from bonobo.util.testing import all_runners
@all_runners
def test_init_file(runner, tmpdir):
target = tmpdir.join('foo.py')
target_filename = str(target)
runner('init', target_filename)
assert os.path.exists(target_filename)
out, err = runner('run', target_filename)
assert out.replace('\n', ' ').strip() == 'Hello World'
assert not err

View File

@ -0,0 +1,48 @@
import os
from unittest.mock import patch
from bonobo import get_examples_path
from bonobo.util.testing import all_runners
@all_runners
def test_run(runner):
out, err = runner('run', '--quiet', get_examples_path('types/strings.py'))
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_run_module(runner):
out, err = runner('run', '--quiet', '-m', 'bonobo.examples.types.strings')
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_run_path(runner):
out, err = runner('run', '--quiet', get_examples_path('types'))
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_install_requirements_for_dir(runner):
dirname = get_examples_path('types')
with patch('bonobo.commands.run._install_requirements') as install_mock:
runner('run', '--install', dirname)
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))
@all_runners
def test_install_requirements_for_file(runner):
dirname = get_examples_path('types')
with patch('bonobo.commands.run._install_requirements') as install_mock:
runner('run', '--install', os.path.join(dirname, 'strings.py'))
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))

View File

@ -0,0 +1,109 @@
import pytest
from bonobo.util.testing import EnvironmentTestCase
@pytest.fixture
def env1(tmpdir):
env_file = tmpdir.join('.env_one')
env_file.write('\n'.join((
'SECRET=unknown',
'PASSWORD=sweet',
'PATH=first',
)))
return str(env_file)
@pytest.fixture
def env2(tmpdir):
env_file = tmpdir.join('.env_two')
env_file.write('\n'.join((
'PASSWORD=bitter',
"PATH='second'",
)))
return str(env_file)
class TestDefaultEnvFile(EnvironmentTestCase):
def test_run_with_default_env_file(self, runner, target, env1):
env = self.run_environ(runner, *target, '--default-env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == '/usr/bin'
def test_run_with_multiple_default_env_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--default-env-file', env1, '--default-env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == '/usr/bin'
env = self.run_environ(runner, *target, '--default-env-file', env2, '--default-env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == '/usr/bin'
class TestEnvFile(EnvironmentTestCase):
def test_run_with_file(self, runner, target, env1):
env = self.run_environ(runner, *target, '--env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == 'first'
def test_run_with_multiple_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--env-file', env1, '--env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == 'second'
env = self.run_environ(runner, *target, '--env-file', env2, '--env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == 'first'
class TestEnvFileCombinations(EnvironmentTestCase):
def test_run_with_both_env_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--default-env-file', env1, '--env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == 'second'
def test_run_with_both_env_files_then_overrides(self, runner, target, env1, env2):
env = self.run_environ(
runner, *target, '--default-env-file', env1, '--env-file', env2, '--env', 'PASSWORD=mine', '--env',
'SECRET=s3cr3t'
)
assert env.get('SECRET') == 's3cr3t'
assert env.get('PASSWORD') == 'mine'
assert env.get('PATH') == 'second'
class TestEnvVars(EnvironmentTestCase):
def test_run_no_env(self, runner, target):
env = self.run_environ(runner, *target, environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
def test_run_env(self, runner, target):
env = self.run_environ(runner, *target, '--env', 'USER=serious', environ={'USER': 'romain'})
assert env.get('USER') == 'serious'
def test_run_env_mixed(self, runner, target):
env = self.run_environ(runner, *target, '--env', 'ONE=1', '--env', 'TWO="2"', environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
assert env.get('ONE') == '1'
assert env.get('TWO') == '2'
def test_run_default_env(self, runner, target):
env = self.run_environ(runner, *target, '--default-env', 'USER=clown')
assert env.get('USER') == 'clown'
env = self.run_environ(runner, *target, '--default-env', 'USER=clown', environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
env = self.run_environ(
runner, *target, '--env', 'USER=serious', '--default-env', 'USER=clown', environ={
'USER': 'romain'
}
)
assert env.get('USER') == 'serious'

View File

@ -0,0 +1,20 @@
from bonobo import __version__
from bonobo.util.testing import all_runners
@all_runners
def test_version(runner):
out, err = runner('version')
out = out.strip()
assert out.startswith('bonobo ')
assert __version__ in out
out, err = runner('version', '-q')
out = out.strip()
assert out.startswith('bonobo ')
assert __version__ in out
out, err = runner('version', '-qq')
out = out.strip()
assert not out.startswith('bonobo ')
assert __version__ in out

View File

@ -185,6 +185,7 @@ def test_node_tuple_dict():
assert output[0] == ('foo', 'bar', {'id': 1})
assert output[1] == ('foo', 'baz', {'id': 2})
def test_node_lifecycle_natural():
func = MagicMock()
@ -203,6 +204,7 @@ def test_node_lifecycle_natural():
ctx.stop()
assert all((ctx.started, ctx.stopped)) and not any((ctx.alive, ctx.killed))
def test_node_lifecycle_with_kill():
func = MagicMock()
@ -223,7 +225,3 @@ def test_node_lifecycle_with_kill():
ctx.stop()
assert all((ctx.started, ctx.killed, ctx.stopped)) and not ctx.alive

View File

@ -10,6 +10,7 @@ def test_names():
event_name = getattr(events, name.upper())
assert event_name == '.'.join(('execution', name))
def test_event_object():
# Same logic as above.
c = Mock()

View File

@ -1,319 +0,0 @@
import functools
import io
import os
import runpy
import sys
from contextlib import redirect_stdout, redirect_stderr
from unittest.mock import patch
import pkg_resources
import pytest
from bonobo import __main__, __version__, get_examples_path
from bonobo.commands import entrypoint
from bonobo.commands.download import EXAMPLES_BASE_URL
def runner(f):
@functools.wraps(f)
def wrapped_runner(*args, catch_errors=False):
with redirect_stdout(io.StringIO()) as stdout, redirect_stderr(io.StringIO()) as stderr:
try:
f(list(args))
except BaseException as exc:
if not catch_errors:
raise
elif isinstance(catch_errors, BaseException) and not isinstance(exc, catch_errors):
raise
return stdout.getvalue(), stderr.getvalue(), exc
return stdout.getvalue(), stderr.getvalue()
return wrapped_runner
@runner
def runner_entrypoint(args):
""" Run bonobo using the python command entrypoint directly (bonobo.commands.entrypoint). """
return entrypoint(args)
@runner
def runner_module(args):
""" Run bonobo using the bonobo.__main__ file, which is equivalent as doing "python -m bonobo ..."."""
with patch.object(sys, 'argv', ['bonobo', *args]):
return runpy.run_path(__main__.__file__, run_name='__main__')
all_runners = pytest.mark.parametrize('runner', [runner_entrypoint, runner_module])
def test_entrypoint():
commands = {}
for command in pkg_resources.iter_entry_points('bonobo.commands'):
commands[command.name] = command
assert not {
'convert',
'init',
'inspect',
'run',
'version',
}.difference(set(commands))
@all_runners
def test_no_command(runner):
_, err, exc = runner(catch_errors=True)
assert type(exc) == SystemExit
assert 'error: the following arguments are required: command' in err
@all_runners
def test_run(runner):
out, err = runner('run', '--quiet', get_examples_path('types/strings.py'))
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_run_module(runner):
out, err = runner('run', '--quiet', '-m', 'bonobo.examples.types.strings')
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_run_path(runner):
out, err = runner('run', '--quiet', get_examples_path('types'))
out = out.split('\n')
assert out[0].startswith('Foo ')
assert out[1].startswith('Bar ')
assert out[2].startswith('Baz ')
@all_runners
def test_install_requirements_for_dir(runner):
dirname = get_examples_path('types')
with patch('bonobo.commands.run._install_requirements') as install_mock:
runner('run', '--install', dirname)
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))
@all_runners
def test_install_requirements_for_file(runner):
dirname = get_examples_path('types')
with patch('bonobo.commands.run._install_requirements') as install_mock:
runner('run', '--install', os.path.join(dirname, 'strings.py'))
install_mock.assert_called_once_with(os.path.join(dirname, 'requirements.txt'))
@all_runners
def test_init_file(runner, tmpdir):
target = tmpdir.join('foo.py')
target_filename = str(target)
runner('init', target_filename)
assert os.path.exists(target_filename)
out, err = runner('run', target_filename)
assert out.replace('\n', ' ').strip() == 'Hello World'
assert not err
@all_runners
def test_version(runner):
out, err = runner('version')
out = out.strip()
assert out.startswith('bonobo ')
assert __version__ in out
out, err = runner('version', '-q')
out = out.strip()
assert out.startswith('bonobo ')
assert __version__ in out
out, err = runner('version', '-qq')
out = out.strip()
assert not out.startswith('bonobo ')
assert __version__ in out
@all_runners
def test_download_works_for_examples(runner):
expected_bytes = b'hello world'
class MockResponse(object):
def __init__(self):
self.status_code = 200
def iter_content(self, *args, **kwargs):
return [expected_bytes]
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
pass
fout = io.BytesIO()
fout.close = lambda: None
with patch('bonobo.commands.download._open_url') as mock_open_url, \
patch('bonobo.commands.download.open') as mock_open:
mock_open_url.return_value = MockResponse()
mock_open.return_value = fout
runner('download', 'examples/datasets/coffeeshops.txt')
expected_url = EXAMPLES_BASE_URL + 'datasets/coffeeshops.txt'
mock_open_url.assert_called_once_with(expected_url)
assert fout.getvalue() == expected_bytes
@all_runners
def test_download_fails_non_example(runner):
with pytest.raises(ValueError):
runner('download', 'something/entirely/different.txt')
@pytest.fixture
def env1(tmpdir):
env_file = tmpdir.join('.env_one')
env_file.write('\n'.join((
'SECRET=unknown',
'PASSWORD=sweet',
'PATH=first',
)))
return str(env_file)
@pytest.fixture
def env2(tmpdir):
env_file = tmpdir.join('.env_two')
env_file.write('\n'.join((
'PASSWORD=bitter',
"PATH='second'",
)))
return str(env_file)
all_environ_targets = pytest.mark.parametrize(
'target', [
(get_examples_path('environ.py'), ),
(
'-m',
'bonobo.examples.environ',
),
]
)
@all_runners
@all_environ_targets
class EnvironmentTestCase():
def run_quiet(self, runner, *args):
return runner('run', '--quiet', *args)
def run_environ(self, runner, *args, environ=None):
_environ = {'PATH': '/usr/bin'}
if environ:
_environ.update(environ)
with patch.dict('os.environ', _environ, clear=True):
out, err = self.run_quiet(runner, *args)
assert 'SECRET' not in os.environ
assert 'PASSWORD' not in os.environ
if 'PATH' in _environ:
assert 'PATH' in os.environ
assert os.environ['PATH'] == _environ['PATH']
assert err == ''
return dict(map(lambda line: line.split(' ', 1), filter(None, out.split('\n'))))
class TestDefaultEnvFile(EnvironmentTestCase):
def test_run_with_default_env_file(self, runner, target, env1):
env = self.run_environ(runner, *target, '--default-env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == '/usr/bin'
def test_run_with_multiple_default_env_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--default-env-file', env1, '--default-env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == '/usr/bin'
env = self.run_environ(runner, *target, '--default-env-file', env2, '--default-env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == '/usr/bin'
class TestEnvFile(EnvironmentTestCase):
def test_run_with_file(self, runner, target, env1):
env = self.run_environ(runner, *target, '--env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == 'first'
def test_run_with_multiple_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--env-file', env1, '--env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == 'second'
env = self.run_environ(runner, *target, '--env-file', env2, '--env-file', env1)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'sweet'
assert env.get('PATH') == 'first'
class TestEnvFileCombinations(EnvironmentTestCase):
def test_run_with_both_env_files(self, runner, target, env1, env2):
env = self.run_environ(runner, *target, '--default-env-file', env1, '--env-file', env2)
assert env.get('SECRET') == 'unknown'
assert env.get('PASSWORD') == 'bitter'
assert env.get('PATH') == 'second'
def test_run_with_both_env_files_then_overrides(self, runner, target, env1, env2):
env = self.run_environ(
runner, *target, '--default-env-file', env1, '--env-file', env2, '--env', 'PASSWORD=mine', '--env',
'SECRET=s3cr3t'
)
assert env.get('SECRET') == 's3cr3t'
assert env.get('PASSWORD') == 'mine'
assert env.get('PATH') == 'second'
class TestEnvVars(EnvironmentTestCase):
def test_run_no_env(self, runner, target):
env = self.run_environ(runner, *target, environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
def test_run_env(self, runner, target):
env = self.run_environ(runner, *target, '--env', 'USER=serious', environ={'USER': 'romain'})
assert env.get('USER') == 'serious'
def test_run_env_mixed(self, runner, target):
env = self.run_environ(runner, *target, '--env', 'ONE=1', '--env', 'TWO="2"', environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
assert env.get('ONE') == '1'
assert env.get('TWO') == '2'
def test_run_default_env(self, runner, target):
env = self.run_environ(runner, *target, '--default-env', 'USER=clown')
assert env.get('USER') == 'clown'
env = self.run_environ(runner, *target, '--default-env', 'USER=clown', environ={'USER': 'romain'})
assert env.get('USER') == 'romain'
env = self.run_environ(
runner, *target, '--env', 'USER=serious', '--default-env', 'USER=clown', environ={
'USER': 'romain'
}
)
assert env.get('USER') == 'serious'