How We Pipe Data

  • |
  • 31 July 2018
Image not Found

At Buzzvil, we are keeping track of  activities of more than 17 million user in more than 30 nations. Such activities include various screen-related activities, AD impressions, AD clicks, AD conversions and beyond. These data comes from multitude of sources and usually get stored in different types of databases ( MySQL, DynamoDB, Redis, and even S3). By having these data in a single storage, we were able to analyze cross-service and cross-platform metrics with ease.

Building and maintaining a data pipeline that directs the data from multiple sources to a single data warehouse was a challenge for our team. And here is how we did it!

Before we discuss the pipeline, Redshift, our main data warehouse is worth mentioning in this blog post. It has surprised us since the beginning and is continuing to do so. Redshift is an AWS-managed, SQL based columnar data warehouse suited for complex and large-scale analytics. It is a solution well adapted by many enterprises customers (Yelp, Coursera, Pinterest, etc.), to drive insight from various types of data generated from their customers. At Buzzvil, we adapted Redshift because of the following reasons:

  1. Columnar storage -> Only access columns that you need.
  2. Complex queries involving multiple joins and aggregations are easily computable
  3. Distributed Storage
  4. Fast Data Ingestion (Ingest first, index and clean later)
  • Horizontal Scalability

  1. No need for extra complexity that comes from sharding or clustering
  2. Since data are innately stored in nodes, horizontal scaling only involves adding additional nodes.
  3. Fast and easy integration with other AWS services (Both Pro and Con)

However, there are some drawbacks as well:

  • Does not support multiple indices like other RDBMS
    • 1 Distribution Key and 1 Sort Key
  • Does not enforce uniqueness or foreign key constraints like MySQL or other RDBMS

So, how do we get data into Redshift?

There are mainly 3 ways that we use to deliver data to Redshift.

1.Via Athena Preprocessing Batch jobs (Lockscreen activity, AD Allocation)

Why?

The primary reason for such preprocessing step is the sheer size of the incoming data. Some data might be too raw for analysts and data scientists to query directly. Thus, we utilized AWS Athena (https://aws.amazon.com/athena/) to leverage the fact that AWS only charges us for the size of the data read by the Athena query. If we are to preprocess the data through EMR or other MapReduce solutions, we might have to spend much more on computing costs in order to process more than

How?

1.  Send data to S3
2.  Aggregate/Process using Athena
3.  Load the processed data into Redshift (COPY command)

Pros?

1.  Serverless (No need to manage EMR clusters or servers)
2.  Cheap ($5 for each 1TB read from S3)

Cons?

1.  During peak hours (12:00 AM UTC), some queries might fail. -> We do not recommend running mission-critical logics on Athena.
2.  Does not (yet) provide the full functionality of PRESTO DB.

2.Via Firehose (Impression, Clicks, Device, Events)

Why?

Kinesis Firehose is excellent as it not only provides a stable pipeline to various data destinations such as Redshift, Elasticsearch and S3, but also seamlessly integrates with Fluentd which provides a steady and stable stream of data from servers to firehose.With firehose-fluentd integration, rather than having to manage two separate pipelines ( SERVER -> S3, S3 -> Redshift ), we only have to manage and monitor a single data pipeline from the source to the sink.

How?

(https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) 1.Create Firehose delivery stream with correct data format and ingestion period of your choice.

conf["user_activity"] = {
    "DataTableName": "user_activity",
    "DataTableColumns": "user_id, app_id, activity_type, timestamp",
    "CopyOptions": "FORMAT AS JSON "s3://buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths" gzip TIMEFORMAT AS "YYYY-MM-DDTHH:MI:SS" ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF",
    "jsonpaths_file": "buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths",
}

configuration = {
        "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",
        "ClusterJDBCURL": "jdbc:redshift://buzzvil.xxxxxxxxx.us-west-2.redshift.amazonaws.com:5439/sample_db",
        "CopyCommand": {
            "DataTableName": sample_table,
            "DataTableColumns": conf[type]["DataTableColumns"],
            "CopyOptions": conf[type]["CopyOptions"],
        },
        "Username": db_user,
        "Password": db_password,
        "S3Configuration": {
            "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",
            "BucketARN": "arn:aws:s3:::firehose_bucket",
            "Prefix": "buzzvil/user_activity/",
            "BufferingHints": {
                "SizeInMBs": 64,
                "IntervalInSeconds": 60
            },
            "CompressionFormat": "GZIP",
            "EncryptionConfiguration": {
                "NoEncryptionConfig": "NoEncryption",
            }
        }
}

2.Setup and run Fluentd docker containers in each server.

<source>
 @type tail
 path /var/log/containers/buzzad/impression.json
 pos_file /var/log/containers/td-agent/impression-json.pos
 format none
 tag firehose.impression
</source>

<match firehose.impression>
 @type kinesis_firehose
 region us-west-2
 delivery_stream_name "prod-buzzad-impression-stream"
 flush_interval 1s
 data_key message
</match>

3.Monitor as data is captured by Firehose and are sent to Redshift.

Pros?

  1. Fast and Reliable Data Delivery
  2. Easy to monitor

Cons?

  1. Schema change is not automatic (I have to manually alter the Redshift schema before)

 

3) Via MySQL Asynchronous Loads (Ads, Contents, Ad Provider, Ad Publishers)

Why?

In order to sync subsets of data from multiple RDS MySQL databases, we had to employ three different techniques to copy the relevant data to our Redshift cluster (This is the least glamorous option of the three).

How?

  • FULL_COPY

    • Extract a full copy (dump) of a MySQL table and load them as SQL inserts into Redshift)
  • INCREMENTAL_COPY

    • Start from the last pkey, read any new rows and copy them into Redshift
  • UPDATE_LATEST_COPY

    • Start from the last updated_at timestamp, find any newly updated/created rows and copy them (after removing duplicates) into Redshift

Pros?

  1. Fine tuned methods to meet the data characteristics
  2. Easier to manage compared to binlog replication methods.

Cons?

  1. Have to manage separate servers or lambdas to handle the MySQL -> Redshift sync task
  2. Redshift ORM is yet mature to support smooth schema altering.

These three methods differ by the type of data that is being delivered to the target sink. For example, transactional, log type data would be delivered by firehose or be aggregated and then loaded to Redshift. On the other hand, fact tables from MySQL might be synced to Redshift via a specific type of CDC (change data capture) sync method. A combination of these 3 methods allows our analysts and BD managers to query cross-service or cross-platform data with ease.

You May Also Like

버즈빌, 아마도 당신이 원하던 회사!

지원하기