Examples of aiobotocore usage

Below is a list of examples from aiobotocore/examples

Every example is a correct tiny python program.

Basic Usage

Simple put, get, delete example for S3 service:

import asyncio
import aiobotocore

AWS_ACCESS_KEY_ID = "xxx"
AWS_SECRET_ACCESS_KEY = "xxx"


@asyncio.coroutine
def go(loop):

    bucket = 'dataintake'
    filename = 'dummy.bin'
    folder = 'aiobotocore'
    key = '{}/{}'.format(folder, filename)

    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('s3', region_name='us-west-2',
                                   aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                   aws_access_key_id=AWS_ACCESS_KEY_ID)
    # upload object to amazon s3
    data = b'\x01'*1024
    resp = yield from client.put_object(Bucket=bucket,
                                        Key=key,
                                        Body=data)
    print(resp)

    # getting s3 object properties of file we just uploaded
    resp = yield from client.get_object_acl(Bucket=bucket, Key=key)
    print(resp)

    # delete object from s3
    resp = yield from client.delete_object(Bucket=bucket, Key=key)
    print(resp)


loop = asyncio.get_event_loop()
loop.run_until_complete(go(loop))

SQS

Queue Create

This snippet creates a queue, lists the queues, then deletes the queue.

# Boto should get credentials from ~/.aws/credentials or the environment
import asyncio

import aiobotocore


@asyncio.coroutine
def go(loop):
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('sqs', region_name='us-west-2')

    print('Creating test_queue1')
    response = yield from client.create_queue(QueueName='test_queue1')
    queue_url = response['QueueUrl']

    response = yield from client.list_queues()

    print('Queue URLs:')
    for queue_name in response.get('QueueUrls', []):
        print(' ' + queue_name)

    print('Deleting queue {0}'.format(queue_url))
    yield from client.delete_queue(QueueUrl=queue_url)

    print('Done')
    yield from client.close()


def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(go(loop))
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()

Producer Consumer

Here is a quick and simple producer/consumer example. The producer will put messages on the queue with a delay of up to 4 seconds between each put. The consumer will read off any messages on the queue, waiting up to 2 seconds for messages to appear before returning.

#!/usr/bin/env python3
"""
aiobotocore SQS Producer Example
"""
import asyncio
import random
import sys

import aiobotocore
import botocore.exceptions

QUEUE_NAME = 'test_queue12'


@asyncio.coroutine
def go(loop):
    # Boto should get credentials from ~/.aws/credentials or the environment
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('sqs', region_name='us-west-2')
    try:
        response = yield from client.get_queue_url(QueueName=QUEUE_NAME)
    except botocore.exceptions.ClientError as err:
        if err.response['Error']['Code'] == \
                'AWS.SimpleQueueService.NonExistentQueue':
            print("Queue {0} does not exist".format(QUEUE_NAME))
            yield from client.close()
            sys.exit(1)
        else:
            raise

    queue_url = response['QueueUrl']

    print('Putting messages on the queue')

    msg_no = 1
    while True:
        try:
            msg_body = 'Message #{0}'.format(msg_no)
            yield from client.send_message(
                QueueUrl=queue_url,
                MessageBody=msg_body
            )
            msg_no += 1

            print('Pushed "{0}" to queue'.format(msg_body))

            yield from asyncio.sleep(random.randint(1, 4))
        except KeyboardInterrupt:
            break

    print('Finished')
    yield from client.close()


def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(go(loop))
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()
#!/usr/bin/env python3
"""
aiobotocore SQS Consumer Example
"""
import asyncio
import sys

import aiobotocore
import botocore.exceptions

QUEUE_NAME = 'test_queue12'


@asyncio.coroutine
def go(loop):
    # Boto should get credentials from ~/.aws/credentials or the environment
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('sqs', region_name='us-west-2')
    try:
        response = yield from client.get_queue_url(QueueName=QUEUE_NAME)
    except botocore.exceptions.ClientError as err:
        if err.response['Error']['Code'] == \
                'AWS.SimpleQueueService.NonExistentQueue':
            print("Queue {0} does not exist".format(QUEUE_NAME))
            yield from client.close()
            sys.exit(1)
        else:
            raise

    queue_url = response['QueueUrl']

    print('Pulling messages off the queue')

    while True:
        try:
            # This loop wont spin really fast as there is
            # essentially a sleep in the receieve_message call
            response = yield from client.receive_message(
                QueueUrl=queue_url,
                WaitTimeSeconds=2,
            )

            if 'Messages' in response:
                for msg in response['Messages']:
                    print('Got msg "{0}"'.format(msg['Body']))
                    # Need to remove msg from queue or else it'll reappear
                    yield from client.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=msg['ReceiptHandle']
                    )
            else:
                print('No messages in queue')
        except KeyboardInterrupt:
            break

    print('Finished')
    yield from client.close()


def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(go(loop))
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()

DynamoDB

Table Creation

When you create a DynamoDB table, it can take quite a while (especially if you add a few secondary index’s). Instead of polling describe_table yourself, boto3 came up with “waiters” that will do all the polling for you. The following snippet shows how to wait for a DynamoDB table to be created in an async way.

# Boto should get credentials from ~/.aws/credentials or the environment
import uuid
import asyncio

import aiobotocore


@asyncio.coroutine
def go(loop):
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('dynamodb', region_name='us-west-2')
    # Create random table name
    table_name = 'aiobotocore-' + str(uuid.uuid4())

    print('Requesting table creation...')
    yield from client.create_table(
        TableName=table_name,
        AttributeDefinitions=[
            {
                'AttributeName': 'testKey',
                'AttributeType': 'S'
            },
        ],
        KeySchema=[
            {
                'AttributeName': 'testKey',
                'KeyType': 'HASH'
            },
        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 10,
            'WriteCapacityUnits': 10
        }
    )

    print("Waiting for table to be created...")
    waiter = client.get_waiter('table_exists')
    yield from waiter.wait(TableName=table_name)
    print("Table {0} created".format(table_name))

    yield from client.close()


def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(go(loop))
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()

Batch Insertion

Now if you have a massive amount of data to insert into Dynamo, I would suggest using an EMR data pipeline (theres even an example for exactly this). But if you stubborn, here is an example of inserting lots of items into Dynamo (it’s not really that complicated once you’ve read it).

What the code does is generates items (e.g. item0, item1, item2…) and writes them to a table “test” against a primary partition key called “pk” (with 5 read and 5 write units, no auto-scaling).

The batch_write_item method only takes a max of 25 items at a time, so the script computes 25 items, writes them, then does it all over again.

After Dynamo has had enough, it will start throttling you and return any items that have not been written in the response. Once the script is being throttled, it will start sleeping for 5 seconds until the failed items have been successfully written, after that it will exit.

# Boto should get credentials from ~/.aws/credentials or the environment
import asyncio

import aiobotocore


def get_items(start_num, num_items):
    """
    Generate a sequence of dynamo items

    :param start_num: Start index
    :type start_num: int
    :param num_items: Number of items
    :type num_items: int
    :return: List of dictionaries
    :rtype: list of dict
    """
    result = []
    for i in range(start_num, start_num+num_items):
        result.append({'pk': {'S': 'item{0}'.format(i)}})
    return result


def create_batch_write_structure(table_name, start_num, num_items):
    """
    Create item structure for passing to batch_write_item

    :param table_name: DynamoDB table name
    :type table_name: str
    :param start_num: Start index
    :type start_num: int
    :param num_items: Number of items
    :type num_items: int
    :return: dictionary of tables to write to
    :rtype: dict
    """
    return {
        table_name: [
            {'PutRequest': {'Item': item}}
            for item in get_items(start_num, num_items)
        ]
    }


@asyncio.coroutine
def go(loop):
    session = aiobotocore.get_session(loop=loop)
    client = session.create_client('dynamodb', region_name='us-west-2')
    table_name = 'test'

    print('Writing to dynamo')
    start = 0
    while True:
        # Loop adding 25 items to dynamo at a time
        request_items = create_batch_write_structure(table_name, start, 25)
        response = yield from client.batch_write_item(
            RequestItems=request_items
        )
        if len(response['UnprocessedItems']) == 0:
            print('Writted 25 items to dynamo')
        else:
            # Hit the provisioned write limit
            print('Hit write limit, backing off then retrying')
            yield from asyncio.sleep(5)

            # Items left over that haven't been inserted
            unprocessed_items = response['UnprocessedItems']
            print('Resubmitting items')
            # Loop until unprocessed items are written
            while len(unprocessed_items) > 0:
                response = yield from client.batch_write_item(
                    RequestItems=unprocessed_items
                )
                # If any items are still left over, add them to the
                # list to be written
                unprocessed_items = response['UnprocessedItems']

                # If there are items left over, we could do with
                # sleeping some more
                if len(unprocessed_items) > 0:
                    print('Backing off for 5 seconds')
                    yield from asyncio.sleep(5)

            # Inserted all the unprocessed items, exit loop
            print('Unprocessed items successfully inserted')
            break

        start += 25

    # See if DynamoDB has the last item we inserted
    final_item = 'item' + str(start + 24)
    print('Item "{0}" should exist'.format(final_item))

    response = yield from client.get_item(
        TableName=table_name,
        Key={'pk': {'S': final_item}}
    )
    print('Response: ' + str(response['Item']))

    yield from client.close()


def main():
    try:
        loop = asyncio.get_event_loop()
        loop.run_until_complete(go(loop))
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()