Browse Source

System Wise detection added

Former-commit-id: b0101f5db5 [formerly 29c80e5ad8] [formerly 20adaa7aeb [formerly 954235e92d]] [formerly be8de1f5cb [formerly f04459aab8] [formerly af6a241777 [formerly 11832ea941]]] [formerly 76836c2e90 [formerly 49d86e4e36] [formerly 83647e8ffd [formerly b54a78f40c]] [formerly e0d49339de [formerly 9729ac54bd] [formerly 30bc1b36d3 [formerly a79efa4e6d]]]] [formerly 1681476537 [formerly dc6796c55c] [formerly 70c2a56d6f [formerly 1c9c8e7608]] [formerly ba01035d8b [formerly 217b51ab37] [formerly b7192505f0 [formerly fa243cd501]]] [formerly 3ccd933f55 [formerly 5f9dc85397] [formerly ca8b40852e [formerly 50f6cb845c]] [formerly ec3635f261 [formerly d2666795d0] [formerly 1a06a32245 [formerly 173d1b3084]]]]] [formerly 6d860cf373 [formerly c83b20a27b] [formerly 1bb62d904f [formerly f812ef7fa4]] [formerly 0b70aa942c [formerly 01faeef027] [formerly e9f100c538 [formerly 73bfd33c63]]] [formerly dfa753e398 [formerly 81d9256d17] [formerly 315119e9ff [formerly 4d62d31d75]] [formerly a43db5fe93 [formerly 8fcf1401ef] [formerly 44e0278b99 [formerly b62aa327ba]]]] [formerly 0727340c2a [formerly 2908762c9e] [formerly 539d47dcfe [formerly 23886725f7]] [formerly cb0236a7a7 [formerly 383ee073ac] [formerly 3ff6159188 [formerly 49067bd3ed]]] [formerly 5243ae03dd [formerly 499398b844] [formerly 1c1d7c0b96 [formerly b44a9d6b80]] [formerly db83075114 [formerly 9e9d8c9877] [formerly 3357faf70d [formerly 7fc9ac7050]]]]]]
Former-commit-id: a1b9d5fd93 [formerly d885f24004] [formerly 8aa0c6324f [formerly 323da8b843]] [formerly 1c098dfce8 [formerly 2d70712b9d] [formerly ddd288172f [formerly 6853a65d95]]] [formerly ef0875e760 [formerly 6def45bf3c] [formerly 2161f1a1f6 [formerly f5fa6032fb]] [formerly 28ee6a45ef [formerly 3eaa3fd743] [formerly 157b7f651e [formerly 415dcb4451]]]] [formerly e80d2baea8 [formerly af276f0e17] [formerly d16a3a9e3b [formerly ab6fc5f38f]] [formerly d1d0b8d53a [formerly 7b2397e0a6] [formerly b3a08b8aec [formerly a22ce8f90f]]] [formerly 5b0c7bde63 [formerly 863d1d2104] [formerly b280500e71 [formerly 8a1e6280c6]] [formerly aeb47189b9 [formerly 11793fb209] [formerly 3357faf70d]]]]
Former-commit-id: 44f81dd1d8 [formerly a63fabe160] [formerly ff67ec2d25 [formerly 1591a88714]] [formerly 3bbe67e7f6 [formerly b34a8d50f9] [formerly bdf75c1cde [formerly c0f2e2d838]]] [formerly f4c40e62d7 [formerly 3d4a394a38] [formerly 34a11cb3fd [formerly a48b745458]] [formerly 8ab86b4533 [formerly c5d1edfe84] [formerly 90967c4da7 [formerly ca862cc1d9]]]]
Former-commit-id: e7a99f97aa [formerly 9efa17aa11] [formerly 0504778d2c [formerly 3ced73ed0c]] [formerly 288987b7c0 [formerly a3467d5a5e] [formerly c047ea98d7 [formerly 0a29b28b43]]]
Former-commit-id: 6ac75ca115 [formerly 0e1aaa07fe] [formerly 392b1fcb10 [formerly d36aac32fb]]
Former-commit-id: 088d9a0c39 [formerly c4f9f20b57]
Former-commit-id: 33d69cfffb
master
Devesh Kumar 5 years ago
parent
commit
af76d18e21
3 changed files with 452 additions and 0 deletions
  1. +74
    -0
      examples/build_System_Wise_Detection_pipeline.py
  2. +375
    -0
      tods/detection_algorithm/SystemWiseDetection.py
  3. +3
    -0
      tods/resources/.entry_points.ini

+ 74
- 0
examples/build_System_Wise_Detection_pipeline.py View File

@@ -0,0 +1,74 @@
from d3m import index
from d3m.metadata.base import ArgumentType
from d3m.metadata.pipeline import Pipeline, PrimitiveStep

# -> dataset_to_dataframe -> column_parser -> extract_columns_by_semantic_types(attributes) -> imputer -> random_forest
# extract_columns_by_semantic_types(targets) -> ^

# Creating pipeline
pipeline_description = Pipeline()
pipeline_description.add_input(name='inputs')

# Step 0: dataset_to_dataframe
step_0 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.dataset_to_dataframe.Common'))
step_0.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='inputs.0')
step_0.add_output('produce')
pipeline_description.add_step(step_0)

# Step 1: column_parser
step_1 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.column_parser.Common'))
step_1.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.0.produce')
step_1.add_output('produce')
pipeline_description.add_step(step_1)

# Step 2: extract_columns_by_semantic_types(attributes)
step_2 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common'))
step_2.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.1.produce')
step_2.add_output('produce')
step_2.add_hyperparameter(name='semantic_types', argument_type=ArgumentType.VALUE,
data=['https://metadata.datadrivendiscovery.org/types/Attribute'])
pipeline_description.add_step(step_2)

# Step 3: extract_columns_by_semantic_types(targets)
step_3 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.data_transformation.extract_columns_by_semantic_types.Common'))
step_3.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.0.produce')
step_3.add_output('produce')
step_3.add_hyperparameter(name='semantic_types', argument_type=ArgumentType.VALUE,
data=['https://metadata.datadrivendiscovery.org/types/TrueTarget'])
pipeline_description.add_step(step_3)

attributes = 'steps.2.produce'
targets = 'steps.3.produce'

# Step 4: auto encoder
step_4 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.tods.detection_algorithm.pyod_ae'))
step_4.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference=attributes)
step_4.add_output('produce_score')
#step_4.add_hyperparameter(name='use_columns', argument_type=ArgumentType.VALUE, data=[2])
#step_4.add_hyperparameter(name='use_semantic_types', argument_type=ArgumentType.VALUE, data=True)
step_4.add_hyperparameter(name='return_result', argument_type=ArgumentType.VALUE, data='append')
pipeline_description.add_step(step_4)

# Step 5: ensemble
step_5 = PrimitiveStep(primitive=index.get_primitive('d3m.primitives.tods.detection_algorithm.system_wise_detection'))
step_5.add_argument(name='inputs', argument_type=ArgumentType.CONTAINER, data_reference='steps.4.produce_score')
step_5.add_hyperparameter(name='return_result', argument_type=ArgumentType.VALUE, data='new')

step_5.add_output('produce')
pipeline_description.add_step(step_5)


# Final Output
pipeline_description.add_output(name='output predictions', data_reference='steps.5.produce')

# Output to YAML
#yaml = pipeline_description.to_yaml()
#with open('pipeline.yml', 'w') as f:
# f.write(yaml)
#prin(yaml)

# Output to json
data = pipeline_description.to_json()
with open('example_pipeline.json', 'w') as f:
f.write(data)
print(data)

+ 375
- 0
tods/detection_algorithm/SystemWiseDetection.py View File

@@ -0,0 +1,375 @@
import os
from typing import Any,Optional,List
import statsmodels.api as sm
import numpy as np
from d3m import container, utils as d3m_utils
from d3m import utils

from numpy import ndarray
from collections import OrderedDict
from scipy import sparse
import os

import numpy
import typing
import time

from d3m import container
from d3m.primitive_interfaces import base, transformer

from d3m.container import DataFrame as d3m_dataframe
from d3m.metadata import hyperparams, params, base as metadata_base

from d3m.base import utils as base_utils
from d3m.exceptions import PrimitiveNotFittedError

__all__ = ('SystemWiseDetectionPrimitive',)

Inputs = container.DataFrame
Outputs = container.DataFrame

class Params(params.Params):
#to-do : how to make params dynamic
use_column_names: Optional[Any]



class Hyperparams(hyperparams.Hyperparams):

#Tuning Parameter
#default -1 considers entire time series is considered
window_size = hyperparams.Hyperparameter(default=-1, semantic_types=[
'https://metadata.datadrivendiscovery.org/types/TuningParameter',
], description="Window Size for decomposition")

method_type = hyperparams.Enumeration(
values=['max', 'avg', 'majority_voting_sum'],
default='avg',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="The type of method used to find anomalous system",
)
contamination = hyperparams.Uniform(
lower=0.,
upper=0.5,
default=0.1,
description='The amount of contamination of the data set, i.e. the proportion of outliers in the data set. ',
semantic_types=['https://metadata.datadrivendiscovery.org/types/TuningParameter']
)

#control parameter
use_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to force primitive to operate on. If any specified column cannot be parsed, it is skipped.",
)
exclude_columns = hyperparams.Set(
elements=hyperparams.Hyperparameter[int](-1),
default=(),
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="A set of column indices to not operate on. Applicable only if \"use_columns\" is not provided.",
)
return_result = hyperparams.Enumeration(
values=['append', 'replace', 'new'],
default='new',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Should parsed columns be appended, should they replace original columns, or should only parsed columns be returned? This hyperparam is ignored if use_semantic_types is set to false.",
)
use_semantic_types = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Controls whether semantic_types metadata will be used for filtering columns in input dataframe. Setting this to false makes the code ignore return_result and will produce only the output dataframe"
)
add_index_columns = hyperparams.UniformBool(
default=False,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Also include primary index columns if input data has them. Applicable only if \"return_result\" is set to \"new\".",
)
error_on_no_input = hyperparams.UniformBool(
default=True,
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'],
description="Throw an exception if no input column is selected/provided. Defaults to true to behave like sklearn. To prevent pipelines from breaking set this to False.",
)

return_semantic_type = hyperparams.Enumeration[str](
values=['https://metadata.datadrivendiscovery.org/types/Attribute',
'https://metadata.datadrivendiscovery.org/types/ConstructedAttribute'],
default='https://metadata.datadrivendiscovery.org/types/Attribute',
description='Decides what semantic type to attach to generated attributes',
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter']
)



class SystemWiseDetectionPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]):
"""
Primitive to find abs_energy of time series
"""

__author__ = "DATA Lab at Texas A&M University",
metadata = metadata_base.PrimitiveMetadata(
{
'id': '3726fa29-28c5-4529-aec5-2f8b4ff2ef9e',
'version': '0.1.0',
'name': 'Sytem_Wise_Anomaly_Detection_Primitive',
'python_path': 'd3m.primitives.tods.detection_algorithm.system_wise_detection',
'keywords': ['Time Series','Anomalous System '],
"hyperparams_to_tune": ['window_size','method_type','contamination'],
'source': {
'name': 'DATA Lab at Texas A&M University',
'uris': ['https://gitlab.com/lhenry15/tods.git','https://gitlab.com/lhenry15/tods/-/blob/devesh/tods/feature_analysis/StatisticalAbsEnergy.py'],
'contact': 'mailto:khlai037@tamu.edu'

},
'installation': [
{'type': metadata_base.PrimitiveInstallationType.PIP,
'package_uri': 'git+https://gitlab.com/lhenry15/tods.git@{git_commit}#egg=TODS'.format(
git_commit=d3m_utils.current_git_commit(os.path.dirname(__file__)),
),
}

],
'algorithm_types': [
metadata_base.PrimitiveAlgorithmType.DATA_PROFILING,
],
'primitive_family': metadata_base.PrimitiveFamily.ANOMALY_DETECTION,

}
)

def __init__(self, *, hyperparams: Hyperparams) -> None:
super().__init__(hyperparams=hyperparams)
self.primitiveNo = 0

def produce(self, *, inputs: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]:
"""

Args:
inputs: Container DataFrame
timeout: Default
iterations: Default

Returns:
Container DataFrame containing abs_energy of time series
"""

self.logger.info('System wise Detection Input Primitive called')

# Get cols to fit.
self._fitted = False
self._training_inputs, self._training_indices = self._get_columns_to_fit(inputs, self.hyperparams)
self._input_column_names = self._training_inputs.columns

if len(self._training_indices) > 0:
# self._clf.fit(self._training_inputs)
self._fitted = True
else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")

if not self._fitted:
raise PrimitiveNotFittedError("Primitive not fitted.")
system_wise_detection_input = inputs
if self.hyperparams['use_semantic_types']:
system_wise_detection_input = inputs.iloc[:, self._training_indices]
output_columns = []
if len(self._training_indices) > 0:
system_wise_detection_output = self._system_wise_detection(system_wise_detection_input,self.hyperparams["method_type"],self.hyperparams["window_size"],self.hyperparams["contamination"])
outputs = system_wise_detection_output


if sparse.issparse(system_wise_detection_output):
system_wise_detection_output = system_wise_detection_output.toarray()
outputs = self._wrap_predictions(inputs, system_wise_detection_output)

#if len(outputs.columns) == len(self._input_column_names):
# outputs.columns = self._input_column_names

output_columns = [outputs]


else:
if self.hyperparams['error_on_no_input']:
raise RuntimeError("No input columns were selected")
self.logger.warn("No input columns were selected")


self.logger.info('System wise Detection Primitive returned')
outputs = base_utils.combine_columns(return_result=self.hyperparams['return_result'],
add_index_columns=self.hyperparams['add_index_columns'],
inputs=inputs, column_indices=self._training_indices,
columns_list=output_columns)
return base.CallResult(outputs)

@classmethod
def _get_columns_to_fit(cls, inputs: Inputs, hyperparams: Hyperparams):
"""
Select columns to fit.
Args:
inputs: Container DataFrame
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
list
"""
if not hyperparams['use_semantic_types']:
return inputs, list(range(len(inputs.columns)))

inputs_metadata = inputs.metadata

def can_produce_column(column_index: int) -> bool:
return cls._can_produce_column(inputs_metadata, column_index, hyperparams)

use_columns = hyperparams['use_columns']
exclude_columns = hyperparams['exclude_columns']

columns_to_produce, columns_not_to_produce = base_utils.get_columns_to_use(inputs_metadata,
use_columns=use_columns,
exclude_columns=exclude_columns,
can_use_column=can_produce_column)
return inputs.iloc[:, columns_to_produce], columns_to_produce
# return columns_to_produce

@classmethod
def _can_produce_column(cls, inputs_metadata: metadata_base.DataMetadata, column_index: int,
hyperparams: Hyperparams) -> bool:
"""
Output whether a column can be processed.
Args:
inputs_metadata: d3m.metadata.base.DataMetadata
column_index: int

Returns:
bool
"""
column_metadata = inputs_metadata.query((metadata_base.ALL_ELEMENTS, column_index))

accepted_structural_types = (int, float, numpy.integer, numpy.float64)
accepted_semantic_types = set()
accepted_semantic_types.add("https://metadata.datadrivendiscovery.org/types/Attribute")
if not issubclass(column_metadata['structural_type'], accepted_structural_types):
return False

semantic_types = set(column_metadata.get('semantic_types', []))
return True
if len(semantic_types) == 0:
cls.logger.warning("No semantic types found in column metadata")
return False

# Making sure all accepted_semantic_types are available in semantic_types
if len(accepted_semantic_types - semantic_types) == 0:
return True

return False

@classmethod
def _update_predictions_metadata(cls, inputs_metadata: metadata_base.DataMetadata, outputs: Optional[Outputs],
target_columns_metadata: List[OrderedDict]) -> metadata_base.DataMetadata:
"""
Updata metadata for selected columns.
Args:
inputs_metadata: metadata_base.DataMetadata
outputs: Container Dataframe
target_columns_metadata: list

Returns:
d3m.metadata.base.DataMetadata
"""
outputs_metadata = metadata_base.DataMetadata().generate(value=outputs)

for column_index, column_metadata in enumerate(target_columns_metadata):
column_metadata.pop("structural_type", None)
outputs_metadata = outputs_metadata.update_column(column_index, column_metadata)

return outputs_metadata

def _wrap_predictions(self, inputs: Inputs, predictions: ndarray) -> Outputs:
"""
Wrap predictions into dataframe
Args:
inputs: Container Dataframe
predictions: array-like data (n_samples, n_features)

Returns:
Dataframe
"""
outputs = d3m_dataframe(predictions, generate_metadata=True)
target_columns_metadata = self._add_target_columns_metadata(outputs.metadata, self.hyperparams,self.primitiveNo)
outputs.metadata = self._update_predictions_metadata(inputs.metadata, outputs, target_columns_metadata)

return outputs

@classmethod
def _add_target_columns_metadata(cls, outputs_metadata: metadata_base.DataMetadata, hyperparams, primitiveNo):
"""
Add target columns metadata
Args:
outputs_metadata: metadata.base.DataMetadata
hyperparams: d3m.metadata.hyperparams.Hyperparams

Returns:
List[OrderedDict]
"""
outputs_length = outputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length']
target_columns_metadata: List[OrderedDict] = []
for column_index in range(outputs_length):
column_name = "{0}{1}_{2}".format(cls.metadata.query()['name'], primitiveNo, column_index)
column_metadata = OrderedDict()
semantic_types = set()
semantic_types.add(hyperparams["return_semantic_type"])
column_metadata['semantic_types'] = list(semantic_types)

column_metadata["name"] = str(column_name)
target_columns_metadata.append(column_metadata)

return target_columns_metadata

def _write(self, inputs: Inputs):
inputs.to_csv(str(time.time()) + '.csv')




def _system_wise_detection(self,X,method_type,window_size,contamination):
systemIds = X.system_id.unique()
groupedX = X.groupby(X.system_id)

transformed_X = []
if(method_type=="max"):
maxOutlierScorePerSystemList = []
for systemId in systemIds:
systemDf = groupedX.get_group(systemId)
maxOutlierScorePerSystemList.append(np.max(systemDf["value_0"].values))

ranking = np.sort(maxOutlierScorePerSystemList)
threshold = ranking[int((1 - contamination) * len(ranking))]
self.threshold = threshold
mask = (maxOutlierScorePerSystemList >= threshold)
ranking[mask] = 1
ranking[np.logical_not(mask)] = 0
for iter in range(len(systemIds)):
transformed_X.append([systemIds[iter],ranking[iter]])

if (method_type == "avg"):
maxOutlierScorePerSystemList = []
for systemId in systemIds:
systemDf = groupedX.get_group(systemId)
maxOutlierScorePerSystemList.append(np.mean(np.abs(systemDf["value_0"].values)))

ranking = np.sort(maxOutlierScorePerSystemList)
threshold = ranking[int((1 - contamination) * len(ranking))]
self.threshold = threshold
mask = (maxOutlierScorePerSystemList >= threshold)
ranking[mask] = 1
ranking[np.logical_not(mask)] = 0
for iter in range(len(systemIds)):
transformed_X.append([systemIds[iter], ranking[iter]])


return transformed_X





+ 3
- 0
tods/resources/.entry_points.ini View File

@@ -79,3 +79,6 @@ tods.detection_algorithm.telemanom = tods.detection_algorithm.Telemanom:Telemano
tods.detection_algorithm.Ensemble = tods.detection_algorithm.Ensemble:Ensemble

tods.reinforcement.rule_filter = tods.reinforcement.RuleBasedFilter:RuleBasedFilter


tods.detection_algorithm.system_wise_detection = tods.detection_algorithm.SystemWiseDetection:SystemWiseDetectionPrimitive

Loading…
Cancel
Save