Creating a Streaming Data Pipeline for a Real-Time Dashboard with Dataflow

중간에 망해서 , 이글은 다시 업데이트 예정입니다.

Task 1. Create a Pub/Sub topic and BigQuery dataset

taxirides 이름의 bigquery를 만듭니다.

$bq mk taxirides

$ bq mk taxirides
Dataset 'qwiklabs-gcp-xxxxxxx:taxirides' successfully created.

axirides.realtime 테이블(나중에 스트리밍할 빈 스키마)을 만듭니다.

bq mk \
--time_partitioning_field timestamp \
--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
passenger_count:integer -t taxirides.realtime

bigquery 콘솔 메뉴로 이동합니다.

☰ > Big Data > BigQuery

dataset을 생성합니다.

프로젝트 아이디 옆에 을 눌러 Create dataset을 누릅니다.

image

image

Task 2. Create a Cloud Storage bucket

☰ > Cloud Storage 로 이동합니다.

  • CREATE BUCKET을 생성합니다.

버킷 아이디는 global에서 유니크 해야 합니다.

Task 3. Set up a Dataflow Pipeline

☰ > Big Data > Dataflow 로 이동힙니다 . CREATE JOB FROM TEMPLATE.

image image

Task 4. Analyze the taxi data using BigQuery

SELECT * FROM taxirides.realtime LIMIT 10

Task 5. Perform aggregations on the stream for reporting

WITH streaming_data AS (
SELECT
timestamp,
TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour,
TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute,
TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second,
ride_id,
latitude,
longitude,
meter_reading,
ride_status,
passenger_count
FROM
taxirides.realtime
WHERE ride_status = 'dropoff'
ORDER BY timestamp DESC
LIMIT 100000
)
# calculate aggregations on stream for reporting:
SELECT
ROW_NUMBER() OVER() AS dashboard_sort,
minute,
COUNT(DISTINCT ride_id) AS total_rides,
SUM(meter_reading) AS total_revenue,
SUM(passenger_count) AS total_passengers
FROM streaming_data
GROUP BY minute, timestamp

새 시크릿 모드로 https://datastudio.google.com/ 에 접속합니다.

Reports > Start with a Template > Blank Report

image

Bigquery 로 돌아옵니다.

EXPLORE DATA > Explore with Data Studio

  • Chart type: Combo chart
  • Date range Dimension: dashboard_sort
  • Dimension: dashboard_sort
  • Drill Down: dashboard_sort (Make sure that Drill down option is turned ON)
  • Metric: SUM() total_rides, SUM() total_passengers, SUM() total_revenue
  • Sort: dashboard_sort, Ascending (latest rides first)

Task 6. Create a real-time dashboard

image

Task 7. Create a time series dashboard

image

CUSTOM QUERY

SELECT
*
FROM
taxirides.realtime
WHERE
ride_status='dropoff'

Task 8. Stop the Dataflow job

image

☰ > Dataflow
에서 정지합니다.

Links