جار تحميل البيانات
جار تحميل البيانات
جار تحميل البيانات
select * from users limit 10Обязательно нужно узнать про уникальный ключ в этой таблице. Он нам поможет избежать дубликатов, так сказать uniqueness test. И еще можно отслеживать freshness таблица по какому-нибудь timestamp. Дальше нам надо придумать как эту табличку тянуть в хранилище данных, обычно в staging слой. И тут есть разные способы и инструменты (их не так много). Например, мы можем использовать Change Data Capture метод, который позволяет нам копировать только свежие и измененные данные, я это еще называю incremental загрузка. Альтернатива это full reload. Если вы посмотрите инструменты Fivetran, Airbyte, Meltano, Matillion - все они предлагают вам похожие способы забора и загрузки данных. Ок, нам повезло - наша таблица содержит два ключевых timestamp: -
created_at-
updated_atИ мы можем использовать подход с watermark, то есть хранить последнее значение (или находить его) перед запуском загрузки, и при каждом запуске ETL job, мы просто должны:
select * from users where created_at >= $parameter or updated_at >= $parameterНо не забывайте самое главное свойство ETL pipelines - idempotent - то есть если каким-то образом мы выполним один и тот же job много раз, результат будет всегда такой же. Для этого при incremental (инкрементальной) загрузке у нас есть варианты разные
DELETE/INSERT,
UPSERT,
MERGE,
UPDATE/INSERTи зависит это от данных и возможностей базы данных. Например для таблиц с логами, у нас история не меняется и всегда
APPEND, то есть данные добавляются, и отлично подходит
DELETE/INSERT, а для таблички с пользователями у нас для каждого
USER_IDмогу поменяться атрибуты, поэтому мы будем использовать
UPSERT. Для этого важно знать уникальный ключа в таблице! Выше я писал про свой подход с Snowflake Procedure, но это можно реализовать множеством других способов и инструментов. Главное суть остается та же. Теперь у нас есть таблица с пользователями, которая обновляется каждый день. И у для разработчика сразу должны возникнуть вопросы к заказчику, так сказать уточнение требований отчета:
IS_DELETED. А для этого нам уже придется сравнивать полный snapshot и текущею базу и находить удаленных клиентов. 4) И возможно, мы хотим видеть историю роста, вчера было 100 клиентов, а сегодня 110, то есть нам надо делать SNAPSHOTS раз в день. Казалось бы такая простая задача, а сколько возможностей. Самое интересное, что практически во всех организациях, где есть хранилище данных это делают. Часто терминология разная, но идея остается прежней, и ей уже лет 30 если не больше. И для этого не нужно знать ни python, ни Hadoop, ни streaming. Просто SQL и пару приемов, как данные сделать полезными для конечного потребителя. Поэтому прежде чем получать сертификаты dbt, snowflake, Databricks и тп, попробуйте на локальной базе разобраться с этими вещами и потом будет легче делать все тоже самое но уже на modern data stack за хорошую денюшку.
create or replace procedure CONFIG.UPDATE_WATERMARK_JOB(SCHEMA_NAME VARCHAR, PIPELINE_NAME VARCHAR, START_TIMESTAMP TIMESTAMP_NTZ, END_TIMESTAMP TIMESTAMP_NTZ, SOURCE VARCHAR, TABLE_NAME VARCHAR, LAST_CREATED_TIMESTAMP TIMESTAMP_NTZ, LAST_UPDATED_TIMESTAMP TIMESTAMP_NTZ) returns VARCHAR language SQL strict as $$ BEGIN INSERT INTO raw.config.watermark_table (schema_name, pipeline_name, start_timestamp, end_timestamp, source, table_name, last_created_timestamp, last_updated_timestamp) VALUES (schema_name, pipeline_name, start_timestamp, CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP()), source, table_name, last_created_timestamp, last_updated_timestamp); RETURN 'Success'; END; $$;Получается, что всё новое — это хорошо забытое старое! Как у вас дела обстоят с хранимыми процедурами?
SELECT * FROM