def fields(
self,
fieldname,
data_type,
str_native_data_type,
fielddescription,
recursive=False,
jsonPath=None,
nullable=False,
tags=None,
):
fields = {
"fieldPath": fieldname,
"jsonPath": jsonPath,
"nullable": nullable,
"description": {"string": fielddescription},
"type": {"type": {f"{data_type}": {}}},
"nativeDataType": str_native_data_type,
"recursive": recursive,
}
if tags is not None and str(tags) != "none" and len(tags) > 0:
fields["globalTags"] = self.get_tag(tags)
return fields
def time_stamp(self, user_name="Datahub", impersonator=None):
return {
"time": self.current_time_stamp,
"actor": "urn:li:corpuser:" + user_name,
"impersonator": impersonator,
}
def table(
self,
list_schema,
list_field,
database_name,
table_name,
owner_table=None,
table_tags=None,
database_type=None,
):
json_table_aspect = {
"auditHeader": None,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": f"urn:li:dataset:(urn:li:dataPlatform:{database_type},{table_name},PROD)",
"aspects": [
{"com.linkedin.pegasus2avro.common.Status": {"removed": False}},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": f"{database_name}",
"platform": "urn:li:dataPlatform:mysql",
"version": 0,
"created": self.time_stamp(
user_name="Datahub", impersonator=None
),
"lastModified": self.time_stamp(
user_name="Datahub", impersonator=None
),
"deleted": None,
"dataset": None,
"cluster": None,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.KafkaSchema": {
"documentSchema": '{"type":"record","name":"'
+ table_name
+ '",'
'"namespace":"com.linkedin.dataset","fields":'
+ str(list_schema)
+ "}",
"documentSchemaType": "AVRO",
}
},
"fields": list_field,
"primaryKeys": None,
"foreignKeysSpecs": None,
}
},
],
}
},
"proposedDelta": None,
}
pegasus_dataset = "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot"
list_aspect = json_table_aspect["proposedSnapshot"][pegasus_dataset]["aspects"]
if owner_table is not None:
owner_data = self.get_ownership(owner_table, self.current_time_stamp)
if owner_data is not None and len(owner_data) > 0:
list_aspect.append(owner_data)
if table_tags is not None:
table_tag = self.table_tagging(table_tags)
if table_tag is not None and len(table_tag) > 0:
list_aspect.append(table_tag)
json_table_aspect["proposedSnapshot"][pegasus_dataset]["aspects"] = list_aspect
return json_table_aspect
-
Python 2024-04-27 16:01:10 What are some innovative use cases for Python in real-world applications?
-
Python 2024-04-27 01:42:48 What are some interesting use cases for metaclasses in Python?
-
Python 2024-04-22 20:29:58 What are some practical use cases for decorators in Python?
-
Python 2024-04-22 11:53:56 What are some practical use cases of metaclasses in Python?
-
Python 2024-04-18 11:16:32 What are some practical use cases for generators in Python?
-
Python 2024-04-16 04:11:18 How can I implement memoization in Python?
-
Python 2024-04-15 09:56:13 What are some innovative use cases of Python in real-world applications?
-
Python 2024-04-07 14:41:20 What are some practical applications of metaprogramming in Python?