amplify-flutter icon indicating copy to clipboard operation
amplify-flutter copied to clipboard

(Notice): Pinpoint End Of Support

Open ekjotmultani opened this issue 6 months ago • 7 comments

Description

AWS will end support for Amazon Pinpoint on October 30, 2026,, and is no longer accepting any new users as of May 20 (see the linked doc). The guidance is to use AWS End User Messaging for push notifications and SMS, Amazon Simple Email Service for sending emails, and Amazon Connect for campaigns, journeys, endpoints, and engagement analytics. Finally they recommend Amazon Kinesis for event collection and mobile analytics.

This assortment of services will together make up what Pinpoint has offered in the past, and this ticket is tracking interest in these services and migration from Pinpoint.

Categories

  • [ ] Analytics
  • [ ] API (REST)
  • [ ] API (GraphQL)
  • [ ] Auth
  • [ ] Authenticator
  • [ ] DataStore
  • [x] Notifications (Push)
  • [ ] Storage

Steps to Reproduce

Please thumbs up and/ or comment on this ticket with any thoughts you have!

Screenshots

No response

Platforms

  • [ ] iOS
  • [ ] Android
  • [ ] Web
  • [ ] macOS
  • [ ] Windows
  • [ ] Linux

Flutter Version

3.32

Amplify Flutter Version

2.6.2

Deployment Method

Amplify CLI (Gen 1)

Schema


ekjotmultani avatar Jun 05 '25 17:06 ekjotmultani

This issue was opened by a maintainer of this repository; updates will be posted here. If you are also experiencing this issue, please comment here with any relevant information so that we're aware and can prioritize accordingly.

github-actions[bot] avatar Jun 05 '25 18:06 github-actions[bot]

Hello!

We're using Pinpoint Notifications and Analytics in our apps. As I understand, the notifications capabilities won't be affected, but we're also using campaigns, segments, and endpoints. We're also using analytics events and metrics.

It would be great to know what is the plan of Amplify Flutter, if it will support the services mentioned as a replacement. Since we rely on those services, it would be great if we have time to migrate and make sure everything is working correctly.

Thank you!

jamilsaadeh97 avatar Jun 05 '25 19:06 jamilsaadeh97

Hello @jamilsaadeh97, current users can continue to use Pinpoint until October 30, 2026 without issue. The recommended replacement for Pinpoint Analytics is Amazon Kinesis, however Amplify does not have an interface for this service.

We are still researching a path forward with Amplify and will respond here when we have an update. Our intent is to give ample time to migrate and verify any changes.

tyllark avatar Jun 10 '25 01:06 tyllark

Hello, I have a question. I’m only using Pinpoint to manage push notifications on mobile devices — is there anything I should be concerned about? Will I be affected by this change?

The notice says that Mobile Push won’t be impacted. Does that mean:

  1. If I’m using aws-amplify/push-notifications (from Amplify version 6) to listen for incoming notifications and retrieve device tokens, can I expect it to continue working without any changes on my side? I use those device tokens to send push notifications to specific users via a Lambda function.

  2. If I’m sending notifications through a Lambda function that uses the Pinpoint library from the AWS SDK for Node.js, will that continue to work as well? I created the Lambda function using the Amplify CLI, which granted the necessary permissions to access Pinpoint. I also use the ANALYTICS_NEURALIZEPINPOINT_ID environment variable (created by Amplify) to target the correct Pinpoint instance.

For example, to notify users when they receive a message:

  1. I obtain the user’s device token using the aws-amplify/push-notifications library.
  2. I save the device token in my database, associated with that user.
  3. When another user sends a message to this user, I retrieve the target user’s device token from the database.
  4. I pass the device token to my Lambda function as part of the request.
  5. The Lambda function uses the AWS SDK’s Pinpoint module, together with the environment variable provided by Amplify, to send the push notification through the appropriate channel.
  6. On the receiving user end I use aws-amplify/push-notifications to listen whenever a notification arrives and change states accordingly.

Is there anything I should rework from this?

Thank you for clarifying!

GERASM1 avatar Jun 19 '25 22:06 GERASM1

Hello, I have a question. I’m only using Pinpoint to manage push notifications on mobile devices — is there anything I should be concerned about? Will I be affected by this change?

The notice says that Mobile Push won’t be impacted. Does that mean:

  1. If I’m using aws-amplify/push-notifications (from Amplify version 6) to listen for incoming notifications and retrieve device tokens, can I expect it to continue working without any changes on my side? I use those device tokens to send push notifications to specific users via a Lambda function.
  2. If I’m sending notifications through a Lambda function that uses the Pinpoint library from the AWS SDK for Node.js, will that continue to work as well? I created the Lambda function using the Amplify CLI, which granted the necessary permissions to access Pinpoint. I also use the ANALYTICS_NEURALIZEPINPOINT_ID environment variable (created by Amplify) to target the correct Pinpoint instance.

For example, to notify users when they receive a message:

  1. I obtain the user’s device token using the aws-amplify/push-notifications library.
  2. I save the device token in my database, associated with that user.
  3. When another user sends a message to this user, I retrieve the target user’s device token from the database.
  4. I pass the device token to my Lambda function as part of the request.
  5. The Lambda function uses the AWS SDK’s Pinpoint module, together with the environment variable provided by Amplify, to send the push notification through the appropriate channel.
  6. On the receiving user end I use aws-amplify/push-notifications to listen whenever a notification arrives and change states accordingly.

Is there anything I should rework from this?

Thank you for clarifying!

@GERASM1 I have have the same issue as i started using this amplify_analytics_pinpoint: ^2.6.3 but it had error message indicating not available.

I looked at the alternative i.e. Kinesis and used AWSSigV4Signer approach to get connected as per my sequence listed below.

@ekjotmultani @tyllark I will appreciate if you will be able to review below procedure and provide any recommendations on this as workaround whilst a formal amplify_kinesis package awaited...it will be of huge help... Thanks!

AWS Packages Used:

import 'package:amplify_auth_cognito/amplify_auth_cognito.dart';
import 'package:amplify_flutter/amplify_flutter.dart';
import 'package:aws_signature_v4/aws_signature_v4.dart';

PART A: AWS Console Configuration

Step 1. Set correct AWS profile:

export AWS_PROFILE=YourAWSProfile`

Step 2: Add Kinesis permissions to Cognito policy:

AWS Console → IAM → Roles → YourCognitoAuthenticatedUserPolicy → Edit

And add below (or amend as needed) to Cognito policy:

{
    "Sid": "AllowKinesisDataStreams",
    "Effect": "Allow",
    "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords",
        "kinesis:DescribeStream",
        "kinesis:ListStreams"
    ],
    "Resource": [
        "arn:aws:kinesis:*:*:stream/mobile-analytics-stream",
        "arn:aws:kinesis:*:*:stream/analytics-*",
        "arn:aws:kinesis:*:*:stream/app-events-*"
    ]
},
{
    "Sid": "AllowKinesisFirehose",
    "Effect": "Allow",
    "Action": [
        "firehose:PutRecord",
        "firehose:PutRecordBatch",
        "firehose:DescribeDeliveryStream"
    ],
    "Resource": [
        "arn:aws:firehose:*:*:deliverystream/analytics-*",
        "arn:aws:firehose:*:*:deliverystream/mobile-*"
    ]
} 

Step 3: Create Kinesis stream:

Go to:

Amazon Kinesis → Data streams → Create Stream

Example Stream Name

Stream name: mobile-analytics-stream


PART B: Dart Client Setup

Step 1: Configure region:


class KinesisSpecifications {
  // Static constants for regions you commonly use
  static const String usEast1 = 'us-east-1';

  // Default regions for different use cases
  static const String defaultRegion = usEast1;
  // Current region variable that can be updated
  static String myRegion = defaultRegion;

  // Method to set region based on use case or requirements
  static String getRegionBasedOnUseCase(KinesisUseCase useCase) {
    switch (useCase) {
      case KinesisUseCase.mobileAnalytics:
        myRegion = defaultRegion;
        break;
      case KinesisUseCase.realTimeProcessing:
        myRegion = defaultRegion;
        break;
      case KinesisUseCase.dataWarehouse:
        myRegion = defaultRegion;
        break;
    }
    return myRegion;
  }

  // Method to set region manually
  static String setRegion(String region) {
    myRegion = region;
    return myRegion;
  }

  // Dynamic getters for Kinesis endpoints using the current region
  static String get kinesisEndpoint => 'kinesis.$myRegion.amazonaws.com';
  static String get firehoseEndpoint => 'firehose.$myRegion.amazonaws.com';
  static String get kinesisAnalyticsEndpoint =>
      'kinesisanalytics.$myRegion.amazonaws.com';

  // Full HTTPS URLs for direct use
  static String get kinesisEndpointUrl => 'https://$kinesisEndpoint';
  static String get firehoseEndpointUrl => 'https://$firehoseEndpoint';
  static String get kinesisAnalyticsEndpointUrl =>
      'https://$kinesisAnalyticsEndpoint';

  // Headers and content types for Kinesis API
  static const String contentType = 'application/x-amz-json-1.1';
  static const String kinesisTargetPrefix = 'Kinesis_20131202';
  static const String firehoseTargetPrefix = 'Firehose_20150804';

  // Common X-Amz-Target headers for different operations
  static const String putRecordTarget = '$kinesisTargetPrefix.PutRecord';
  static const String putRecordsTarget = '$kinesisTargetPrefix.PutRecords';
  static const String describeStreamTarget =
      '$kinesisTargetPrefix.DescribeStream';
  static const String listStreamsTarget = '$kinesisTargetPrefix.ListStreams';

  // Firehose targets
  static const String firehosePutRecordTarget =
      '$firehoseTargetPrefix.PutRecord';
  static const String firehosePutRecordBatchTarget =
      '$firehoseTargetPrefix.PutRecordBatch';

  // Method to get X-Amz-Target header for specific operations
  static String getTargetHeader(KinesisOperation operation) {
    switch (operation) {
      case KinesisOperation.putRecord:
        return putRecordTarget;
      case KinesisOperation.putRecords:
        return putRecordsTarget;
      case KinesisOperation.describeStream:
        return describeStreamTarget;
      case KinesisOperation.listStreams:
        return listStreamsTarget;
      case KinesisOperation.firehosePutRecord:
        return firehosePutRecordTarget;
      case KinesisOperation.firehosePutRecordBatch:
        return firehosePutRecordBatchTarget;
    }
  }
}

// Enum for different Kinesis use cases
enum KinesisUseCase {
  mobileAnalytics,
  realTimeProcessing,
  dataWarehouse,
}

// Enum for different Kinesis operations
enum KinesisOperation {
  putRecord,
  putRecords,
  describeStream,
  listStreams,
  firehosePutRecord,
  firehosePutRecordBatch,
}


Step 2: Create Kinesis Client:

class KinesisResponse {
  final bool success;
  final String? errorMessage;
  final int? statusCode;
  final Map<String, dynamic>? data;

  KinesisResponse._({
    required this.success,
    this.errorMessage,
    this.statusCode,
    this.data,
  });

  factory KinesisResponse.success([Map<String, dynamic>? data]) {
    return KinesisResponse._(success: true, data: data);
  }

  factory KinesisResponse.failure(String error, [int? statusCode]) {
    return KinesisResponse._(
      success: false,
      errorMessage: error,
      statusCode: statusCode,
    );
  }
}

// ===== CORE CLIENT =====
class AmazonKinesisClient {
  AWSCredentials? _credentials;
  final Duration timeout;
  final String region;

  AmazonKinesisClient({
    this.timeout = const Duration(seconds: 30),
    required this.region,
  }) {
    KinesisSpecifications.setRegion(region);
  }

  // Core method - send single record
  Future<KinesisResponse> putRecord({
    required String streamName,
    required Map<String, dynamic> data,
    required String partitionKey,
  }) async {
    try {
      final credentials = await _fetchCurrentAuthSession();
      if (credentials == null) {
        return KinesisResponse.failure('Failed to retrieve AWS credentials');
      }

      final request = await _createKinesisRequest(
        operation: KinesisOperation.putRecord,
        payload: {
          'StreamName': streamName,
          'Data': base64Encode(utf8.encode(jsonEncode(data))),
          'PartitionKey': partitionKey,
        },
      );

      final signedRequest = await _signRequest(request, credentials);
      final response = await signedRequest.send().response.timeout(timeout);

      if (response.statusCode == 200) {
        final responseData = await _parseSuccessResponse(response);
        return KinesisResponse.success(responseData);
      } else {
        final errorMessage = await _parseErrorResponse(response);
        return KinesisResponse.failure(errorMessage, response.statusCode);
      }
    } on TimeoutException catch (e) {
      return KinesisResponse.failure('Request timed out: ${e.message}');
    } catch (e) {
      return KinesisResponse.failure('Error in Kinesis request: $e');
    }
  }




  // ===== PRIVATE METHODS =====
  Future<AWSCredentials?> _fetchCurrentAuthSession() async {
    try {
      // Check if cached credentials are still valid
      if (_credentials?.expiration?.isAfter(DateTime.now()) ?? false) {
        return _credentials;
      }

      final cognitoPlugin =
          Amplify.Auth.getPlugin(AmplifyAuthCognito.pluginKey);
      final session = await cognitoPlugin.fetchAuthSession();

      if (session.isSignedIn) {
        final fetchedCredentials = session.credentialsResult.value;
        if (fetchedCredentials.expiration?.isAfter(DateTime.now()) ?? false) {
          _credentials = fetchedCredentials;
          return _credentials;
        }
      }
      return null;
    } catch (e) {
      AppLog.log("AmazonKinesisClient").e('Error fetching auth session: $e');
      return null;
    }
  }

  Future<AWSSignedRequest> _signRequest(
    AWSHttpRequest request,
    AWSCredentials credentials,
  ) async {
    final credentialsProvider = AWSCredentialsProvider(credentials);
    final signer = AWSSigV4Signer(credentialsProvider: credentialsProvider);
    final scope = AWSCredentialScope(
      region: region,
      service: const AWSService('kinesis'),
    );

    return await signer.sign(
      request,
      credentialScope: scope,
    );
  }

  Future<AWSHttpRequest> _createKinesisRequest({
    required KinesisOperation operation,
    required Map<String, dynamic> payload,
  }) async {
    final body = jsonEncode(payload);
    return AWSHttpRequest(
      method: AWSHttpMethod.post,
      uri: Uri.parse(KinesisSpecifications.kinesisEndpointUrl),
      headers: {
        'Content-Type': KinesisSpecifications.contentType,
        'X-Amz-Target': KinesisSpecifications.getTargetHeader(operation),
        'Content-Length': utf8.encode(body).length.toString(),
      },
      body: utf8.encode(body),
    );
  }



  Future<Map<String, dynamic>> _parseSuccessResponse(
    AWSBaseHttpResponse response,
  ) async {
    try {
      final bodyBytes = await response.body.toList();
      final flatBytes = bodyBytes.expand((x) => x).toList();
      final responseBody = utf8.decode(flatBytes);
      return jsonDecode(responseBody) as Map<String, dynamic>;
    } catch (e) {
      return {'message': 'Success but failed to parse response'};
    }
  }

  Future<String> _parseErrorResponse(AWSBaseHttpResponse response) async {
    try {
      final bodyChunks = await response.body.toList();
      final flatBytes = bodyChunks.expand((chunk) {
        return chunk;
      }).toList();

      final errorBody = utf8.decode(flatBytes);

      try {
        final decoded = jsonDecode(errorBody);
        if (decoded is Map<String, dynamic>) {
          return decoded['message']?.toString() ??
              decoded['__type']?.toString() ??
              errorBody;
        } else {
          return errorBody;
        }
      } catch (_) {
        return errorBody;
      }
    } catch (e) {
      return 'Failed to parse error response: $e';
    }
  }
}
   

Step 3: Test Kinesis Client:


class KinesisTestingService {
  final AmazonKinesisClient _client;
  final String streamName;

  KinesisTestingService({
    required AmazonKinesisClient client,
    required this.streamName,
  }) : _client = client;

  Future<bool> quickConnectivityTest() async {
    AppLog.log("KinesisTestingService").i('🔍 Testing Kinesis connection...');

    final testEvent = {
      'test_id': 'connectivity_${DateTime.now().millisecondsSinceEpoch}',
      'timestamp': DateTime.now().toIso8601String(),
      'event_type': 'connectivity_test',
    };

    final response = await _client.putRecord(
      streamName: streamName,
      data: testEvent,
      partitionKey: 'connectivity_test',
    );

    if (response.success) {
      AppLog.log("KinesisTestingService")
          .i('✅ Connection successful - Kinesis is working!');
      return true;
    } else {
      AppLog.log("KinesisTestingService")
          .e('❌ Connection failed: ${response.errorMessage}');
      return false;
    }
  }
}


USAGE

      final kinesisClient = AmazonKinesisClient(
      region: KinesisSpecifications.usEast1,
      timeout: const Duration(seconds: 30),
    );

    // Test connection
    final testService = KinesisTestingService(
        client: kinesisClient, streamName: 'mobile-analytics-stream');

    if (await testService.quickConnectivityTest()) {
      // Send analytics event
      final event = {
        'user_id': '12345',
        'action': 'button_click',
        'screen': 'home_page',
        'timestamp': DateTime.now().toIso8601String(),
      };

      await kinesisClient.putRecord(
        streamName: 'mobile-analytics-stream',
        data: event,
        partitionKey: 'user_12345',
      );
    }

TEST RESULTS


flutter: 16:19:29 ℹ️ INFO  │ [KinesisTestingService] 🔍 Quick connectivity test...
flutter: 16:19:31 ℹ️ INFO  │ [KinesisTestingService] 🎉 Connectivity test PASSED - Kinesis is working!

dkliss avatar Jun 25 '25 06:06 dkliss

[GERASM1] Were you able to get device token using Aws Amplify V6 ?I am struggling to get the device token from AWS amplify library though the same is being done using firebase messaging service

itmsroohim avatar Oct 01 '25 12:10 itmsroohim

@dkliss sorry for the late reply, but this approach looks good to me. Also checking in to say that we are still working on the kinesis package!

ekjotmultani avatar Oct 06 '25 15:10 ekjotmultani