sqlmesh.utils.pandas
1from __future__ import annotations 2 3import typing as t 4from functools import lru_cache 5 6from sqlglot import exp 7 8if t.TYPE_CHECKING: 9 import pandas as pd 10 11 12@lru_cache() 13def get_pandas_type_mappings() -> t.Dict[t.Any, exp.DataType]: 14 import pandas as pd 15 import numpy as np 16 17 mappings = { 18 np.dtype("int8"): exp.DataType.build("tinyint"), 19 np.dtype("int16"): exp.DataType.build("smallint"), 20 np.dtype("int32"): exp.DataType.build("int"), 21 np.dtype("int64"): exp.DataType.build("bigint"), 22 np.dtype("float16"): exp.DataType.build("float"), 23 np.dtype("float32"): exp.DataType.build("float"), 24 np.dtype("float64"): exp.DataType.build("double"), 25 np.dtype("O"): exp.DataType.build("text"), 26 np.dtype("bool"): exp.DataType.build("boolean"), 27 np.dtype("datetime64"): exp.DataType.build("timestamp"), 28 np.dtype("datetime64[ns]"): exp.DataType.build("timestamp"), 29 np.dtype("datetime64[us]"): exp.DataType.build("timestamp"), 30 pd.Int8Dtype(): exp.DataType.build("tinyint"), 31 pd.Int16Dtype(): exp.DataType.build("smallint"), 32 pd.Int32Dtype(): exp.DataType.build("int"), 33 pd.Int64Dtype(): exp.DataType.build("bigint"), 34 pd.Float32Dtype(): exp.DataType.build("float"), 35 pd.Float64Dtype(): exp.DataType.build("double"), 36 pd.StringDtype(): exp.DataType.build("text"), # type: ignore 37 pd.BooleanDtype(): exp.DataType.build("boolean"), 38 } 39 try: 40 import pyarrow # type: ignore # noqa 41 42 # Only add this if pyarrow is installed 43 mappings[pd.StringDtype("pyarrow")] = exp.DataType.build("text") 44 except ImportError: 45 pass 46 47 return mappings 48 49 50def columns_to_types_from_df(df: pd.DataFrame) -> t.Dict[str, exp.DataType]: 51 return columns_to_types_from_dtypes(df.dtypes.items()) 52 53 54def columns_to_types_from_dtypes( 55 dtypes: t.Iterable[t.Tuple[t.Hashable, t.Any]], 56) -> t.Dict[str, exp.DataType]: 57 import pandas as pd 58 59 result = {} 60 for column_name, column_type in dtypes: 61 exp_type: t.Optional[exp.DataType] = None 62 if hasattr(pd, "DatetimeTZDtype") and isinstance(column_type, pd.DatetimeTZDtype): 63 exp_type = exp.DataType.build("timestamptz") 64 else: 65 exp_type = get_pandas_type_mappings().get(column_type) 66 if not exp_type: 67 raise ValueError(f"Unsupported pandas type '{column_type}'") 68 result[str(column_name)] = exp_type 69 return result
@lru_cache()
def
get_pandas_type_mappings() -> Dict[Any, sqlglot.expressions.datatypes.DataType]:
13@lru_cache() 14def get_pandas_type_mappings() -> t.Dict[t.Any, exp.DataType]: 15 import pandas as pd 16 import numpy as np 17 18 mappings = { 19 np.dtype("int8"): exp.DataType.build("tinyint"), 20 np.dtype("int16"): exp.DataType.build("smallint"), 21 np.dtype("int32"): exp.DataType.build("int"), 22 np.dtype("int64"): exp.DataType.build("bigint"), 23 np.dtype("float16"): exp.DataType.build("float"), 24 np.dtype("float32"): exp.DataType.build("float"), 25 np.dtype("float64"): exp.DataType.build("double"), 26 np.dtype("O"): exp.DataType.build("text"), 27 np.dtype("bool"): exp.DataType.build("boolean"), 28 np.dtype("datetime64"): exp.DataType.build("timestamp"), 29 np.dtype("datetime64[ns]"): exp.DataType.build("timestamp"), 30 np.dtype("datetime64[us]"): exp.DataType.build("timestamp"), 31 pd.Int8Dtype(): exp.DataType.build("tinyint"), 32 pd.Int16Dtype(): exp.DataType.build("smallint"), 33 pd.Int32Dtype(): exp.DataType.build("int"), 34 pd.Int64Dtype(): exp.DataType.build("bigint"), 35 pd.Float32Dtype(): exp.DataType.build("float"), 36 pd.Float64Dtype(): exp.DataType.build("double"), 37 pd.StringDtype(): exp.DataType.build("text"), # type: ignore 38 pd.BooleanDtype(): exp.DataType.build("boolean"), 39 } 40 try: 41 import pyarrow # type: ignore # noqa 42 43 # Only add this if pyarrow is installed 44 mappings[pd.StringDtype("pyarrow")] = exp.DataType.build("text") 45 except ImportError: 46 pass 47 48 return mappings
def
columns_to_types_from_df( df: pandas.core.frame.DataFrame) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
def
columns_to_types_from_dtypes( dtypes: Iterable[Tuple[Hashable, Any]]) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
55def columns_to_types_from_dtypes( 56 dtypes: t.Iterable[t.Tuple[t.Hashable, t.Any]], 57) -> t.Dict[str, exp.DataType]: 58 import pandas as pd 59 60 result = {} 61 for column_name, column_type in dtypes: 62 exp_type: t.Optional[exp.DataType] = None 63 if hasattr(pd, "DatetimeTZDtype") and isinstance(column_type, pd.DatetimeTZDtype): 64 exp_type = exp.DataType.build("timestamptz") 65 else: 66 exp_type = get_pandas_type_mappings().get(column_type) 67 if not exp_type: 68 raise ValueError(f"Unsupported pandas type '{column_type}'") 69 result[str(column_name)] = exp_type 70 return result