Work with realm in multiple threads


#1

To work in multiple thread I am creating a new instance of realm within the parse function where it is needed, but assuming I use 30-40 parseFunction within my sync call, and all of them should use realm, is it efficient to do it like this example below?

I already trying to initialize realm in the synchProcessQueue and pass it to the parse functions as a parameter but I received errors such as “Realm accessed from different thread…”

let synchQueue = DispatchQueue(label: "com.dmapp.synchQueue", qos: .default, attributes: .concurrent)

let synchProcessQueue = DispatchQueue(label: "com.dmapp.processQueue", qos: .default, attributes: .concurrent)

class Sync {

  func synchLatest() {
    while(someconditions) {

      // grpc call

      var table1DispatchWorkItem: DispatchWorkItem?
      var table2DispatchWorkItem: DispatchWorkItem?

      if grpccall.success == 1 {
        // now that I received all info, and populated realm local db, I can manipulate realm db tables that related
        // to some data that I needed to make sure existed. That's why I am using DispatchWorkItem now
        synchProcessQueue.sync {
          table1DispatchWorkItem?.perform()
          table1DispatchWorkItem = nil
          table2DispatchWorkItem?.perform()
          table2DispatchWorkItem = nil
        }

        if grpccall.needAgain {
          synchLatest()
        } else {
          DispatchQueue.main.async {
            let realm = try! Realm()
            realm.refresh() // not sure if needed
            // update UI
          }
        }
      }

      synchQueue.async {
            ...
            let response = try grpcCall.receive()
            ...
            synchProcessQueue.async {
                ....
                if response.data == "table1" {
                  table1DispatchWorkItem = DispatchWorkItem {
                    parseTable1(response.data.table1)
                  }
                } else if response.data == "table5" {
                  parseTable5(response.data.table5)
                } else if response.data == "table6" {
                  parseTable6(response.data.table6)
                }
                ....
            }
      }
    }
  }

  func parseTable1(_ table1: [Grpc_Response_DataMsg_Table1]) {
    let realm = try! Realm()

    try! realm.write{
        ....
        ...
        ....
    }
  }

  func parseTable2(_ table1: [Grpc_Response_DataMsg_Table2]) {
    let realm = try! Realm()

    try! realm.write{
        ....
        ...
        ....
    }
  }

}

#2

Why dont you use a dedicated realm thread to do all the realm related work?
Its not the best to use the main thread, it could be blocked.

Also, never use let realm = try! Realm() in a production code, always catch the error on opening a realm.

The “Realm accessed from different thread” issue will remain if you try to change a managed objects thread. You can fix that with Threadsafe pointers.


#3

I am a bit confused with the queues I am working and realm instance. I keep getting the error that I access realm from a different thread. And I have issue figuring out what to do. I have matured the code to this kind of thing:

let serialQueue = DispatchQueue(label: "com.app.sync.serialQueue", qos: .default)
let serialProcessQueue = DispatchQueue(label: "com.app.sync.serialProcessQueue", qos: .default)

class Sync {
    static private var realm: Realm!
    
    static func sync(_ view: UIView?, showLoading: Bool = false, showNotifications: Bool = true, callback: @escaping ((Bool) -> Void)) {
        var progressBar: UIView? = nil
        if view != nil &&  showLoading {
            progressBar = showProgressBar(view!)
        }
        
        serialQueue.async {
            do {
                let service = Apiupdate_ApiUpdateServiceClient(address: API_UPDATE_ADDRESS, secure: secure, arguments: [.maxReceiveMessageLength(-1)])
                
                try? service.metadata.add(key: "token", value: token)
                try? service.metadata.add(key: "user_token", value: user_token)
                service.channel.timeout = 180

                var request = Apiupdate_Request()

                let getUpdateCall = try service.getUpdate(request) { _ in
                }
                
                var table3DispatchWorkItem: DispatchWorkItem?
                var table4DispatchWorkItem: DispatchWorkItem?
                                
                serialProcessQueue.async {
                    realm = try! Realm()
                    realm.beginWrite()
                }
                
                while true {
                    guard let response = try getUpdateCall.receive() else {
                        break
                    }
                    
                    if response.errorCode > 0 {                        
                        serialProcessQueue.async {
                            if realm.isInWriteTransaction {
                                realm.cancelWrite()
                                realm = nil
                            }
                        }
                        
                        DispatchQueue.main.async {
                            callback(false)
                            
                            progressBar?.hide(animated: true)
                            let realm = try! Realm()
                            
                            if response.errorCode == 123 {
                                if CURRENT_USER != nil {
                                    try! realm.write {
                                        CURRENT_USER!.token = ""
                                    }
                                }
                                let appDelegate = UIApplication.shared.delegate as! AppDelegate
                                appDelegate.showUser()
                            }
                        }
                        return
                    }
                    
                    if response.success == 1 && response.timestamp != "" {
                        
                        if table3DispatchWorkItem != nil {
                            serialProcessQueue.async(execute: table3DispatchWorkItem!)
                        }
                        if table4DispatchWorkItem != nil {
                            serialProcessQueue.async(execute: table4DispatchWorkItem!)
                        }
                        
                        table3DispatchWorkItem = nil
                        table4DispatchWorkItem = nil
                                                
                        serialProcessQueue.async {
                            if realm.isInWriteTransaction {
                                try? realm.commitWrite()
                                realm = nil
                            }
                            if !response.last {
                                DispatchQueue.main.async {
                                    sync(view, showLoading: false, callback: callback)
                                }
                            }
                        }
                        
                        if response.last {
                            DispatchQueue.main.async {
                                let realm = try! Realm()
                                realm.refresh()
                                callback(true)
                                progressBar.hide(animated: true)
                            }
                            return
                        }
                    }
                    
                    // MARK: - PROCESS QUEUE
                    serialProcessQueue.async {                        
                        do {
                            
                            // Processing response
                            switch response.tableName {
                            ....
                            case "table1": parseTable1(response.data.table1); break
                            case "table2": parseTable2(response.data.table2); break
                            case "table3":
                                table3DispatchWorkItem = DispatchWorkItem {
                                    parseTable3(response.data.table3)
                                }
                                break
                            case "table4":
                                table4DispatchWorkItem = DispatchWorkItem {
                                    parseTable4(response.data.table4)
                                }
                                break
                            default: break
                            }
                        } catch {
                        }
                    }
                }
            } catch {
                print("Error: \(error)")
            }
            
            
        }
    }

    func parseTable1(_ table1: [Grpc_Response_DataMsg_Table1]) {
      for headerType in table1.headerTypes {
        var headerTypeObj: HeaderType!
        if let realmObj = realm.object(ofType: HeaderType.self, forPrimaryKey: Int(headerType.id)) {
            headerTypeObj = realmObj
        } else {
            headerTypeObj = HeaderType()
            headerTypeObj.id = Int(headerType.id)
        }
        
        headerTypeObj.name = headerType.name

        realm.add(headerTypeObj, update: true)
      }
    }

I read that using a serialQueue will make sure that my work is processed in serie so one task after the other, but does not guarantee that they will be processed on the same queue. Hence I think that is the reason I keep getting that real is accessed from a different thread.

This is the log error output:

*** Terminating app due to uncaught exception 'RLMException', reason: 'Realm accessed from incorrect thread.'
*** First throw call stack:
(
	0   CoreFoundation                      0x0000000112ff86fb __exceptionPreprocess + 331
	1   libobjc.A.dylib                     0x0000000111e55ac5 objc_exception_throw + 48
	2   Realm                               0x000000010b699cc3 -[RLMRealm cancelWriteTransaction] + 147
	3   Realm                               0x000000010b69b199 -[RLMRealm dealloc] + 441
	4   libobjc.A.dylib                     0x0000000111e6872c _ZN11objc_object17sidetable_releaseEb + 202
	5   RealmSwift                          0x000000010d02b8ab $s10RealmSwift0A0Cfd + 315
	6   RealmSwift                          0x000000010d02ba09 $s10RealmSwift0A0CfD + 265
	7   libswiftCore.dylib                  0x0000000113cdd870 _swift_release_dealloc + 16
	8   App                       0x0000000101ade979 $s13App4SyncC15sync_11showLoading0G13Notifications8callbackySo6UIViewCSg_S2bySbctFZyycfU_yycfU3_ + 2617
	9   App                       0x0000000101c3e21e $s13App4SyncC15sync_11showLoading0G13Notifications8callbackySo6UIViewCSg_S2bySbctFZyycfU_yycfU3_TA + 46
	10  App                       0x00000001018a955e $sIeg_IeyB_TR + 142
	11  libclang_rt.asan_iossim_dynamic.dylib 0x0000000105b2d4b6 __wrap_dispatch_async_block_invoke + 214
	12  libdispatch.dylib                   0x00000001146bfd7f _dispatch_call_block_and_release + 12
	13  libdispatch.dylib                   0x00000001146c0db5 _dispatch_client_callout + 8
	14  libdispatch.dylib                   0x00000001146c8225 _dispatch_lane_serial_drain + 778
	15  libdispatch.dylib                   0x00000001146c8e9c _dispatch_lane_invoke + 425
	16  libdispatch.dylib                   0x00000001146d2ea3 _dispatch_workloop_worker_thread + 733
	17  libsystem_pthread.dylib             0x0000000114aa9611 _pthread_wqthread + 421
	18  libsystem_pthread.dylib             0x0000000114aa93fd start_wqthread + 13
)
libc++abi.dylib: terminating with uncaught exception of type NSException
(Recorded stack frame)

How do I get it to work?
With the code above I am trying to get chunkes of response from a call with GRPC Swift, and on every chunk I call the same method to fetch the other chunk with another timestamp which I omitted in the code as not relevant. At the end of every chunk when I receive the response, I process the table that needs to be processed (processed as for “stored in realm”) in the end, because they depend on a precondition that the other table that are not in DispatchWorkItem have been processed and stored in realm already. Than at the end, on the last chunk process I call the callback to wherever this static function sync(...) has been called and continue doing UI things.

But I would like to be in the same transaction, so in case I receive an error from the backend during the processing of a chunk, I cancel the transaction instead of writing half of the data for that chunk.


#4

Ohh sorry, i misinterpret.
The code seems fairly okay, can you try it without the serialQueue?

Here is how my functions looks like:

public func delete(exerciseId: String,
                   callback: @escaping (Result<Bool, NSError>) -> Void) {
        DispatchQueue(label: "realm").async {
            autoreleasepool { [weak self] in
                if let realm = try? Realm() {
                    if let currentExercise = self?.getExerciseFrom(exerciseId: exerciseId) {
                        do {
                            try realm.write {
                                for route in currentExercise.routePoints {
                                    realm.delete(route)
                                }
                                realm.delete(currentExercise)
                                callback(.success(true))
                            }
                        } catch {
                            callback(.failure(error as NSError))
                        }
                    } else {
                        callback(.failure(TepNSError.create(message: "Cant find exercise with id:" + exerciseId)))
                    }
                } else {
                    callback(.failure(TepNSError.create(message: "RealmExerciseFactory.Realm doesnt exist")))
                }
            }
        }
    }

I am always making a new dispatchqueue with the same label - maybe thats not a good thing, but i never had problems with it.


#5

For a reason I need it to be serial cause some tables depends on others and I have around 20 parse functions for 20 different tables coming from my grpc call. Not sure how to try it in my codebase, I will need to do lots of changes I guess since I have all those parsing functions. But what I just tried is to create my worker thread that will have a serial queue and work async:

class AsyncSerialQueueWorker {
    
    private var thread: Thread?
    
    private let semaphore = DispatchSemaphore(value: 0)
    
    private let lock = NSRecursiveLock()
    
    private var queue = [() -> Void]()
    
    public func enqueue(_ block: @escaping () -> Void) {
        locked { queue.append(block) }
        
        semaphore.signal()
        
        if thread == nil {
            thread = Thread(block: work)
            thread?.start()
        }
    }
    
    private func work() {
        while true {
            semaphore.wait()
           
            let block = locked { queue.removeFirst() }
            block()
        }
    }
   
    private func locked<T>(do block: () -> T) -> T {
        lock.lock(); defer { lock.unlock() }
        return block()
    }
}

And then whenever I use it instead of my serialProcessQueue it works great. So now everything that I do with realm via that worker thread works perfectly and I don’t get any exception from realm.

let workerThread = AsyncSerialQueueWorker()
....
workerThread.enqueue{...}
....

Thanks again for the support and for pointing out issues in my codebase :slight_smile:


#6

Hmm, interesting.
I will check a few things out for you.


#7

I could be mistaken but I don’t think even using a named serial queue guarantees each operation will be run on the same thread. I use NSOperation quite extensively but always pass object IDs around and then open a realm and fetch the objects inside each operation.

For very large jobs don’t do this because it will consume a lot of memory opening lots of copies of the same realm - in this case I will make sure the whole process gets run in one operation. Or I will split it into maybe 4 big chunks and run each on a separate operation - if the computer is constrained I will run just one concurrent job and if not then I will run up to 4 concurrent jobs on desktop or mobile devices.

So far this seems to work fine - I am regularly traversing a hierarchy of ~1m objects. I have recently encountered one instance where some changes were made and a report was run immediately after the changes and the report seems to have stale copies of the data. The report background process only opened the realm after the user had made the data changes so I would not have expected any stale data.