In typical fMRI experiments the researcher creates a sequence of stimuli or events they want to show in advance and then measures the participant's brain activity in response to this pre-specified task. This provides an assessment of the relationship between behavior in the task and BOLD activity. However, although tempting, it is not possible to conclude from this correlation alone that active brain regions are causing the behavior. Such causal inferences have typically required directly intervening to disrupt or enhance the functioning of a brain region (e.g., via stimulation, lesioning, cooling, etc.) and then examining the impact on behavior. Such experimental manipulations of brain function, especially in a regionally specific way, require invasive techniques and so are not possible in healthy humans for ethical reasons.
Real-time fMRI is a step in the direction of (safely) manipulating brain regions, enabling more valid causal inferences about the brain and behavior. Specifically, rather than brain activity being just a dependent variable, it becomes part of the experimental design as a kind of independent variable. For more on real-time fMRI, see here or here.
Take for instance a recent study using real-time fMRI to train attention deBettencourt et al. (2015). In this study, patterns of brain activity were used to make the task easier or harder depending on the participant's attentional state, with the goal of training them to attend better. However, if the brain activity was too noisy or analyzed inappropriately, read out from brain regions that didn't contain information about attentional state, or taken from another participant's brain (as in the control condition), then the participant should not improve.
There are no second chances with real-time fMRI because your analysis is part of data collection. For example, you can't later decide to use a different preprocessing step, analysis algorithm, parameter setting, feedback type, etc. This is why preparation, piloting, and efficient code is critical for real-time fMRI. Think of it as mandatory pre-registration!
There are many different types of real-time fMRI, but here we focus on using the multivariate methods from earlier notebooks to analyze data on the fly and generate feedback for participants in a closed-loop manner.
1. Learn to design a multivariate real-time fMRI experiment
2. Run a real-time fMRI analysis using simulated data
1.1. Data file preparation
1.2. File watcher
1.3. Preprocessing a volume
1.4. Training a model
1.5. Classifying new volumes
1.6. Modifying stimuli for feedback
2. Running a real-time simulation
3. Adaptive real-time experiments
Exercises
The following sequence of steps are necessary for successfully running a real-time fMRI analysis.
Data file preparation: Setup a folder where fMRI volumes will be stored as they are created.
File watcher: A function that looks for a volume to process.
Preprocessing a volume: Preprocess the TR to prepare it for classification.
Training a model: Take the data you have set aside for training and create your classifier model.
Classifying new volumes: Take an epoch of data and classify that, assigning it a label.
Modifying stimuli for feedback The classifier results influence the next stimulus shown to the participant.
Here is an example pipeline from deBettencourt et al. (2015):
import os
import time
import numpy as np # type: ignore
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.linear_model import LogisticRegression # type: ignore
from watchdog.events import PatternMatchingEventHandler # type: ignore
from watchdog.observers import Observer # type: ignore
from queue import Queue
from sklearn import svm
from sklearn import linear_model
import scipy.stats
from IPython import display
import shutil
from utils import results_path
As you know, an fMRI data volume is generated for each TR. The scanner outputs these files one-by-one in DICOM format. We will not cover here how to create a network link between your scanner and your analysis computer, but you can find example instructions for Siemens here. Once transferred to the analysis computer, you will first need to read the DICOM data into a numpy array. The code below will then process the numpy array. One way to read DICOM files into numpy is by using the package dicom-numpy.
For this notebook, we will generate simulated data using the BrainIAK fmrism module. The function generate_data.py simulates a two condition experiment where each condition is blocked for 10s with no gap between events. This function simulates fMRI noise and then inserts signal in two regions corresponding to the two conditions with the appropriate hemodynamic response function.
Self-Study: Investigate the generate_data.py function to learn how it is working and what brain activity distinguishes between conditions.
A:
To simulate real-time data, this function outputs a TR every 2 s and saves it as a numpy matrix in the fmrisim/data/ folder. To run the script, run the cell below.
# Run the generate_data function
if shutil.which('sbatch') is not None:
!sbatch ./13-real-time/run_generate_data.sh
else:
%run ./13-real-time/generate_data.py
Any time you want to simulate the acquisition of data you should delete the contents of this folder (rm 13-real-time/fmrisim/data/*
) and then run it again.
Below we will set the paths to be used. We also want to specify the proportion of trials that will be used for training the model (time until the real-time neurofeedback kicks in).
data_dir = os.path.join(results_path, '13-real-time/data/') # Where is information stored that is needed for generating data
file_pattern = 'rt_{0:0>3}.npy' # What is the pattern of file that wil be created
train_count = 40 # Number of trials that will be used for training.
For real-time fMRI, we need to monitor as every TR comes in. The simple way to do this is to just set up a loop that checks whether the file exists and waits until it does.
# Create a file watching algorithm
def tr_watcher_simple(filename,
sleep_time=0.1, # temporal resolution of the file watcher
):
# While the file doesn't exist, loop and wait
while not os.path.exists(filename):
time.sleep(sleep_time) # How long do you want to wait for
# When the file exists, load it and output it
vol = np.load(filename)
return vol
However, this procedure is inefficient and prone to error. For instance, if a volume is added just after a check, this code waits the sleep time before checking again. Moreover, this procedure can crash since the file names often exist before the file contents are created.
Instead, to process volumes as soon as they are finished being created, a file watcher function continuously polls for new files. Once a new file is found, it is added to a queue and triggers a call to the next processing step. The watchdog package is used for ths purpose.
# Create a file watching algorithm
def tr_watcher(filename, file_queue):
# Does the file exist?
file_exists = os.path.exists(filename)
# While the file doesn't exist, loop and wait
while not file_exists:
# look for file creation event
event = file_queue.get()
# If there is an event then save it
if event.src_path == filename:
file_exists = True
# When the file exists, load it and output it
vol = np.load(filename)
return vol
# Create a class of events for
class file_notify_handler(PatternMatchingEventHandler):
# Initialize the object being created
def __init__(self, queue, file_pattern):
super().__init__(patterns=file_pattern)
self.q = queue
# When an event occurs, put it in the queue
def on_created(self, event):
self.q.put(event)
We are going to check out this file watcher in action. Remember to clear your 13-real-time/fmrisim/data
directory and re-launch run_generate_data.sh. This script will first play catch up and then print every time a new TR comes in until the training set is acquired.
file_observer = Observer()
file_queue = Queue() # type: ignore
# set up the notifications for when a new TR is created
notify_file_pattern = '*.npy'
file_notify = file_notify_handler(file_queue, [notify_file_pattern])
file_observer.schedule(file_notify, data_dir, recursive=False)
file_observer.start()
for idx in range(train_count):
# What file name are you going to load
next_filename = data_dir + file_pattern.format(idx)
vol = tr_watcher(next_filename, file_queue)
# When the file exists, load it and output it
print('Recieved:', next_filename)
file_observer.stop()
As each volume is received, it is necessary to preprocess it prior to analysis. Given the constraint of wanting to do this as close as possible to real-time, you will need to choose steps carefully and decide on the minimal set of steps that provide the most benefit. For example, motion correction, masking, and normalization might be sufficient. The simulator does not generate any motion artifacts so that step is unnecessary in this notebook. Masking is critical because we don't want to feed the model irrelevant features. Normalization in space (as we will do) is easy because each time point can be treated independently; however, normalizing across time is hard since each additional TR will change the mean and SD, and thus the z-values of all preceding TRs. This is a more general consequence of real-time, wherein you know the past but not the future. This also impacts temporal filtering, which must be done with "causal" filters. For this reason, procedures for normalizing and filtering over time are still being developed/refined.
Self-study: Look into normalizing voxels across time. Helpful information is found in StandardScaler. For temporal filtering, compare the filtfilt and lfilter functions in scipy.
# Insert code here
def preprocess_vol(vol, mask):
# Insert code here
# load mask
mask_file = data_dir +'mask.npy'
mask = np.load(mask_file)
# allocate a matrix to hold the TR data, preset to NaNs. Each row is a TR vector
tr_data = np.full((train_count, mask.sum()), np.nan)
# set up the notifications for when a new TR is created
file_observer = Observer()
file_queue = Queue() # type: ignore
notify_file_pattern = '*.npy'
file_notify = file_notify_handler(file_queue, [notify_file_pattern])
file_observer.schedule(file_notify, data_dir, recursive=False)
file_observer.start()
# Cycle through TRs
for idx in range(train_count):
# Start the timer
# What file name are you going to load
next_filename = data_dir + file_pattern.format(idx)
vol = tr_watcher(next_filename, file_queue)
# Store the volume as a preprocessed vector
tr_data[idx, :] = preprocess_vol(vol, mask)
# End the timer
# Print the timing
file_observer.stop()
After we have collected enough volumes we then train our classifier. Like deBettencourt et al. (2015) we will use an L2 regularized logistic regression. Below a function is created to take in time by voxel training data and a list of labels for each volume and then fit a regression function to it.
# Train 2 Logistic Regression models
def train_logistic(training_data,
training_labels,
parameters='l2'):
# Train the model predicting state 2 (so that true means 1 and false means 0)
clf = LogisticRegression(penalty=parameters)
clf.fit(training_data, training_labels == 2)
return clf
A critical step when training the model is shifting the labels. We expect that a stimulus will evoke activity 4-6 s after its onset because of the hemodynamic lag?.
# Load the labels
label_file = data_dir + 'labels.npy'
labels = np.load(label_file)
# How much do you need to shift the labels by in terms of TRs
tr_shift = 3
training_data = tr_data[tr_shift:train_count, :]
training_labels = labels[0:train_count - tr_shift]
# Fit the model
clf = train_logistic(training_data, training_labels[:,0])
We can now visualize this training in order to see what the model has learned.
coefs = np.zeros(mask.shape)
coefs[mask==1]=clf.coef_[0,:]
plt.imshow(coefs[:,:,16])
plt.title('Classifier coeffients for slice through the brain')
plt.colorbar()
Now that we have trained a model, we can classify incoming volumes with the learned weights. Since most classifiers come with the .fit and .predict formulation in scikit-learn, this is very easy to do (after some reshaping). In the case of a logistic regression, the outcome is simply True
or False
. In this case, True
means condition 2 and False
means condition 1.
# Get a new volume
next_filename = data_dir + file_pattern.format(train_count + 1)
vol = tr_watcher(next_filename, file_queue)
# Store the volume as a preprocessed vector
new_data = preprocess_vol(vol, mask)
prediction = clf.predict(new_data.reshape(1, -1))
print('Prediction for TR %d is condition %d and the label is %d' % (train_count + 1, prediction + 1, labels[idx - tr_shift]))
The final step of closed-loop multivariate real-time fMRI is to use the classifier results and provide feedback to the participant in the scanner. This could involve changing the composition of the stimulus, changing the task difficulty, or providing feedback directly via a scale or gauge. Although we won't do this here, such displays can be easily implemented in experiment code from Psychtoolbox or PsychoPy.
Now that we have all parts of the real-time workflow, we can put them together to run a simulated real-time experiment. The function below named realtime
does this.
This takes as an input the fmrisim
directory. It also takes the number of volumes that are used for training and a function that specifies how to run the classifier (e.g., train_logistic
). The final input incremental_batch
will be discussed later.
def realtime(data_dir,
train_count,
clf_obj=train_logistic,
incremental_batch=0,
):
# While the file doesn't exist, loop and wait
label_file = data_dir + 'labels.npy'
while not os.path.exists(label_file):
time.sleep(0.1) # How long do you want to wait for
# load labels
labels = np.load(label_file)
# How much do you need to shift the labels by in terms of TRs
tr_shift = 3
# load mask
mask_file = data_dir +'mask.npy'
mask = np.load(mask_file)
# allocate a matrix to hold the TR data, preset to NaNs. Each row is a TR vector
tr_data = np.full((len(labels), mask.sum()), np.nan)
# Set up the figure
plt.figure() # Set up figure
plt.plot((train_count, train_count), (0, 3), 'g')
plt.xlim((0, len(labels)))
plt.ylim((0, 3))
plt.title('Searching for the first TR')
is_print=0
# set up the notifications for when a new TR is created
file_observer = Observer()
file_queue = Queue() # type: ignore
file_notify = file_notify_handler(file_queue, [notify_file_pattern])
file_observer.schedule(file_notify, data_dir, recursive=False)
file_observer.start()
# Listen for TRs
num_correct = 0 # Preset the number of correct answers to zero
for idx in range(len(labels)):
# What file name are you going to load
next_filename = data_dir + file_pattern.format(idx, '02d')
vol = tr_watcher(next_filename, file_queue)
# Store the volume as a preprocessed vector
tr_data[idx, :] = preprocess_vol(vol, mask)
plt.plot(range(idx), labels[0:idx], 'r-')
# Collect TRs for training
if idx < train_count:
plt.title('TR: %d for training' % idx)
elif idx == train_count: # Is this time to train the classifier
# Train the classifier
trainStart = time.time()
plt.title("Sufficient TRs collected, training the model")
# Train the classifier
clf = clf_obj(tr_data[tr_shift:train_count, :], labels[0:train_count - tr_shift][:,0])
# Report the training duration
print("Completed training in %0.2f sec" % (time.time() - trainStart))
elif idx > train_count: # Is this after training (is it testing)
# Pull out the predictions of the model for this TR
prediction = clf.predict(tr_data[idx, :].reshape(1, -1))
# If it is a boolean (0 or 1) then add 1 to turn it into the labels
if prediction.dtype=='bool':
prediction = prediction + 1
if prediction == labels[idx - tr_shift]:
num_correct += 1
plt.scatter(idx, prediction)
accuracy = num_correct / (idx - train_count)
plt.title('TR: %d; Total accuracy: %0.2f' % (idx + 1, accuracy))
# Do you want to create a new batch for training if doing an incremental fit
if incremental_batch > 0 and np.mod((idx - train_count), incremental_batch) == 0:
# When does this batch start
start_idx = idx - incremental_batch
# Feed in the classifier to be updated with the current batch size
clf = clf_obj(tr_data[start_idx + tr_shift:idx, :], labels[start_idx:idx - tr_shift], clf)
# Mark where the new batch was loaded in
plt.plot((idx, idx), (0, 3), 'k')
# Plot the figure
display.clear_output(wait=True)
display.display(plt.gcf())
plt.xlim((0, len(labels)))
plt.ylim((0, 3))
# Stop listening
file_observer.stop()
# Return the accuracy overall
return accuracy
try:
realtime(data_dir=data_dir,
train_count=train_count,
clf_obj=train_logistic,
incremental_batch=0,
)
except Exception as err:
file_observer.stop()
print("Exception: {}".format(err))
# Insert new classifier object
# Run the realtime function with this new classifier object
Exercise 5: Create a new copy of the realtime
function below and rename it. Then edit it in order to plot the classifier decision evidence rather than the labels. Make sure that you have an appropriate classifier that outputs the decision evidence for each prediction (and re-scale your figure appropriately).
# Insert code below
The goal of real-time fMRI is often to change brain function via neurofeedback. For instance, neural representations might change because the experiment trains participants to use different parts of their brain to process a stimulus. In such cases, a classifier trained at the start of the experiment will be wrong by the end of the experiment because the underlying feature space across voxels in the brain has changed. There are algorithms that can be used in these cases, which allow the model fit to be updated incrementally as new training examples come in. This can also be used to refine your model with more training data even if the underlying features are stable.
This approach to dynamically training classifiers is called incremental or online learning and scikit-learn has a number of classifiers that allow this. Below we specify one classifier with this functionality. First we will run it without incremental updating as a baseline.
Beware: The initialization of these functions can have a dramatic effect on performance, which you can evaluate by using a random seed.
def train_incremental(training_data,
training_labels,
parameters=None,
):
all_classes = np.array([1, 2]) # Need to say all the labels, in case you want to test out of sample
# Get the clf
if parameters is None:
# Create the linear if it hasn't already been passed in by parameters
clf = linear_model.SGDClassifier()
else:
clf = parameters # Pull out the classifier
# Fit the training data (either initializing the clf or updating it)
clf.partial_fit(training_data,
training_labels,
classes=all_classes,
)
return clf
try:
np.random.seed(0)
realtime(data_dir=data_dir,
train_count=train_count,
clf_obj=train_incremental,
incremental_batch=0,
)
except Exception as err:
file_observer.stop()
print("Exception: {}".format(err))
Now that we have run the classifier with static weights, we can try dynamically updating the classifier as we acquire more data. This is accomplished by telling realtime
how often you want to update (here, after every additional 20 TRs). The incremental_batch
variable specifies how many volumes are used in each batch of training.
try:
np.random.seed(0)
realtime(data_dir=data_dir,
train_count=train_count,
clf_obj=train_incremental,
incremental_batch=40,
)
except Exception as err:
file_observer.stop()
print("Exception: {}".format(err))
A:
Now that we have a classifier that can update its weights dynamically, we will perform a simulation in which the responses in the fMRI data change part way through the study. The generate_data.py
script allows this with a few parameter changes.
# Insert code here
# Insert code here
Take this to the next level by doing a parameter search over the different combinations of parameters.
batch_values = range(10, 100, 10)
train_values = range(10, 100, 10)
results = np.zeros((len(batch_values), len(train_values)))
for batch_counter in range(len(batch_values)):
for train_counter in range(len(train_values)):
# Pull out the values
batch_size = batch_values[batch_counter]
train_size = train_values[train_counter]
np.random.seed(0)
result = realtime(data_dir=data_dir,
train_count=train_size,
clf_obj=train_incremental,
incremental_batch=batch_size,
)
# Store the results
results[batch_counter, train_counter] = result
# We can visualize this parameter search by making a heat map
plt.figure(figsize = (12,8))
plt.imshow(results, interpolation = 'none')
plt.xlabel("Incremental batch value")
plt.ylabel("# TRs used for training")
plt.title("Heatmap of SDG classification accuracy with different input parameters")
Novel contribution: Be creative and make one new discovery by adding an analysis, visualization, or optimization.
Here are some ideas:
G. Wallace for providing initial code.
M. Kumar, C. Ellis and N. Turk-Browne produced the initial notebook 04/2018.
G. Wallace enabled execution in non cluster environments.