Skip to content

Commit

Permalink
[FLINK-32647][pyflink] Support create catalog in pyflink table enviro…
Browse files Browse the repository at this point in the history
…nment (#25986)
  • Loading branch information
xuyangzhong authored Jan 17, 2025
1 parent c4bea91 commit 9a4f289
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 5 deletions.
16 changes: 16 additions & 0 deletions flink-python/docs/reference/pyflink.table/catalog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,19 @@ A catalog implementation for Jdbc.
:toctree: api/

JdbcCatalog



CatalogDescriptor
-----------------

Describes a catalog with the catalog name and configuration.
A CatalogDescriptor is a template for creating a catalog instance. It closely resembles the
"CREATE CATALOG" SQL DDL statement, containing catalog name and catalog configuration.

.. currentmodule:: pyflink.table.catalog

.. autosummary::
:toctree: api/

CatalogDescriptor.of
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ keyword, thus must be escaped) in a catalog named 'cat.1' and database named 'db
TableEnvironment.list_user_defined_functions
TableEnvironment.list_views
TableEnvironment.load_module
TableEnvironment.create_catalog
TableEnvironment.register_catalog
TableEnvironment.set_python_requirements
TableEnvironment.sql_query
Expand Down Expand Up @@ -246,6 +247,7 @@ StreamTableEnvironment
StreamTableEnvironment.list_user_defined_functions
StreamTableEnvironment.list_views
StreamTableEnvironment.load_module
StreamTableEnvironment.create_catalog
StreamTableEnvironment.register_catalog
StreamTableEnvironment.set_python_requirements
StreamTableEnvironment.sql_query
Expand Down
2 changes: 2 additions & 0 deletions flink-python/pyflink/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
from and to a catalog.
- :class:`catalog.HiveCatalog`
Responsible for reading and writing metadata stored in Hive.
- :class:`catalog.CatalogDescriptor`
Responsible for describing a Catalog with the catalog name and configuration.
Classes to define source & sink:
Expand Down
25 changes: 24 additions & 1 deletion flink-python/pyflink/table/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
################################################################################
from py4j.java_gateway import java_import

from pyflink.common.configuration import Configuration
from pyflink.java_gateway import get_gateway
from pyflink.table.schema import Schema
from pyflink.table.table_schema import TableSchema
from typing import Dict, List, Optional

__all__ = ['Catalog', 'CatalogDatabase', 'CatalogBaseTable', 'CatalogPartition', 'CatalogFunction',
'Procedure', 'ObjectPath', 'CatalogPartitionSpec', 'CatalogTableStatistics',
'CatalogColumnStatistics', 'HiveCatalog']
'CatalogColumnStatistics', 'HiveCatalog', 'CatalogDescriptor']


class Catalog(object):
Expand Down Expand Up @@ -1368,3 +1369,25 @@ def __init__(self, catalog_name: str, default_database: str, username: str, pwd:
j_jdbc_catalog = gateway.jvm.org.apache.flink.connector.jdbc.catalog.JdbcCatalog(
catalog_name, default_database, username, pwd, base_url)
super(JdbcCatalog, self).__init__(j_jdbc_catalog)


class CatalogDescriptor:
"""
Describes a catalog with the catalog name and configuration.
A CatalogDescriptor is a template for creating a catalog instance. It closely resembles the
"CREATE CATALOG" SQL DDL statement, containing catalog name and catalog configuration.
"""
def __init__(self, j_catalog_descriptor):
self._j_catalog_descriptor = j_catalog_descriptor

@staticmethod
def of(catalog_name: str, configuration: Configuration, comment: str = None):
assert catalog_name is not None
assert configuration is not None

from pyflink.java_gateway import get_gateway
gateway = get_gateway()

j_catalog_descriptor = gateway.jvm.org.apache.flink.table.catalog.CatalogDescriptor.of(
catalog_name, configuration._j_configuration, comment)
return CatalogDescriptor(j_catalog_descriptor)
13 changes: 12 additions & 1 deletion flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table import Table, EnvironmentSettings, Expression, ExplainDetail, \
Module, ModuleEntry, Schema, ChangelogMode
from pyflink.table.catalog import Catalog
from pyflink.table.catalog import Catalog, CatalogDescriptor
from pyflink.table.serializers import ArrowSerializer
from pyflink.table.statement_set import StatementSet
from pyflink.table.table_config import TableConfig
Expand Down Expand Up @@ -117,6 +117,17 @@ def create(environment_settings: Union[EnvironmentSettings, Configuration]) \
j_tenv = gateway.jvm.TableEnvironment.create(environment_settings._j_environment_settings)
return TableEnvironment(j_tenv)

def create_catalog(self, catalog_name: str, catalog_descriptor: CatalogDescriptor):
"""
Creates a :class:`~pyflink.table.catalog.Catalog` using the provided
:class:`~pyflink.table.catalog.CatalogDescriptor`. All table registered in the
:class:`~pyflink.table.catalog.Catalog` can be accessed.
:param catalog_name: The name under which the catalog will be created.
:param catalog_descriptor: The catalog descriptor for creating catalog.
"""
self._j_tenv.createCatalog(catalog_name, catalog_descriptor._j_catalog_descriptor)

def register_catalog(self, catalog_name: str, catalog: Catalog):
"""
Registers a :class:`~pyflink.table.catalog.Catalog` under a unique name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def excluded_methods(cls):
'compilePlanSql',
'executePlan',
'explainPlan',
# See FLINK-32647
'createCatalog',
'registerFunction',
'scan',
'registerTable'}
Expand Down
10 changes: 9 additions & 1 deletion flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from pyflink.java_gateway import get_gateway
from pyflink.table import (DataTypes, StreamTableEnvironment, EnvironmentSettings, Module,
ResultKind, ModuleEntry, Schema)
from pyflink.table.catalog import ObjectPath, CatalogBaseTable
from pyflink.table.catalog import ObjectPath, CatalogBaseTable, CatalogDescriptor
from pyflink.table.explain_detail import ExplainDetail
from pyflink.table.expressions import col, source_watermark
from pyflink.table.table_descriptor import TableDescriptor
Expand Down Expand Up @@ -657,6 +657,14 @@ def process_element(self, value, ctx):
result.sort()
self.assertEqual(expected, result)

def test_create_catalog(self):
config = Configuration()
config.set_string("type", "generic_in_memory")
catalog_desc = CatalogDescriptor.of("mycat", config, None)
self.t_env.create_catalog("mycat", catalog_desc)
mycat = self.t_env.get_catalog("mycat")
self.assertIsNotNone(mycat)


class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):

Expand Down

0 comments on commit 9a4f289

Please sign in to comment.