From aebbb06116184ead8e5f98a4c7588d75e5e3a54f Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Sat, 25 Mar 2023 17:18:57 -0700 Subject: [PATCH 01/16] wip --- docs/source/extending_text_generation.rst | 6 +- docs/source/generating_cdc_data.rst | 53 ++++++------- docs/source/generating_json_data.rst | 91 +++++++++++----------- docs/source/multi_table_data.rst | 91 +++++++++++----------- docs/source/troubleshooting.rst | 93 ++++++++++++----------- 5 files changed, 174 insertions(+), 160 deletions(-) diff --git a/docs/source/extending_text_generation.rst b/docs/source/extending_text_generation.rst index 7f0df8a4..f59e7171 100644 --- a/docs/source/extending_text_generation.rst +++ b/docs/source/extending_text_generation.rst @@ -38,7 +38,7 @@ extended syntax. .withColumn("address", text=fakerText("address" )) .withColumn("email", text=fakerText("ascii_company_email") ) .withColumn("ip_address", text=fakerText("ipv4_private" )) - .withColumn("faker_text", text=fakerText("sentence", ext_word_list=my_word_list) ) + .withColumn("faker_text", text=fakerText("sentence", ext_word_list=my_word_list)) ) dfFakerOnly = fakerDataspec.build() @@ -91,7 +91,9 @@ The following code shows use of a custom Python function to generate text: pluginDataspec = (DataGenerator(spark, rows=data_rows, partitions=partitions_requested, randomSeedMethod="hash_fieldname") - .withColumn("text", text=PyfuncText(text_generator, initFn=initPluginContext)) + .withColumn("text", + text=PyfuncText(text_generator, + initFn=initPluginContext)) ) dfPlugin = pluginDataspec.build() diff --git a/docs/source/generating_cdc_data.rst b/docs/source/generating_cdc_data.rst index 1633ce41..ccbf16b4 100644 --- a/docs/source/generating_cdc_data.rst +++ b/docs/source/generating_cdc_data.rst @@ -1,7 +1,7 @@ .. Test Data Generator documentation master file, created by - sphinx-quickstart on Sun Jun 21 10:54:30 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +sphinx-quickstart on Sun Jun 21 10:54:30 2020. +You can adapt this file completely to your liking, but it should at least +contain the root `toctree` directive. Generating Change Data Capture Data =================================== @@ -47,28 +47,30 @@ We'll add a timestamp for when the row was generated and a memo field to mark wh uniqueCustomers = 10 * 1000000 - dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) - .withColumn("customer_id","long", uniqueValues=uniqueCustomers) - .withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') - .withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') - .withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard', - 'American Express', 'discover', 'branded visa', 'branded mastercard'], - random=True, distribution="normal") - .withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999, baseColumn="customer_id", - baseColumnType="hash", omit=True) - .withColumn("payment_instrument", expr="format_number(int_payment_instrument, '**** ****** *####')", - baseColumn="int_payment_instrument") - .withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w') - .withColumn("email2", template=r'\\w.\\w@\\w.com') - .withColumn("ip_address", template=r'\\n.\\n.\\n.\\n') - .withColumn("md5_payment_instrument", - expr="md5(concat(payment_instrument_type, ':', payment_instrument))", - base_column=['payment_instrument_type', 'payment_instrument']) - .withColumn("customer_notes", text=dg.ILText(words=(1,8))) - .withColumn("created_ts", "timestamp", expr="now()") - .withColumn("modified_ts", "timestamp", expr="now()") - .withColumn("memo", expr="'original data'") - ) + dataspec = ( + dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) + .withColumn("customer_id","long", uniqueValues=uniqueCustomers) + .withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') + .withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w') + .withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard', + 'American Express', 'discover', 'branded visa', 'branded mastercard'], + random=True, distribution="normal") + .withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999, + baseColumn="customer_id", baseColumnType="hash", omit=True) + .withColumn("payment_instrument", + expr="format_number(int_payment_instrument, '**** ****** *####')", + baseColumn="int_payment_instrument") + .withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w') + .withColumn("email2", template=r'\\w.\\w@\\w.com') + .withColumn("ip_address", template=r'\\n.\\n.\\n.\\n') + .withColumn("md5_payment_instrument", + expr="md5(concat(payment_instrument_type, ':', payment_instrument))", + base_column=['payment_instrument_type', 'payment_instrument']) + .withColumn("customer_notes", text=dg.ILText(words=(1,8))) + .withColumn("created_ts", "timestamp", expr="now()") + .withColumn("modified_ts", "timestamp", expr="now()") + .withColumn("memo", expr="'original data'") + ) df1 = dataspec.build() # write table @@ -168,7 +170,6 @@ values of the columns from the source table will be used. ]) print(sqlStmt) - spark.sql(sqlStmt) That's all that's required to perform merges with the data generation framework. diff --git a/docs/source/generating_json_data.rst b/docs/source/generating_json_data.rst index 56989d76..a26db3ee 100644 --- a/docs/source/generating_json_data.rst +++ b/docs/source/generating_json_data.rst @@ -195,51 +195,52 @@ functions such as `named_struct` and `to_json`. lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid'] - testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=1000000, - partitions=8, - randomSeedMethod='hash_fieldname') - .withIdOutput() - # we'll use hash of the base field to generate the ids to - # avoid a simple incrementing sequence - .withColumn("internal_device_id", LongType(), minValue=0x1000000000000, - uniqueValues=device_population, omit=True, baseColumnType="hash") - - # note for format strings, we must use "%lx" not "%x" as the - # underlying value is a long - .withColumn("device_id", StringType(), format="0x%013x", - baseColumn="internal_device_id") - - # the device / user attributes will be the same for the same device id - # so lets use the internal device id as the base column for these attribute - .withColumn("country", StringType(), values=country_codes, - weights=country_weights, - baseColumn="internal_device_id") - - .withColumn("manufacturer", StringType(), values=manufacturers, - baseColumn="internal_device_id", omit=True) - .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", - baseColumnType="hash", omit=True) - .withColumn("manufacturer_info", "string", - expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", - baseColumn=['manufacturer', 'line']) - - - .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11, - baseColumn="device_id", - baseColumnType="hash", omit=True) - - .withColumn("event_type", StringType(), - values=["activation", "deactivation", "plan change", - "telecoms activity", "internet activity", "device error"], - random=True, omit=True) - .withColumn("event_ts", "timestamp", - begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", - interval="1 minute", random=True, omit=True) - - .withColumn("event_info", "string", - expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", - baseColumn=['event_type', 'event_ts']) - ) + testDataSpec = ( + dg.DataGenerator(spark, name="device_data_set", rows=1000000, + partitions=8, + randomSeedMethod='hash_fieldname') + .withIdOutput() + # we'll use hash of the base field to generate the ids to + # avoid a simple incrementing sequence + .withColumn("internal_device_id", LongType(), minValue=0x1000000000000, + uniqueValues=device_population, omit=True, baseColumnType="hash") + + # note for format strings, we must use "%lx" not "%x" as the + # underlying value is a long + .withColumn("device_id", StringType(), format="0x%013x", + baseColumn="internal_device_id") + + # the device / user attributes will be the same for the same device id + # so lets use the internal device id as the base column for these attribute + .withColumn("country", StringType(), values=country_codes, + weights=country_weights, + baseColumn="internal_device_id") + + .withColumn("manufacturer", StringType(), values=manufacturers, + baseColumn="internal_device_id", omit=True) + .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", + baseColumnType="hash", omit=True) + .withColumn("manufacturer_info", "string", + expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", + baseColumn=['manufacturer', 'line']) + + + .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11, + baseColumn="device_id", + baseColumnType="hash", omit=True) + + .withColumn("event_type", StringType(), + values=["activation", "deactivation", "plan change", + "telecoms activity", "internet activity", "device error"], + random=True, omit=True) + .withColumn("event_ts", "timestamp", + begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", + interval="1 minute", random=True, omit=True) + + .withColumn("event_info", "string", + expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", + baseColumn=['event_type', 'event_ts']) + ) dfTestData = testDataSpec.build() diff --git a/docs/source/multi_table_data.rst b/docs/source/multi_table_data.rst index 5eab313d..d5aa5ec2 100644 --- a/docs/source/multi_table_data.rst +++ b/docs/source/multi_table_data.rst @@ -1,7 +1,7 @@ .. Test Data Generator documentation master file, created by - sphinx-quickstart on Sun Jun 21 10:54:30 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +sphinx-quickstart on Sun Jun 21 10:54:30 2020. +You can adapt this file completely to your liking, but it should at least +contain the root `toctree` directive. Generating and Using Data with Multiple Tables ============================================== @@ -73,7 +73,9 @@ Here we use a simple sequence for our plan ids. import dbldatagen as dg import pyspark.sql.functions as F - spark.catalog.clearCache() # clear cache so that if we run multiple times to check performance, we're not relying on cache + # clear cache so that if we run multiple times to check performance, + # we're not relying on cache + spark.catalog.clearCache() UNIQUE_PLANS = 20 PLAN_MIN_VALUE = 100 @@ -87,36 +89,35 @@ Here we use a simple sequence for our plan ids. spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000) - plan_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) - .withColumn("plan_id","int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS) - # use plan_id as root value - .withColumn("plan_name", prefix="plan", baseColumn="plan_id") - - # note default step is 1 so you must specify a step for small number ranges, - .withColumn("cost_per_mb", "decimal(5,3)", minValue=0.005, maxValue=0.050, - step=0.005, random=True) - .withColumn("cost_per_message", "decimal(5,3)", minValue=0.001, maxValue=0.02, - step=0.001, random=True) - .withColumn("cost_per_minute", "decimal(5,3)", minValue=0.001, maxValue=0.01, - step=0.001, random=True) - - # we're modelling long distance and international prices simplistically - - # each is a multiplier thats applied to base rate - .withColumn("ld_multiplier", "decimal(5,3)", minValue=1.5, maxValue=3, step=0.05, - random=True, distribution="normal", omit=True) - .withColumn("ld_cost_per_minute", "decimal(5,3)", - expr="cost_per_minute * ld_multiplier", - baseColumns=['cost_per_minute', 'ld_multiplier']) - .withColumn("intl_multiplier", "decimal(5,3)", minValue=2, maxValue=4, step=0.05, - random=True, distribution="normal", omit=True) - .withColumn("intl_cost_per_minute", "decimal(5,3)", - expr="cost_per_minute * intl_multiplier", - baseColumns=['cost_per_minute', 'intl_multiplier']) + plan_dataspec = ( + dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested) + .withColumn("plan_id","int", minValue=PLAN_MIN_VALUE, uniqueValues=UNIQUE_PLANS) + # use plan_id as root value + .withColumn("plan_name", prefix="plan", baseColumn="plan_id") + + # note default step is 1 so you must specify a step for small number ranges, + .withColumn("cost_per_mb", "decimal(5,3)", minValue=0.005, maxValue=0.050, + step=0.005, random=True) + .withColumn("cost_per_message", "decimal(5,3)", minValue=0.001, maxValue=0.02, + step=0.001, random=True) + .withColumn("cost_per_minute", "decimal(5,3)", minValue=0.001, maxValue=0.01, + step=0.001, random=True) + + # we're modelling long distance and international prices simplistically - + # each is a multiplier thats applied to base rate + .withColumn("ld_multiplier", "decimal(5,3)", minValue=1.5, maxValue=3, step=0.05, + random=True, distribution="normal", omit=True) + .withColumn("ld_cost_per_minute", "decimal(5,3)", + expr="cost_per_minute * ld_multiplier", + baseColumns=['cost_per_minute', 'ld_multiplier']) + .withColumn("intl_multiplier", "decimal(5,3)", minValue=2, maxValue=4, step=0.05, + random=True, distribution="normal", omit=True) + .withColumn("intl_cost_per_minute", "decimal(5,3)", + expr="cost_per_minute * intl_multiplier", + baseColumns=['cost_per_minute', 'intl_multiplier']) ) - df_plans = (plan_dataspec.build() - .cache() - ) + df_plans = plan_dataspec.build().cache() display(df_plans) @@ -195,10 +196,11 @@ when using hashed values, the range of the hashes produced can be large. effective_customers = df_customers.count() - print(stripMargin(f"""revised customers : {df_customers.count()}, - | unique customers: {df_customers.select(F.countDistinct('customer_id')).take(1)[0][0]}, - | unique device ids: {df_customers.select(F.countDistinct('device_id')).take(1)[0][0]}, - | unique phone numbers: {df_customers.select(F.countDistinct('phone_number')).take(1)[0][0]}""") + print(stripMargin( + f"""revised customers : {df_customers.count()}, + | unique customers: {df_customers.select(F.countDistinct('customer_id')).take(1)[0][0]}, + | unique device ids: {df_customers.select(F.countDistinct('device_id')).take(1)[0][0]}, + | unique phone numbers: {df_customers.select(F.countDistinct('phone_number')).take(1)[0][0]}""") ) display(df_customers) @@ -247,7 +249,8 @@ A simple approach is simply to multiply the # use random seed method of 'hash_fieldname' for better spread - default in later builds events_dataspec = (dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested, randomSeed=42, randomSeedMethod="hash_fieldname") - # use same logic as per customers dataset to ensure matching keys - but make them random + # use same logic as per customers dataset to ensure matching keys + # but make them random .withColumn("device_id_base","decimal(10)", minValue=CUSTOMER_MIN_VALUE, uniqueValues=UNIQUE_CUSTOMERS, random=True, omit=True) @@ -260,12 +263,14 @@ A simple approach is simply to multiply the weights=[50, 50, 20, 10, 5 ], random=True) # use Gamma distribution for skew towards short calls - .withColumn("base_minutes","decimal(7,2)", minValue=1.0, maxValue=100.0, step=0.1, + .withColumn("base_minutes","decimal(7,2)", + minValue=1.0, maxValue=100.0, step=0.1, distribution=dg.distributions.Gamma(shape=1.5, scale=2.0), random=True, omit=True) # use Gamma distribution for skew towards short transfers - .withColumn("base_bytes_transferred","decimal(12)", minValue=K_1, maxValue=MB_100, + .withColumn("base_bytes_transferred","decimal(12)", + minValue=K_1, maxValue=MB_100, distribution=dg.distributions.Gamma(shape=0.75, scale=2.0), random=True, omit=True) @@ -308,8 +313,7 @@ Let's compute the customers and associated plans import pyspark.sql.functions as F import pyspark.sql.types as T - df_customer_pricing = df_customers.join(df_plans, - df_plans.plan_id == df_customers.plan) + df_customer_pricing = df_customers.join(df_plans, df_plans.plan_id == df_customers.plan) display(df_customer_pricing) @@ -365,8 +369,9 @@ now let's compute the invoices .. code-block:: python - df_customer_summary = (df_customer_pricing.join(df_summary, - df_customer_pricing.device_id == df_summary.device_id ) + df_customer_summary = ( + df_customer_pricing.join(df_summary, + df_customer_pricing.device_id == df_summary.device_id ) .createOrReplaceTempView("customer_summary")) df_invoices = spark.sql(""" diff --git a/docs/source/troubleshooting.rst b/docs/source/troubleshooting.rst index 386d35ab..83d721b3 100644 --- a/docs/source/troubleshooting.rst +++ b/docs/source/troubleshooting.rst @@ -165,50 +165,55 @@ In these cases, we use the `baseColumn` attribute to ensure the correct column b lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid'] - testDataSpec = (dg.DataGenerator(spark, name="device_data_set", rows=1000000, - partitions=8, - randomSeedMethod='hash_fieldname') - # we'll use hash of the base field to generate the ids to - # avoid a simple incrementing sequence - .withColumn("internal_device_id", "long", minValue=0x1000000000000, - uniqueValues=device_population, omit=True, baseColumnType="hash") - - # note for format strings, we must use "%lx" not "%x" as the - # underlying value is a long - .withColumn("device_id", "string", format="0x%013x", - baseColumn="internal_device_id") - - # the device / user attributes will be the same for the same device id - # so lets use the internal device id as the base column for these attribute - .withColumn("country", "string", values=country_codes, - weights=country_weights, - baseColumn="internal_device_id") - - .withColumn("manufacturer", "string", values=manufacturers, - baseColumn="internal_device_id", omit=True) - - .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", - baseColumnType="hash", omit=True) - - # note use of baseColumn to control column build ordering - .withColumn("manufacturer_info", "string", - expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", - baseColumn=["line", "manufacturer"] - ) - - .withColumn("event_type", "string", - values=["activation", "deactivation", "plan change", - "telecoms activity", "internet activity", "device error"], - random=True, omit=True) - - .withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", - interval="1 minute", random=True, omit=True) - - # note use of baseColumn to control column build ordering - .withColumn("event_info", "string", - expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", - baseColumn=["event_type", "event_ts"]) - ) + testDataSpec = ( + dg.DataGenerator(spark, name="device_data_set", rows=1000000, + partitions=8, + randomSeedMethod='hash_fieldname') + # we'll use hash of the base field to generate the ids to + # avoid a simple incrementing sequence + .withColumn("internal_device_id", "long", minValue=0x1000000000000, + uniqueValues=device_population, omit=True, baseColumnType="hash") + + # note for format strings, we must use "%lx" not "%x" as the + # underlying value is a long + .withColumn("device_id", "string", format="0x%013x", + baseColumn="internal_device_id") + + # the device / user attributes will be the same for the same device id + # so lets use the internal device id as the base column for these attribute + .withColumn("country", "string", values=country_codes, + weights=country_weights, + baseColumn="internal_device_id") + + .withColumn("manufacturer", "string", values=manufacturers, + baseColumn="internal_device_id", omit=True) + + .withColumn("line", StringType(), values=lines, baseColumn="manufacturer", + baseColumnType="hash", omit=True) + + # note use of baseColumn to control column build ordering + .withColumn("manufacturer_info", "string", + expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))", + baseColumn=["line", "manufacturer"] + ) + + .withColumn("event_type", "string", + values=["activation", "deactivation", "plan change", + "telecoms activity", "internet activity", "device error"], + random=True, omit=True) + + .withColumn("event_ts", "timestamp", + begin="2020-01-01 01:00:00", + end="2020-12-31 23:59:00", + interval="1 minute", + random=True, + omit=True) + + # note use of baseColumn to control column build ordering + .withColumn("event_info", "string", + expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))", + baseColumn=["event_type", "event_ts"]) + ) dfTestData = testDataSpec.build() From c4fdc3bc8e77a7eeaaa2ff03797cf160675cb79b Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 9 May 2023 11:59:45 -0700 Subject: [PATCH 02/16] wip --- python/dev_require.txt | 1 - python/require.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/python/dev_require.txt b/python/dev_require.txt index 8d53d810..a34ed3b2 100644 --- a/python/dev_require.txt +++ b/python/dev_require.txt @@ -31,7 +31,6 @@ pypandoc ipython==7.22.0 recommonmark sphinx-markdown-builder -rst2pdf==0.98 Jinja2 < 3.1 sphinx-copybutton diff --git a/python/require.txt b/python/require.txt index 53f80fde..5f0e30a4 100644 --- a/python/require.txt +++ b/python/require.txt @@ -30,7 +30,6 @@ pypandoc ipython==7.22.0 recommonmark sphinx-markdown-builder -rst2pdf==0.98 Jinja2 < 3.1 sphinx-copybutton From a1b03aa3068de08d1133a43a7eafc1f3da5fdb33 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Wed, 14 Jun 2023 12:54:53 -0700 Subject: [PATCH 03/16] added support for inferred types --- CHANGELOG.md | 1 + dbldatagen/__init__.py | 3 +- dbldatagen/data_generator.py | 22 ++- dbldatagen/datagen_constants.py | 2 + docs/source/generating_json_data.rst | 28 +++ makefile | 6 +- tests/test_complex_columns.py | 267 +++++++++++++++++++++++++++ 7 files changed, 324 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index afb34eee..34794e1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in #### Changed * Added formatting of generated code as Html for script methods +* Allow use of inferred types on `withColumn` method when `expr` attribute is used ### Version 0.3.4 Post 3 diff --git a/dbldatagen/__init__.py b/dbldatagen/__init__.py index 49eea723..3c41a5d7 100644 --- a/dbldatagen/__init__.py +++ b/dbldatagen/__init__.py @@ -25,7 +25,8 @@ from .data_generator import DataGenerator from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_RANDOM, RANDOM_SEED_FIXED, \ - RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION + RANDOM_SEED_HASH_FIELD_NAME, MIN_PYTHON_VERSION, MIN_SPARK_VERSION, \ + INFER_DATATYPE from .utils import ensure, topologicalSort, mkBoundsList, coalesce_values, \ deprecated, parse_time_interval, DataGenError, split_list_matching_condition, strip_margins, \ json_value_from_path, system_time_millis diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 02095490..c62186f2 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -14,7 +14,9 @@ from .column_generation_spec import ColumnGenerationSpec from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \ DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \ - OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD + OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \ + INFER_DATATYPE + from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition from .html_utils import HtmlUtils @@ -607,7 +609,8 @@ def withColumnSpecs(self, patterns=None, fields=None, matchTypes=None, **kwargs) Builder pattern .. note:: - matchTypes may also take SQL type strings or a list of SQL type strings such as "array" + matchTypes may also take SQL type strings or a list of SQL type strings such as "array". However, + you may not use ``INFER_DATYTYPE`` as part of the matchTypes list. You may also add a variety of options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. @@ -645,6 +648,9 @@ def withColumnSpecs(self, patterns=None, fields=None, matchTypes=None, **kwargs) for typ in matchTypes: if isinstance(typ, str): + if typ == INFER_DATATYPE: + raise ValueError("You cannot use INFER_DATATYPE with the method `withColumnSpecs`") + effective_types.append(SchemaParser.columnTypeFromString(typ)) else: effective_types.append(typ) @@ -762,6 +768,15 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None :returns: modified in-place instance of test data generator allowing for chaining of calls following Builder pattern + .. note:: + if the value ``None`` is used for the ``colType`` parameter, the method will try to use the underlying + datatype derived from the base columns. + + If the value ``INFER_DATATYPE`` is used for the ``colType`` parameter and a SQL expression has been supplied + via the ``expr`` parameter, the method will try to infer the column datatype from the SQL expression. + + Inferred data types can only be used if the ``expr`` parameter is specified. + You may also add a variety of additional options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. @@ -821,6 +836,9 @@ def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, if colType is None: colType = self.getColumnType(baseColumn) + if colType == INFER_DATATYPE: + raise ValueError("When base column(s) have inferred datatype, you must specify the column type") + new_props = {} new_props.update(kwargs) diff --git a/dbldatagen/datagen_constants.py b/dbldatagen/datagen_constants.py index f19c2c41..34ff44dd 100644 --- a/dbldatagen/datagen_constants.py +++ b/dbldatagen/datagen_constants.py @@ -41,3 +41,5 @@ OPTION_RANDOM = "random" OPTION_RANDOM_SEED_METHOD = "randomSeedMethod" OPTION_RANDOM_SEED = "randomSeed" + +INFER_DATATYPE = "__infer__" \ No newline at end of file diff --git a/docs/source/generating_json_data.rst b/docs/source/generating_json_data.rst index f7534c5b..f33a55d8 100644 --- a/docs/source/generating_json_data.rst +++ b/docs/source/generating_json_data.rst @@ -178,6 +178,18 @@ written as: expr="named_struct('event_type', event_type, 'event_ts', event_ts)", baseColumn=['event_type', 'event_ts']) + To simplify the specification of struct valued columns, the keyword "__infer__" can be used in place of the datatype +when the `expr` attribute is specified. This will cause the datatype to be inferred from the expression. + +In this case, the previous code would be written as follows: + +.. code-block:: python + + .withColumn("event_info", + "__infer__", + expr="named_struct('event_type', event_type, 'event_ts', event_ts)", + baseColumn=['event_type', 'event_ts']) + Generating JSON valued fields ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -293,6 +305,22 @@ variable length subsets of the ``r`` columns. .. note:: Note the use of the `baseColumn` attribute here to ensure correct ordering and separation of phases. +Using inferred datatypes +^^^^^^^^^^^^^^^^^^^^^^^^ + +When building columns with complex data types such as structs especially with nested structs, it can be repetitive and +error prone to specify the datatypes - especially when the column is based on the resuls of a SQL expression +(as specified by the ``expr`` attribute). + +You may use the constant ``INFER_DATATYPE`` in place of the actual datatype when the ``expr`` attribute is used. + +When the ``INFER_DATATYPE`` constant is used for the datatype, the actual datatype for the column will be inferred +from the SQL expression passed using the ``expr`` parameter. This is only supported when the ``expr`` parameter is +populated. + +The following example illustrates this: + + Using multi feature columns to generate arrays ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/makefile b/makefile index 7c006b3e..e76e0952 100644 --- a/makefile +++ b/makefile @@ -89,11 +89,13 @@ dev-test: dev-lint-report: @echo "$(OK_COLOR)=> Running Prospector lint reporting $(PWD) $(NO_COLOR)" - prospector --profile prospector.yaml > prospector_report.txt + prospector --profile prospector.yaml dbldatagen > prospector_report.txt + prospector --profile prospector.yaml tests >> prospector_report.txt dev-lint: @echo "$(OK_COLOR)=> Running Prospector lint reporting $(PWD) $(NO_COLOR)" - prospector --profile prospector.yaml + prospector --profile prospector.yaml dbldatagen + prospector --profile prospector.yaml tests dev-test-with-html-report: @echo "$(OK_COLOR)=> Running unit tests with HTML test coverage report$(NO_COLOR)" diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 213897c1..52283f1b 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -396,3 +396,270 @@ def test_map_values(self): for r in rows: assert r['test'] is not None + + def test_inferred_column_types1(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + import dbldatagen as dg + + from pyspark.sql.types import LongType, FloatType, IntegerType, \ + StringType, DoubleType, BooleanType, ShortType, \ + TimestampType, DateType, DecimalType, ByteType, \ + BinaryType, ArrayType, MapType, StructType, StructField + + from collections import namedtuple + import dbldatagen as dg + from dbldatagen import INFER_DATATYPE + import logging + + os = ['MacOS', 'Linux', 'Windows', 'iOS', 'Andoid'] + os_edition = ['RHEL 5.0', 'Windows 7', 'Windows 10', 'Windows XP', 'Mac OS X 10.8'] + linux_distro = ['Ubuntu', 'Fedora', 'Kubuntu', 'Arch', 'Alpine'] + boolean_values = ['True'] + dot_net_version = ['.NET 8.0', '.NET 7.0', '.NET 6.0', '.NET Core 3.0'] + browser_label = ['Chrome', 'Edge', 'Firefox', 'Safari'] + browserver = ['Version 113.0.5672.126'] + osver = ['13.3.1 (22E261)'] + + dataspec = ( + dg.DataGenerator(spark, name="device_data_set", rows=10000, + partitions=8 + , randomSeedMethod='hash_fieldname') + ) + + logging.info(dataspec.partitions) + + dataspec = (dataspec + # Design + # + # v2.0 - updated so taht script outputs JSON only to mimic flow from Cisco edge + + # Reporting Identifiers + .withColumn("dd_id", "long", minValue=1, maxValue=100000, random=True, omit=True) + .withColumn("internal_device_id", "integer", baseColumnType="hash", + omit=True) # internal id is hash of `id` column + .withColumn("duo_device_id", "string", format="0x%010x", baseColumnType="values", + baseColumn="internal_device_id") # format hash value as string + .withColumn("org_id", "integer", minValue=1, maxValue=100000, random=True, omit=True) + .withColumn("user_id", "integer", minValue=1000000, maxValue=9999999, random=True, omit=True) + .withColumn("anyconnect_id", "string", format="0x%015x", minValue=1, maxValue=1000000, + random=True, omit=True) + .withColumn("ztna_device_id", "string", format="0x%015x", minValue=1, maxValue=1000000, + random=True, omit=True) + .withColumn("reporting_identifiers", INFER_DATATYPE, + expr="named_struct('org_id', org_id ,'dd_id', dd_id ,'user_id',user_id,'anyconnect_id',anyconnect_id,'duo_device_id',duo_device_id,'ztna_device_id',ztna_device_id)", + baseColumn=['org_id', 'dd_id', 'user_id', 'anyconnect_id', 'duo_device_id', + 'ztna_device_id'] + , omit=True) + + .withColumn("txid", "string", template=r'\XXX-XXXXXXXXXXXXXXX', random=True, + omit=True) + .withColumn("duo_client_version", "string", format="1.0.%x", minValue=1, maxValue=9, + random=True, omit=True) + .withColumn("os", "string", values=os, random=True, omit=True) + .withColumn("os_version", "string", values=osver, omit=True) + # .withColumn("os_version", "string", format="10.10.%x", minValue=1, maxValue=9, random=True, omit=True) + .withColumn("os_build", "string", format="19042.%x", minValue=1000, maxValue=9999, random=True, + omit=True) + .withColumn("os_edition", "string", values=os_edition, random=True, omit=True) + .withColumn("linux_distro", "string", values=linux_distro, random=True, omit=True) + .withColumn("device_name", "string", template=r'\W', random=True, omit=True) + .withColumn("is_password_set", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("is_encryption_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("is_firewall_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("is_screen_lock_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("is_auto_login_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("seconds_before_screen_lock", "integer", minValue=1, maxValue=60, random=True, + omit=True) + + .withColumn("is_active", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("product_label", "string", template=r'\W', random=True, omit=True) + .withColumn("version", "string", template=r'\W', random=True, omit=True) + .withColumn("instance_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX-\XXXXXX', random=True, + omit=True) + + # struct + .withColumn("security_agents", INFER_DATATYPE, + expr="named_struct('is_active', is_active, 'product_label', product_label,'version', version, 'instance_guid', instance_guid)", + baseColumn=['is_active', 'product_label', 'version', 'instance_guid'] + , omit=True) + + # .withColumn("security_agents", "struct", + # expr="named_struct('is_active', is_active, 'product_label', product_label,'version', version, 'instance_guid', instance_guid)", + # baseColumn=['is_active', 'product_label','version','instance_guid'] + # ,omit=True) + + .withColumn("communication_scheme", "string", template=r'\W', random=True, omit=True) + .withColumn("health_check_end_timestamp", "string", percentNulls=1, omit=True) + .withColumn("health_check_start_timestamp", "string", percentNulls=1, omit=True) + .withColumn("health_check_length_millis", "string", percentNulls=1, omit=True) + .withColumn("browser_label", "string", percentNulls=1, omit=True) + # .withColumn("browser_version", "string", percentNulls=1, omit=True) + .withColumn("browser_version", "string", values=browserver, omit=True) + .withColumn("browser_process_name", "string", percentNulls=1, omit=True) + + .withColumn("valid", "string", percentNulls=1, omit=True) + .withColumn("common_name", "string", percentNulls=1, omit=True) + .withColumn("team_id", "string", percentNulls=1, omit=True) + .withColumn("browser_process_info", INFER_DATATYPE, + expr="named_struct('valid', valid, 'common_name', common_name,'team_id', team_id)", + baseColumn=['valid', 'common_name', 'team_id'] + , omit=True) + + .withColumn("is_virtual_machine", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("is_health_check_retrying", "string", percentNulls=1, omit=True) + .withColumn("domain_name", "string", template=r'www.\w.com|\w.\w.co.u\k', omit=True) + .withColumn("host_name", "string", template=r'\w', omit=True) + .withColumn("dot_net_version", "string", values=dot_net_version, random=True, omit=True) + .withColumn("wifi_fingerprint", "string", template=r'\XXX-XXXXXXXXXXXXXXX', random=True, + omit=True) + .withColumn("wifi_fingerprint_age_seconds", "integer", minValue=1, maxValue=720, random=True, + omit=True) + .withColumn("wifi_fingerprint_includes_bssid", "boolean", values=boolean_values, random=True, + omit=True) + + .withColumn("desktop_session_token", "string", template=r'\XXX-XXXXXXXXXXXX-XXX', random=True, + omit=True) + .withColumn("desktop_session", StructType([StructField('desktop_session_token', StringType())]), + expr="named_struct('desktop_session_token', desktop_session_token)", + baseColumn=['desktop_session_token'] + , omit=True) + + .withColumn("machine_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX', random=True, + omit=True) + .withColumn("hardware_uuid", "string", template=r'\XXXXXXXX-\XXXX-\XXXX-\XXXX-\XXXXXXXXXXXX', + random=True, omit=True) + .withColumn("domain_sid", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, + omit=True) + .withColumn("computer_sid", "string", format="0x%010x", minValue=1, maxValue=1000000, + random=True, omit=True) + .withColumn("intune_id", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, + omit=True) + .withColumn("amp_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX', random=True, omit=True) + .withColumn("omadm_device_client_id", "string", format="0x%010x", minValue=1, maxValue=1000000, + random=True, omit=True) + .withColumn("cpu_id", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, + omit=True) + .withColumn("identifiers", INFER_DATATYPE, + expr="named_struct('machine_guid', machine_guid, 'hardware_uuid', hardware_uuid,'domain_sid', domain_sid, \ + 'computer_sid', computer_sid, 'intune_id', intune_id, 'amp_guid', amp_guid, 'omadm_device_client_id', omadm_device_client_id, 'cpu_id',cpu_id )", + baseColumn=['machine_guid', 'hardware_uuid', 'domain_sid', 'computer_sid'] + , omit=True) + + .withColumn("mdm_id_collection_errors", "string", format="0x%010x", minValue=1, + maxValue=1000000, random=True, omit=True) + + .withColumn("password_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("device_name_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("os_version_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("biometrics_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("encryption_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("firewall_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("security_agents_total_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("lock_screen_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("virtual_machine_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("auto_login_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("domain_name_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("host_name_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("machine_guid_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("domain_sid_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("compute_sid_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("intune_id_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("amp_guid_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("omadm_device_client_id_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("desktop_session_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("signature_validation_fl", FloatType(), percentNulls=1, omit=True) + .withColumn("individual_health_check_durations_millis", + INFER_DATATYPE, + expr="named_struct('password_fl', password_fl, 'device_name_fl', device_name_fl,'os_version_fl', os_version_fl,'biometrics_fl', biometrics_fl, 'encryption_fl', encryption_fl, 'firewall_fl', firewall_fl, 'security_agents_total_fl', security_agents_total_fl, 'lock_screen_fl',lock_screen_fl, 'virtual_machine_fl',virtual_machine_fl,'auto_login_fl',auto_login_fl,'domain_name_fl',domain_name_fl,'host_name_fl',host_name_fl,'machine_guid_fl',machine_guid_fl,'domain_sid_fl',domain_sid_fl,'compute_sid_fl',compute_sid_fl,'intune_id_fl',intune_id_fl,'amp_guid_fl',amp_guid_fl,'omadm_device_client_id_fl',omadm_device_client_id_fl,'desktop_session_fl',desktop_session_fl,'signature_validation_fl',signature_validation_fl)", + baseColumn=['password_fl', 'device_name_fl', 'os_version', 'biometrics_fl', + 'encryption_fl', 'firewall_fl', 'security_agents_total_fl', + 'lock_screen_fl', 'virtual_machine_fl', 'auto_login_fl', 'domain_name_fl', + 'host_name_fl', 'machine_guid_fl', 'domain_sid_fl', 'compute_sid_fl', + 'intune_id_fl', 'amp_guid_fl', 'omadm_device_client_id_fl', + 'desktop_session_fl', 'signature_validation_fl'] + , omit=True) + + .withColumn("app_updates_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("auto_launch_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("diagnostic_log_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("desktop_sessions_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("user_configuration", INFER_DATATYPE, + expr="named_struct('app_updates_enabled', app_updates_enabled, 'auto_launch_enabled', auto_launch_enabled,'diagnostic_log_enabled', diagnostic_log_enabled, 'desktop_sessions_enabled',desktop_sessions_enabled)", + baseColumn=['app_updates_enabled', 'auto_launch_enabled', 'diagnostic_log_enabled', + 'desktop_sessions_enabled'] + , omit=True) + + .withColumn("app_updates_enabled_sys", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("row_app_hidden", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("row_password_hidden", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("row_encryption_hidden", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("row_firewall_hidden", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("row_updates_hidden", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("automatic_updates_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("updater_disabled_by_admin", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("system_configuration", + INFER_DATATYPE, + expr="named_struct('app_updates_enabled_sys', app_updates_enabled_sys, 'row_app_hidden', row_app_hidden,'row_password_hidden', row_password_hidden, 'row_encryption_hidden',row_encryption_hidden, 'row_firewall_hidden', row_firewall_hidden,'row_updates_hidden',row_updates_hidden, 'automatic_updates_enabled',automatic_updates_enabled,'updater_disabled_by_admin',updater_disabled_by_admin )", + baseColumn=['app_updates_enabled_sys', 'row_app_hidden', 'row_password_hidden', + 'row_encryption_hidden', 'row_firewall_hidden', 'row_updates_hidden', + 'automatic_updates_enabled', 'updater_disabled_by_admin'] + , omit=True) + + .withColumn("feature_enabled", "boolean", values=boolean_values, random=True, omit=True) + .withColumn("connected_hostnames", "string", template=r'\w', omit=True) + .withColumn("duoconnect", INFER_DATATYPE, + expr="named_struct('feature_enabled', feature_enabled, 'connected_hostnames', connected_hostnames)", + baseColumn=['feature_enabled', 'connected_hostnames'] + , omit=True) + + .withColumn("supports_hardware_security", "boolean", values=boolean_values, random=True, omit=True) + + # Posture payload + .withColumn("posture_payload", INFER_DATATYPE, + expr="named_struct('reporting_identifiers', reporting_identifiers,'txid', txid ,'duo_client_version', duo_client_version ,'os', os,'os_version', os_version ,'os_build', os_build ,'os_edition', os_edition,'linux_distro', linux_distro ,'device_name', device_name ,'is_password_set', is_password_set,'is_encryption_enabled', is_encryption_enabled ,'is_firewall_enabled', is_firewall_enabled ,'is_screen_lock_enabled',is_screen_lock_enabled,'is_auto_login_enabled', is_auto_login_enabled ,'seconds_before_screen_lock', seconds_before_screen_lock, 'security_agents', security_agents, 'communication_scheme',communication_scheme,'health_check_end_timestamp',health_check_end_timestamp,'health_check_start_timestamp',health_check_start_timestamp,'health_check_length_millis', health_check_length_millis,'browser_label', browser_label,'browser_version',browser_version, 'browser_process_name', browser_process_name, 'browser_process_info',browser_process_info, 'is_virtual_machine',is_virtual_machine, 'is_health_check_retrying', is_health_check_retrying,'domain_name',domain_name, 'host_name',host_name, 'dot_net_version', dot_net_version, 'wifi_fingerprint', wifi_fingerprint,'wifi_fingerprint_age_seconds',wifi_fingerprint_age_seconds,'wifi_fingerprint_includes_bssid',wifi_fingerprint_includes_bssid, 'desktop_session', desktop_session, 'identifiers',identifiers, 'mdm_id_collection_errors',mdm_id_collection_errors, 'individual_health_check_durations_millis',individual_health_check_durations_millis, 'user_configuration' , user_configuration, 'system_configuration', system_configuration, 'duoconnect',duoconnect, 'supports_hardware_security', supports_hardware_security )", + baseColumn=['reporting_identifiers', 'txid', 'duo_client_version', 'os', 'os_version', + 'os_build', 'os_edition', 'linux_distro', 'device_name', 'is_password_set', + 'is_encryption_enabled', 'is_firewall_enabled', 'is_screen_lock_enabled', + 'is_auto_login_enabled', 'seconds_before_screen_lock', 'security_agents', + 'communication_scheme', 'health_check_end_timestamp', + 'health_check_start_timestamp', 'health_check_length_millis', + 'browser_label', 'browser_version', 'browser_process_name', + 'browser_process_info', 'is_virtual_machine', 'is_health_check_retrying', + 'domain_name', 'host_name', 'dot_net_version', 'wifi_fingerprint', + 'wifi_fingerprint_age_seconds', 'wifi_fingerprint_includes_bssid', + 'desktop_session', 'identifiers', 'mdm_id_collection_errors', + 'individual_health_check_durations_millis', 'user_configuration', + 'system_configuration', 'duoconnect', 'supports_hardware_security'] + ) # Close Posture Payload + .withColumn("last_modified", "timestamp", expr="now()") + ) + + dfTestData = dataspec.build() + + print(dfTestData.schema) + + data = dfTestData.limit(10).collect() + + for x in data: + print(x) + + def test_inferred_column_types2(self, setupLogging): + with pytest.raises(ValueError): + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", dg.INFER_DATATYPE, values=['a', 'b', 'c']) + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + df = df_spec.build() + df.show() + + From b3cd64495707476877b6028e5fd64263ccdc382a Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Wed, 14 Jun 2023 16:18:34 -0700 Subject: [PATCH 04/16] added support for inferred types --- dbldatagen/column_generation_spec.py | 14 +++++++- dbldatagen/data_generator.py | 29 ++++++++------- tests/test_complex_columns.py | 54 ++++++++++++++++++++++++---- 3 files changed, 74 insertions(+), 23 deletions(-) diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index 6456a389..81deba1c 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -20,7 +20,7 @@ from .column_spec_options import ColumnSpecOptions from .datagen_constants import RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, RANDOM_SEED_RANDOM, \ - DEFAULT_SEED_COLUMN, OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD + DEFAULT_SEED_COLUMN, OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, INFER_DATATYPE from .daterange import DateRange from .distributions import Normal, DataDistribution @@ -109,6 +109,12 @@ def __init__(self, name, colType=None, minValue=0, maxValue=None, step=1, prefix if colType is None: # default to integer field if none specified colType = IntegerType() + self._inferDataType = False + elif colType == INFER_DATATYPE: + colType = StringType() # default inferred data type to string until exact type is known + self._inferDataType = True + else: + self._inferDataType = False assert isinstance(colType, DataType), f"colType `{colType}` is not instance of DataType" @@ -399,6 +405,12 @@ def textGenerator(self): """ Get the text generator for the column spec""" return self._textGenerator + @property + def inferDatatype(self): + """ If True indicates that datatype should be inferred to be result of computing SQL expression + """ + return self._inferDataType + @property def baseColumns(self): """ Return base columns as list of strings""" diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index c62186f2..df800344 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -10,18 +10,17 @@ import re from pyspark.sql.types import LongType, IntegerType, StringType, StructType, StructField, DataType -from .spark_singleton import SparkSingleton + +from ._version import _get_spark_version from .column_generation_spec import ColumnGenerationSpec from .datagen_constants import DEFAULT_RANDOM_SEED, RANDOM_SEED_FIXED, RANDOM_SEED_HASH_FIELD_NAME, \ - DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \ - OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \ - INFER_DATATYPE - - -from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition + DEFAULT_SEED_COLUMN, SPARK_RANGE_COLUMN, MIN_SPARK_VERSION, \ + OPTION_RANDOM, OPTION_RANDOM_SEED, OPTION_RANDOM_SEED_METHOD, \ + INFER_DATATYPE from .html_utils import HtmlUtils -from . _version import _get_spark_version from .schema_parser import SchemaParser +from .spark_singleton import SparkSingleton +from .utils import ensure, topologicalSort, DataGenError, deprecated, split_list_matching_condition _OLD_MIN_OPTION = 'min' _OLD_MAX_OPTION = 'max' @@ -64,7 +63,8 @@ class DataGenerator: # restrict spurious messages from java gateway logging.getLogger("py4j").setLevel(logging.WARNING) - #logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.NOTSET) + + # logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.NOTSET) def __init__(self, sparkSession=None, name=None, randomSeedMethod=None, rows=1000000, startingId=0, randomSeed=None, partitions=None, verbose=False, @@ -185,7 +185,7 @@ def _checkSparkVersion(cls, sparkVersion, minSparkVersion): if sparkVersionInfo < minSparkVersion: logging.warning(f"*** Minimum version of Python supported is {minSparkVersion} - found version %s ", - sparkVersionInfo ) + sparkVersionInfo) return False return True @@ -809,7 +809,7 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None new_props = {} new_props.update(kwargs) - if type(colType) == str: + if type(colType) == str and colType != INFER_DATATYPE: colType = SchemaParser.columnTypeFromString(colType) self.logger.info("effective range: %s, %s, %s args: %s", minValue, maxValue, step, kwargs) @@ -830,8 +830,7 @@ def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, we'll mark that the build plan needs to be regenerated. For our purposes, the build plan determines the order of column generation etc. - :returns: modified in-place instance of test data generator allowing for chaining of calls - following Builder pattern + :returns: Newly added column_spec """ if colType is None: colType = self.getColumnType(baseColumn) @@ -891,7 +890,7 @@ def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, # mark that the build plan needs to be regenerated self._markForPlanRegen() - return self + return column_spec def _getBaseDataFrame(self, startId=0, streaming=False, options=None): """ generate the base data frame and seed column (which defaults to `id`) , partitioning the data if necessary @@ -972,7 +971,7 @@ def _computeColumnBuildOrder(self): self.logger.info("columnBuildOrder: %s", str(self._buildOrder)) - self._buildOrder = self._adjustBuildOrderForSqlDependencies(self._buildOrder, self._columnSpecsByName) + self._buildOrder = self._adjustBuildOrderForSqlDependencies(self._buildOrder, self._columnSpecsByName) return self._buildOrder diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 52283f1b..495e9497 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -421,13 +421,7 @@ def test_inferred_column_types1(self, setupLogging): browserver = ['Version 113.0.5672.126'] osver = ['13.3.1 (22E261)'] - dataspec = ( - dg.DataGenerator(spark, name="device_data_set", rows=10000, - partitions=8 - , randomSeedMethod='hash_fieldname') - ) - - logging.info(dataspec.partitions) + dataspec = dg.DataGenerator(spark, name="device_data_set", rows=10000, randomSeedMethod='hash_fieldname') dataspec = (dataspec # Design @@ -662,4 +656,50 @@ def test_inferred_column_types2(self, setupLogging): df = df_spec.build() df.show() + def test_inferred_with_schema(self): + """Test use of schema""" + schema = StructType([ + StructField("region_id", IntegerType(), True), + StructField("region_cd", StringType(), True), + StructField("c", StringType(), True), + StructField("c1", StringType(), True), + StructField("state1", StringType(), True), + StructField("state2", StringType(), True), + StructField("st_desc", StringType(), True), + + ]) + + testDataSpec = (dg.DataGenerator(spark, name="test_data_set1", rows=10000) + .withSchema(schema) + ) + + with pytest.raises(ValueError): + testDataSpec2 = testDataSpec.withColumnSpecs(matchTypes=[dg.INFER_DATATYPE], minValue=0, maxValue=100) + df = testDataSpec2.build() + df.show() + + def test_inferred_column_types3(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", dg.INFER_DATATYPE, expr="code1 + code2") + .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) + .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + ) + + columnSpec1 = df_spec.getColumnType("code1") + assert columnSpec1.infertype is False + + columnSpec2 = df_spec.getColumnType("code3") + assert columnSpec3.infertype is True + + + + + From 61e1044ebc64b2bfbd1c0a45e1d944b0f0f4c035 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Mon, 26 Jun 2023 14:43:53 -0700 Subject: [PATCH 05/16] wip --- dbldatagen/column_generation_spec.py | 12 +- dbldatagen/data_generator.py | 128 +++++++- tests/test_complex_columns.py | 455 ++++++++++++--------------- 3 files changed, 330 insertions(+), 265 deletions(-) diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index 81deba1c..e952b1b3 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -27,11 +27,13 @@ from .nrange import NRange from .text_generators import TemplateGenerator from .utils import ensure, coalesce_values +from .schema_parser import SchemaParser HASH_COMPUTE_METHOD = "hash" VALUES_COMPUTE_METHOD = "values" RAW_VALUES_COMPUTE_METHOD = "raw_values" AUTO_COMPUTE_METHOD = "auto" +EXPR_OPTION = "expr" COMPUTE_METHOD_VALID_VALUES = [HASH_COMPUTE_METHOD, AUTO_COMPUTE_METHOD, VALUES_COMPUTE_METHOD, @@ -107,14 +109,18 @@ def __init__(self, name, colType=None, minValue=0, maxValue=None, step=1, prefix # set up default range and type for column self._dataRange = NRange(None, None, None) # by default range of values for column is unconstrained + self._inferDataType = False if colType is None: # default to integer field if none specified colType = IntegerType() - self._inferDataType = False elif colType == INFER_DATATYPE: colType = StringType() # default inferred data type to string until exact type is known self._inferDataType = True - else: - self._inferDataType = False + + if EXPR_OPTION not in kwargs: + raise ValueError(f"Column generation spec must have `expr` attribute specified if datatype is inferred") + + elif type(colType) == str and colType != INFER_DATATYPE: + colType = SchemaParser.columnTypeFromString(colType) assert isinstance(colType, DataType), f"colType `{colType}` is not instance of DataType" diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index df800344..d455e190 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -491,12 +491,23 @@ def schema(self): """ infer spark output schema definition from the field specifications :returns: Spark SQL `StructType` for schema + + ..note:: + If the data generation specification contains columns for which the datatype is inferred, the schema type + for inferred columns may not be correct until the build command has completed. + """ return StructType(self.schemaFields) @property def inferredSchema(self): - """ infer spark interim schema definition from the field specifications""" + """ infer spark interim schema definition from the field specifications + + ..note:: + If the data generation specification contains columns for which the datatype is inferred, the schema type + for inferred columns may not be correct until the build command has completed. + + """ self._checkFieldList() return StructType(self._inferredSchemaFields) @@ -773,10 +784,14 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None datatype derived from the base columns. If the value ``INFER_DATATYPE`` is used for the ``colType`` parameter and a SQL expression has been supplied - via the ``expr`` parameter, the method will try to infer the column datatype from the SQL expression. + via the ``expr`` parameter, the method will try to infer the column datatype from the SQL expression when + the ``build()`` method is called. Inferred data types can only be used if the ``expr`` parameter is specified. + Note that properties which return a schema based on the specification may not be accurate until the + ``build()`` method is called. Prior to this, the schema may indicate a default column type for those fields. + You may also add a variety of additional options to further control the test data generation process. For full list of options, see :doc:`/reference/api/dbldatagen.column_spec_options`. @@ -809,19 +824,114 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None new_props = {} new_props.update(kwargs) - if type(colType) == str and colType != INFER_DATATYPE: - colType = SchemaParser.columnTypeFromString(colType) + # if type(colType) == str and colType != INFER_DATATYPE: + # colType = SchemaParser.columnTypeFromString(colType) self.logger.info("effective range: %s, %s, %s args: %s", minValue, maxValue, step, kwargs) self.logger.info("adding column - `%s` with baseColumn : `%s`, implicit : %s , omit %s", colName, baseColumn, implicit, omit) - self._generateColumnDefinition(colName, colType, minValue=minValue, maxValue=maxValue, - step=step, prefix=prefix, random=random, - distribution=distribution, baseColumn=baseColumn, dataRange=dataRange, - implicit=implicit, omit=omit, **new_props) - self._inferredSchemaFields.append(StructField(colName, colType, nullable)) + newColumn = self._generateColumnDefinition(colName, colType, minValue=minValue, maxValue=maxValue, + step=step, prefix=prefix, random=random, + distribution=distribution, baseColumn=baseColumn, + dataRange=dataRange, + implicit=implicit, omit=omit, **new_props) + + # note for inferred columns, the column type is initially sey to a StringType but may be superceded later + self._inferredSchemaFields.append(StructField(colName, newColumn.datatype, nullable)) return self + def _mkSqlStructFromList(self, fields): + """ + Create a SQL struct expression from a list of fields + + :param fields: a list of elements that make up the SQL struct expression (each being a string or tuple) + :returns: SQL expression to generate the struct + + .. note:: + This method is used internally when creating struct columns. It is not intended for general use. + + Each element of the list may be a simple string, or a tuple. + When the element is specified as a simple string, it must be the name of a previously defined column which + will be used as both the field name within the struct and the SQL expression to generate the field value. + + When the element is specified as a tuple, it must be a tuple of two elements. The first element must be the + name of the field within the struct. The second element must be a SQL expression that will be used to generate + the field value, and may reference previously defined columns. + """ + assert fields is not None and isinstance(fields, list), \ + "Fields must be a non-empty list of fields that make up the struct elements" + assert len(fields) >= 1, "Fields must be a non-empty list of fields that make up the struct elements" + + struct_expressions = [] + + for fieldSpec in fields: + if isinstance(fieldSpec, str): + struct_expressions.append(f"'{fieldSpec}'") + struct_expressions.append(fieldSpec) + elif isinstance(fieldSpec, tuple): + assert len(fieldSpec) == 2, "tuple must be field name and SQL expression strings" + assert isinstance(fieldSpec[0], str), "First element must be field name string" + assert isinstance(fieldSpec[1], str), "Second element must be field value SQL string" + struct_expressions.append(f"'{fieldSpec[0]}'") + struct_expressions.append(fieldSpec[1]) + + struct_expression = f"named_struct({','.join(struct_expressions)})" + return struct_expression + + def _mkStructFromDict(self,fields): + assert fields is not None and isinstance(fields, dict), \ + "Fields must be a non-empty dict of fields that make up the struct elements" + struct_expressions = [] + + for key, value in fields.items(): + struct_expressions.append(f"'{key}'") + if isinstance(value, str): + struct_expressions.append(str(value)) + elif isinstance(value, dict): + struct_expressions.append(self._mkStructFromDict(value)) + elif isinstance(value, list): + array_expressions = ",".join([str(x) for x in value]) + struct_expressions.append(f"array({array_expressions})") + else: + raise ValueError(f"Invalid field element for field `{key}`") + + struct_expression = f"named_struct({','.join(struct_expressions)})" + return struct_expression + + def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): + """ + Add a struct column to the synthetic data generation specification. This will add a new column composed of + a struct of the specified fields. + + :param colName: name of column + :param fields: list of fields to compose as a struct valued column + :param asJson: If False, generate a struct valued column. If True, generate a JSON string column + :return: A modified in-place instance of data generator allowing for chaining of calls + following the Builder pattern + + .. note:: + Additional options for the field specification may be specified as keyword arguments. + + """ + assert fields is not None and type(fields) is list and len(fields) > 0, \ + "Must specify at least one field for struct column" + assert type(colName) is str and len(colName) > 0, "Must specify a column name" + + if isinstance(fields, list): + return self.withColumn(colName, INFER_DATATYPE, expr=self._mkSqlStructFromList(fields), **kwargs) + elif isinstance(fields, dict): + return self.withColumn(colName, INFER_DATATYPE, expr=self._mkStructFromDict(fields), **kwargs) + + for fld in fields: + assert fld in self.getInferredColumnNames(), f"Field `{fld}` not found in column specs" + + fieldExprs = [f"'{fld}', fld" for fld in l] + outputExpr = f"named_struct({','.join(fieldExprs)})" + if asJson: + outputExpr = f"to_json({outputExpr})" + + return self.withColumn(colName, INFER_DATATYPE, expr=outputExpr, baseColumn=fields, **kwargs) + def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, implicit=False, omit=False, nullable=True, **kwargs): """ generate field definition and column spec diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 495e9497..eb6246ce 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -3,7 +3,7 @@ import pytest from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, MapType, \ - BinaryType, LongType + BinaryType, LongType, DateType import dbldatagen as dg @@ -22,6 +22,15 @@ class TestComplexColumns: row_count = 1000 column_count = 10 + @staticmethod + def getFieldType(schema, fieldName): + fields = [fld for fld in schema.fields if fld.name == fieldName] + + if fields is not None and len(fields) > 0: + return fields[0].dataType + else: + return None + @pytest.mark.parametrize("complexFieldType, expectedType, invalidValueCondition", [("array", ArrayType(IntegerType()), "complex_field is not Null"), ("array>", ArrayType(ArrayType(StringType())), "complex_field is not Null"), @@ -397,248 +406,7 @@ def test_map_values(self): for r in rows: assert r['test'] is not None - def test_inferred_column_types1(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - import dbldatagen as dg - - from pyspark.sql.types import LongType, FloatType, IntegerType, \ - StringType, DoubleType, BooleanType, ShortType, \ - TimestampType, DateType, DecimalType, ByteType, \ - BinaryType, ArrayType, MapType, StructType, StructField - - from collections import namedtuple - import dbldatagen as dg - from dbldatagen import INFER_DATATYPE - import logging - - os = ['MacOS', 'Linux', 'Windows', 'iOS', 'Andoid'] - os_edition = ['RHEL 5.0', 'Windows 7', 'Windows 10', 'Windows XP', 'Mac OS X 10.8'] - linux_distro = ['Ubuntu', 'Fedora', 'Kubuntu', 'Arch', 'Alpine'] - boolean_values = ['True'] - dot_net_version = ['.NET 8.0', '.NET 7.0', '.NET 6.0', '.NET Core 3.0'] - browser_label = ['Chrome', 'Edge', 'Firefox', 'Safari'] - browserver = ['Version 113.0.5672.126'] - osver = ['13.3.1 (22E261)'] - - dataspec = dg.DataGenerator(spark, name="device_data_set", rows=10000, randomSeedMethod='hash_fieldname') - - dataspec = (dataspec - # Design - # - # v2.0 - updated so taht script outputs JSON only to mimic flow from Cisco edge - - # Reporting Identifiers - .withColumn("dd_id", "long", minValue=1, maxValue=100000, random=True, omit=True) - .withColumn("internal_device_id", "integer", baseColumnType="hash", - omit=True) # internal id is hash of `id` column - .withColumn("duo_device_id", "string", format="0x%010x", baseColumnType="values", - baseColumn="internal_device_id") # format hash value as string - .withColumn("org_id", "integer", minValue=1, maxValue=100000, random=True, omit=True) - .withColumn("user_id", "integer", minValue=1000000, maxValue=9999999, random=True, omit=True) - .withColumn("anyconnect_id", "string", format="0x%015x", minValue=1, maxValue=1000000, - random=True, omit=True) - .withColumn("ztna_device_id", "string", format="0x%015x", minValue=1, maxValue=1000000, - random=True, omit=True) - .withColumn("reporting_identifiers", INFER_DATATYPE, - expr="named_struct('org_id', org_id ,'dd_id', dd_id ,'user_id',user_id,'anyconnect_id',anyconnect_id,'duo_device_id',duo_device_id,'ztna_device_id',ztna_device_id)", - baseColumn=['org_id', 'dd_id', 'user_id', 'anyconnect_id', 'duo_device_id', - 'ztna_device_id'] - , omit=True) - - .withColumn("txid", "string", template=r'\XXX-XXXXXXXXXXXXXXX', random=True, - omit=True) - .withColumn("duo_client_version", "string", format="1.0.%x", minValue=1, maxValue=9, - random=True, omit=True) - .withColumn("os", "string", values=os, random=True, omit=True) - .withColumn("os_version", "string", values=osver, omit=True) - # .withColumn("os_version", "string", format="10.10.%x", minValue=1, maxValue=9, random=True, omit=True) - .withColumn("os_build", "string", format="19042.%x", minValue=1000, maxValue=9999, random=True, - omit=True) - .withColumn("os_edition", "string", values=os_edition, random=True, omit=True) - .withColumn("linux_distro", "string", values=linux_distro, random=True, omit=True) - .withColumn("device_name", "string", template=r'\W', random=True, omit=True) - .withColumn("is_password_set", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("is_encryption_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("is_firewall_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("is_screen_lock_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("is_auto_login_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("seconds_before_screen_lock", "integer", minValue=1, maxValue=60, random=True, - omit=True) - - .withColumn("is_active", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("product_label", "string", template=r'\W', random=True, omit=True) - .withColumn("version", "string", template=r'\W', random=True, omit=True) - .withColumn("instance_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX-\XXXXXX', random=True, - omit=True) - - # struct - .withColumn("security_agents", INFER_DATATYPE, - expr="named_struct('is_active', is_active, 'product_label', product_label,'version', version, 'instance_guid', instance_guid)", - baseColumn=['is_active', 'product_label', 'version', 'instance_guid'] - , omit=True) - - # .withColumn("security_agents", "struct", - # expr="named_struct('is_active', is_active, 'product_label', product_label,'version', version, 'instance_guid', instance_guid)", - # baseColumn=['is_active', 'product_label','version','instance_guid'] - # ,omit=True) - - .withColumn("communication_scheme", "string", template=r'\W', random=True, omit=True) - .withColumn("health_check_end_timestamp", "string", percentNulls=1, omit=True) - .withColumn("health_check_start_timestamp", "string", percentNulls=1, omit=True) - .withColumn("health_check_length_millis", "string", percentNulls=1, omit=True) - .withColumn("browser_label", "string", percentNulls=1, omit=True) - # .withColumn("browser_version", "string", percentNulls=1, omit=True) - .withColumn("browser_version", "string", values=browserver, omit=True) - .withColumn("browser_process_name", "string", percentNulls=1, omit=True) - - .withColumn("valid", "string", percentNulls=1, omit=True) - .withColumn("common_name", "string", percentNulls=1, omit=True) - .withColumn("team_id", "string", percentNulls=1, omit=True) - .withColumn("browser_process_info", INFER_DATATYPE, - expr="named_struct('valid', valid, 'common_name', common_name,'team_id', team_id)", - baseColumn=['valid', 'common_name', 'team_id'] - , omit=True) - - .withColumn("is_virtual_machine", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("is_health_check_retrying", "string", percentNulls=1, omit=True) - .withColumn("domain_name", "string", template=r'www.\w.com|\w.\w.co.u\k', omit=True) - .withColumn("host_name", "string", template=r'\w', omit=True) - .withColumn("dot_net_version", "string", values=dot_net_version, random=True, omit=True) - .withColumn("wifi_fingerprint", "string", template=r'\XXX-XXXXXXXXXXXXXXX', random=True, - omit=True) - .withColumn("wifi_fingerprint_age_seconds", "integer", minValue=1, maxValue=720, random=True, - omit=True) - .withColumn("wifi_fingerprint_includes_bssid", "boolean", values=boolean_values, random=True, - omit=True) - - .withColumn("desktop_session_token", "string", template=r'\XXX-XXXXXXXXXXXX-XXX', random=True, - omit=True) - .withColumn("desktop_session", StructType([StructField('desktop_session_token', StringType())]), - expr="named_struct('desktop_session_token', desktop_session_token)", - baseColumn=['desktop_session_token'] - , omit=True) - - .withColumn("machine_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX', random=True, - omit=True) - .withColumn("hardware_uuid", "string", template=r'\XXXXXXXX-\XXXX-\XXXX-\XXXX-\XXXXXXXXXXXX', - random=True, omit=True) - .withColumn("domain_sid", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, - omit=True) - .withColumn("computer_sid", "string", format="0x%010x", minValue=1, maxValue=1000000, - random=True, omit=True) - .withColumn("intune_id", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, - omit=True) - .withColumn("amp_guid", "string", template=r'\XXXXXX-\XXXXXX-\XXXXXX', random=True, omit=True) - .withColumn("omadm_device_client_id", "string", format="0x%010x", minValue=1, maxValue=1000000, - random=True, omit=True) - .withColumn("cpu_id", "string", format="0x%010x", minValue=1, maxValue=1000000, random=True, - omit=True) - .withColumn("identifiers", INFER_DATATYPE, - expr="named_struct('machine_guid', machine_guid, 'hardware_uuid', hardware_uuid,'domain_sid', domain_sid, \ - 'computer_sid', computer_sid, 'intune_id', intune_id, 'amp_guid', amp_guid, 'omadm_device_client_id', omadm_device_client_id, 'cpu_id',cpu_id )", - baseColumn=['machine_guid', 'hardware_uuid', 'domain_sid', 'computer_sid'] - , omit=True) - - .withColumn("mdm_id_collection_errors", "string", format="0x%010x", minValue=1, - maxValue=1000000, random=True, omit=True) - - .withColumn("password_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("device_name_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("os_version_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("biometrics_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("encryption_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("firewall_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("security_agents_total_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("lock_screen_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("virtual_machine_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("auto_login_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("domain_name_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("host_name_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("machine_guid_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("domain_sid_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("compute_sid_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("intune_id_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("amp_guid_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("omadm_device_client_id_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("desktop_session_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("signature_validation_fl", FloatType(), percentNulls=1, omit=True) - .withColumn("individual_health_check_durations_millis", - INFER_DATATYPE, - expr="named_struct('password_fl', password_fl, 'device_name_fl', device_name_fl,'os_version_fl', os_version_fl,'biometrics_fl', biometrics_fl, 'encryption_fl', encryption_fl, 'firewall_fl', firewall_fl, 'security_agents_total_fl', security_agents_total_fl, 'lock_screen_fl',lock_screen_fl, 'virtual_machine_fl',virtual_machine_fl,'auto_login_fl',auto_login_fl,'domain_name_fl',domain_name_fl,'host_name_fl',host_name_fl,'machine_guid_fl',machine_guid_fl,'domain_sid_fl',domain_sid_fl,'compute_sid_fl',compute_sid_fl,'intune_id_fl',intune_id_fl,'amp_guid_fl',amp_guid_fl,'omadm_device_client_id_fl',omadm_device_client_id_fl,'desktop_session_fl',desktop_session_fl,'signature_validation_fl',signature_validation_fl)", - baseColumn=['password_fl', 'device_name_fl', 'os_version', 'biometrics_fl', - 'encryption_fl', 'firewall_fl', 'security_agents_total_fl', - 'lock_screen_fl', 'virtual_machine_fl', 'auto_login_fl', 'domain_name_fl', - 'host_name_fl', 'machine_guid_fl', 'domain_sid_fl', 'compute_sid_fl', - 'intune_id_fl', 'amp_guid_fl', 'omadm_device_client_id_fl', - 'desktop_session_fl', 'signature_validation_fl'] - , omit=True) - - .withColumn("app_updates_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("auto_launch_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("diagnostic_log_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("desktop_sessions_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("user_configuration", INFER_DATATYPE, - expr="named_struct('app_updates_enabled', app_updates_enabled, 'auto_launch_enabled', auto_launch_enabled,'diagnostic_log_enabled', diagnostic_log_enabled, 'desktop_sessions_enabled',desktop_sessions_enabled)", - baseColumn=['app_updates_enabled', 'auto_launch_enabled', 'diagnostic_log_enabled', - 'desktop_sessions_enabled'] - , omit=True) - - .withColumn("app_updates_enabled_sys", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("row_app_hidden", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("row_password_hidden", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("row_encryption_hidden", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("row_firewall_hidden", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("row_updates_hidden", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("automatic_updates_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("updater_disabled_by_admin", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("system_configuration", - INFER_DATATYPE, - expr="named_struct('app_updates_enabled_sys', app_updates_enabled_sys, 'row_app_hidden', row_app_hidden,'row_password_hidden', row_password_hidden, 'row_encryption_hidden',row_encryption_hidden, 'row_firewall_hidden', row_firewall_hidden,'row_updates_hidden',row_updates_hidden, 'automatic_updates_enabled',automatic_updates_enabled,'updater_disabled_by_admin',updater_disabled_by_admin )", - baseColumn=['app_updates_enabled_sys', 'row_app_hidden', 'row_password_hidden', - 'row_encryption_hidden', 'row_firewall_hidden', 'row_updates_hidden', - 'automatic_updates_enabled', 'updater_disabled_by_admin'] - , omit=True) - - .withColumn("feature_enabled", "boolean", values=boolean_values, random=True, omit=True) - .withColumn("connected_hostnames", "string", template=r'\w', omit=True) - .withColumn("duoconnect", INFER_DATATYPE, - expr="named_struct('feature_enabled', feature_enabled, 'connected_hostnames', connected_hostnames)", - baseColumn=['feature_enabled', 'connected_hostnames'] - , omit=True) - - .withColumn("supports_hardware_security", "boolean", values=boolean_values, random=True, omit=True) - - # Posture payload - .withColumn("posture_payload", INFER_DATATYPE, - expr="named_struct('reporting_identifiers', reporting_identifiers,'txid', txid ,'duo_client_version', duo_client_version ,'os', os,'os_version', os_version ,'os_build', os_build ,'os_edition', os_edition,'linux_distro', linux_distro ,'device_name', device_name ,'is_password_set', is_password_set,'is_encryption_enabled', is_encryption_enabled ,'is_firewall_enabled', is_firewall_enabled ,'is_screen_lock_enabled',is_screen_lock_enabled,'is_auto_login_enabled', is_auto_login_enabled ,'seconds_before_screen_lock', seconds_before_screen_lock, 'security_agents', security_agents, 'communication_scheme',communication_scheme,'health_check_end_timestamp',health_check_end_timestamp,'health_check_start_timestamp',health_check_start_timestamp,'health_check_length_millis', health_check_length_millis,'browser_label', browser_label,'browser_version',browser_version, 'browser_process_name', browser_process_name, 'browser_process_info',browser_process_info, 'is_virtual_machine',is_virtual_machine, 'is_health_check_retrying', is_health_check_retrying,'domain_name',domain_name, 'host_name',host_name, 'dot_net_version', dot_net_version, 'wifi_fingerprint', wifi_fingerprint,'wifi_fingerprint_age_seconds',wifi_fingerprint_age_seconds,'wifi_fingerprint_includes_bssid',wifi_fingerprint_includes_bssid, 'desktop_session', desktop_session, 'identifiers',identifiers, 'mdm_id_collection_errors',mdm_id_collection_errors, 'individual_health_check_durations_millis',individual_health_check_durations_millis, 'user_configuration' , user_configuration, 'system_configuration', system_configuration, 'duoconnect',duoconnect, 'supports_hardware_security', supports_hardware_security )", - baseColumn=['reporting_identifiers', 'txid', 'duo_client_version', 'os', 'os_version', - 'os_build', 'os_edition', 'linux_distro', 'device_name', 'is_password_set', - 'is_encryption_enabled', 'is_firewall_enabled', 'is_screen_lock_enabled', - 'is_auto_login_enabled', 'seconds_before_screen_lock', 'security_agents', - 'communication_scheme', 'health_check_end_timestamp', - 'health_check_start_timestamp', 'health_check_length_millis', - 'browser_label', 'browser_version', 'browser_process_name', - 'browser_process_info', 'is_virtual_machine', 'is_health_check_retrying', - 'domain_name', 'host_name', 'dot_net_version', 'wifi_fingerprint', - 'wifi_fingerprint_age_seconds', 'wifi_fingerprint_includes_bssid', - 'desktop_session', 'identifiers', 'mdm_id_collection_errors', - 'individual_health_check_durations_millis', 'user_configuration', - 'system_configuration', 'duoconnect', 'supports_hardware_security'] - ) # Close Posture Payload - .withColumn("last_modified", "timestamp", expr="now()") - ) - - dfTestData = dataspec.build() - - print(dfTestData.schema) - - data = dfTestData.limit(10).collect() - - for x in data: - print(x) - - def test_inferred_column_types2(self, setupLogging): + def test_inferred_column_types_disallowed1(self, setupLogging): with pytest.raises(ValueError): column_count = 10 data_rows = 10 * 1000 @@ -654,9 +422,10 @@ def test_inferred_column_types2(self, setupLogging): ) df = df_spec.build() - df.show() - def test_inferred_with_schema(self): + assert df is not None + + def test_inferred_disallowed_with_schema(self): """Test use of schema""" schema = StructType([ StructField("region_id", IntegerType(), True), @@ -678,7 +447,7 @@ def test_inferred_with_schema(self): df = testDataSpec2.build() df.show() - def test_inferred_column_types3(self, setupLogging): + def test_inferred_column_basic(self, setupLogging): column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -687,19 +456,199 @@ def test_inferred_column_types3(self, setupLogging): numColumns=column_count, structType="array") .withColumn("code1", "integer", minValue=100, maxValue=200) .withColumn("code2", "integer", minValue=0, maxValue=10) - .withColumn("code3", dg.INFER_DATATYPE, expr="code1 + code2") - .withColumn("code4", StringType(), values=['a', 'b', 'c'], random=True) - .withColumn("code5", StringType(), values=['a', 'b', 'c'], random=True, weights=[9, 1, 1]) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="code1 + code2") + .withColumn("code7", dg.INFER_DATATYPE, expr="concat(code3, code4)") ) - columnSpec1 = df_spec.getColumnType("code1") - assert columnSpec1.infertype is False + columnSpec1 = df_spec.getColumnSpec("code1") + assert columnSpec1.inferDatatype is False + + columnSpec5 = df_spec.getColumnSpec("code5") + assert columnSpec5.inferDatatype is True + + columnSpec6 = df_spec.getColumnSpec("code6") + assert columnSpec6.inferDatatype is True + + columnSpec7 = df_spec.getColumnSpec("code7") + assert columnSpec7.inferDatatype is True + + def test_inferred_column_validate_types(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="code1 + code2") + .withColumn("code7", dg.INFER_DATATYPE, expr="concat(code3, code4)") + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "code5") + assert type1 == DateType() + + type2 = self.getFieldType(df.schema, "code6") + assert type2 == IntegerType() - columnSpec2 = df_spec.getColumnType("code3") - assert columnSpec3.infertype is True + type3 = self.getFieldType(df.schema, "code") + assert type3 == StringType() + def test_inferred_column_structs1(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withColumn("struct1", dg.INFER_DATATYPE, expr="named_struct('a', code1, 'b', code2)") + .withColumn("struct2", dg.INFER_DATATYPE, expr="named_struct('a', code5, 'b', code6)") + ) + + df = df_spec.build() + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + def test_inferred_column_structs2(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withColumn("struct1", dg.INFER_DATATYPE, expr="named_struct('a', code1, 'b', code2)") + .withColumn("struct2", dg.INFER_DATATYPE, expr="named_struct('a', code5, 'b', code6)") + .withColumn("struct3", dg.INFER_DATATYPE, expr="named_struct('a', struct1, 'b', struct2)") + ) + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + type3 = self.getFieldType(df.schema, "struct3") + assert type3 == StructType( + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), + StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) + + def test_with_struct_column1(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=[('a', 'code1'), ('b', 'code2')]) + .withStructColumn("struct2", fields=[('a', 'code5'), ('b', 'code6')]) + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + + def test_inferred_column_structs2(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=[('a', 'code1'), ('b', 'code2')]) + .withStructColumn("struct2", fields=[('a', 'code5'), ('b', 'code6')]) + .withStructColumn("struct3", fields=[('a', 'struct1'), ('b', 'struct2')]) + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + type3 = self.getFieldType(df.schema, "struct3") + assert type3 == StructType( + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), + StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) + + def test_inferred_column_structs3(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=[('a', 'code1'), ('b', 'code2')]) + .withStructColumn("struct2", fields=[('a', 'code5'), ('b', 'code6')]) + .withStructColumn("struct3", + fields={'a': {'a': 'code1', 'b': 'code2'}, 'b': {'a': 'code5', 'b': 'code6'}}) + ) + + df = df_spec.build() + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + type3 = self.getFieldType(df.schema, "struct3") + assert type3 == StructType( + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), + StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) From 713e1bb5802dea0275190f4a7a4fd6bd42741453 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Mon, 26 Jun 2023 15:11:09 -0700 Subject: [PATCH 06/16] wip --- dbldatagen/column_generation_spec.py | 2 +- dbldatagen/data_generator.py | 4 ++-- tests/test_complex_columns.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index e952b1b3..2445195d 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -117,7 +117,7 @@ def __init__(self, name, colType=None, minValue=0, maxValue=None, step=1, prefix self._inferDataType = True if EXPR_OPTION not in kwargs: - raise ValueError(f"Column generation spec must have `expr` attribute specified if datatype is inferred") + raise ValueError("Column generation spec must have `expr` attribute specified if datatype is inferred") elif type(colType) == str and colType != INFER_DATATYPE: colType = SchemaParser.columnTypeFromString(colType) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index d455e190..3de8df69 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -878,7 +878,7 @@ def _mkSqlStructFromList(self, fields): struct_expression = f"named_struct({','.join(struct_expressions)})" return struct_expression - def _mkStructFromDict(self,fields): + def _mkStructFromDict(self, fields): assert fields is not None and isinstance(fields, dict), \ "Fields must be a non-empty dict of fields that make up the struct elements" struct_expressions = [] @@ -925,7 +925,7 @@ def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): for fld in fields: assert fld in self.getInferredColumnNames(), f"Field `{fld}` not found in column specs" - fieldExprs = [f"'{fld}', fld" for fld in l] + fieldExprs = [f"'{fld}', fld" for fld in fields] outputExpr = f"named_struct({','.join(fieldExprs)})" if asJson: outputExpr = f"to_json({outputExpr})" diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index eb6246ce..1d218de8 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -588,7 +588,7 @@ def test_with_struct_column1(self, setupLogging): type2 = self.getFieldType(df.schema, "struct2") assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) - def test_inferred_column_structs2(self, setupLogging): + def test_with_struct_column2(self, setupLogging): column_count = 10 data_rows = 10 * 1000 @@ -620,7 +620,7 @@ def test_inferred_column_structs2(self, setupLogging): [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) - def test_inferred_column_structs3(self, setupLogging): + def test_with_struct_column3(self, setupLogging): column_count = 10 data_rows = 10 * 1000 From 9762f39087e950da9ba6b85e8bb3f2b31d3979b1 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 20:54:26 -0700 Subject: [PATCH 07/16] wip --- dbldatagen/column_generation_spec.py | 32 +++++++++-- dbldatagen/data_generator.py | 7 +++ tests/test_complex_columns.py | 81 ++++++++++++++++++++++++---- 3 files changed, 104 insertions(+), 16 deletions(-) diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index 2445195d..caa77f56 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -119,7 +119,7 @@ def __init__(self, name, colType=None, minValue=0, maxValue=None, step=1, prefix if EXPR_OPTION not in kwargs: raise ValueError("Column generation spec must have `expr` attribute specified if datatype is inferred") - elif type(colType) == str and colType != INFER_DATATYPE: + elif type(colType) == str: colType = SchemaParser.columnTypeFromString(colType) assert isinstance(colType, DataType), f"colType `{colType}` is not instance of DataType" @@ -1048,11 +1048,12 @@ def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=T # TODO: add full support for date value generation if self.expr is not None: # note use of SQL expression ignores range specifications - new_def = expr(self.expr).astype(self.datatype) - - # record execution history + new_def = expr(self.expr) self.executionHistory.append(f".. using SQL expression `{self.expr}` as base") - self.executionHistory.append(f".. casting to `{self.datatype}`") + + if not self._inferDataType: + new_def = new_def.astype(self.datatype) + self.executionHistory.append(f".. casting to `{self.datatype}`") elif type(self.datatype) in [ArrayType, MapType, StructType] and self.values is None: new_def = expr("NULL") elif self._dataRange is not None and self._dataRange.isFullyPopulated(): @@ -1101,6 +1102,24 @@ def _makeSingleGenerationExpression(self, index=None, use_pandas_optimizations=T new_def = self._applyComputePercentNullsExpression(new_def, percent_nulls) return new_def + def _onSelect(self, df): + """ + The _onSelect method is called when the column specifications expression as produced by the + method ``_makeSingleGenerationExpression`` is used in a select statement. + + :param df: Dataframe in which expression is used + :return: nothing + + .. note:: The purpose of this method is to allow for introspection of information such as datatype + which can only be determined when column specifications expression is used. + """ + self.logger.warning(f"_onSelect: Column {self.name} is of type {self.datatype} ") + + if self._inferDataType: + inferred_type = df.schema[self.name].dataType + self.logger.warning("Inferred datatype for column %s as %s", self.name, str(inferred_type)) + self._csOptions.options['type'] = inferred_type + def _applyTextFormatExpression(self, new_def, sformat): # note : # while it seems like this could use a shared instance, this does not work if initialized @@ -1159,6 +1178,9 @@ def _applyFinalCastExpression(self, col_type, new_def): # cast the result to the appropriate type. For dates, cast first to timestamp, then to date if type(col_type) is DateType: new_def = new_def.astype(TimestampType()).astype(col_type) + elif self._inferDataType: + # dont apply cast when column has an inferred data type + pass else: new_def = new_def.astype(col_type) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 3de8df69..b73fa845 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -1256,6 +1256,7 @@ def build(self, withTempView=False, withView=False, withStreaming=False, options return df1 + # noinspection PyProtectedMember def _buildColumnExpressionsWithSelects(self, df1): """ Build column generation expressions with selects @@ -1270,6 +1271,7 @@ def _buildColumnExpressionsWithSelects(self, df1): # are generated resulting in shorter lineage for colNames in self.build_order: build_round = ["*"] + column_specs_applied = [] inx_col = 0 self.executionHistory.append(f"building stage for columns: {colNames}") for colName in colNames: @@ -1285,9 +1287,14 @@ def _buildColumnExpressionsWithSelects(self, df1): i += 1 else: build_round.append(column_generators.alias(colName)) + column_specs_applied.append(col1) inx_col = inx_col + 1 df1 = df1.select(*build_round) + + # apply any post select processing + for cs in column_specs_applied: + cs._onSelect(df1) return df1 def _sqlTypeFromSparkType(self, dt): diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 1d218de8..90b8ea38 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -499,7 +499,7 @@ def test_inferred_column_validate_types(self, setupLogging): type2 = self.getFieldType(df.schema, "code6") assert type2 == IntegerType() - type3 = self.getFieldType(df.schema, "code") + type3 = self.getFieldType(df.schema, "code7") assert type3 == StringType() def test_inferred_column_structs1(self, setupLogging): @@ -525,9 +525,12 @@ def test_inferred_column_structs1(self, setupLogging): df = df_spec.build() type1 = self.getFieldType(df.schema, "struct1") - assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + expectedType = StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) + assert type1 == expectedType + type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + expectedType2 = StructType([StructField('a', DateType(), False), StructField('b', StringType())]) + assert type2 == expectedType2 def test_inferred_column_structs2(self, setupLogging): column_count = 10 @@ -555,11 +558,12 @@ def test_inferred_column_structs2(self, setupLogging): type1 = self.getFieldType(df.schema, "struct1") assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + assert type2 == StructType([StructField('a', DateType(), False), StructField('b', StringType())]) type3 = self.getFieldType(df.schema, "struct3") assert type3 == StructType( - [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), - StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]), False), + StructField('b', StructType([StructField('a', DateType(), False), StructField('b', StringType())]), False)] + ) def test_with_struct_column1(self, setupLogging): column_count = 10 @@ -586,12 +590,66 @@ def test_with_struct_column1(self, setupLogging): type1 = self.getFieldType(df.schema, "struct1") assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + assert type2 == StructType([StructField('a', DateType(), False), StructField('b', StringType())]) def test_with_struct_column2(self, setupLogging): column_count = 10 data_rows = 10 * 1000 + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=['code1', 'code2']) + .withStructColumn("struct2", fields=['code5', 'code6']) + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('code1', IntegerType()), StructField('code2', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('code5', DateType(), False), StructField('code6', StringType())]) + + def test_with_json_struct_column(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=['code1', 'code2'], toJson=True) + .withStructColumn("struct2", fields=['code5', 'code6'], toJson=True) + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + assert type1 == StructType([StructField('code1', IntegerType()), StructField('code2', IntegerType())]) + type2 = self.getFieldType(df.schema, "struct2") + assert type2 == StructType([StructField('code5', DateType(), False), StructField('code6', StringType())]) + + def test_with_struct_column3(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -614,13 +672,14 @@ def test_with_struct_column2(self, setupLogging): type1 = self.getFieldType(df.schema, "struct1") assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + assert type2 == StructType([StructField('a', DateType(), False), StructField('b', StringType())]) type3 = self.getFieldType(df.schema, "struct3") assert type3 == StructType( - [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), - StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]), False), + StructField('b', StructType([StructField('a', DateType(), False), StructField('b', StringType())]), + False)]) - def test_with_struct_column3(self, setupLogging): + def test_with_struct_column4(self, setupLogging): column_count = 10 data_rows = 10 * 1000 From c5b3da6c3c1319bb18e3fd3bd4bd2ad7c7bdcf7f Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 21:47:01 -0700 Subject: [PATCH 08/16] updates and fixes to unit tests --- dbldatagen/column_generation_spec.py | 4 +--- dbldatagen/data_generator.py | 30 ++++++++++++++++++---------- docs/source/generating_json_data.rst | 16 ++++++++++----- tests/test_complex_columns.py | 16 ++++++++------- 4 files changed, 41 insertions(+), 25 deletions(-) diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index caa77f56..db535ce3 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -1113,11 +1113,9 @@ def _onSelect(self, df): .. note:: The purpose of this method is to allow for introspection of information such as datatype which can only be determined when column specifications expression is used. """ - self.logger.warning(f"_onSelect: Column {self.name} is of type {self.datatype} ") - if self._inferDataType: inferred_type = df.schema[self.name].dataType - self.logger.warning("Inferred datatype for column %s as %s", self.name, str(inferred_type)) + self.logger.info("Inferred datatype for column %s as %s", self.name, str(inferred_type)) self._csOptions.options['type'] = inferred_type def _applyTextFormatExpression(self, new_def, sformat): diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index b73fa845..fee72e2f 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -912,25 +912,35 @@ def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): .. note:: Additional options for the field specification may be specified as keyword arguments. + The field specification may be : + - a list of field references (strings) which will be used as both the field name and the SQL expression + - a list of tuples of the form (field_name, field_expression) where field_name is the name of the field + - a Python dict outlining the structure of the struct column. The keys of the dict are the field names + + When using the ``struct`` form of the field specifications, a field whose value is a list will be treated + as creating a SQL array literal. + """ - assert fields is not None and type(fields) is list and len(fields) > 0, \ + assert fields is not None and isinstance(fields, (list, dict)), \ "Must specify at least one field for struct column" assert type(colName) is str and len(colName) > 0, "Must specify a column name" if isinstance(fields, list): - return self.withColumn(colName, INFER_DATATYPE, expr=self._mkSqlStructFromList(fields), **kwargs) + assert len(fields) > 0, \ + "Must specify at least one field for struct column" + struct_expr = self._mkSqlStructFromList(fields) elif isinstance(fields, dict): - return self.withColumn(colName, INFER_DATATYPE, expr=self._mkStructFromDict(fields), **kwargs) - - for fld in fields: - assert fld in self.getInferredColumnNames(), f"Field `{fld}` not found in column specs" + struct_expr = self._mkStructFromDict(fields) + else: + raise ValueError(f"Invalid field specification for struct column `{colName}`") - fieldExprs = [f"'{fld}', fld" for fld in fields] - outputExpr = f"named_struct({','.join(fieldExprs)})" if asJson: - outputExpr = f"to_json({outputExpr})" + output_expr = f"to_json({struct_expr})" + newDf = self.withColumn(colName, StringType(), expr=output_expr, **kwargs) + else: + newDf = self.withColumn(colName, INFER_DATATYPE, expr=struct_expr, **kwargs) - return self.withColumn(colName, INFER_DATATYPE, expr=outputExpr, baseColumn=fields, **kwargs) + return newDf def _generateColumnDefinition(self, colName, colType=None, baseColumn=None, implicit=False, omit=False, nullable=True, **kwargs): diff --git a/docs/source/generating_json_data.rst b/docs/source/generating_json_data.rst index f33a55d8..0c830e74 100644 --- a/docs/source/generating_json_data.rst +++ b/docs/source/generating_json_data.rst @@ -178,17 +178,18 @@ written as: expr="named_struct('event_type', event_type, 'event_ts', event_ts)", baseColumn=['event_type', 'event_ts']) - To simplify the specification of struct valued columns, the keyword "__infer__" can be used in place of the datatype -when the `expr` attribute is specified. This will cause the datatype to be inferred from the expression. + To simplify the specification of struct valued columns, the defined value of `INFER_DATATYPE` can be used in place of +the datatype when the `expr` attribute is specified. This will cause the datatype to be inferred from the expression. In this case, the previous code would be written as follows: .. code-block:: python .withColumn("event_info", - "__infer__", - expr="named_struct('event_type', event_type, 'event_ts', event_ts)", - baseColumn=['event_type', 'event_ts']) + dg.INFER_DATATYPE, + expr="named_struct('event_type', event_type, 'event_ts', event_ts)") + +The helper method ``withStructColumn`` can also be used to simplify the specification of struct valued columns. Generating JSON valued fields ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -267,6 +268,11 @@ functions such as `named_struct` and `to_json`. #dfTestData.write.format("json").mode("overwrite").save("/tmp/jsonData2") display(dfTestData) +The helper method ``withStructColumn`` in the DataGenerator class can also be used to simplify the specification +of struct valued columns. When the argument ``asJson`` is set to ``True``, the resulting structure +will be transformed to JSON. + + Generating complex column data ------------------------------ There are several methods for columns with arrays, structs and maps. diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 90b8ea38..453405bc 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -635,16 +635,17 @@ def test_with_json_struct_column(self, setupLogging): .withColumn("code4", "string", values=['one', 'two', 'three']) .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") - .withStructColumn("struct1", fields=['code1', 'code2'], toJson=True) - .withStructColumn("struct2", fields=['code5', 'code6'], toJson=True) + .withStructColumn("struct1", fields=['code1', 'code2'], asJson=True) + .withStructColumn("struct2", fields=['code5', 'code6'], asJson=True) ) df = df_spec.build() type1 = self.getFieldType(df.schema, "struct1") - assert type1 == StructType([StructField('code1', IntegerType()), StructField('code2', IntegerType())]) + print("type1", type1) + assert type1 == StringType() type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('code5', DateType(), False), StructField('code6', StringType())]) + assert type2 == StringType() def test_with_struct_column3(self, setupLogging): column_count = 10 @@ -706,8 +707,9 @@ def test_with_struct_column4(self, setupLogging): type1 = self.getFieldType(df.schema, "struct1") assert type1 == StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]) type2 = self.getFieldType(df.schema, "struct2") - assert type2 == StructType([StructField('a', DateType()), StructField('b', StringType())]) + assert type2 == StructType([StructField('a', DateType(), False), StructField('b', StringType())]) type3 = self.getFieldType(df.schema, "struct3") assert type3 == StructType( - [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())])), - StructField('b', StructType([StructField('a', DateType()), StructField('b', StringType())]))]) + [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]), False), + StructField('b', StructType([StructField('a', DateType(), False), StructField('b', StringType())]), + False)]) From 7a1cfe06f4400f991b4d8d9e3fe22306d375d759 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 21:53:16 -0700 Subject: [PATCH 09/16] wip --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34794e1e..d108fcf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ All notable changes to the Databricks Labs Data Generator will be documented in * Added formatting of generated code as Html for script methods * Allow use of inferred types on `withColumn` method when `expr` attribute is used +#### Changed +* Added ``withStructColumn`` method to allow simplified generation of struct and JSON columns + + ### Version 0.3.4 Post 3 ### Changed From 72db0b25f0727c504cb9df26adc512bee94dbffa Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 22:26:29 -0700 Subject: [PATCH 10/16] updated pipfile due to upstream changes in pipenv --- Pipfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Pipfile b/Pipfile index ee4d7609..5b3807b4 100644 --- a/Pipfile +++ b/Pipfile @@ -8,9 +8,9 @@ pytest = "*" pytest-cov = "*" sphinx = ">=2.0.0,<3.1.0" nbsphinx = "*" -numpydoc = "0.8" +numpydoc = "==0.8.0" pypandoc = "*" -ipython = "7.31.1" +ipython = "==7.31.1" pydata-sphinx-theme = "*" recommonmark = "*" sphinx-markdown-builder = "*" From 0ccd114acc547284b62a551dd62c8f9bc2263d10 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 22:54:04 -0700 Subject: [PATCH 11/16] additional tests --- tests/test_complex_columns.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 453405bc..9369ecf3 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -647,6 +647,36 @@ def test_with_json_struct_column(self, setupLogging): type2 = self.getFieldType(df.schema, "struct2") assert type2 == StringType() + def test_with_json_struct_column2(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + column_count = 10 + data_rows = 10 * 1000 + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields={'codes': ["code6", "code6"]}, asJson=True) + .withStructColumn("struct2", fields=['code5', 'code6'], asJson=True) + ) + + df = df_spec.build() + + type1 = self.getFieldType(df.schema, "struct1") + print("type1", type1) + assert type1 == StringType() + + type2 = df_spec.inferredSchema["struct1"].dataType + assert type2 == StringType() + + def test_with_struct_column3(self, setupLogging): column_count = 10 data_rows = 10 * 1000 From 1cb14c84809c85bcbbeb45944c92ac76cef32c10 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 23:00:29 -0700 Subject: [PATCH 12/16] wip --- tests/test_complex_columns.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 9369ecf3..02dfd53f 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -676,7 +676,6 @@ def test_with_json_struct_column2(self, setupLogging): type2 = df_spec.inferredSchema["struct1"].dataType assert type2 == StringType() - def test_with_struct_column3(self, setupLogging): column_count = 10 data_rows = 10 * 1000 From 03c823fec185af6d15fe5d660453a6674417e264 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 23:17:48 -0700 Subject: [PATCH 13/16] wip --- dbldatagen/data_generator.py | 2 +- tests/test_complex_columns.py | 65 ++++++++++++++++++++++------------- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index fee72e2f..1045e0e7 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -922,7 +922,7 @@ def withStructColumn(self, colName, fields=None, asJson=False, **kwargs): """ assert fields is not None and isinstance(fields, (list, dict)), \ - "Must specify at least one field for struct column" + "Fields argument must be a list of field specifications or dict outlining the target structure " assert type(colName) is str and len(colName) > 0, "Must specify a column name" if isinstance(fields, list): diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 02dfd53f..27b35e8c 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -503,9 +503,6 @@ def test_inferred_column_validate_types(self, setupLogging): assert type3 == StringType() def test_inferred_column_structs1(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -533,9 +530,6 @@ def test_inferred_column_structs1(self, setupLogging): assert type2 == expectedType2 def test_inferred_column_structs2(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -566,9 +560,6 @@ def test_inferred_column_structs2(self, setupLogging): ) def test_with_struct_column1(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -593,9 +584,6 @@ def test_with_struct_column1(self, setupLogging): assert type2 == StructType([StructField('a', DateType(), False), StructField('b', StringType())]) def test_with_struct_column2(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -620,9 +608,6 @@ def test_with_struct_column2(self, setupLogging): assert type2 == StructType([StructField('code5', DateType(), False), StructField('code6', StringType())]) def test_with_json_struct_column(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -648,9 +633,6 @@ def test_with_json_struct_column(self, setupLogging): assert type2 == StringType() def test_with_json_struct_column2(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -677,9 +659,6 @@ def test_with_json_struct_column2(self, setupLogging): assert type2 == StringType() def test_with_struct_column3(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -710,9 +689,6 @@ def test_with_struct_column3(self, setupLogging): False)]) def test_with_struct_column4(self, setupLogging): - column_count = 10 - data_rows = 10 * 1000 - column_count = 10 data_rows = 10 * 1000 df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) @@ -742,3 +718,44 @@ def test_with_struct_column4(self, setupLogging): [StructField('a', StructType([StructField('a', IntegerType()), StructField('b', IntegerType())]), False), StructField('b', StructType([StructField('a', DateType(), False), StructField('b', StringType())]), False)]) + + def test_with_struct_column_err1(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + with pytest.raises(ValueError): + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields={'BAD_FIELD': 45}) + ) + + df = df_spec.build() + + def test_with_struct_column_err2(self, setupLogging): + column_count = 10 + data_rows = 10 * 1000 + + with pytest.raises(Exception): + df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows) + .withIdOutput() + .withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)", + numColumns=column_count, structType="array") + .withColumn("code1", "integer", minValue=100, maxValue=200) + .withColumn("code2", "integer", minValue=0, maxValue=10) + .withColumn("code3", "string", values=['one', 'two', 'three']) + .withColumn("code4", "string", values=['one', 'two', 'three']) + .withColumn("code5", dg.INFER_DATATYPE, expr="current_date()") + .withColumn("code6", dg.INFER_DATATYPE, expr="concat(code3, code4)") + .withStructColumn("struct1", fields=23) + ) + + df = df_spec.build() + From 0ee24f91f73488d80e5819b38001af768dd233cb Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Tue, 27 Jun 2023 23:28:18 -0700 Subject: [PATCH 14/16] wip --- tests/test_complex_columns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 27b35e8c..179614cf 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -758,4 +758,4 @@ def test_with_struct_column_err2(self, setupLogging): ) df = df_spec.build() - + \ No newline at end of file From bb256f6af2009e12e64baf1e07d518b6eef86ec6 Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Fri, 30 Jun 2023 17:09:54 -0700 Subject: [PATCH 15/16] wip --- tests/test_complex_columns.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_complex_columns.py b/tests/test_complex_columns.py index 179614cf..58cdf13f 100644 --- a/tests/test_complex_columns.py +++ b/tests/test_complex_columns.py @@ -758,4 +758,3 @@ def test_with_struct_column_err2(self, setupLogging): ) df = df_spec.build() - \ No newline at end of file From 6376f6b0833beea8a9d04703394f67c34d1518af Mon Sep 17 00:00:00 2001 From: ronanstokes-db Date: Fri, 7 Jul 2023 17:43:23 -0700 Subject: [PATCH 16/16] removed old code --- dbldatagen/data_generator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/dbldatagen/data_generator.py b/dbldatagen/data_generator.py index 1045e0e7..dadb4c32 100644 --- a/dbldatagen/data_generator.py +++ b/dbldatagen/data_generator.py @@ -824,9 +824,6 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None new_props = {} new_props.update(kwargs) - # if type(colType) == str and colType != INFER_DATATYPE: - # colType = SchemaParser.columnTypeFromString(colType) - self.logger.info("effective range: %s, %s, %s args: %s", minValue, maxValue, step, kwargs) self.logger.info("adding column - `%s` with baseColumn : `%s`, implicit : %s , omit %s", colName, baseColumn, implicit, omit)