diff --git a/doxec/__init__.py b/doxec/__init__.py index 64ca33f30d02ab3a23cbfbcd903c3c235f003a34..301bb25e4ceb819a96018f4c6c1d6c3757d778a9 100644 --- a/doxec/__init__.py +++ b/doxec/__init__.py @@ -1,6 +1,15 @@ import abc import re +import subprocess + +class TestException(Exception): + """ + This exception should be raised, if an operation performed tests and one + of these tests fails. + """ + pass + class Operation(metaclass=abc.ABCMeta): """ @@ -55,10 +64,38 @@ class OpWrite(Operation): command = "write" def execute(self): - pass + with open(self.args, "w") as f: + for line in self.content: + print(line, file=f) + +class OpAppend(Operation): + """ + This operation performs a 'append to file' operation. + """ + command = "append" + + def execute(self): + with open(self.args, "a") as f: + for line in self.content: + print(line, file=f) + +class OpConsole(Operation): + """ + This operation runs all lines starting with $ in the console. The + operation raises an error, if the return code is not zero. + """ + command = "console" + + def execute(self): + job = subprocess.Popen("/bin/bash", stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + script = "\n".join([l[1:] for l in self.content if l.startswith("$")]) + (stdoutdata, stderrdata) = job.communicate(script.encode('utf8')) + if job.returncode != 0: + raise TestException("Script failed with %d:" % job.returncode, + stdoutdata.decode('utf8'), stderrdata.decode('utf8')) + -# add write operation to op_store -Operation.op_store.append(OpWrite) class OpConsoleOutput(Operation): """ @@ -67,11 +104,14 @@ class OpConsoleOutput(Operation): """ command = "console_output" - def executed(self): + def execute(self): pass -# add write operation to op_store +# add operations to op_store +Operation.op_store.append(OpConsole) Operation.op_store.append(OpConsoleOutput) +Operation.op_store.append(OpAppend) +Operation.op_store.append(OpWrite) class DoxecSyntax(metaclass=abc.ABCMeta): """ diff --git a/doxec/tests/__init__.py b/doxec/tests/__init__.py index 49e1f469f2a5585bcb2bab71ad7c9c28926659ec..4d4992fc96b31c2684b3f95beb13afdf7ca442b3 100644 --- a/doxec/tests/__init__.py +++ b/doxec/tests/__init__.py @@ -1,2 +1,7 @@ from .markdown import MarkdownSyntaxTestCase +from .operation import OperationTestCase +from .operation import OpWriteTestCase +from .operation import OpAppendTestCase +from .operation import OpConsoleTestCase +from .operation import OpConsoleOutputTestCase diff --git a/doxec/tests/operation.py b/doxec/tests/operation.py index 0a81249c21422c68815fe15cbec5801a929dd012..3f31c185dc9c3cea5baeafd301599ecba5e5f07f 100644 --- a/doxec/tests/operation.py +++ b/doxec/tests/operation.py @@ -1,7 +1,11 @@ +import os import unittest +import random +from tempfile import NamedTemporaryFile -from doxec import Operation +from doxec import Operation, OpWrite, OpAppend, OpConsole, OpConsoleOutput, \ + TestException class ToyOperationA(Operation): """ @@ -56,5 +60,113 @@ class OperationTestCase(unittest.TestCase): op_obj = Operation.factory("op_c", "args", "content") self.assertIsNone(op_obj) + def test_factory_completeness(self): + """ + Test that the factory knows all common operations. + """ + op_obj = Operation.factory("write", "args", "content") + self.assertIsNotNone(op_obj) + + op_obj = Operation.factory("append", "args", "content") + self.assertIsNotNone(op_obj) + + op_obj = Operation.factory("console", "args", "content") + self.assertIsNotNone(op_obj) + + op_obj = Operation.factory("console_output", "args", "content") + self.assertIsNotNone(op_obj) +class OpWriteTestCase(unittest.TestCase): + """ + Test the functionality of the write-operation. This test case focuses on + the execute method. + """ + + def test_execute(self): + """ + Create a write operation for a temporary file and check the file's + contents after calling execute. + """ + tmp_file = NamedTemporaryFile(delete=False) + tmp_path = tmp_file.name + tmp_file.close() + + with open(tmp_path, "w") as f: + print("this should be overwritten", file=f) + + write = OpWrite(tmp_path, ["Hello", " World!"]) + write.execute() + + with open(tmp_path) as f: + self.assertEqual(f.read(), "Hello\n World!\n") + + os.remove(tmp_path) + + +class OpAppendTestCase(unittest.TestCase): + """ + Test the functionality of the append-operation. This test case focuses on + the execute method. + """ + + def test_execute(self): + """ + Create two append operations for a temporary file and check the file's + contents after calling execute. + """ + tmp_file = NamedTemporaryFile(delete=False) + tmp_path = tmp_file.name + tmp_file.close() + + append_1 = OpAppend(tmp_path, ["Hello"]) + append_2 = OpAppend(tmp_path, [" World", "!"]) + + append_1.execute() + append_2.execute() + + + with open(tmp_path) as f: + self.assertEqual(f.read(), "Hello\n World\n!\n") + + os.remove(tmp_path) + +class OpConsoleTestCase(unittest.TestCase): + """ + Test the functionality of the console-operation. This test case focuses on + the execute method. + """ + + def test_execute_pass(self): + """ + Create a OpConsole operation and check that no exception is raised, + when the return code is zero. + """ + tmp_file = NamedTemporaryFile(delete=False) + tmp_path = tmp_file.name + tmp_file.close() + + r = random.random() + + console = OpConsole(None, ['$ echo "%s" > %s' % (r, tmp_path)]) + console.execute() + + with open(tmp_path) as f: + self.assertEqual(float(f.read()), r) + + os.remove(tmp_path) + + def test_execute_fail(self): + """ + Create a OpConsole operation and check that an exception is raised, + when the return code is non-zero. + """ + console = OpConsole(None, ["$ exit 1"]) + self.assertRaises(TestException, console.execute) + +class OpConsoleOutputTestCase(unittest.TestCase): + """ + Test the functionality of the console-output-operation. This test case focuses on + the execute method. + """ + pass