|
|
@@ -1,261 +0,0 @@ |
|
|
import os |
|
|
|
|
|
import typing |
|
|
|
|
|
|
|
|
|
|
|
from d3m import container, utils as d3m_utils |
|
|
|
|
|
from d3m.metadata import base as metadata_base, hyperparams |
|
|
|
|
|
from d3m.primitive_interfaces import base, transformer |
|
|
|
|
|
from d3m.contrib.primitives import compute_scores |
|
|
|
|
|
|
|
|
|
|
|
import common_primitives |
|
|
|
|
|
|
|
|
|
|
|
__all__ = ('ConstructPredictionsPrimitive',) |
|
|
|
|
|
|
|
|
|
|
|
Inputs = container.DataFrame |
|
|
|
|
|
Outputs = container.DataFrame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Hyperparams(hyperparams.Hyperparams): |
|
|
|
|
|
use_columns = hyperparams.Set( |
|
|
|
|
|
elements=hyperparams.Hyperparameter[int](-1), |
|
|
|
|
|
default=(), |
|
|
|
|
|
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], |
|
|
|
|
|
description="A set of column indices to force primitive to operate on. If metadata reconstruction happens, this is used for reference columns." |
|
|
|
|
|
" If any specified column is not a primary index or a predicted target, it is skipped.", |
|
|
|
|
|
) |
|
|
|
|
|
exclude_columns = hyperparams.Set( |
|
|
|
|
|
elements=hyperparams.Hyperparameter[int](-1), |
|
|
|
|
|
default=(), |
|
|
|
|
|
semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], |
|
|
|
|
|
description="A set of column indices to not operate on. If metadata reconstruction happens, this is used for reference columns. Applicable only if \"use_columns\" is not provided.", |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConstructPredictionsPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): |
|
|
|
|
|
""" |
|
|
|
|
|
A primitive which takes as input a DataFrame and outputs a DataFrame in Lincoln Labs predictions |
|
|
|
|
|
format: first column is a d3mIndex column (and other primary index columns, e.g., for object detection |
|
|
|
|
|
problem), and then predicted targets, each in its column, followed by optional confidence column(s). |
|
|
|
|
|
|
|
|
|
|
|
It supports both input columns annotated with semantic types (``https://metadata.datadrivendiscovery.org/types/PrimaryKey``, |
|
|
|
|
|
``https://metadata.datadrivendiscovery.org/types/PrimaryMultiKey``, ``https://metadata.datadrivendiscovery.org/types/PredictedTarget``, |
|
|
|
|
|
``https://metadata.datadrivendiscovery.org/types/Confidence``), or trying to reconstruct metadata. |
|
|
|
|
|
This is why the primitive takes also additional input of a reference DataFrame which should |
|
|
|
|
|
have metadata to help reconstruct missing metadata. If metadata is missing, the primitive |
|
|
|
|
|
assumes that all ``inputs`` columns are predicted targets, without confidence column(s). |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
metadata = metadata_base.PrimitiveMetadata( |
|
|
|
|
|
{ |
|
|
|
|
|
'id': '8d38b340-f83f-4877-baaa-162f8e551736', |
|
|
|
|
|
'version': '0.3.0', |
|
|
|
|
|
'name': "Construct pipeline predictions output", |
|
|
|
|
|
'python_path': 'd3m.primitives.tods.data_processing.construct_predictions', |
|
|
|
|
|
'source': { |
|
|
|
|
|
'name': common_primitives.__author__, |
|
|
|
|
|
'contact': 'mailto:mitar.commonprimitives@tnode.com', |
|
|
|
|
|
'uris': [ |
|
|
|
|
|
'https://gitlab.com/datadrivendiscovery/common-primitives/blob/master/common_primitives/construct_predictions.py', |
|
|
|
|
|
'https://gitlab.com/datadrivendiscovery/common-primitives.git', |
|
|
|
|
|
], |
|
|
|
|
|
}, |
|
|
|
|
|
'installation': [{ |
|
|
|
|
|
'type': metadata_base.PrimitiveInstallationType.PIP, |
|
|
|
|
|
'package_uri': 'git+https://gitlab.com/datadrivendiscovery/common-primitives.git@{git_commit}#egg=common_primitives'.format( |
|
|
|
|
|
git_commit=d3m_utils.current_git_commit(os.path.dirname(__file__)), |
|
|
|
|
|
), |
|
|
|
|
|
}], |
|
|
|
|
|
'algorithm_types': [ |
|
|
|
|
|
metadata_base.PrimitiveAlgorithmType.DATA_CONVERSION, |
|
|
|
|
|
], |
|
|
|
|
|
'primitive_family': metadata_base.PrimitiveFamily.DATA_TRANSFORMATION, |
|
|
|
|
|
}, |
|
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def produce(self, *, inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: # type: ignore |
|
|
|
|
|
index_columns = inputs.metadata.get_index_columns() |
|
|
|
|
|
target_columns = inputs.metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/PredictedTarget',)) |
|
|
|
|
|
|
|
|
|
|
|
# Target columns cannot be also index columns. This should not really happen, |
|
|
|
|
|
# but it could happen with buggy primitives. |
|
|
|
|
|
target_columns = [target_column for target_column in target_columns if target_column not in index_columns] |
|
|
|
|
|
|
|
|
|
|
|
if index_columns and target_columns: |
|
|
|
|
|
outputs = self._produce_using_semantic_types(inputs, index_columns, target_columns) |
|
|
|
|
|
else: |
|
|
|
|
|
outputs = self._produce_reconstruct(inputs, reference, index_columns, target_columns) |
|
|
|
|
|
|
|
|
|
|
|
outputs = compute_scores.ComputeScoresPrimitive._encode_columns(outputs) |
|
|
|
|
|
|
|
|
|
|
|
# Generally we do not care about column names in DataFrame itself (but use names of columns from metadata), |
|
|
|
|
|
# but in this case setting column names makes it easier to assure that "to_csv" call produces correct output. |
|
|
|
|
|
# See: https://gitlab.com/datadrivendiscovery/d3m/issues/147 |
|
|
|
|
|
column_names = [] |
|
|
|
|
|
for column_index in range(len(outputs.columns)): |
|
|
|
|
|
column_names.append(outputs.metadata.query_column(column_index).get('name', outputs.columns[column_index])) |
|
|
|
|
|
outputs.columns = column_names |
|
|
|
|
|
|
|
|
|
|
|
return base.CallResult(outputs) |
|
|
|
|
|
|
|
|
|
|
|
def _filter_index_columns(self, inputs_metadata: metadata_base.DataMetadata, index_columns: typing.Sequence[int]) -> typing.Sequence[int]: |
|
|
|
|
|
if self.hyperparams['use_columns']: |
|
|
|
|
|
index_columns = [index_column_index for index_column_index in index_columns if index_column_index in self.hyperparams['use_columns']] |
|
|
|
|
|
if not index_columns: |
|
|
|
|
|
raise ValueError("No index columns listed in \"use_columns\" hyper-parameter, but index columns are required.") |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
index_columns = [index_column_index for index_column_index in index_columns if index_column_index not in self.hyperparams['exclude_columns']] |
|
|
|
|
|
if not index_columns: |
|
|
|
|
|
raise ValueError("All index columns listed in \"exclude_columns\" hyper-parameter, but index columns are required.") |
|
|
|
|
|
|
|
|
|
|
|
names = [] |
|
|
|
|
|
for index_column in index_columns: |
|
|
|
|
|
index_metadata = inputs_metadata.query_column(index_column) |
|
|
|
|
|
# We do not care about empty strings for names either. |
|
|
|
|
|
if index_metadata.get('name', None): |
|
|
|
|
|
names.append(index_metadata['name']) |
|
|
|
|
|
|
|
|
|
|
|
if 'd3mIndex' not in names: |
|
|
|
|
|
raise ValueError("\"d3mIndex\" index column is missing.") |
|
|
|
|
|
|
|
|
|
|
|
names_set = set(names) |
|
|
|
|
|
if len(names) != len(names_set): |
|
|
|
|
|
duplicate_names = names |
|
|
|
|
|
for name in names_set: |
|
|
|
|
|
# Removes just the first occurrence. |
|
|
|
|
|
duplicate_names.remove(name) |
|
|
|
|
|
|
|
|
|
|
|
self.logger.warning("Duplicate names for index columns: %(duplicate_names)s", { |
|
|
|
|
|
'duplicate_names': list(set(duplicate_names)), |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return index_columns |
|
|
|
|
|
|
|
|
|
|
|
def _get_columns(self, inputs_metadata: metadata_base.DataMetadata, index_columns: typing.Sequence[int], target_columns: typing.Sequence[int]) -> typing.List[int]: |
|
|
|
|
|
assert index_columns |
|
|
|
|
|
assert target_columns |
|
|
|
|
|
|
|
|
|
|
|
index_columns = self._filter_index_columns(inputs_metadata, index_columns) |
|
|
|
|
|
|
|
|
|
|
|
if self.hyperparams['use_columns']: |
|
|
|
|
|
target_columns = [target_column_index for target_column_index in target_columns if target_column_index in self.hyperparams['use_columns']] |
|
|
|
|
|
if not target_columns: |
|
|
|
|
|
raise ValueError("No target columns listed in \"use_columns\" hyper-parameter, but target columns are required.") |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
target_columns = [target_column_index for target_column_index in target_columns if target_column_index not in self.hyperparams['exclude_columns']] |
|
|
|
|
|
if not target_columns: |
|
|
|
|
|
raise ValueError("All target columns listed in \"exclude_columns\" hyper-parameter, but target columns are required.") |
|
|
|
|
|
|
|
|
|
|
|
assert index_columns |
|
|
|
|
|
assert target_columns |
|
|
|
|
|
|
|
|
|
|
|
return list(index_columns) + list(target_columns) |
|
|
|
|
|
|
|
|
|
|
|
def _get_confidence_columns(self, inputs_metadata: metadata_base.DataMetadata) -> typing.List[int]: |
|
|
|
|
|
confidence_columns = inputs_metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/Confidence',)) |
|
|
|
|
|
|
|
|
|
|
|
if self.hyperparams['use_columns']: |
|
|
|
|
|
confidence_columns = [confidence_column_index for confidence_column_index in confidence_columns if confidence_column_index in self.hyperparams['use_columns']] |
|
|
|
|
|
else: |
|
|
|
|
|
confidence_columns = [confidence_column_index for confidence_column_index in confidence_columns if confidence_column_index not in self.hyperparams['exclude_columns']] |
|
|
|
|
|
|
|
|
|
|
|
return confidence_columns |
|
|
|
|
|
|
|
|
|
|
|
def _produce_using_semantic_types(self, inputs: Inputs, index_columns: typing.Sequence[int], |
|
|
|
|
|
target_columns: typing.Sequence[int]) -> Outputs: |
|
|
|
|
|
confidence_columns = self._get_confidence_columns(inputs.metadata) |
|
|
|
|
|
|
|
|
|
|
|
output_columns = self._get_columns(inputs.metadata, index_columns, target_columns) + confidence_columns |
|
|
|
|
|
|
|
|
|
|
|
# "get_index_columns" makes sure that "d3mIndex" is always listed first. |
|
|
|
|
|
# And "select_columns" selects columns in order listed, which then |
|
|
|
|
|
# always puts "d3mIndex" first. |
|
|
|
|
|
outputs = inputs.select_columns(output_columns) |
|
|
|
|
|
|
|
|
|
|
|
if confidence_columns: |
|
|
|
|
|
outputs.metadata = self._update_confidence_columns(outputs.metadata, confidence_columns) |
|
|
|
|
|
|
|
|
|
|
|
return outputs |
|
|
|
|
|
|
|
|
|
|
|
def _update_confidence_columns(self, inputs_metadata: metadata_base.DataMetadata, confidence_columns: typing.Sequence[int]) -> metadata_base.DataMetadata: |
|
|
|
|
|
output_columns_length = inputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] |
|
|
|
|
|
|
|
|
|
|
|
outputs_metadata = inputs_metadata |
|
|
|
|
|
|
|
|
|
|
|
# All confidence columns have to be named "confidence". |
|
|
|
|
|
for column_index in range(output_columns_length - len(confidence_columns), output_columns_length): |
|
|
|
|
|
outputs_metadata = outputs_metadata.update((metadata_base.ALL_ELEMENTS, column_index), { |
|
|
|
|
|
'name': 'confidence', |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return outputs_metadata |
|
|
|
|
|
|
|
|
|
|
|
def _produce_reconstruct(self, inputs: Inputs, reference: Inputs, index_columns: typing.Sequence[int], target_columns: typing.Sequence[int]) -> Outputs: |
|
|
|
|
|
if not index_columns: |
|
|
|
|
|
reference_index_columns = reference.metadata.get_index_columns() |
|
|
|
|
|
|
|
|
|
|
|
if not reference_index_columns: |
|
|
|
|
|
raise ValueError("Cannot find an index column in reference data, but index column is required.") |
|
|
|
|
|
|
|
|
|
|
|
filtered_index_columns = self._filter_index_columns(reference.metadata, reference_index_columns) |
|
|
|
|
|
index = reference.select_columns(filtered_index_columns) |
|
|
|
|
|
else: |
|
|
|
|
|
filtered_index_columns = self._filter_index_columns(inputs.metadata, index_columns) |
|
|
|
|
|
index = inputs.select_columns(filtered_index_columns) |
|
|
|
|
|
|
|
|
|
|
|
if not target_columns: |
|
|
|
|
|
if index_columns: |
|
|
|
|
|
raise ValueError("No target columns in input data, but index column(s) present.") |
|
|
|
|
|
|
|
|
|
|
|
# We assume all inputs are targets. |
|
|
|
|
|
targets = inputs |
|
|
|
|
|
|
|
|
|
|
|
# We make sure at least basic metadata is generated correctly, so we regenerate metadata. |
|
|
|
|
|
targets.metadata = targets.metadata.generate(targets) |
|
|
|
|
|
|
|
|
|
|
|
# We set target column names from the reference. We set semantic types. |
|
|
|
|
|
targets.metadata = self._update_targets_metadata(targets.metadata, self._get_target_names(reference.metadata)) |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
targets = inputs.select_columns(target_columns) |
|
|
|
|
|
|
|
|
|
|
|
return index.append_columns(targets) |
|
|
|
|
|
|
|
|
|
|
|
def multi_produce(self, *, produce_methods: typing.Sequence[str], inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.MultiCallResult: # type: ignore |
|
|
|
|
|
return self._multi_produce(produce_methods=produce_methods, timeout=timeout, iterations=iterations, inputs=inputs, reference=reference) |
|
|
|
|
|
|
|
|
|
|
|
def fit_multi_produce(self, *, produce_methods: typing.Sequence[str], inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.MultiCallResult: # type: ignore |
|
|
|
|
|
return self._fit_multi_produce(produce_methods=produce_methods, timeout=timeout, iterations=iterations, inputs=inputs, reference=reference) |
|
|
|
|
|
|
|
|
|
|
|
def _get_target_names(self, metadata: metadata_base.DataMetadata) -> typing.List[typing.Union[str, None]]: |
|
|
|
|
|
target_names = [] |
|
|
|
|
|
|
|
|
|
|
|
for column_index in metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/TrueTarget',)): |
|
|
|
|
|
column_metadata = metadata.query((metadata_base.ALL_ELEMENTS, column_index)) |
|
|
|
|
|
|
|
|
|
|
|
target_names.append(column_metadata.get('name', None)) |
|
|
|
|
|
|
|
|
|
|
|
return target_names |
|
|
|
|
|
|
|
|
|
|
|
def _update_targets_metadata(self, metadata: metadata_base.DataMetadata, target_names: typing.Sequence[typing.Union[str, None]]) -> metadata_base.DataMetadata: |
|
|
|
|
|
targets_length = metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] |
|
|
|
|
|
|
|
|
|
|
|
if targets_length != len(target_names): |
|
|
|
|
|
raise ValueError("Not an expected number of target columns to apply names for. Expected {target_names}, provided {targets_length}.".format( |
|
|
|
|
|
target_names=len(target_names), |
|
|
|
|
|
targets_length=targets_length, |
|
|
|
|
|
)) |
|
|
|
|
|
|
|
|
|
|
|
for column_index, target_name in enumerate(target_names): |
|
|
|
|
|
metadata = metadata.add_semantic_type((metadata_base.ALL_ELEMENTS, column_index), 'https://metadata.datadrivendiscovery.org/types/Target') |
|
|
|
|
|
metadata = metadata.add_semantic_type((metadata_base.ALL_ELEMENTS, column_index), 'https://metadata.datadrivendiscovery.org/types/PredictedTarget') |
|
|
|
|
|
|
|
|
|
|
|
# We do not have it, let's skip it and hope for the best. |
|
|
|
|
|
if target_name is None: |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
metadata = metadata.update_column(column_index, { |
|
|
|
|
|
'name': target_name, |
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
return metadata |
|
|
|