Setting Up S3 for Logs in Airflow

setting up s3 for logs in airflow

You need to set up the S3 connection through Airflow UI. For this, you need to go to the Admin -> Connections tab on airflow UI and create a new row for your S3 connection.

An example configuration would be:

Conn Id: my_conn_S3

Conn Type: S3

Extra: {"aws_access_key_id":"your_aws_key_id", "aws_secret_access_key": "your_aws_secret_key"}

Setting up S3 logging in Airflow

Solved:

  1. upgraded to 1.9
  2. ran the steps described in this comment
  3. added

    [core]

    remote_logging = True

    to airflow.cfg

  4. ran

    pip install --upgrade airflow[log]

Everything's working fine now.

Airflow won't write logs to s3

I finally found an answer using StackOverflow answer
which is most of the work I then had to add one more step. I reproduce this answer here and adapt it a bit the way I did:

Some things to check:

  1. Make sure you have the log_config.py file and it is in the correct dir: ./config/log_config.py.
  2. Make sure you didn't forget the __init__.py file in that dir.
  3. Make sure you defined the s3.task handler and set its formatter to airflow.task
  4. Make sure you set airflow.task and airflow.task_runner handlers to s3.task
  5. Set task_log_reader = s3.task in airflow.cfg
  6. Pass the S3_LOG_FOLDER to log_config. I did that using a variable and retrieving it as in the following log_config.py.

Here is a log_config.py that works:

import os

from airflow import configuration as conf

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'log_format')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'child_process_log_directory')

FILENAME_TEMPLATE = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log'
PROCESSOR_FILENAME_TEMPLATE = '{{ filename }}.log'

S3_LOG_FOLDER = conf.get('core', 'S3_LOG_FOLDER')

LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow.task': {
'format': LOG_FORMAT,
},
'airflow.processor': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'airflow.task',
'stream': 'ext://sys.stdout'
},
'file.task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'file.processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow.processor',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
's3.task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow.task',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': S3_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
},
'loggers': {
'': {
'handlers': ['console'],
'level': LOG_LEVEL
},
'airflow': {
'handlers': ['console'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.processor': {
'handlers': ['file.processor'],
'level': LOG_LEVEL,
'propagate': True,
},
'airflow.task': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task_runner': {
'handlers': ['s3.task'],
'level': LOG_LEVEL,
'propagate': True,
},
}
}

Note that this way S3_LOG_FOLDER can be specified in airflow.cfg or as environment the variable AIRFLOW__CORE__S3_LOG_FOLDER.

Airflow 2 on k8s S3 logging is not working

So it seems that the S3 target folder should exist before writing the first log and that solves the issue. I hope that it will help someone in the future!



Related Topics



Leave a reply



Submit