#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from apache_beam.io.filesystems import FileSystems
from apache_beam.ml.anomaly.specifiable import Spec
from apache_beam.ml.anomaly.transforms import AnomalyDetection
from apache_beam.ml.anomaly.transforms import Specifiable
from apache_beam.utils import python_callable
from apache_beam.yaml.yaml_provider import InlineProvider
[docs]
def maybe_make_specifiable(v):
if isinstance(v, dict):
if "type" in v and "config" in v:
return Specifiable.from_spec(
Spec(type=v["type"], config=maybe_make_specifiable(v["config"])))
if "callable" in v:
if "path" in v or "name" in v:
raise ValueError(
"Cannot specify 'callable' with 'path' and 'name' for function.")
else:
return python_callable.PythonCallableWithSource(v["callable"])
if "path" in v and "name" in v:
return python_callable.PythonCallableWithSource.load_from_script(
FileSystems.open(v["path"]).read().decode(), v["name"])
ret = {k: maybe_make_specifiable(v[k]) for k in v}
return ret
else:
return v
[docs]
class SpecProvider(InlineProvider):
[docs]
def create_spec_providers():
return SpecProvider({"AnomalyDetection": AnomalyDetection})