|
| 1 | +// Created by Michael Kirk on 12/19/16. |
| 2 | +// Copyright © 2016 Open Whisper Systems. All rights reserved. |
| 3 | + |
| 4 | +import Foundation |
| 5 | +import PromiseKit |
| 6 | + |
| 7 | +@objc(OWSMessageFetcherJob) |
| 8 | +class MessageFetcherJob: NSObject { |
| 9 | + |
| 10 | + let TAG = "[MessageFetcherJob]" |
| 11 | + var timer: Timer? |
| 12 | + |
| 13 | + // MARK: injected dependencies |
| 14 | + let networkManager: TSNetworkManager |
| 15 | + let messagesManager: TSMessagesManager |
| 16 | + let messageSender: MessageSender |
| 17 | + let signalService: OWSSignalService |
| 18 | + |
| 19 | +// var fallbackTransport = false |
| 20 | + // ENABLED FOR DEBUG. DO NOT COMMIT! |
| 21 | + var fallbackTransport = true |
| 22 | + var runPromises = [Double: Promise<Void>]() |
| 23 | + |
| 24 | + init(messagesManager: TSMessagesManager, messageSender: MessageSender, networkManager: TSNetworkManager, signalService: OWSSignalService) { |
| 25 | + self.messagesManager = messagesManager |
| 26 | + self.networkManager = networkManager |
| 27 | + self.messageSender = messageSender |
| 28 | + self.signalService = signalService |
| 29 | + } |
| 30 | + |
| 31 | + func runAsync() { |
| 32 | + Logger.debug("\(TAG) \(#function)") |
| 33 | + guard signalService.isCensored else { |
| 34 | + Logger.debug("\(self.TAG) delegating message fetching to SocketManager since we're using normal transport.") |
| 35 | + TSSocketManager.becomeActive(fromBackgroundExpectMessage: true) |
| 36 | + return |
| 37 | + } |
| 38 | + |
| 39 | + Logger.info("\(TAG) using fallback message fetching.") |
| 40 | + |
| 41 | + let promiseId = NSDate().timeIntervalSince1970 |
| 42 | + Logger.debug("\(self.TAG) starting promise: \(promiseId)") |
| 43 | + let runPromise = self.fetchUndeliveredMessages().then { (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool) -> () in |
| 44 | + for envelope in envelopes { |
| 45 | + Logger.info("\(self.TAG) received envelope.") |
| 46 | + self.messagesManager.handleReceivedEnvelope(envelope); |
| 47 | + |
| 48 | + self.acknowledgeDelivery(envelope: envelope) |
| 49 | + } |
| 50 | + if more { |
| 51 | + Logger.info("\(self.TAG) more messages, so recursing.") |
| 52 | + // recurse |
| 53 | + self.runAsync() |
| 54 | + } |
| 55 | + }.always { |
| 56 | + Logger.debug("\(self.TAG) cleaning up promise: \(promiseId)") |
| 57 | + self.runPromises[promiseId] = nil |
| 58 | + } |
| 59 | + |
| 60 | + // maintain reference to make sure it's not de-alloced prematurely. |
| 61 | + runPromises[promiseId] = runPromise |
| 62 | + } |
| 63 | + |
| 64 | + // use in DEBUG or wherever you can't receive push notifications to poll for messages. |
| 65 | + // Do not use in production. |
| 66 | + func startRunLoop(timeInterval: Double) { |
| 67 | + Logger.error("\(TAG) Starting message fetch polling. This should not be used in production."); |
| 68 | + timer = Timer.scheduledTimer(timeInterval: timeInterval, target: self, selector: #selector(runAsync), userInfo: nil, repeats: true) |
| 69 | + } |
| 70 | + |
| 71 | + func stopRunLoop() { |
| 72 | + timer?.invalidate() |
| 73 | + timer = nil |
| 74 | + } |
| 75 | + |
| 76 | + func parseMessagesResponse(responseObject: Any?) -> (envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)? { |
| 77 | + guard let responseObject = responseObject else { |
| 78 | + Logger.error("\(self.TAG) response object was surpringly nil") |
| 79 | + return nil |
| 80 | + } |
| 81 | + |
| 82 | + guard let responseDict = responseObject as? [String: Any] else { |
| 83 | + Logger.error("\(self.TAG) response object was not a dictionary") |
| 84 | + return nil |
| 85 | + } |
| 86 | + |
| 87 | + guard let messageDicts = responseDict["messages"] as? [[String: Any]] else { |
| 88 | + Logger.error("\(self.TAG) messages object was not a list of dictionaries") |
| 89 | + return nil |
| 90 | + } |
| 91 | + |
| 92 | + let moreMessages = { () -> Bool in |
| 93 | + if let responseMore = responseDict["more"] as? Bool { |
| 94 | + return responseMore |
| 95 | + } else { |
| 96 | + Logger.warn("\(self.TAG) more object was not a bool. Assuming no more") |
| 97 | + return false |
| 98 | + } |
| 99 | + }() |
| 100 | + |
| 101 | + let envelopes = messageDicts.map { buildEnvelope(messageDict: $0) }.filter { $0 != nil }.map { $0! } |
| 102 | + |
| 103 | + return ( |
| 104 | + envelopes: envelopes, |
| 105 | + more: moreMessages |
| 106 | + ) |
| 107 | + } |
| 108 | + |
| 109 | + func buildEnvelope(messageDict: [String: Any]) -> OWSSignalServiceProtosEnvelope? { |
| 110 | + let builder = OWSSignalServiceProtosEnvelopeBuilder() |
| 111 | + |
| 112 | + guard let typeInt = messageDict["type"] as? Int32 else { |
| 113 | + Logger.error("\(TAG) message body didn't have type") |
| 114 | + return nil |
| 115 | + } |
| 116 | + |
| 117 | + guard let type = OWSSignalServiceProtosEnvelopeType(rawValue:typeInt) else { |
| 118 | + Logger.error("\(TAG) message body type was invalid") |
| 119 | + return nil |
| 120 | + } |
| 121 | + builder.setType(type) |
| 122 | + |
| 123 | + if let relay = messageDict["relay"] as? String { |
| 124 | + builder.setRelay(relay) |
| 125 | + } |
| 126 | + |
| 127 | + guard let timestamp = messageDict["timestamp"] as? UInt64 else { |
| 128 | + Logger.error("\(TAG) message body didn't have timestamp") |
| 129 | + return nil |
| 130 | + } |
| 131 | + builder.setTimestamp(timestamp) |
| 132 | + |
| 133 | + guard let source = messageDict["source"] as? String else { |
| 134 | + Logger.error("\(TAG) message body didn't have source") |
| 135 | + return nil |
| 136 | + } |
| 137 | + builder.setSource(source) |
| 138 | + |
| 139 | + guard let sourceDevice = messageDict["sourceDevice"] as? UInt32 else { |
| 140 | + Logger.error("\(TAG) message body didn't have sourceDevice") |
| 141 | + return nil |
| 142 | + } |
| 143 | + builder.setSourceDevice(sourceDevice) |
| 144 | + |
| 145 | + if let encodedLegacyMessage = messageDict["message"] as? String { |
| 146 | + Logger.debug("\(TAG) message body had legacyMessage") |
| 147 | + if let legacyMessage = Data(base64Encoded: encodedLegacyMessage) { |
| 148 | + builder.setLegacyMessage(legacyMessage) |
| 149 | + } |
| 150 | + } |
| 151 | + |
| 152 | + if let encodedContent = messageDict["content"] as? String { |
| 153 | + Logger.debug("\(TAG) message body had content") |
| 154 | + if let content = Data(base64Encoded: encodedContent) { |
| 155 | + builder.setContent(content) |
| 156 | + } |
| 157 | + } |
| 158 | + |
| 159 | + return builder.build() |
| 160 | + } |
| 161 | + |
| 162 | + func fetchUndeliveredMessages() -> Promise<(envelopes: [OWSSignalServiceProtosEnvelope], more: Bool)> { |
| 163 | + return Promise { fulfill, reject in |
| 164 | + let messagesRequest = OWSGetMessagesRequest() |
| 165 | + |
| 166 | + self.networkManager.makeRequest( |
| 167 | + messagesRequest, |
| 168 | + success: { (task: URLSessionDataTask?, responseObject: Any?) -> () in |
| 169 | + guard let (envelopes, more) = self.parseMessagesResponse(responseObject: responseObject) else { |
| 170 | + Logger.error("\(self.TAG) response object had unexpected content") |
| 171 | + return reject(OWSErrorMakeUnableToProcessServerResponseError()) |
| 172 | + } |
| 173 | + |
| 174 | + fulfill((envelopes: envelopes, more: more)) |
| 175 | + }, |
| 176 | + failure: { (task: URLSessionDataTask?, error: Error?) in |
| 177 | + guard let error = error else { |
| 178 | + Logger.error("\(self.TAG) error was surpringly nil. sheesh rough day.") |
| 179 | + return reject(OWSErrorMakeUnableToProcessServerResponseError()) |
| 180 | + } |
| 181 | + |
| 182 | + reject(error) |
| 183 | + }) |
| 184 | + } |
| 185 | + } |
| 186 | + |
| 187 | + func acknowledgeDelivery(envelope: OWSSignalServiceProtosEnvelope) { |
| 188 | + let request = OWSAcknowledgeMessageDeliveryRequest(source: envelope.source, timestamp: envelope.timestamp) |
| 189 | + self.networkManager.makeRequest(request, |
| 190 | + success: { (task: URLSessionDataTask?, responseObject: Any?) -> () in |
| 191 | + Logger.debug("\(self.TAG) acknowledged delivery for message at timestamp: \(envelope.timestamp)") |
| 192 | + }, |
| 193 | + failure: { (task: URLSessionDataTask?, error: Error?) in |
| 194 | + Logger.debug("\(self.TAG) acknowledging delivery for message at timestamp: \(envelope.timestamp) failed with error: \(error)") |
| 195 | + }) |
| 196 | + } |
| 197 | +} |
0 commit comments