diff --git a/tods/data_processing/ContructPredictions.py b/tods/data_processing/ContructPredictions.py deleted file mode 100644 index ecc89cf..0000000 --- a/tods/data_processing/ContructPredictions.py +++ /dev/null @@ -1,261 +0,0 @@ -import os -import typing - -from d3m import container, utils as d3m_utils -from d3m.metadata import base as metadata_base, hyperparams -from d3m.primitive_interfaces import base, transformer -from d3m.contrib.primitives import compute_scores - -import common_primitives - -__all__ = ('ConstructPredictionsPrimitive',) - -Inputs = container.DataFrame -Outputs = container.DataFrame - - -class Hyperparams(hyperparams.Hyperparams): - use_columns = hyperparams.Set( - elements=hyperparams.Hyperparameter[int](-1), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column indices to force primitive to operate on. If metadata reconstruction happens, this is used for reference columns." - " If any specified column is not a primary index or a predicted target, it is skipped.", - ) - exclude_columns = hyperparams.Set( - elements=hyperparams.Hyperparameter[int](-1), - default=(), - semantic_types=['https://metadata.datadrivendiscovery.org/types/ControlParameter'], - description="A set of column indices to not operate on. If metadata reconstruction happens, this is used for reference columns. Applicable only if \"use_columns\" is not provided.", - ) - - -class ConstructPredictionsPrimitive(transformer.TransformerPrimitiveBase[Inputs, Outputs, Hyperparams]): - """ - A primitive which takes as input a DataFrame and outputs a DataFrame in Lincoln Labs predictions - format: first column is a d3mIndex column (and other primary index columns, e.g., for object detection - problem), and then predicted targets, each in its column, followed by optional confidence column(s). - - It supports both input columns annotated with semantic types (``https://metadata.datadrivendiscovery.org/types/PrimaryKey``, - ``https://metadata.datadrivendiscovery.org/types/PrimaryMultiKey``, ``https://metadata.datadrivendiscovery.org/types/PredictedTarget``, - ``https://metadata.datadrivendiscovery.org/types/Confidence``), or trying to reconstruct metadata. - This is why the primitive takes also additional input of a reference DataFrame which should - have metadata to help reconstruct missing metadata. If metadata is missing, the primitive - assumes that all ``inputs`` columns are predicted targets, without confidence column(s). - """ - - metadata = metadata_base.PrimitiveMetadata( - { - 'id': '8d38b340-f83f-4877-baaa-162f8e551736', - 'version': '0.3.0', - 'name': "Construct pipeline predictions output", - 'python_path': 'd3m.primitives.tods.data_processing.construct_predictions', - 'source': { - 'name': common_primitives.__author__, - 'contact': 'mailto:mitar.commonprimitives@tnode.com', - 'uris': [ - 'https://gitlab.com/datadrivendiscovery/common-primitives/blob/master/common_primitives/construct_predictions.py', - 'https://gitlab.com/datadrivendiscovery/common-primitives.git', - ], - }, - 'installation': [{ - 'type': metadata_base.PrimitiveInstallationType.PIP, - 'package_uri': 'git+https://gitlab.com/datadrivendiscovery/common-primitives.git@{git_commit}#egg=common_primitives'.format( - git_commit=d3m_utils.current_git_commit(os.path.dirname(__file__)), - ), - }], - 'algorithm_types': [ - metadata_base.PrimitiveAlgorithmType.DATA_CONVERSION, - ], - 'primitive_family': metadata_base.PrimitiveFamily.DATA_TRANSFORMATION, - }, - ) - - def produce(self, *, inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.CallResult[Outputs]: # type: ignore - index_columns = inputs.metadata.get_index_columns() - target_columns = inputs.metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/PredictedTarget',)) - - # Target columns cannot be also index columns. This should not really happen, - # but it could happen with buggy primitives. - target_columns = [target_column for target_column in target_columns if target_column not in index_columns] - - if index_columns and target_columns: - outputs = self._produce_using_semantic_types(inputs, index_columns, target_columns) - else: - outputs = self._produce_reconstruct(inputs, reference, index_columns, target_columns) - - outputs = compute_scores.ComputeScoresPrimitive._encode_columns(outputs) - - # Generally we do not care about column names in DataFrame itself (but use names of columns from metadata), - # but in this case setting column names makes it easier to assure that "to_csv" call produces correct output. - # See: https://gitlab.com/datadrivendiscovery/d3m/issues/147 - column_names = [] - for column_index in range(len(outputs.columns)): - column_names.append(outputs.metadata.query_column(column_index).get('name', outputs.columns[column_index])) - outputs.columns = column_names - - return base.CallResult(outputs) - - def _filter_index_columns(self, inputs_metadata: metadata_base.DataMetadata, index_columns: typing.Sequence[int]) -> typing.Sequence[int]: - if self.hyperparams['use_columns']: - index_columns = [index_column_index for index_column_index in index_columns if index_column_index in self.hyperparams['use_columns']] - if not index_columns: - raise ValueError("No index columns listed in \"use_columns\" hyper-parameter, but index columns are required.") - - else: - index_columns = [index_column_index for index_column_index in index_columns if index_column_index not in self.hyperparams['exclude_columns']] - if not index_columns: - raise ValueError("All index columns listed in \"exclude_columns\" hyper-parameter, but index columns are required.") - - names = [] - for index_column in index_columns: - index_metadata = inputs_metadata.query_column(index_column) - # We do not care about empty strings for names either. - if index_metadata.get('name', None): - names.append(index_metadata['name']) - - if 'd3mIndex' not in names: - raise ValueError("\"d3mIndex\" index column is missing.") - - names_set = set(names) - if len(names) != len(names_set): - duplicate_names = names - for name in names_set: - # Removes just the first occurrence. - duplicate_names.remove(name) - - self.logger.warning("Duplicate names for index columns: %(duplicate_names)s", { - 'duplicate_names': list(set(duplicate_names)), - }) - - return index_columns - - def _get_columns(self, inputs_metadata: metadata_base.DataMetadata, index_columns: typing.Sequence[int], target_columns: typing.Sequence[int]) -> typing.List[int]: - assert index_columns - assert target_columns - - index_columns = self._filter_index_columns(inputs_metadata, index_columns) - - if self.hyperparams['use_columns']: - target_columns = [target_column_index for target_column_index in target_columns if target_column_index in self.hyperparams['use_columns']] - if not target_columns: - raise ValueError("No target columns listed in \"use_columns\" hyper-parameter, but target columns are required.") - - else: - target_columns = [target_column_index for target_column_index in target_columns if target_column_index not in self.hyperparams['exclude_columns']] - if not target_columns: - raise ValueError("All target columns listed in \"exclude_columns\" hyper-parameter, but target columns are required.") - - assert index_columns - assert target_columns - - return list(index_columns) + list(target_columns) - - def _get_confidence_columns(self, inputs_metadata: metadata_base.DataMetadata) -> typing.List[int]: - confidence_columns = inputs_metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/Confidence',)) - - if self.hyperparams['use_columns']: - confidence_columns = [confidence_column_index for confidence_column_index in confidence_columns if confidence_column_index in self.hyperparams['use_columns']] - else: - confidence_columns = [confidence_column_index for confidence_column_index in confidence_columns if confidence_column_index not in self.hyperparams['exclude_columns']] - - return confidence_columns - - def _produce_using_semantic_types(self, inputs: Inputs, index_columns: typing.Sequence[int], - target_columns: typing.Sequence[int]) -> Outputs: - confidence_columns = self._get_confidence_columns(inputs.metadata) - - output_columns = self._get_columns(inputs.metadata, index_columns, target_columns) + confidence_columns - - # "get_index_columns" makes sure that "d3mIndex" is always listed first. - # And "select_columns" selects columns in order listed, which then - # always puts "d3mIndex" first. - outputs = inputs.select_columns(output_columns) - - if confidence_columns: - outputs.metadata = self._update_confidence_columns(outputs.metadata, confidence_columns) - - return outputs - - def _update_confidence_columns(self, inputs_metadata: metadata_base.DataMetadata, confidence_columns: typing.Sequence[int]) -> metadata_base.DataMetadata: - output_columns_length = inputs_metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] - - outputs_metadata = inputs_metadata - - # All confidence columns have to be named "confidence". - for column_index in range(output_columns_length - len(confidence_columns), output_columns_length): - outputs_metadata = outputs_metadata.update((metadata_base.ALL_ELEMENTS, column_index), { - 'name': 'confidence', - }) - - return outputs_metadata - - def _produce_reconstruct(self, inputs: Inputs, reference: Inputs, index_columns: typing.Sequence[int], target_columns: typing.Sequence[int]) -> Outputs: - if not index_columns: - reference_index_columns = reference.metadata.get_index_columns() - - if not reference_index_columns: - raise ValueError("Cannot find an index column in reference data, but index column is required.") - - filtered_index_columns = self._filter_index_columns(reference.metadata, reference_index_columns) - index = reference.select_columns(filtered_index_columns) - else: - filtered_index_columns = self._filter_index_columns(inputs.metadata, index_columns) - index = inputs.select_columns(filtered_index_columns) - - if not target_columns: - if index_columns: - raise ValueError("No target columns in input data, but index column(s) present.") - - # We assume all inputs are targets. - targets = inputs - - # We make sure at least basic metadata is generated correctly, so we regenerate metadata. - targets.metadata = targets.metadata.generate(targets) - - # We set target column names from the reference. We set semantic types. - targets.metadata = self._update_targets_metadata(targets.metadata, self._get_target_names(reference.metadata)) - - else: - targets = inputs.select_columns(target_columns) - - return index.append_columns(targets) - - def multi_produce(self, *, produce_methods: typing.Sequence[str], inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.MultiCallResult: # type: ignore - return self._multi_produce(produce_methods=produce_methods, timeout=timeout, iterations=iterations, inputs=inputs, reference=reference) - - def fit_multi_produce(self, *, produce_methods: typing.Sequence[str], inputs: Inputs, reference: Inputs, timeout: float = None, iterations: int = None) -> base.MultiCallResult: # type: ignore - return self._fit_multi_produce(produce_methods=produce_methods, timeout=timeout, iterations=iterations, inputs=inputs, reference=reference) - - def _get_target_names(self, metadata: metadata_base.DataMetadata) -> typing.List[typing.Union[str, None]]: - target_names = [] - - for column_index in metadata.list_columns_with_semantic_types(('https://metadata.datadrivendiscovery.org/types/TrueTarget',)): - column_metadata = metadata.query((metadata_base.ALL_ELEMENTS, column_index)) - - target_names.append(column_metadata.get('name', None)) - - return target_names - - def _update_targets_metadata(self, metadata: metadata_base.DataMetadata, target_names: typing.Sequence[typing.Union[str, None]]) -> metadata_base.DataMetadata: - targets_length = metadata.query((metadata_base.ALL_ELEMENTS,))['dimension']['length'] - - if targets_length != len(target_names): - raise ValueError("Not an expected number of target columns to apply names for. Expected {target_names}, provided {targets_length}.".format( - target_names=len(target_names), - targets_length=targets_length, - )) - - for column_index, target_name in enumerate(target_names): - metadata = metadata.add_semantic_type((metadata_base.ALL_ELEMENTS, column_index), 'https://metadata.datadrivendiscovery.org/types/Target') - metadata = metadata.add_semantic_type((metadata_base.ALL_ELEMENTS, column_index), 'https://metadata.datadrivendiscovery.org/types/PredictedTarget') - - # We do not have it, let's skip it and hope for the best. - if target_name is None: - continue - - metadata = metadata.update_column(column_index, { - 'name': target_name, - }) - - return metadata