Creating a Streaming Data Pipeline for a Real-Time Dashboard with Dataflow
중간에 망해서 , 이글은 다시 업데이트 예정입니다.
Task 1. Create a Pub/Sub topic and BigQuery dataset
taxirides 이름의 bigquery를 만듭니다.
$bq mk taxirides
$ bq mk taxiridesDataset 'qwiklabs-gcp-xxxxxxx:taxirides' successfully created.
axirides.realtime 테이블(나중에 스트리밍할 빈 스키마)을 만듭니다.
bq mk \--time_partitioning_field timestamp \--schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\passenger_count:integer -t taxirides.realtime
bigquery 콘솔 메뉴로 이동합니다.
☰ > Big Data > BigQuery
dataset
을 생성합니다.
프로젝트 아이디 옆에 ⋮
을 눌러 Create dataset
을 누릅니다.
Task 2. Create a Cloud Storage bucket
☰ > Cloud Storage 로 이동합니다.
CREATE BUCKET
을 생성합니다.
버킷 아이디는 global에서 유니크 해야 합니다.
Task 3. Set up a Dataflow Pipeline
☰ > Big Data > Dataflow 로 이동힙니다 .
CREATE JOB FROM TEMPLATE.
Task 4. Analyze the taxi data using BigQuery
Menu: ☰ > BigQuery
SELECT * FROM taxirides.realtime LIMIT 10
Task 5. Perform aggregations on the stream for reporting
WITH streaming_data AS (SELECTtimestamp,TIMESTAMP_TRUNC(timestamp, HOUR, 'UTC') AS hour,TIMESTAMP_TRUNC(timestamp, MINUTE, 'UTC') AS minute,TIMESTAMP_TRUNC(timestamp, SECOND, 'UTC') AS second,ride_id,latitude,longitude,meter_reading,ride_status,passenger_countFROMtaxirides.realtimeWHERE ride_status = 'dropoff'ORDER BY timestamp DESCLIMIT 100000)# calculate aggregations on stream for reporting:SELECTROW_NUMBER() OVER() AS dashboard_sort,minute,COUNT(DISTINCT ride_id) AS total_rides,SUM(meter_reading) AS total_revenue,SUM(passenger_count) AS total_passengersFROM streaming_dataGROUP BY minute, timestamp
새 시크릿 모드로 https://datastudio.google.com/ 에 접속합니다.
Reports > Start with a Template > Blank Report
Bigquery 로 돌아옵니다.
EXPLORE DATA > Explore with Data Studio
- Chart type: Combo chart
- Date range Dimension: dashboard_sort
- Dimension: dashboard_sort
- Drill Down: dashboard_sort (Make sure that Drill down option is turned ON)
- Metric: SUM() total_rides, SUM() total_passengers, SUM() total_revenue
- Sort: dashboard_sort, Ascending (latest rides first)
Task 6. Create a real-time dashboard
Task 7. Create a time series dashboard
CUSTOM QUERY
SELECT*FROMtaxirides.realtimeWHEREride_status='dropoff'
Task 8. Stop the Dataflow job
Menu
☰ > Dataflow
에서 정지합니다.