Source code for dicom2nifti.convert_ge

# -*- coding: utf-8 -*-
"""
dicom2nifti

@author: abrys
"""
from dicom2nifti.exceptions import ConversionError

import itertools
import os
from math import pow

import logging
import nibabel
import numpy

from pydicom.tag import Tag

import dicom2nifti.common as common
import dicom2nifti.convert_generic as convert_generic

logger = logging.getLogger(__name__)


[docs]def dicom_to_nifti(dicom_input, output_file=None): """ This is the main dicom to nifti conversion fuction for ge images. As input ge images are required. It will then determine the type of images and do the correct conversion Examples: See unit test :param output_file: the filepath to the output nifti file :param dicom_input: list with dicom objects """ assert common.is_ge(dicom_input) # remove duplicate slices based on position and data dicom_input = convert_generic.remove_duplicate_slices(dicom_input) # remove localizers based on image type dicom_input = convert_generic.remove_localizers_by_imagetype(dicom_input) # remove_localizers based on image orientation (only valid if slicecount is validated) dicom_input = convert_generic.remove_localizers_by_orientation(dicom_input) logger.info('Reading and sorting dicom files') grouped_dicoms = _get_grouped_dicoms(dicom_input) if _is_4d(grouped_dicoms): logger.info('Found sequence type: 4D') return _4d_to_nifti(grouped_dicoms, output_file) logger.info('Assuming anatomical data') return convert_generic.dicom_to_nifti(dicom_input, output_file)
def _is_4d(grouped_dicoms): """ Use this function to detect if a dicom series is a ge 4d dataset NOTE: Only the first slice will be checked so you can only provide an already sorted dicom directory (containing one series) """ # read dicom header header = grouped_dicoms[0][0] # check if contains multiple stacks if len(grouped_dicoms) > 1: return True return False def _is_diffusion_imaging(grouped_dicoms): """ Use this function to detect if a dicom series is a ge dti dataset NOTE: We already assume this is a 4D dataset """ # we already assume 4D images as input # check if contains dti bval information bval_tag = Tag(0x0043, 0x1039) # put this there as this is a slow step and used a lot found_bval = False for header in list(itertools.chain.from_iterable(grouped_dicoms)): if bval_tag in header and int(header[bval_tag].value[0]) != 0: found_bval = True break if not found_bval: return False return True def _4d_to_nifti(grouped_dicoms, output_file): """ This function will convert ge 4d series to a nifti """ # Create mosaic block logger.info('Creating data block') full_block = _get_full_block(grouped_dicoms) logger.info('Creating affine') # Create the nifti header info affine, slice_increment = common.create_affine(grouped_dicoms[0]) logger.info('Creating nifti') # Convert to nifti nii_image = nibabel.Nifti1Image(full_block, affine) common.set_tr_te(nii_image, float(grouped_dicoms[0][0].RepetitionTime), float(grouped_dicoms[0][0].EchoTime)) logger.info('Saving nifti to disk %s' % output_file) # Save to disk if output_file is not None: nii_image.header.set_slope_inter(1, 0) nii_image.header.set_xyzt_units(2) # set units for xyz (leave t as unknown) nii_image.to_filename(output_file) if _is_diffusion_imaging(grouped_dicoms): bval_file = None bvec_file = None # Create the bval en bevec files if output_file is not None: base_path = os.path.dirname(output_file) base_name = os.path.splitext(os.path.splitext(os.path.basename(output_file))[0])[0] logger.info('Creating bval en bvec files') bval_file = '%s/%s.bval' % (base_path, base_name) bvec_file = '%s/%s.bvec' % (base_path, base_name) bval, bvec = _create_bvals_bvecs(grouped_dicoms, bval_file, bvec_file) return {'NII_FILE': output_file, 'BVAL_FILE': bval_file, 'BVEC_FILE': bvec_file, 'NII': nii_image, 'BVAL': bval, 'BVEC': bvec, 'MAX_SLICE_INCREMENT': slice_increment } return {'NII_FILE': output_file, 'NII': nii_image} def _get_full_block(grouped_dicoms): """ Generate a full datablock containing all timepoints """ # For each slice / mosaic create a data volume block data_blocks = [] for index in range(0, len(grouped_dicoms)): logger.info('Creating block %s of %s' % (index + 1, len(grouped_dicoms))) data_blocks.append(_timepoint_to_block(grouped_dicoms[index])) # Add the data_blocks together to one 4d block size_x = numpy.shape(data_blocks[0])[0] size_y = numpy.shape(data_blocks[0])[1] size_z = numpy.shape(data_blocks[0])[2] size_t = len(data_blocks) full_block = numpy.zeros((size_x, size_y, size_z, size_t), dtype=data_blocks[0].dtype) for index in range(0, size_t): if full_block[:, :, :, index].shape != data_blocks[index].shape: logger.warning('Missing slices (slice count mismatch between timepoint %s and %s)' % (index - 1, index)) logger.warning('---------------------------------------------------------') logger.warning(full_block[:, :, :, index].shape) logger.warning(data_blocks[index].shape) logger.warning('---------------------------------------------------------') raise ConversionError("MISSING_DICOM_FILES") full_block[:, :, :, index] = data_blocks[index] return full_block def _timepoint_to_block(timepoint_dicoms): """ Convert slices to a block of data by reading the headers and appending """ # similar way of getting the block to anatomical however here we are creating the dicom series our selves return common.get_volume_pixeldata(timepoint_dicoms) def _get_grouped_dicoms(dicom_input): """ Search all dicoms in the dicom directory, sort and validate them fast_read = True will only read the headers not the data """ # Order all dicom files by InstanceNumber dicoms = sorted(dicom_input, key=lambda x: x.InstanceNumber) # now group per stack grouped_dicoms = [[]] # list with first element a list stack_index = 0 # loop over all sorted dicoms and sort them by stack # for this we use the position and direction of the slices so we can detect a new stack easily previous_position = None previous_direction = None for dicom_ in dicoms: current_direction = None # if the stack number decreases we moved to the next stack if previous_position is not None: current_direction = numpy.array(dicom_.ImagePositionPatient) - previous_position current_direction = current_direction / numpy.linalg.norm(current_direction) if current_direction is not None and \ previous_direction is not None and \ not numpy.allclose(current_direction, previous_direction, rtol=0.05, atol=0.05): previous_position = numpy.array(dicom_.ImagePositionPatient) previous_direction = None stack_index += 1 else: previous_position = numpy.array(dicom_.ImagePositionPatient) previous_direction = current_direction if stack_index >= len(grouped_dicoms): grouped_dicoms.append([]) grouped_dicoms[stack_index].append(dicom_) return grouped_dicoms def _get_bvals_bvecs(grouped_dicoms): """ Write the bvals from the sorted dicom files to a bval file """ # loop over all timepoints and create a list with all bvals and bvecs bvals = numpy.zeros([len(grouped_dicoms)], dtype=numpy.int32) bvecs = numpy.zeros([len(grouped_dicoms), 3]) for group_index in range(0, len(grouped_dicoms)): dicom_ = grouped_dicoms[group_index][0] # 0019:10bb: Diffusion X # 0019:10bc: Diffusion Y # 0019:10bd: Diffusion Z # 0043:1039: B-values (4 values, 1st value is actual B value) # bval can be stored both in string as number format in dicom so implement both # some workarounds needed for implicit transfer syntax to work if isinstance(dicom_[Tag(0x0043, 0x1039)].value, str): # this works for python2.7 original_bval = float(dicom_[Tag(0x0043, 0x1039)].value.split('\\')[0]) elif isinstance(dicom_[Tag(0x0043, 0x1039)].value, bytes): # this works for python3.o original_bval = float(dicom_[Tag(0x0043, 0x1039)].value.decode("utf-8").split('\\')[0]) else: original_bval = dicom_[Tag(0x0043, 0x1039)][0] original_bvec = numpy.array([0, 0, 0], dtype=numpy.float) original_bvec[0] = -float(dicom_[Tag(0x0019, 0x10bb)].value) # invert based upon mricron output original_bvec[1] = float(dicom_[Tag(0x0019, 0x10bc)].value) original_bvec[2] = float(dicom_[Tag(0x0019, 0x10bd)].value) # Add calculated B Value if original_bval != 0: # only normalize if there is a value corrected_bval = original_bval * pow(numpy.linalg.norm(original_bvec), 2) if numpy.linalg.norm(original_bvec) != 0: normalized_bvec = original_bvec / numpy.linalg.norm(original_bvec) else: normalized_bvec = original_bvec else: corrected_bval = original_bval normalized_bvec = original_bvec bvals[group_index] = int(round(corrected_bval)) # we want the original numbers back as in the protocol bvecs[group_index, :] = normalized_bvec return bvals, bvecs def _create_bvals_bvecs(grouped_dicoms, bval_file, bvec_file): """ Write the bvals from the sorted dicom files to a bval file """ # get the bvals and bvecs bvals, bvecs = _get_bvals_bvecs(grouped_dicoms) # save the found bvecs to the file common.write_bval_file(bvals, bval_file) common.write_bvec_file(bvecs, bvec_file) return bvals, bvecs