![]() Self.assertTupleEqual(first.args, second.args) Super(PythonOperatorTest, self).tearDown()ĭef _assertCallsEqual(self, first, second): Super(PythonOperatorTest, cls).setUpClass() (replacing Mock.assert_called_with assertion method)Ĭalls_collection.append(Call(*args, **kwargs)) Then using this custom function recording custom Call objects for further testing TypeError: Object of type Mock is not JSON serializable We can not use a Mock instance as a PythonOperator callable function or some tests fail with a TI_CONTEXT_ENV_VARS = ['AIRFLOW_CTX_DAG_ID',ĭef build_recording_function(calls_collection): Where my_test.py is defined as follows: from _future_ import print_function, unicode_literalsįrom airflow.models import TaskInstance as TI, DAG, DagRunįrom _operator import PythonOperatorĭEFAULT_DATE = timezone.datetime(2016, 1, 1)įROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1) You can create DAG factories for all repetitive tasks you may have, thanks to this you'll be able to unit test your ETL code.I have a Python module which is structured as follows: my_module/ Globals() = create_dag(config)Ĭreating dynamic in Airflow is super easy. ![]() Module = SourceFileLoader("module", filename).load_module() config = ",įilename = os.path.join(CONFIG_FOLDER, file) So you have a configuration folder called config in which you have the 3 sources configuration. Airflow UI with dynamic DAGs Dynamic DAGs with configurations The main reason is because Python configuration can be linted, can be statically checked and you can comment Python dicts. I recommend you to create Python configuration rather than JSON. If you want to go further you can also create a configuration per source. It creates a global variable that contains the DAG object that the Airflow DagBag will parse and add for every scheduler loop. The important part of this code is the last line. This is the DAG that loads all the raw dataįor source in : import pendulumįrom corators import dag, prepare(source): For each source we want to apply a prepare and a load function. These sources are user, product and order. Let's say we have 3 sources and we want to create a DAG per source to do stuff on each source. We will use last Airflow version - 2.3.4 - here, but it'll work for every version with the TaskFlow API.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |