How We Pipe Data

  • |
  • 31 July 2018
Image not Found

At Buzzvil, we are keeping track of  activities of more than 17 million user in more than 30 nations. Such activities include various screen-related activities, AD impressions, AD clicks, AD conversions and beyond. These data comes from multitude of sources and usually get stored in different types of databases ( MySQL, DynamoDB, Redis, and even S3). By having these data in a single storage, we were able to analyze cross-service and cross-platform metrics with ease.

Building and maintaining a data pipeline that directs the data from multiple sources to a single data warehouse was a challenge for our team. And here is how we did it!

Before we discuss the pipeline, Redshift, our main data warehouse is worth mentioning in this blog post. It has surprised us since the beginning and is continuing to do so. Redshift is an AWS-managed, SQL based columnar data warehouse suited for complex and large-scale analytics. It is a solution well adapted by many enterprises customers (Yelp, Coursera, Pinterest, etc.), to drive insight from various types of data generated from their customers. At Buzzvil, we adapted Redshift because of the following reasons:

  1. Columnar storage -> Only access columns that you need.
  2. Complex queries involving multiple joins and aggregations are easily computable
  3. Distributed Storage
  4. Fast Data Ingestion (Ingest first, index and clean later)
  • Horizontal Scalability

  1. No need for extra complexity that comes from sharding or clustering
  2. Since data are innately stored in nodes, horizontal scaling only involves adding additional nodes.
  3. Fast and easy integration with other AWS services (Both Pro and Con)

However, there are some drawbacks as well:

  • Does not support multiple indices like other RDBMS
    • 1 Distribution Key and 1 Sort Key
  • Does not enforce uniqueness or foreign key constraints like MySQL or other RDBMS

So, how do we get data into Redshift?

There are mainly 3 ways that we use to deliver data to Redshift.

1.Via Athena Preprocessing Batch jobs (Lockscreen activity, AD Allocation)

Why?

The primary reason for such preprocessing step is the sheer size of the incoming data. Some data might be too raw for analysts and data scientists to query directly. Thus, we utilized AWS Athena (https://aws.amazon.com/athena/) to leverage the fact that AWS only charges us for the size of the data read by the Athena query. If we are to preprocess the data through EMR or other MapReduce solutions, we might have to spend much more on computing costs in order to process more than

How?

1.  Send data to S3
2.  Aggregate/Process using Athena
3.  Load the processed data into Redshift (COPY command)

Pros?

1.  Serverless (No need to manage EMR clusters or servers)
2.  Cheap ($5 for each 1TB read from S3)

Cons?

1.  During peak hours (12:00 AM UTC), some queries might fail. -> We do not recommend running mission-critical logics on Athena.
2.  Does not (yet) provide the full functionality of PRESTO DB.

2.Via Firehose (Impression, Clicks, Device, Events)

Why?

Kinesis Firehose is excellent as it not only provides a stable pipeline to various data destinations such as Redshift, Elasticsearch and S3, but also seamlessly integrates with Fluentd which provides a steady and stable stream of data from servers to firehose.With firehose-fluentd integration, rather than having to manage two separate pipelines ( SERVER -> S3, S3 -> Redshift ), we only have to manage and monitor a single data pipeline from the source to the sink.

How?

(https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html) 1.Create Firehose delivery stream with correct data format and ingestion period of your choice.

conf["user_activity"] = {
    "DataTableName": "user_activity",
    "DataTableColumns": "user_id, app_id, activity_type, timestamp",
    "CopyOptions": "FORMAT AS JSON "s3://buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths" gzip TIMEFORMAT AS "YYYY-MM-DDTHH:MI:SS" ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF",
    "jsonpaths_file": "buzzvil-firehose/sample/user_activity/jsonpaths/user_activity_log-0001.jsonpaths",
}

configuration = {
        "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",
        "ClusterJDBCURL": "jdbc:redshift://buzzvil.xxxxxxxxx.us-west-2.redshift.amazonaws.com:5439/sample_db",
        "CopyCommand": {
            "DataTableName": sample_table,
            "DataTableColumns": conf[type]["DataTableColumns"],
            "CopyOptions": conf[type]["CopyOptions"],
        },
        "Username": db_user,
        "Password": db_password,
        "S3Configuration": {
            "RoleARN": "arn:aws:iam::xxxxxxxxxxxx:role/firehose_delivery_role",
            "BucketARN": "arn:aws:s3:::firehose_bucket",
            "Prefix": "buzzvil/user_activity/",
            "BufferingHints": {
                "SizeInMBs": 64,
                "IntervalInSeconds": 60
            },
            "CompressionFormat": "GZIP",
            "EncryptionConfiguration": {
                "NoEncryptionConfig": "NoEncryption",
            }
        }
}

2.Setup and run Fluentd docker containers in each server.

<source>
 @type tail
 path /var/log/containers/buzzad/impression.json
 pos_file /var/log/containers/td-agent/impression-json.pos
 format none
 tag firehose.impression
</source>

<match firehose.impression>
 @type kinesis_firehose
 region us-west-2
 delivery_stream_name "prod-buzzad-impression-stream"
 flush_interval 1s
 data_key message
</match>

3.Monitor as data is captured by Firehose and are sent to Redshift.

Pros?

  1. Fast and Reliable Data Delivery
  2. Easy to monitor

Cons?

  1. Schema change is not automatic (I have to manually alter the Redshift schema before)

 

3) Via MySQL Asynchronous Loads (Ads, Contents, Ad Provider, Ad Publishers)

Why?

In order to sync subsets of data from multiple RDS MySQL databases, we had to employ three different techniques to copy the relevant data to our Redshift cluster (This is the least glamorous option of the three).

How?

  • FULL_COPY

    • Extract a full copy (dump) of a MySQL table and load them as SQL inserts into Redshift)
  • INCREMENTAL_COPY

    • Start from the last pkey, read any new rows and copy them into Redshift
  • UPDATE_LATEST_COPY

    • Start from the last updated_at timestamp, find any newly updated/created rows and copy them (after removing duplicates) into Redshift

Pros?

  1. Fine tuned methods to meet the data characteristics
  2. Easier to manage compared to binlog replication methods.

Cons?

  1. Have to manage separate servers or lambdas to handle the MySQL -> Redshift sync task
  2. Redshift ORM is yet mature to support smooth schema altering.

These three methods differ by the type of data that is being delivered to the target sink. For example, transactional, log type data would be delivered by firehose or be aggregated and then loaded to Redshift. On the other hand, fact tables from MySQL might be synced to Redshift via a specific type of CDC (change data capture) sync method. A combination of these 3 methods allows our analysts and BD managers to query cross-service or cross-platform data with ease.

You May Also Like

Python 2.7 서버의 CI Test 개선 - 13분에서 3분으로

들어가며 안녕하세요 Supply 그룹 Product Backend 팀의 Elric 입니다. 버즈빌에서 운영중인 허니스크린은 새로운 프로모션을 지속적으로 출시하며 빈번한 배포가 진행되는 서비스이지만, 긴급 배포 상황에서 CI 파이프라인의 긴 실행 시간(약 13분)이 병목 …

Read Article

2026년 버즈빌 디자인 스택, AI 전환을 준비하며

2026년은 버즈빌 디자인 팀에게 전환점이 되는 해입니다. 이제 도구 선택은 단순한 운영상의 결정이 아니라, 차세대 AI 기반 제품 개발을 지원할 수 있는 ‘코드와 연결된 통합 디자인 시스템’을 향한 전략적 행보입니다. 지난 몇 년간 버즈빌 디자인 팀은 철저한 디지털 …

Read Article
버즈빌, 아마도 당신이 원하던 회사!

지원하기