setting up s3 for logs in airflow
You need to set up the S3 connection through Airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.
An example configuration would be:
Conn Id: my_conn_S3
Conn Type: S3
Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}
Setting up S3 logging in Airflow
Solved:
- upgraded to 1.9
- ran the steps described in this comment
added
[core]
remote_logging = True
- ran
pip install --upgrade airflow[log]
Airflow won't write logs to s3
I finally found an answer using StackOverflow answer
which is most of the work I then had to add one more step. I reproduce this answer here and adapt it a bit the way I did:
Some things to check:
- Make sure you have the
log_config.py
file and it is in the correct dir:./config/log_config.py
. - Make sure you didn't forget the
__init__.py
file in that dir. - Make sure you defined the
s3.task
handler and set its formatter toairflow.task
- Make sure you set airflow.task and airflow.task_runner handlers to s3.task
- Set
task_log_reader = s3.task
inairflow.cfg
- Pass the
S3_LOG_FOLDER
tolog_config
. I did that using a variable and retrieving it as in the followinglog_config.py
.
import os
from airflow import configuration as conf
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')
FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'
S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}
Note that this way S3_LOG_FOLDER
can be specified in airflow.cfg
or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER
. Airflow 2 on k8s S3 logging is not working
So it seems that the S3 target folder should exist before writing the first log and that solves the issue. I hope that it will help someone in the future!
Related Topics
How to Use Python-Docx to Replace Text in a Word Document and Save
Adding a Y-Axis Label to Secondary Y-Axis in Matplotlib
What Is the Purpose of Meshgrid in Python/Numpy
Wrapping Around on a List When List Index Is Out of Range
Representing and Solving a Maze Given an Image
Is There a Builtin Identity Function in Python
How to Implement SQL Coalesce in Pandas
Writing Unit Tests in Python: How to Start
Django/Python Beginner: Error When Executing Python Manage.Py Syncdb - Psycopg2 Not Found
Django: How to Manage Development and Production Settings
Python Spacing and Aligning Strings
How to Get Millisecond and Microsecond-Resolution Timestamps in Python
Wrapping Long Y Labels in Matplotlib Tight Layout Using Setp
Counting Each Letter's Frequency in a String