Skip to main content
Version: Self Hosted Turbo

Monitor AWS Managed Workflows for Apache Airflow

Overview

The Amazon Managed Workflows for Apache Airflow (MWAA) Plugin of SnappyFlow collects MWAA metrics and logs from CloudWatch and visualize them within SnappyFlow dashboard. This integration is designed to give you comprehensive insights about MWAA.

Prerequisite

To collect metrics from MWAA, it is necessary to have an IAM Role with CloudWatch access and sfPoller set up within your AWS environment. Click here to learn more about setting up sfPoller in your AWS environment.

Create CloudWatch Access for IAM Role

  1. Sign in to the AWS Management Console and open the IAM console at https://console.aws.amazon.com/iam/.

  2. Follow the below steps to create a policy in the IAM console.

    • Navigate to Access Management > Policies

    • In the Create policy window, go to the JSON tab

    • Copy and paste the below-mentioned JSON code into the policy editor

      {
      "Version": "2012-10-17",
      "Statement": [
      {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": [
      "MWAA: ListEnvironments",
      "MWAA: GetEnvironment",
      "MWAA: ListTagsForResource",
      ],
      },
      ],
      }
  • Click the Next: Tags > Next: Review button

  • In the Review policy window, give the Name and Description (Optional) for the policy and review the list of permissions

  • Click the Create policy button

  • Attach the policy to a dedicated IAM Role for read-only access

note

Metrics such as TaskID and DagID are obtained from Task Logs Stream Names. Therefore, to gather data on TaskLevel, TaskStateLevel, and DagLevel metrics, users must enable CloudWatch Task Logs.

Configure sfPoller to Collect Metrics

Follow the below step to add endpoints and plugins in sfPoller.

  1. Go to the Application tab of sfPoller and navigate to your Project > Application.

  2. Select the Application, it will take you to the Endpoint page.

  3. In the Endpoint page, select the Add Endpoint button, add the following data, and save.

    • Service Type: Select AWS Service
    • Account Name: aws
    • Endpoint Type: MWAA
    • Name: Give a meaningful name to the endpoint
    • Instance Name: Give the table name as the instance name

  4. In the Plugins window, select the +Add button.

  5. In the Add Plugin window, select the below details.

    • Plugin Type: Metric
    • Plugin: cloudwatch-mwaa
    • Interval: Choose an interval value. The minimum value for the interval is 300
    • Status: By default, the status is Enabled

  6. Select the Save button.

  7. Again select the +Add button and in the Add Plugin window, add below details to collect logs of MWAA.

    • Plugin Type: Logger
    • Plugin: cloudwatch-mwaa-log
    • Interval: Choose an interval value. The minimum value for the interval is 300
    • Status: By default, the status is Enabled

  8. Select the Save button.

  9. Click the global Save button in the window's top right corner to save all the changes made so far.

View MWAA Metrics and Logs

Follow the below steps to view the metrics collected from MWAA.

  1. Go to the Application tab in SnappyFlow and navigate to your Project > Application > Dashboard.

  2. You can view the MWAA metrics in the Metrics section and logs in the Log Management section.

    note

    Once plugin is added to sfPoller, they will be automatically detected within the Metrics and log management section. However, if the plugins are not detected, you can import template to view the corresponding metrics and logs.



  3. To access the unprocessed data gathered from the plugins, navigate to the Browse data section and choose the Index: Metric, Instance: Endpoint, Plugin, and Document Type.

Template Details

TemplatePluginDocument TypeDescription
MWAAcloudwatch-mwaaEnvironmentLevel, FunctionLevel, DagIDLevel, TaskLevel, TaskStateLevelCollects metrics from MWAA

Metric List

Environment Level
MetricDescription
TaskInstanceFailuresCount of task instances that have failed during execution. A task instance represents a specific execution of a task within a workflow.
TaskInstanceSuccessesCount of task instances that have successfully completed during execution. It indicates the number of task executions that ended without errors.
ZombiesKilledCount of zombie task instances that have been detected and killed during execution. Zombie tasks are those that are not properly managed by Airflow, and killing them helps maintain workflow integrity.
DagCallbackExceptionsCount of exceptions encountered during the execution of DAG (Directed Acyclic Graph) callback functions. Callback functions are user-defined functions that can be executed at various points in the workflow.
Function Level
MetricDescription
CriticalSectionBusyCount of times a critical section, which is a region of code that must be executed atomically, has been busy or locked. This metric can indicate potential contention for critical resources.
DagBagSizeCount representing the size of the DAG (Directed Acyclic Graph) bag, which holds all the DAGs available to Airflow. It indicates the number of DAGs currently loaded into Airflow.
ImportErrorsCount of import errors encountered during the loading of DAGs or other Python modules. Import errors can indicate issues with module availability or compatibility.
TotalParseTimeAverage time (in seconds), taken to parse a DAG definition file. It measures the time required for Airflow to read and interpret the DAG configuration.
TasksExecutableMaximum count of tasks that are considered executable and ready for execution. It represents the peak concurrency of executable tasks.
TasksRunningAverage count of tasks that are currently running or actively executing. It provides insight into the average task concurrency.
TasksStarvingMaximum count of tasks that are waiting for resources or dependencies before they can start execution. It highlights peak periods of potential resource contention.
CriticalSectionDurationAverage duration (in milliseconds), spent in critical sections. Critical sections are regions of code that must be executed atomically, and this metric helps assess their efficiency.
ProcessorTimeoutsCount of processor timeouts, indicating the number of times a task execution exceeded its allotted time and triggered a timeout.
QueuedTasksCount of tasks that are currently queued and waiting for execution. It helps assess the workload backlog.
RunningTasksCount of tasks that are currently in the running state, actively executing. It provides insight into current task concurrency.
OpenSlotsMinimum count of available execution slots for tasks. It represents the lowest number of available resources during a period.
OrphanedTasksAdoptedMaximum count of orphaned tasks that have been adopted and assigned to an executor for execution.
OrphanedTasksClearedMaximum count of orphaned tasks that have been cleared or removed from the execution queue.
TriggeredDagRunsCount of DAG runs that have been triggered. A DAG run represents the execution of a DAG.
Dag Level
MetricDescription
DAGDurationSuccessAverage duration (in milliseconds) of successful DAG runs. It measures the time taken for successful DAG executions.
DAGScheduleDelayAverage delay (in milliseconds) between the scheduled start time and the actual start time of DAG runs. It helps assess scheduling efficiency.
FirstTaskSchedulingDelayAverage delay (in milliseconds) before the first task within a DAG is scheduled for execution. It provides insights into DAG initiation.
DAGDependencyCheckAverage time (in milliseconds) spent on checking task dependencies within a DAG. It measures the time required for dependency resolution.
DAGDurationFailedAverage duration (in milliseconds) of failed DAG runs. It measures the time taken for DAG executions that ended in failure.
Task State Level
MetricDescription
TaskInstanceFinishedSum of task instances that have finished execution. It represents the total count of completed task instances.
Task Level
MetricDescription
TaskInstanceDurationAverage duration (in milliseconds) of task instances. It measures the typical time spent on task execution, accounting for variations.
TaskInstanceStartedSum represents the total count of started task instances, offering insights into task initiation rates.