PrefectInterfaces

Documentation for PrefectInterfaces.

PrefectInterfaces

PrefectInterfaces.PrefectInterfacesModule

PrefectInterfaces.jl provides composite types to enable:

  • Interactive REPL connections to a Prefect server.
  • Prefect deployed julia processes that can be called from the python API with parameters.

Use of this package requires a running Prefect server to make connections to the Prefect server REST API, configured with the environment variable PREFECT_API_URL or by specifying the endpoint url in constructor calls. Calling PrefectBlock(blockname::String) retrieves Prefect block information by name, and thus julia modules can be built to connect to resources defined by those blocks.

Examples

julia> using PrefectInterfaces
julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api";

julia> ls()
5-element Vector{Any}:
 "aws-credentials/subdivisions"
 "docker-container/lamneth"
 "local-file-system/willowdata"
 "process/red-barchetta"
 "string/syrinx"

julia> dump(PrefectBlock("local-file-system/willowdata"))
PrefectBlock
  blockname: String "local-file-system/willowdata"
  block: LocalFSBlock
    blockname: String "local-file-system/willowdata"
    blocktype: String "local-file-system"
    basepath: String "/Users/mahiki/willowdata/dev"
    read_path: #4 (function of type PrefectInterfaces.var"#4#5"{String})
      basepath: String "/Users/mahiki/willowdata/dev"
source
PrefectInterfaces.LocalFSBlockType
LocalFSBlock(blockname::String, blocktype::String, basepath::String)

Corresponds with the Prefect LocalFileSystem block. Attached functions:

read_path("path/to/file.csv")
write_path("path/to/file.csv", df::AbstractDataFrame)

Returns or writes a DataFrame file at "LocalFSBlock.basepath/path/to/file.csv".

Examples:

julia> fsblock = PrefectBlock("local-file-system/willowdata");

julia> df = fsblock.block.read_path("extracts/csv/dataset=test_table/rundate=2023-05-25/data.csv")
1×2 DataFrame
 Row │ column1                            result
     │ String                             String7
─────┼────────────────────────────────────────────
   1 │ If you can select this table you…  PASSED
source
PrefectInterfaces.PrefectAPIType
PrefectAPI(url::String, key::SecretString) <:AbstractPrefectInterface
PrefectAPI(url::String)
PrefectAPI()

Mutable struct tha stores the Prefect server api endpoint. All PrefectInterface operations depend on connecting to a running Prefect server to pull block information. Constructor with no arguments assigns env variables PREFECT_API_URL, PREFECT_API_KEY

If PREFECT_API_KEY does not exist then an empty string is assigned to key. For Prefect Server with no authentication (or with auth managed by connection string) the empty key will not interfere with API calls.

Examples:

julia> using PrefectInterfaces

julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"; ENV["PREFECT_API_KEY"] = "zyxw4321";

julia> api = PrefectAPI()
PrefectAPI("http://127.0.0.1:4300/api", ####Secret####)

julia> api.url
"http://127.0.0.1:4300/api"

julia> api.key.secret
"zyxw4321"

julia> api.url = "https://api.prefect.cloud/api/accounts/0eEXAMPLE";

julia> api.key = SecretString("abcd1234")
####Secret####

julia> api = PrefectAPI("https://api.prefect.cloud/api/accounts/0eEXAMPLE", "abcd1234")
PrefectAPI("https://api.prefect.cloud/api/accounts/0eEXAMPLE", ####Secret####)
source
PrefectInterfaces.PrefectBlockType
PrefectBlock <: AbstractPrefectBlock

Constructors:

PrefectBlock(blockname::String)
PrefectBlock(blockname::String, api::PrefectAPI)
PrefectBlock(blockname::String, block::AbstractPrefectBlock)

Returns a Prefect Block from the Prefect server, the block data is stored in the block field. Prefect Block names are strings called 'slugs', formatted as block-type-name/block-name. A Prefect Block is uniquely specified by its name and the Prefect DB where it is stored, therefore the API URL is necessary for the constructor. The single-argument constructor takes the default PrefectAPI() as the Prefect Server endpoint.

A non-server block can be constructed by supplying an AbstractPrefectBlock object.

The AbstractPrefectBlock types are meant to mirror the functionality defined in the Prefect Python API, for example LocalFSBlock has a write_path() method attached which only writes to paths relative from the block basepath.

Examples:

julia> using PrefectInterfaces

julia> spec_fsblock = LocalFSBlock("local-file-system/xanadu", "local-file-system", "/usr/mahiki/xanadu/dev");

julia> fsblock = PrefectBlock("local-file-system/xanadu", spec_fsblock);

julia> fsblock.blockname
"local-file-system/xanadu"

julia> propertynames(fsblock.block)
(:blockname, :blocktype, :basepath, :read_path, :write_path)

julia> fsblock.block.basepath
"/usr/mahiki/xanadu/dev"
source
PrefectInterfaces.S3BucketBlockType
S3BucketBlock(
    blockname, blocktype, bucket_name, bucket_folder
    , region_name, aws_access_key_id, aws_secret_access_key)

Corresponds with the Prefect S3Bucket block in the prefect-aws integration. Attached functions:

read_path("path/to/object.csv")
write_path("path/to/object.csv", df::AbstractDataFrame)

Returns or writes a DataFrame csv object at a relative key from the block-defined s3:://bucket_name/bucket_folder/path/to/object.csv.

Examples:

# pull hypothetical existing block from Prefect DB server

julia> s3block = PrefectBlock("s3-bucket/willowdata")
S3BucketBlock("s3-bucket/willowdata", "s3-bucket", "willowdata", "data-folder/dev", "us-west-2"
, "AKIAEXAMPLEXXX", ####Secret####, ...)

julia> df = s3block.block.read_path("extracts/csv/dataset=test_table/rundate=2023-05-25/data.csv");

julia> s3block.block.write_path("testfolder/xanadu-test.csv", df)
p"s3://willowdata/data-folder/dev/testfolder/xanadu-test.csv"
source
PrefectInterfaces.SecretBlockType
SecretBlock(blockname::String, blocktype::String, value::SecretString) <: AbstractPrefectBlock

A struct for storing a Prefect Block with a SecretString as value field. This permits retrieving secrets from the Prefect Server/Prefect Cloud. The secret field is accessible via the value.secret field.

NOTE: This is not an ecrypted secrets store, it is a log obfuscator.

Example:

julia> using PrefectInterfaces

julia> secretblock = SecretBlock("secret", "necromancer", "abcd1234")
SecretBlock("secret", "necromancer", ####Secret####)

julia> dump(secretblock, maxdepth = 10)
SecretBlock
  blockname: String "secret"
  blocktype: String "necromancer"
  value: SecretString

julia> secretblock.value.secret
"abcd1234"
source
PrefectInterfaces.SecretStringType
SecretString(secret::String) <:Any

A struct for storing secret values with overrides of show and dump to prevent its field from being exposed in plaintext in logs. The secret field is accessible via the secret field.

NOTE: This is not an ecrypted secrets store, it is a log obfuscator.

Example:

julia> using PrefectInterfaces

julia> password = SecretString("abcd1234")
####Secret####

julia> show(password)
####Secret####
julia> password.secret
"abcd1234"
source
PrefectInterfaces.getblockMethod
getblock(blockname::String; api::AbstractPrefectConfig = PrefectAPI())

Makes an HTTP.get() call to provided URL endpoint, default endpoint constructed by PrefectAPI. Returns a Dict containing the Prefect Block specification.

source
PrefectInterfaces.lsMethod
ls(; type="block", api::PrefectAPI = PrefectAPI())

Calls the Prefect server and returns a list of all defined blocks as Vector{String}. Default is to list all blocks, later implementation could include "flows", "deployments", "work-pool" etc. See PrefectAPI docs for details about authentication if needed.

Examples:

julia> ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api";

julia> ls()
11-element Vector{Any}:
 "aws-credentials/subdivisions"
 "docker-container/lamneth"
 "github/dev"
 "github/main"
 "local-file-system/willowdata"
 "process/red-barchetta"
 "s3/necromancer"
 "s3-bucket/willowdata"
 "secret/necromancer"
 "slack-webhook/bytor-alert"
 "string/environment"
source
PrefectInterfaces.makeblockMethod
makeblock(block_api_data::Dict)

Instantiates a new PrefectBlock, choosing the concrete type based on block data returned from the api call for block documents.

source

Datasets

PrefectInterfaces.Datasets.DatasetType
Dataset(dataset_name=str::String; kwargs...)

An object that stores configuration, and file path locations. Assertions constrain valid field values. If rundate is not the current date, the latest_path will not be used.

NOTE: No positional arguments allowed b/c of @kw_def, keyworded args only.

Supported keyword arguments (default show first):

dataset_name::String (required)
datastore_type ∈ ["local", "remote"]
dataset_type ∈ ["extracts", "reports", "images"]
file_format ∈ ["csv"]
rundate::Date = Datest.today()
rundate_type ∈ ["latest", "specific"]

Read/write behavior depends on rundate/rundate_type combination as follows:

rundate_type|  rundate   || read    |  write
------------|------------||---------|-----------------------------------------
latest      |  == today  || latest  |  [latest, rundate]   default option
latest      |  != today  || latest  |  [latest]            dont write to date partition (rare)
specific    |  == today  || rundate |  [rundate]
specific    |  != today  || rundate |  [rundate]

Examples

julia> begin
    ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
    ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
end;

julia> ds = Dataset(dataset_name="test_table", datastore_type="local")
Dataset
  dataset_name: String "test_table"
  datastore_type: String "local"
  dataset_type: String "extracts"
  file_format: String "csv"
  rundate: Dates.Date
  rundate_type: String "latest"
  dataset_path: String "extracts/csv/dataset=test_table/rundate=2023-07-24/data.csv"
  latest_path: String "extracts/csv/latest/dataset=test_table/data.csv"
  image_path: String "extracts/dataset=test_table/rundate=2023-07-24"

julia> df = read(ds)
1×2 DataFrame
 Row │ column1                            result
     │ String                             String7
─────┼────────────────────────────────────────────
   1 │ If you can select this table you…  PASSED
source
PrefectInterfaces.Datasets.PrefectDatastoreNamesType
PrefectDatastoreNames(remote::String, local::String, default::String) <: AbstractPrefectInterface

A struct to store the names of Prefect blocks which reference local and remote file storage. The default constructor pulls the names from ENV variables, and default field default is "local".

PREFECT_DATA_BLOCK_REMOTE
PREFECT_DATA_BLOCK_LOCAL
PREFECT_DATASTORE_DEFAULT_TYPE ∈ ["local", "remote"]
source
Base.readMethod
read(ds::Dataset)

Returns a DataFrame by calling CSV.read on a filepath defined by the Dataset type.

NOTE: A prefect server must be available to use Dataset read function.

Examples

julia> begin
    ENV["PREFECT_API_URL"] = "http://127.0.0.1:4300/api"
    ENV["PREFECT_DATA_BLOCK_LOCAL"] = "local-file-system/willowdata"
end;

julia> df = read(Dataset(dataset_name="test_table", datastore_type="local"))
1×2 DataFrame
 Row │ column1                            result
     │ String                             String7
─────┼────────────────────────────────────────────
   1 │ If you can select this table you…  PASSED
source
Base.writeMethod
write(ds::Dataset, df::DataFrame)

Writes a DataFrame via CSV.write to a filepath defined by the Dataset type.

NOTE: A prefect server must be available to use Dataset read function.

source