Skip to content

Resource: awsKinesisanalyticsv2Application

Manages a Kinesis Analytics v2 Application. This resource can be used to manage both Kinesis Data Analytics for SQL applications and Kinesis Data Analytics for Apache Flink applications.

-> Note: Kinesis Data Analytics for SQL applications created using this resource cannot currently be viewed in the AWS Console. To manage Kinesis Data Analytics for SQL applications that can also be viewed in the AWS Console, use the awsKinesisAnalyticsApplication resource.

Example Usage

/*Provider bindings are generated by running cdktf get.
See https://cdk.tf/provider-generation for more details.*/
import * as aws from "./.gen/providers/aws";
const awsS3BucketExample = new aws.s3Bucket.S3Bucket(this, "example", {
  bucket: "example-flink-application",
});
const awsS3ObjectExample = new aws.s3Object.S3Object(this, "example_1", {
  bucket: awsS3BucketExample.id,
  key: "example-flink-application",
  source: "flink-app.jar",
});
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsS3ObjectExample.overrideLogicalId("example");
const awsKinesisanalyticsv2ApplicationExample =
  new aws.kinesisanalyticsv2Application.Kinesisanalyticsv2Application(
    this,
    "example_2",
    {
      applicationConfiguration: {
        applicationCodeConfiguration: {
          codeContent: {
            s3ContentLocation: {
              bucketArn: awsS3BucketExample.arn,
              fileKey: awsS3ObjectExample.key,
            },
          },
          codeContentType: "ZIPFILE",
        },
        environmentProperties: {
          propertyGroup: [
            {
              propertyGroupId: "PROPERTY-GROUP-1",
              propertyMap: {
                key1: "Value1",
              },
            },
            {
              propertyGroupId: "PROPERTY-GROUP-2",
              propertyMap: {
                keyA: "ValueA",
                keyB: "ValueB",
              },
            },
          ],
        },
        flinkApplicationConfiguration: {
          checkpointConfiguration: {
            configurationType: "DEFAULT",
          },
          monitoringConfiguration: {
            configurationType: "CUSTOM",
            logLevel: "DEBUG",
            metricsLevel: "TASK",
          },
          parallelismConfiguration: {
            autoScalingEnabled: true,
            configurationType: "CUSTOM",
            parallelism: 10,
            parallelismPerKpu: 4,
          },
        },
      },
      name: "example-flink-application",
      runtimeEnvironment: "FLINK-1_8",
      serviceExecutionRole: "${aws_iam_role.example.arn}",
      tags: {
        Environment: "test",
      },
    }
  );
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsKinesisanalyticsv2ApplicationExample.overrideLogicalId("example");

SQL Application

/*Provider bindings are generated by running cdktf get.
See https://cdk.tf/provider-generation for more details.*/
import * as aws from "./.gen/providers/aws";
const awsCloudwatchLogGroupExample =
  new aws.cloudwatchLogGroup.CloudwatchLogGroup(this, "example", {
    name: "example-sql-application",
  });
const awsCloudwatchLogStreamExample =
  new aws.cloudwatchLogStream.CloudwatchLogStream(this, "example_1", {
    logGroupName: awsCloudwatchLogGroupExample.name,
    name: "example-sql-application",
  });
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsCloudwatchLogStreamExample.overrideLogicalId("example");
const awsKinesisanalyticsv2ApplicationExample =
  new aws.kinesisanalyticsv2Application.Kinesisanalyticsv2Application(
    this,
    "example_2",
    {
      applicationConfiguration: {
        applicationCodeConfiguration: {
          codeContent: {
            textContent: "SELECT 1;\n",
          },
          codeContentType: "PLAINTEXT",
        },
        sqlApplicationConfiguration: {
          input: {
            inputParallelism: {
              count: 3,
            },
            inputSchema: {
              recordColumn: [
                {
                  mapping: "MAPPING-1",
                  name: "COLUMN_1",
                  sqlType: "VARCHAR(8)",
                },
                {
                  name: "COLUMN_2",
                  sqlType: "DOUBLE",
                },
              ],
              recordEncoding: "UTF-8",
              recordFormat: {
                mappingParameters: {
                  csvMappingParameters: {
                    recordColumnDelimiter: ",",
                    recordRowDelimiter: "\n",
                  },
                },
                recordFormatType: "CSV",
              },
            },
            kinesisStreamsInput: {
              resourceArn: "${aws_kinesis_stream.example.arn}",
            },
            namePrefix: "PREFIX_1",
          },
          output: [
            {
              destinationSchema: {
                recordFormatType: "JSON",
              },
              lambdaOutput: {
                resourceArn: "${aws_lambda_function.example.arn}",
              },
              name: "OUTPUT_1",
            },
            {
              destinationSchema: {
                recordFormatType: "CSV",
              },
              kinesisFirehoseOutput: {
                resourceArn:
                  "${aws_kinesis_firehose_delivery_stream.example.arn}",
              },
              name: "OUTPUT_2",
            },
          ],
          referenceDataSource: {
            referenceSchema: {
              recordColumn: [
                {
                  name: "COLUMN_1",
                  sqlType: "INTEGER",
                },
              ],
              recordFormat: {
                mappingParameters: {
                  jsonMappingParameters: {
                    recordRowPath: "$",
                  },
                },
                recordFormatType: "JSON",
              },
            },
            s3ReferenceDataSource: {
              bucketArn: "${aws_s3_bucket.example.arn}",
              fileKey: "KEY-1",
            },
            tableName: "TABLE-1",
          },
        },
      },
      cloudwatchLoggingOptions: {
        logStreamArn: awsCloudwatchLogStreamExample.arn,
      },
      name: "example-sql-application",
      runtimeEnvironment: "SQL-1_0",
      serviceExecutionRole: "${aws_iam_role.example.arn}",
    }
  );
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsKinesisanalyticsv2ApplicationExample.overrideLogicalId("example");

VPC Configuration

/*Provider bindings are generated by running cdktf get.
See https://cdk.tf/provider-generation for more details.*/
import * as aws from "./.gen/providers/aws";
const awsS3BucketExample = new aws.s3Bucket.S3Bucket(this, "example", {
  bucket: "example-flink-application",
});
const awsS3ObjectExample = new aws.s3Object.S3Object(this, "example_1", {
  bucket: awsS3BucketExample.id,
  key: "example-flink-application",
  source: "flink-app.jar",
});
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsS3ObjectExample.overrideLogicalId("example");
const awsKinesisanalyticsv2ApplicationExample =
  new aws.kinesisanalyticsv2Application.Kinesisanalyticsv2Application(
    this,
    "example_2",
    {
      applicationConfiguration: {
        applicationCodeConfiguration: {
          codeContent: {
            s3ContentLocation: {
              bucketArn: awsS3BucketExample.arn,
              fileKey: awsS3ObjectExample.key,
            },
          },
          codeContentType: "ZIPFILE",
        },
        vpcConfiguration: {
          securityGroupIds: [
            "${aws_security_group.example[0].id}",
            "${aws_security_group.example[1].id}",
          ],
          subnetIds: ["${aws_subnet.example.id}"],
        },
      },
      name: "example-flink-application",
      runtimeEnvironment: "FLINK-1_8",
      serviceExecutionRole: "${aws_iam_role.example.arn}",
    }
  );
/*This allows the Terraform resource name to match the original name. You can remove the call if you don't need them to match.*/
awsKinesisanalyticsv2ApplicationExample.overrideLogicalId("example");

Argument Reference

The following arguments are supported:

  • name - (Required) The name of the application.
  • runtimeEnvironment - (Required) The runtime environment for the application. Valid values: SQL-1_0, FLINK-1_6, FLINK-1_8, FLINK-1_11, FLINK-1_13, FLINK-1_15.
  • serviceExecutionRole - (Required) The ARN of the IAM role used by the application to access Kinesis data streams, Kinesis Data Firehose delivery streams, Amazon S3 objects, and other external resources.
  • applicationConfiguration - (Optional) The application's configuration
  • cloudwatchLoggingOptions - (Optional) A CloudWatch log stream to monitor application configuration errors.
  • description - (Optional) A summary description of the application.
  • forceStop - (Optional) Whether to force stop an unresponsive Flink-based application.
  • startApplication - (Optional) Whether to start or stop the application.
  • tags - (Optional) A map of tags to assign to the application. If configured with a provider defaultTags configuration block present, tags with matching keys will overwrite those defined at the provider-level.

The applicationConfiguration object supports the following:

  • applicationCodeConfiguration - (Required) The code location and type parameters for the application.
  • applicationSnapshotConfiguration - (Optional) Describes whether snapshots are enabled for a Flink-based application.
  • environmentProperties - (Optional) Describes execution properties for a Flink-based application.
  • flinkApplicationConfiguration - (Optional) The configuration of a Flink-based application.
  • runConfiguration - (Optional) Describes the starting properties for a Flink-based application.
  • sqlApplicationConfiguration - (Optional) The configuration of a SQL-based application.
  • vpcConfiguration - (Optional) The VPC configuration of a Flink-based application.

The applicationCodeConfiguration object supports the following:

  • codeContentType - (Required) Specifies whether the code content is in text or zip format. Valid values: plaintext, zipfile.
  • codeContent - (Optional) The location and type of the application code.

The codeContent object supports the following:

  • s3ContentLocation - (Optional) Information about the Amazon S3 bucket containing the application code.
  • textContent - (Optional) The text-format code for the application.

The s3ContentLocation object supports the following:

  • bucketArn - (Required) The ARN for the S3 bucket containing the application code.
  • fileKey - (Required) The file key for the object containing the application code.
  • objectVersion - (Optional) The version of the object containing the application code.

The applicationSnapshotConfiguration object supports the following:

  • snapshotsEnabled - (Required) Describes whether snapshots are enabled for a Flink-based Kinesis Data Analytics application.

The environmentProperties object supports the following:

  • propertyGroup - (Required) Describes the execution property groups.

The propertyGroup object supports the following:

  • propertyGroupId - (Required) The key of the application execution property key-value map.
  • propertyMap - (Required) Application execution property key-value map.

The flinkApplicationConfiguration object supports the following:

  • checkpointConfiguration - (Optional) Describes an application's checkpointing configuration.
  • monitoringConfiguration - (Optional) Describes configuration parameters for CloudWatch logging for an application.
  • parallelismConfiguration - (Optional) Describes parameters for how an application executes multiple tasks simultaneously.

The checkpointConfiguration object supports the following:

  • configurationType - (Required) Describes whether the application uses Kinesis Data Analytics' default checkpointing behavior. Valid values: custom, default. Set this attribute to custom in order for any specified checkpointingEnabled, checkpointInterval, or minPauseBetweenCheckpoints attribute values to be effective. If this attribute is set to default, the application will always use the following values:
  • checkpointingEnabled =True
  • checkpointInterval =60000
  • minPauseBetweenCheckpoints =5000
  • checkpointingEnabled - (Optional) Describes whether checkpointing is enabled for a Flink-based Kinesis Data Analytics application.
  • checkpointInterval - (Optional) Describes the interval in milliseconds between checkpoint operations.
  • minPauseBetweenCheckpoints - (Optional) Describes the minimum time in milliseconds after a checkpoint operation completes that a new checkpoint operation can start.

The monitoringConfiguration object supports the following:

  • configurationType - (Required) Describes whether to use the default CloudWatch logging configuration for an application. Valid values: custom, default. Set this attribute to custom in order for any specified logLevel or metricsLevel attribute values to be effective.
  • logLevel - (Optional) Describes the verbosity of the CloudWatch Logs for an application. Valid values: debug, error, info, warn.
  • metricsLevel - (Optional) Describes the granularity of the CloudWatch Logs for an application. Valid values: application, operator, parallelism, task.

The parallelismConfiguration object supports the following:

  • configurationType - (Required) Describes whether the application uses the default parallelism for the Kinesis Data Analytics service. Valid values: custom, default. Set this attribute to custom in order for any specified autoScalingEnabled, parallelism, or parallelismPerKpu attribute values to be effective.
  • autoScalingEnabled - (Optional) Describes whether the Kinesis Data Analytics service can increase the parallelism of the application in response to increased throughput.
  • parallelism - (Optional) Describes the initial number of parallel tasks that a Flink-based Kinesis Data Analytics application can perform.
  • parallelismPerKpu - (Optional) Describes the number of parallel tasks that a Flink-based Kinesis Data Analytics application can perform per Kinesis Processing Unit (KPU) used by the application.

The runConfiguration object supports the following:

  • applicationRestoreConfiguration - (Optional) The restore behavior of a restarting application.
  • flinkRunConfiguration - (Optional) The starting parameters for a Flink-based Kinesis Data Analytics application.

The applicationRestoreConfiguration object supports the following:

  • applicationRestoreType - (Required) Specifies how the application should be restored. Valid values: RESTORE_FROM_CUSTOM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, SKIP_RESTORE_FROM_SNAPSHOT.
  • snapshotName - (Optional) The identifier of an existing snapshot of application state to use to restart an application. The application uses this value if RESTORE_FROM_CUSTOM_SNAPSHOT is specified for applicationRestoreType.

The flinkRunConfiguration object supports the following:

  • allowNonRestoredState - (Optional) When restoring from a snapshot, specifies whether the runtime is allowed to skip a state that cannot be mapped to the new program. Default is false.

The sqlApplicationConfiguration object supports the following:

  • input - (Optional) The input stream used by the application.
  • output - (Optional) The destination streams used by the application.
  • referenceDataSource - (Optional) The reference data source used by the application.

The input object supports the following:

  • inputSchema - (Required) Describes the format of the data in the streaming source, and how each data element maps to corresponding columns in the in-application stream that is being created.
  • namePrefix - (Required) The name prefix to use when creating an in-application stream.
  • inputParallelism - (Optional) Describes the number of in-application streams to create.
  • inputProcessingConfiguration - (Optional) The input processing configuration for the input. An input processor transforms records as they are received from the stream, before the application's SQL code executes.
  • inputStartingPositionConfiguration (Optional) The point at which the application starts processing records from the streaming source.
  • kinesisFirehoseInput - (Optional) If the streaming source is a Kinesis Data Firehose delivery stream, identifies the delivery stream's ARN.
  • kinesisStreamsInput - (Optional) If the streaming source is a Kinesis data stream, identifies the stream's Amazon Resource Name (ARN).

The inputParallelism object supports the following:

  • count - (Optional) The number of in-application streams to create.

The inputProcessingConfiguration object supports the following:

  • inputLambdaProcessor - (Required) Describes the Lambda function that is used to preprocess the records in the stream before being processed by your application code.

The inputLambdaProcessor object supports the following:

  • resourceArn - (Required) The ARN of the Lambda function that operates on records in the stream.

The inputSchema object supports the following:

  • recordColumn - (Required) Describes the mapping of each data element in the streaming source to the corresponding column in the in-application stream.
  • recordFormat - (Required) Specifies the format of the records on the streaming source.
  • recordEncoding - (Optional) Specifies the encoding of the records in the streaming source. For example, utf8.

The recordColumn object supports the following:

  • name - (Required) The name of the column that is created in the in-application input stream or reference table.
  • sqlType - (Required) The type of column created in the in-application input stream or reference table.
  • mapping - (Optional) A reference to the data element in the streaming input or the reference data source.

The recordFormat object supports the following:

  • mappingParameters - (Required) Provides additional mapping information specific to the record format (such as JSON, CSV, or record fields delimited by some delimiter) on the streaming source.
  • recordFormatType - (Required) The type of record format. Valid values: csv, json.

The mappingParameters object supports the following:

  • csvMappingParameters - (Optional) Provides additional mapping information when the record format uses delimiters (for example, CSV).
  • jsonMappingParameters - (Optional) Provides additional mapping information when JSON is the record format on the streaming source.

The csvMappingParameters object supports the following:

  • recordColumnDelimiter - (Required) The column delimiter. For example, in a CSV format, a comma (,) is the typical column delimiter.
  • recordRowDelimiter - (Required) The row delimiter. For example, in a CSV format, \n is the typical row delimiter.

The jsonMappingParameters object supports the following:

  • recordRowPath - (Required) The path to the top-level parent that contains the records.

The inputStartingPositionConfiguration object supports the following:

\~> NOTE: To modify an application's starting position, first stop the application by setting startApplication =False, then update startingPosition and set startApplication =True.

  • inputStartingPosition - (Required) The starting position on the stream. Valid values: LAST_STOPPED_POINT, now, TRIM_HORIZON.

The kinesisFirehoseInput object supports the following:

  • resourceArn - (Required) The ARN of the delivery stream.

The kinesisStreamsInput object supports the following:

  • resourceArn - (Required) The ARN of the input Kinesis data stream to read.

The output object supports the following:

  • destinationSchema - (Required) Describes the data format when records are written to the destination.
  • name - (Required) The name of the in-application stream.
  • kinesisFirehoseOutput - (Optional) Identifies a Kinesis Data Firehose delivery stream as the destination.
  • kinesisStreamsOutput - (Optional) Identifies a Kinesis data stream as the destination.
  • lambdaOutput - (Optional) Identifies a Lambda function as the destination.

The destinationSchema object supports the following:

  • recordFormatType - (Required) Specifies the format of the records on the output stream. Valid values: csv, json.

The kinesisFirehoseOutput object supports the following:

  • resourceArn - (Required) The ARN of the destination delivery stream to write to.

The kinesisStreamsOutput object supports the following:

  • resourceArn - (Required) The ARN of the destination Kinesis data stream to write to.

The lambdaOutput object supports the following:

  • resourceArn - (Required) The ARN of the destination Lambda function to write to.

The referenceDataSource object supports the following:

  • referenceSchema - (Required) Describes the format of the data in the streaming source, and how each data element maps to corresponding columns created in the in-application stream.
  • s3ReferenceDataSource - (Required) Identifies the S3 bucket and object that contains the reference data.
  • tableName - (Required) The name of the in-application table to create.

The referenceSchema object supports the following:

  • recordColumn - (Required) Describes the mapping of each data element in the streaming source to the corresponding column in the in-application stream.
  • recordFormat - (Required) Specifies the format of the records on the streaming source.
  • recordEncoding - (Optional) Specifies the encoding of the records in the streaming source. For example, utf8.

The s3ReferenceDataSource object supports the following:

  • bucketArn - (Required) The ARN of the S3 bucket.
  • fileKey - (Required) The object key name containing the reference data.

The vpcConfiguration object supports the following:

  • securityGroupIds - (Required) The Security Group IDs used by the VPC configuration.
  • subnetIds - (Required) The Subnet IDs used by the VPC configuration.

The cloudwatchLoggingOptions object supports the following:

  • logStreamArn - (Required) The ARN of the CloudWatch log stream to receive application messages.

Attributes Reference

In addition to all arguments above, the following attributes are exported:

  • id - The application identifier.
  • arn - The ARN of the application.
  • createTimestamp - The current timestamp when the application was created.
  • lastUpdateTimestamp - The current timestamp when the application was last updated.
  • status - The status of the application.
  • versionId - The current application version. Kinesis Data Analytics updates the versionId each time the application is updated.
  • tagsAll - A map of tags assigned to the resource, including those inherited from the provider defaultTags configuration block.

Timeouts

Configuration options:

  • create - (Default 10M)
  • update - (Default 10M)
  • delete - (Default 10M)

Import

awsKinesisanalyticsv2Application can be imported by using the application ARN, e.g.,

$ terraform import aws_kinesisanalyticsv2_application.example arn:aws:kinesisanalytics:us-west-2:123456789012:application/example-sql-application