Streaming data using AWS Database Migration Service and Kinesis

Written on September 13, 2019

Center for Disease Control …nope…the other CDC

In a previous post we looked a streaming with Kafka, however, another common pattern we see is getting the changes from a SQL database. Here we are going to use Change Data Capture or CDC, which allows us to stream the changes happening on the database. CDC uses log sequences and checkpoints so the changes can be resumed and replayed, but these differ between database engine so best to check what you need to enable this on your database. In this instance we are using MySQL so we need to change the Binary Log Format to ‘ROW’ which will capture all changes at a row level.

For this example, I have used the following:

  • RDS MySQL - source database
  • Database Migration Service - this will connect to MySQL and get the changes
  • Kinesis Stream - DMS will write the changes to the stream
  • Kinesis Firehose - will stream from Kinesis to S3
  • S3 - the location for our streamed data

For the actual data I have used the DMS sample data set which can be found on GitHub and there is a guide here on how to generate some activity to test out streaming changes. I’ve spun up an RDS instance and used the scripts to build a database instance and then taken a snapshot. The code below uses that snapshot to provision a new RDS instance, so if you are following along you will need to do the same, which is just a case of provisioning an RDS instance, connecting and running this script.

For the infrastructure piece, I’ve used the CDK again! I really love this as it lets me build a repeatable solution that I can share, but cuts the time down of writing CloudFormation or pasting a whole lot of instructions and screen shots from the console.

However, saying that, there are still something things that don’t belong in stacks just yet. We need some IAM roles for the DMS, so we set them up manually then deploy via the CDK.

git clone https://github.com/msimpsonnz/aws-misc.git
cd aws-misc/dms-stream

aws iam create-role --role-name dms-vpc-role --assume-role-policy-document file://dmsAssumeRolePolicyDocument.json

aws iam attach-role-policy --role-name dms-vpc-role --policy-arn arn:aws:iam::aws:policy/service-role/AmazonDMSVPCManagementRole

cd cdk
cdk deploy

So now lets walk through the CDK stack components to understand how this has been built out.

First we need an RDS instance running a copy of our snapshot, here we create new parameter group with the Binary Log Format turned on. We also need to place this in a VPC, we will just lookup the current ‘default’ VPC for this demo to save creating a new instance.

const vpc = ec2.Vpc.fromLookup(this, 'default', {
    isDefault: true
});

const dbParam = new ParameterGroup(this, 'dms-param-mysql5.7', {
    family: 'mysql5.7',
    parameters: {
    binlog_format: 'ROW'
    }
});

const sourcedb = new rds.DatabaseInstanceFromSnapshot(this, 'dms-rds-source', {
    engine: DatabaseInstanceEngine.MYSQL,
    instanceClass: ec2.InstanceType.of(InstanceClass.BURSTABLE2, InstanceSize.SMALL),
    snapshotIdentifier: 'tickets-mysql57',
    vpc: vpc,
    vpcPlacement: {
    subnetType: SubnetType.PUBLIC,
    },
    parameterGroup: dbParam
});

Then we create the DMS instance, add Kinesis Stream as the destination and the necessary IAM roles needed

const dmsRep = new dms.CfnReplicationInstance(this, 'dms-replication', {
    replicationInstanceClass: 'dms.t2.medium',
    engineVersion: '3.3.0'
});

const stream = new kinesis.Stream(this, 'dms-stream');

const streamWriterRole = new Role(this, 'dms-stream-role', {
    assumedBy: new ServicePrincipal('dms.amazonaws.com')
});

streamWriterRole.addToPolicy(new PolicyStatement({
    resources: [stream.streamArn],
    actions: [
    'kinesis:DescribeStream',
    'kinesis:PutRecord',
    'kinesis:PutRecords'
    ]
}));

const source = new CfnEndpoint(this, 'dms-source', {
    endpointType: 'source',
    engineName: 'mysql',
    username: 'admin',
    password: 'Password1',
    serverName: sourcedb.dbInstanceEndpointAddress,
    port: 3306
});

const target = new CfnEndpoint(this, 'dms-target', {
    endpointType: 'target',
    engineName: 'kinesis',
    kinesisSettings: {
    messageFormat: 'JSON',
    streamArn: stream.streamArn,
    serviceAccessRoleArn: streamWriterRole.roleArn
    }
});

We then need to create the DMS replication task, this take the source, destination and a table mapping of the records that we are going to extract.

var dmsTableMappings = {
    "rules": [
    {
        "rule-type": "selection",
        "rule-id": "1",
        "rule-name": "1",
        "object-locator": {
        "schema-name": "dms_sample",
        "table-name": "ticket_purchase_hist"
        },
        "rule-action": "include"
    }
    ]
}

new CfnReplicationTask(this, 'dms-stream-repTask', {
    replicationInstanceArn: dmsRep.ref,
    migrationType: 'full-load-and-cdc',
    sourceEndpointArn: source.ref,
    targetEndpointArn: target.ref,
    tableMappings: JSON.stringify(dmsTableMappings)
});

Finally we create an S3 bucket for the extracted records and a Kinesis Firehose to get records from the Kinesis Stream and send these to S3

const s3bucket = new s3.Bucket(this, 'dms-stream-bucket');
s3bucket.grantReadWrite(firehoseRole);

const firehose = new CfnDeliveryStream(this, 'dms-stream-firehose', {
    deliveryStreamType: 'KinesisStreamAsSource',
    kinesisStreamSourceConfiguration: {
    kinesisStreamArn: stream.streamArn,
    roleArn: firehoseRole.roleArn
    },
    s3DestinationConfiguration: {
    bucketArn: s3bucket.bucketArn,
    bufferingHints: {
        intervalInSeconds: 300,
        sizeInMBs: 5
    },
    compressionFormat: 'GZIP',
    roleArn: firehoseRole.roleArn
    }
});

Once the DMS task kicks off the table makes the initial load into S3 and we can see this has been complete and it is ready for replicated data.

We then kick off our stored procedure on the MySQL source to generate some DB activity and once these have been processed through the streams we can run a S3 Select to see the updated records landing in S3.

I really like this architecture, we have established a streaming pattern which delivers the result without using any custom code, the only thing I have written is some TypeScript to output a CloudFormation stack!

Written on September 13, 2019