Skip to content

Commit

Permalink
[Internal] Revert "Rewriting DLT pipelines using SDK" (#3838)
Browse files Browse the repository at this point in the history
Reverts #3792
  • Loading branch information
Divyansh-db authored Jul 31, 2024
1 parent 25b2725 commit cca2965
Show file tree
Hide file tree
Showing 8 changed files with 902 additions and 838 deletions.
26 changes: 1 addition & 25 deletions docs/resources/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,13 @@ The following arguments are supported:
* `library` blocks - Specifies pipeline code and required artifacts. Syntax resembles [library](cluster.md#library-configuration-block) configuration block with the addition of a special `notebook` & `file` library types that should have the `path` attribute. *Right now only the `notebook` & `file` types are supported.*
* `cluster` blocks - [Clusters](cluster.md) to run the pipeline. If none is specified, pipelines will automatically select a default cluster configuration for the pipeline. *Please note that DLT pipeline clusters are supporting only subset of attributes as described in [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-api-guide.html#pipelinesnewcluster).* Also, note that `autoscale` block is extended with the `mode` parameter that controls the autoscaling algorithm (possible values are `ENHANCED` for new, enhanced autoscaling algorithm, or `LEGACY` for old algorithm).
* `continuous` - A flag indicating whether to run the pipeline continuously. The default value is `false`.
* `development` - A flag indicating whether to run the pipeline in development mode. The default value is `false`.
* `development` - A flag indicating whether to run the pipeline in development mode. The default value is `true`.
* `photon` - A flag indicating whether to use Photon engine. The default value is `false`.
* `serverless` - An optional flag indicating if serverless compute should be used for this DLT pipeline. Requires `catalog` to be set, as it could be used only with Unity Catalog.
* `catalog` - The name of catalog in Unity Catalog. *Change of this parameter forces recreation of the pipeline.* (Conflicts with `storage`).
* `target` - The name of a database (in either the Hive metastore or in a UC catalog) for persisting pipeline output data. Configuring the target setting allows you to view and query the pipeline output data from the Databricks UI.
* `edition` - optional name of the [product edition](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-concepts.html#editions). Supported values are: `CORE`, `PRO`, `ADVANCED` (default). Not required when `serverless` is set to `true`.
* `channel` - optional name of the release channel for Spark version used by DLT pipeline. Supported values are: `CURRENT` (default) and `PREVIEW`.
* `allow_duplicate_names` - Optional boolean flag. If false, deployment will fail if name conflicts with that of another pipeline. default is `false`.
* `deployment` - Deployment type of this pipeline. Supports following attributes:
* `kind` - The deployment method that manages the pipeline.
* `metadata_file_path` - The path to the file containing metadata about the deployment.
* `filters` - Filters on which Pipeline packages to include in the deployed graph. This block consists of following attributes:
* `include` - Paths to include.
* `exclude` - Paths to exclude.
* `gateway_definition` - The definition of a gateway pipeline to support CDC. Consists of following attributes:
* `connection_id` - Immutable. The Unity Catalog connection this gateway pipeline uses to communicate with the source.
* `gateway_storage_catalog` - Required, Immutable. The name of the catalog for the gateway pipeline's storage location.
* `gateway_storage_name` - Required. The Unity Catalog-compatible naming for the gateway storage location. This is the destination to use for the data that is extracted by the gateway. Delta Live Tables system will automatically create the storage location under the catalog and schema.
* `gateway_storage_schema` - Required, Immutable. The name of the schema for the gateway pipelines's storage location.


### notification block

Expand All @@ -108,17 +95,6 @@ DLT allows to specify one or more notification blocks to get notifications about
* `on-update-fatal-failure` - a pipeline update fails with a non-retryable (fatal) error.
* `on-flow-failure` - a single data flow fails.

### ingestion_definition block

The configuration for a managed ingestion pipeline. These settings cannot be used with the `library`, `target` or `catalog` settings. This block consists of following attributes:

* `connection_name` - Immutable. The Unity Catalog connection this ingestion pipeline uses to communicate with the source. Specify either ingestion_gateway_id or connection_name.
* `ingestion_gateway_id` - Immutable. Identifier for the ingestion gateway used by this ingestion pipeline to communicate with the source. Specify either ingestion_gateway_id or connection_name.
* `objects` - Required. Settings specifying tables to replicate and the destination for the replicated tables.
* `table_configuration` - Configuration settings to control the ingestion of tables. These settings are applied to all tables in the pipeline.



## Attribute Reference

In addition to all arguments above, the following attributes are exported:
Expand Down
36 changes: 18 additions & 18 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam"
sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/serving"
"github.com/databricks/databricks-sdk-go/service/settings"
"github.com/databricks/databricks-sdk-go/service/sharing"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/databricks/terraform-provider-databricks/commands"
"github.com/databricks/terraform-provider-databricks/common"
"github.com/databricks/terraform-provider-databricks/jobs"
"github.com/databricks/terraform-provider-databricks/pipelines"
"github.com/databricks/terraform-provider-databricks/qa"
"github.com/databricks/terraform-provider-databricks/repos"
"github.com/databricks/terraform-provider-databricks/scim"
Expand Down Expand Up @@ -252,7 +252,7 @@ var emptyPipelines = qa.HTTPFixture{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.0/pipelines?max_results=50",
Response: pipelines.ListPipelinesResponse{},
Response: pipelines.PipelineListResponse{},
}

var emptyClusterPolicies = qa.HTTPFixture{
Expand Down Expand Up @@ -1951,10 +1951,10 @@ func TestImportingDLTPipelines(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Response: pipelines.ListPipelinesResponse{
Response: pipelines.PipelineListResponse{
Statuses: []pipelines.PipelineStateInfo{
{
PipelineId: "123",
PipelineID: "123",
Name: "Pipeline1",
},
},
Expand Down Expand Up @@ -2009,7 +2009,7 @@ func TestImportingDLTPipelines(t *testing.T) {
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/123?",
Resource: "/api/2.0/pipelines/123",
Response: getJSONObject("test-data/get-dlt-pipeline.json"),
},
{
Expand Down Expand Up @@ -2130,22 +2130,22 @@ func TestImportingDLTPipelinesMatchingOnly(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Response: pipelines.ListPipelinesResponse{
Response: pipelines.PipelineListResponse{
Statuses: []pipelines.PipelineStateInfo{
{
PipelineId: "123",
PipelineID: "123",
Name: "Pipeline1 test",
},
{
PipelineId: "124",
PipelineID: "124",
Name: "Pipeline1",
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/123?",
Resource: "/api/2.0/pipelines/123",
Response: getJSONObject("test-data/get-dlt-pipeline.json"),
},
{
Expand Down Expand Up @@ -2494,33 +2494,33 @@ func TestIncrementalDLTAndMLflowWebhooks(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Response: pipelines.ListPipelinesResponse{
Response: pipelines.PipelineListResponse{
Statuses: []pipelines.PipelineStateInfo{
{
PipelineId: "abc",
PipelineID: "abc",
Name: "abc",
},
{
PipelineId: "def",
PipelineID: "def",
Name: "def",
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/abc?",
Response: pipelines.GetPipelineResponse{
PipelineId: "abc",
Resource: "/api/2.0/pipelines/abc",
Response: pipelines.PipelineInfo{
PipelineID: "abc",
Name: "abc",
LastModified: 1681466931226,
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/def?",
Response: pipelines.GetPipelineResponse{
PipelineId: "def",
Resource: "/api/2.0/pipelines/def",
Response: pipelines.PipelineInfo{
PipelineID: "def",
Name: "def",
LastModified: 1690156900000,
Spec: &pipelines.PipelineSpec{
Expand Down
31 changes: 12 additions & 19 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam"
sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/serving"
"github.com/databricks/databricks-sdk-go/service/settings"
"github.com/databricks/databricks-sdk-go/service/sharing"
Expand All @@ -32,6 +31,7 @@ import (
"github.com/databricks/terraform-provider-databricks/jobs"
"github.com/databricks/terraform-provider-databricks/mws"
"github.com/databricks/terraform-provider-databricks/permissions"
"github.com/databricks/terraform-provider-databricks/pipelines"
"github.com/databricks/terraform-provider-databricks/repos"
tfsharing "github.com/databricks/terraform-provider-databricks/sharing"
tfsql "github.com/databricks/terraform-provider-databricks/sql"
Expand Down Expand Up @@ -1943,13 +1943,8 @@ var resourcesMap map[string]importable = map[string]importable{
return name + "_" + d.Id()
},
List: func(ic *importContext) error {
w, err := ic.Client.WorkspaceClient()
if err != nil {
return err
}
pipelinesList, err := w.Pipelines.ListPipelinesAll(ic.Context, pipelines.ListPipelinesRequest{
MaxResults: 50,
})
api := pipelines.NewPipelinesAPI(ic.Context, ic.Client)
pipelinesList, err := api.List(50, "")
if err != nil {
return err
}
Expand All @@ -1959,17 +1954,15 @@ var resourcesMap map[string]importable = map[string]importable{
}
var modifiedAt int64
if ic.incremental {
pipeline, err := w.Pipelines.Get(ic.Context, pipelines.GetPipelineRequest{
PipelineId: q.PipelineId,
})
pipeline, err := api.Read(q.PipelineID)
if err != nil {
return err
}
modifiedAt = pipeline.LastModified
}
ic.EmitIfUpdatedAfterMillis(&resource{
Resource: "databricks_pipeline",
ID: q.PipelineId,
ID: q.PipelineID,
}, modifiedAt, fmt.Sprintf("DLT Pipeline '%s'", q.Name))
log.Printf("[INFO] Imported %d of %d DLT Pipelines", i+1, len(pipelinesList))
}
Expand Down Expand Up @@ -2003,25 +1996,25 @@ var resourcesMap map[string]importable = map[string]importable{
ID: cluster.AwsAttributes.InstanceProfileArn,
})
}
if cluster.InstancePoolId != "" {
if cluster.InstancePoolID != "" {
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: cluster.InstancePoolId,
ID: cluster.InstancePoolID,
})
}
if cluster.DriverInstancePoolId != "" {
if cluster.DriverInstancePoolID != "" {
ic.Emit(&resource{
Resource: "databricks_instance_pool",
ID: cluster.DriverInstancePoolId,
ID: cluster.DriverInstancePoolID,
})
}
if cluster.PolicyId != "" {
if cluster.PolicyID != "" {
ic.Emit(&resource{
Resource: "databricks_cluster_policy",
ID: cluster.PolicyId,
ID: cluster.PolicyID,
})
}
ic.emitInitScripts(cluster.InitScripts)
ic.emitInitScriptsLegacy(cluster.InitScripts)
ic.emitSecretsFromSecretsPathMap(cluster.SparkConf)
ic.emitSecretsFromSecretsPathMap(cluster.SparkEnvVars)
}
Expand Down
24 changes: 11 additions & 13 deletions exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam"
sdk_jobs "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/sharing"
sdk_workspace "github.com/databricks/databricks-sdk-go/service/workspace"
tfcatalog "github.com/databricks/terraform-provider-databricks/catalog"
Expand All @@ -24,8 +23,7 @@ import (
"github.com/databricks/terraform-provider-databricks/common"
"github.com/databricks/terraform-provider-databricks/jobs"
"github.com/databricks/terraform-provider-databricks/permissions"

dlt_pipelines "github.com/databricks/terraform-provider-databricks/pipelines"
"github.com/databricks/terraform-provider-databricks/pipelines"
"github.com/databricks/terraform-provider-databricks/policies"
"github.com/databricks/terraform-provider-databricks/pools"
"github.com/databricks/terraform-provider-databricks/provider"
Expand Down Expand Up @@ -293,7 +291,7 @@ func TestRepoIgnore(t *testing.T) {

func TestDLTIgnore(t *testing.T) {
ic := importContextForTest()
d := dlt_pipelines.ResourcePipeline().ToResource().TestResourceData()
d := pipelines.ResourcePipeline().ToResource().TestResourceData()
d.SetId("12345")
r := &resource{ID: "12345", Data: d}
// job without libraries
Expand Down Expand Up @@ -1342,33 +1340,33 @@ func TestIncrementalListDLT(t *testing.T) {
{
Method: "GET",
Resource: "/api/2.0/pipelines?max_results=50",
Response: pipelines.ListPipelinesResponse{
Response: pipelines.PipelineListResponse{
Statuses: []pipelines.PipelineStateInfo{
{
PipelineId: "abc",
PipelineID: "abc",
Name: "abc",
},
{
PipelineId: "def",
PipelineID: "def",
Name: "def",
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/abc?",
Response: pipelines.GetPipelineResponse{
PipelineId: "abc",
Resource: "/api/2.0/pipelines/abc",
Response: pipelines.PipelineInfo{
PipelineID: "abc",
Name: "abc",
LastModified: 1681466931226,
},
},
{
Method: "GET",
Resource: "/api/2.0/pipelines/def?",
Response: pipelines.GetPipelineResponse{
PipelineId: "def",
Resource: "/api/2.0/pipelines/def",
Response: pipelines.PipelineInfo{
PipelineID: "def",
Name: "def",
LastModified: 1690156900000,
},
Expand Down
Loading

0 comments on commit cca2965

Please sign in to comment.