Skip to main content

Concurrency

The Concurrency module provides comprehensive concurrency and parallelism capabilities for Soul, including goroutines, channels, mutexes, wait groups, promises, and other synchronization primitives for building concurrent applications.

Basic Concurrency

spawn - Execute function in goroutine

Spawn a function to run in a separate goroutine and return a promise:
// Simple spawned function
promise = Concurrency.spawn(soul() {
    println("Running in goroutine")
    return "completed"
})

// Wait for result
result = await promise
println(result)  // "completed"

run - Fire-and-forget execution

Run a function in a goroutine without waiting for results:
// Fire-and-forget execution
Concurrency.run(soul() {
    println("Background task running")
    // Do some work
})

// Continue with other work immediately
println("Main thread continues")

spawn with arguments

Pass arguments to spawned functions:
soul processData(data, multiplier) {
    return data * multiplier
}

promise = Concurrency.spawn(processData, 10, 2)
result = await promise
println(result)  // 20

Channels

createChannel - Create channels for communication

Create channels for goroutine communication:
// Unbuffered channel
ch = Concurrency.createChannel()

// Buffered channel
bufferedCh = Concurrency.createChannel(10)

// Send and receive
Concurrency.spawn(soul() {
    ch.send("hello")
})

message = ch.receive()
println(message)  // "hello"

Channel operations

Use channels for producer-consumer patterns:
soul producer(ch) {
    for (i = 0; i < 5; i++) {
        ch.send("item " + i)
        println("Sent: item " + i)
    }
    ch.close()
}

soul consumer(ch) {
    while (true) {
        try {
            item = ch.receive()
            println("Received: " + item)
        } catch (error) {
            if (error.message.contains("closed")) {
                break
            }
        }
    }
}

channel = Concurrency.createChannel(3)
Concurrency.spawn(producer, channel)
Concurrency.spawn(consumer, channel)

Synchronization Primitives

Mutex - Mutual exclusion

Use mutexes for protecting shared resources:
mutex = Concurrency.createMutex()
sharedCounter = 0

soul incrementCounter() {
    mutex.lock()
    try {
        sharedCounter = sharedCounter + 1
        println("Counter: " + sharedCounter)
    } finally {
        mutex.unlock()
    }
}

// Spawn multiple goroutines
for (i = 0; i < 5; i++) {
    Concurrency.spawn(incrementCounter)
}

tryLock - Non-blocking lock

Try to acquire a lock without blocking:
mutex = Concurrency.createMutex()

soul tryWork() {
    if (mutex.tryLock()) {
        try {
            println("Got lock, doing work")
            // Do work
        } finally {
            mutex.unlock()
        }
    } else {
        println("Could not get lock, skipping work")
    }
}

Concurrency.spawn(tryWork)

WaitGroup - Wait for multiple goroutines

Coordinate multiple goroutines:
wg = Concurrency.createWaitGroup()

soul worker(id) {
    println("Worker " + id + " starting")
    Concurrency.sleep(1000)  // Simulate work
    println("Worker " + id + " done")
    wg.done()
}

// Start multiple workers
numWorkers = 5
wg.add(numWorkers)

for (i = 0; i < numWorkers; i++) {
    Concurrency.spawn(worker, i)
}

// Wait for all workers to complete
wg.wait()
println("All workers completed")

Promises

createPromise - Create promises

Create promises for async operations:
promise = Concurrency.createPromise()

// Resolve the promise
Concurrency.spawn(soul() {
    Concurrency.sleep(1000)
    promise.resolve("success")
})

// Wait for resolution
result = await promise
println(result)  // "success"

Promise.all - Wait for all promises

Wait for multiple promises to complete:
promises = []

// Create multiple async operations
for (i = 0; i < 3; i++) {
    promise = Concurrency.spawn(soul(id) {
        Concurrency.sleep(1000 + (id * 500))
        return "Task " + id + " completed"
    }, i)
    promises.push(promise)
}

// Wait for all to complete
allResults = await Concurrency.promiseAll(promises)
for (result in allResults) {
    println(result)
}

Promise.race - Wait for first promise

Wait for the first promise to complete:
promises = []

// Create multiple racing operations
for (i = 0; i < 3; i++) {
    promise = Concurrency.spawn(soul(id) {
        Concurrency.sleep(Math.random() * 2000)
        return "Task " + id + " won"
    }, i)
    promises.push(promise)
}

// Wait for first to complete
winner = await Concurrency.promiseRace(promises)
println(winner)  // "Task X won" (whichever completes first)

Timing and Timeouts

sleep - Pause execution

Sleep for a specified duration:
println("Starting...")
Concurrency.sleep(2000)  // Sleep for 2 seconds
println("After 2 seconds")

createTimeout - Timeout operations

Create timeouts for operations:
timeout = Concurrency.createTimeout(5000)  // 5 second timeout

promise = timeout.execute(soul() {
    Concurrency.sleep(3000)  // This will complete before timeout
    return "completed"
})

try {
    result = await promise
    println(result)  // "completed"
} catch (error) {
    println("Timed out: " + error.message)
}

Timeout cancellation

Cancel timeouts manually:
timeout = Concurrency.createTimeout(10000)

// Cancel the timeout
timeout.cancel()

// Check if timeout is done
if (timeout.isDone()) {
    println("Timeout was cancelled")
}

Advanced Concurrency Patterns

Worker Pool Pattern

soul createWorkerPool(numWorkers, jobs) {
    jobChannel = Concurrency.createChannel(len(jobs))
    resultChannel = Concurrency.createChannel(len(jobs))
    wg = Concurrency.createWaitGroup()
    
    // Start workers
    for (i = 0; i < numWorkers; i++) {
        wg.add(1)
        Concurrency.spawn(soul(workerId) {
            while (true) {
                try {
                    job = jobChannel.receive()
                    result = processJob(job)
                    resultChannel.send(result)
                } catch (error) {
                    if (error.message.contains("closed")) {
                        break
                    }
                }
            }
            wg.done()
        }, i)
    }
    
    // Send jobs
    for (job in jobs) {
        jobChannel.send(job)
    }
    jobChannel.close()
    
    // Collect results
    results = []
    for (i = 0; i < len(jobs); i++) {
        result = resultChannel.receive()
        results.push(result)
    }
    
    // Wait for workers to finish
    wg.wait()
    
    return results
}

soul processJob(job) {
    // Simulate work
    Concurrency.sleep(100)
    return "Processed: " + job
}

jobs = ["task1", "task2", "task3", "task4", "task5"]
results = createWorkerPool(3, jobs)
for (result in results) {
    println(result)
}

Producer-Consumer with Multiple Producers

soul multiProducerConsumer() {
    channel = Concurrency.createChannel(10)
    wg = Concurrency.createWaitGroup()
    
    // Multiple producers
    numProducers = 3
    wg.add(numProducers)
    
    for (i = 0; i < numProducers; i++) {
        Concurrency.spawn(soul(producerId) {
            for (j = 0; j < 5; j++) {
                item = "P" + producerId + "-Item" + j
                channel.send(item)
                println("Produced: " + item)
                Concurrency.sleep(100)
            }
            wg.done()
        }, i)
    }
    
    // Consumer
    Concurrency.spawn(soul() {
        itemCount = 0
        while (itemCount < numProducers * 5) {
            item = channel.receive()
            println("Consumed: " + item)
            itemCount++
        }
    })
    
    // Wait for all producers
    wg.wait()
    println("All producers finished")
}

multiProducerConsumer()

Fan-out/Fan-in Pattern

soul fanOutFanIn() {
    input = Concurrency.createChannel()
    output = Concurrency.createChannel()
    
    // Fan-out to multiple workers
    numWorkers = 3
    for (i = 0; i < numWorkers; i++) {
        Concurrency.spawn(soul(workerId) {
            while (true) {
                try {
                    job = input.receive()
                    result = "Worker" + workerId + " processed " + job
                    output.send(result)
                } catch (error) {
                    if (error.message.contains("closed")) {
                        break
                    }
                }
            }
        }, i)
    }
    
    // Send work
    Concurrency.spawn(soul() {
        for (i = 0; i < 10; i++) {
            input.send("job" + i)
        }
        input.close()
    })
    
    // Fan-in results
    for (i = 0; i < 10; i++) {
        result = output.receive()
        println(result)
    }
}

fanOutFanIn()

One-time Execution

createOnce - Ensure function runs once

Ensure a function runs only once across multiple goroutines:
once = Concurrency.createOnce()
initialized = false

soul initialize() {
    println("Initializing...")
    initialized = true
    return "initialized"
}

// Multiple goroutines try to initialize
for (i = 0; i < 5; i++) {
    Concurrency.spawn(soul() {
        result = once.do(initialize)
        println("Goroutine got: " + result)
    })
}

Concurrency.sleep(1000)
println("Initialized: " + initialized)  // Only runs once

Error Handling in Concurrent Code

Promise error handling

promise = Concurrency.spawn(soul() {
    Concurrency.sleep(1000)
    throw "Something went wrong"
})

try {
    result = await promise
    println(result)
} catch (error) {
    println("Caught error: " + error.message)
}

Channel error handling

channel = Concurrency.createChannel()

Concurrency.spawn(soul() {
    channel.send("message")
    channel.close()
})

try {
    message1 = channel.receive()
    println(message1)  // "message"
    
    message2 = channel.receive()  // This will throw
} catch (error) {
    println("Channel error: " + error.message)
}

Complete Example: Concurrent Web Scraper

soul concurrentWebScraper() {
    urls = [
        "https://example.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2"
    ]
    
    results = Concurrency.createChannel(len(urls))
    wg = Concurrency.createWaitGroup()
    
    soul scrapeUrl(url) {
        try {
            // Simulate web scraping
            Concurrency.sleep(1000)
            result = "Scraped: " + url
            results.send(result)
        } catch (error) {
            results.send("Error scraping " + url + ": " + error.message)
        } finally {
            wg.done()
        }
    }
    
    // Start scrapers
    for (url in urls) {
        wg.add(1)
        Concurrency.spawn(scrapeUrl, url)
    }
    
    // Collect results
    Concurrency.spawn(soul() {
        wg.wait()
        results.close()
    })
    
    // Process results
    scrapedResults = []
    while (true) {
        try {
            result = results.receive()
            scrapedResults.push(result)
        } catch (error) {
            if (error.message.contains("closed")) {
                break
            }
        }
    }
    
    return scrapedResults
}

results = concurrentWebScraper()
for (result in results) {
    println(result)
}

Best Practices

  1. Resource cleanup: Always clean up resources (close channels, unlock mutexes)
  2. Error handling: Handle errors in concurrent code properly
  3. Avoid race conditions: Use proper synchronization primitives
  4. Timeout operations: Use timeouts for potentially long-running operations
  5. Graceful shutdown: Implement proper shutdown mechanisms
// Good - proper resource cleanup
soul properResourceManagement() {
    mutex = Concurrency.createMutex()
    
    soul safeOperation() {
        mutex.lock()
        try {
            // Do work
            return "success"
        } finally {
            mutex.unlock()
        }
    }
    
    return safeOperation()
}

// Good - timeout for operations
soul timeoutOperation() {
    timeout = Concurrency.createTimeout(5000)
    
    promise = timeout.execute(soul() {
        // Potentially long operation
        Concurrency.sleep(3000)
        return "completed"
    })
    
    try {
        result = await promise
        return result
    } catch (error) {
        println("Operation timed out")
        return null
    }
}

// Good - error handling in concurrent code
soul robustConcurrency() {
    promises = []
    
    for (i = 0; i < 3; i++) {
        promise = Concurrency.spawn(soul(id) {
            if (Math.random() > 0.5) {
                throw "Random error in task " + id
            }
            return "Task " + id + " success"
        }, i)
        promises.push(promise)
    }
    
    // Handle each promise individually
    for (promise in promises) {
        try {
            result = await promise
            println("Success: " + result)
        } catch (error) {
            println("Error: " + error.message)
        }
    }
}
The Concurrency module provides powerful tools for building concurrent and parallel applications in Soul, enabling efficient resource utilization and responsive program behavior.
I