Concurrency

The Concurrency module provides comprehensive concurrency and parallelism capabilities for Soul, including goroutines, channels, mutexes, wait groups, promises, and other synchronization primitives for building concurrent applications.

Basic Concurrency

spawn - Execute function in goroutine

Spawn a function to run in a separate goroutine and return a promise:
// Simple spawned function
promise = Concurrency.spawn(soul() {
    println("Running in goroutine")
    return "completed"
})

// Wait for result
result = await promise
println(result)  // "completed"

run - Fire-and-forget execution

Run a function in a goroutine without waiting for results:
// Fire-and-forget execution
Concurrency.run(soul() {
    println("Background task running")
    // Do some work
})

// Continue with other work immediately
println("Main thread continues")

spawn with arguments

Pass arguments to spawned functions:
soul processData(data, multiplier) {
    return data * multiplier
}

promise = Concurrency.spawn(processData, 10, 2)
result = await promise
println(result)  // 20

Channels

createChannel - Create channels for communication

Create channels for goroutine communication:
// Unbuffered channel
ch = Concurrency.createChannel()

// Buffered channel
bufferedCh = Concurrency.createChannel(10)

// Send and receive
Concurrency.spawn(soul() {
    ch.send("hello")
})

message = ch.receive()
println(message)  // "hello"

Channel operations

Use channels for producer-consumer patterns:
soul producer(ch) {
    for (i = 0; i < 5; i++) {
        ch.send("item " + i)
        println("Sent: item " + i)
    }
    ch.close()
}

soul consumer(ch) {
    while (true) {
        try {
            item = ch.receive()
            println("Received: " + item)
        } catch (error) {
            if (error.message.contains("closed")) {
                break
            }
        }
    }
}

channel = Concurrency.createChannel(3)
Concurrency.spawn(producer, channel)
Concurrency.spawn(consumer, channel)

Synchronization Primitives

Mutex - Mutual exclusion

Use mutexes for protecting shared resources:
mutex = Concurrency.createMutex()
sharedCounter = 0

soul incrementCounter() {
    mutex.lock()
    try {
        sharedCounter = sharedCounter + 1
        println("Counter: " + sharedCounter)
    } finally {
        mutex.unlock()
    }
}

// Spawn multiple goroutines
for (i = 0; i < 5; i++) {
    Concurrency.spawn(incrementCounter)
}

tryLock - Non-blocking lock

Try to acquire a lock without blocking:
mutex = Concurrency.createMutex()

soul tryWork() {
    if (mutex.tryLock()) {
        try {
            println("Got lock, doing work")
            // Do work
        } finally {
            mutex.unlock()
        }
    } else {
        println("Could not get lock, skipping work")
    }
}

Concurrency.spawn(tryWork)

WaitGroup - Wait for multiple goroutines

Coordinate multiple goroutines:
wg = Concurrency.createWaitGroup()

soul worker(id) {
    println("Worker " + id + " starting")
    Concurrency.sleep(1000)  // Simulate work
    println("Worker " + id + " done")
    wg.done()
}

// Start multiple workers
numWorkers = 5
wg.add(numWorkers)

for (i = 0; i < numWorkers; i++) {
    Concurrency.spawn(worker, i)
}

// Wait for all workers to complete
wg.wait()
println("All workers completed")

Promises

createPromise - Create promises

Create promises for async operations:
promise = Concurrency.createPromise()

// Resolve the promise
Concurrency.spawn(soul() {
    Concurrency.sleep(1000)
    promise.resolve("success")
})

// Wait for resolution
result = await promise
println(result)  // "success"

Promise.all - Wait for all promises

Wait for multiple promises to complete:
promises = []

// Create multiple async operations
for (i = 0; i < 3; i++) {
    promise = Concurrency.spawn(soul(id) {
        Concurrency.sleep(1000 + (id * 500))
        return "Task " + id + " completed"
    }, i)
    promises.push(promise)
}

// Wait for all to complete
allResults = await Concurrency.promiseAll(promises)
for (result in allResults) {
    println(result)
}

Promise.race - Wait for first promise

Wait for the first promise to complete:
promises = []

// Create multiple racing operations
for (i = 0; i < 3; i++) {
    promise = Concurrency.spawn(soul(id) {
        Concurrency.sleep(Math.random() * 2000)
        return "Task " + id + " won"
    }, i)
    promises.push(promise)
}

// Wait for first to complete
winner = await Concurrency.promiseRace(promises)
println(winner)  // "Task X won" (whichever completes first)

Timing and Timeouts

sleep - Pause execution

Sleep for a specified duration:
println("Starting...")
Concurrency.sleep(2000)  // Sleep for 2 seconds
println("After 2 seconds")

createTimeout - Timeout operations

Create timeouts for operations:
timeout = Concurrency.createTimeout(5000)  // 5 second timeout

promise = timeout.execute(soul() {
    Concurrency.sleep(3000)  // This will complete before timeout
    return "completed"
})

try {
    result = await promise
    println(result)  // "completed"
} catch (error) {
    println("Timed out: " + error.message)
}

Timeout cancellation

Cancel timeouts manually:
timeout = Concurrency.createTimeout(10000)

// Cancel the timeout
timeout.cancel()

// Check if timeout is done
if (timeout.isDone()) {
    println("Timeout was cancelled")
}

Advanced Concurrency Patterns

Worker Pool Pattern

soul createWorkerPool(numWorkers, jobs) {
    jobChannel = Concurrency.createChannel(len(jobs))
    resultChannel = Concurrency.createChannel(len(jobs))
    wg = Concurrency.createWaitGroup()
    
    // Start workers
    for (i = 0; i < numWorkers; i++) {
        wg.add(1)
        Concurrency.spawn(soul(workerId) {
            while (true) {
                try {
                    job = jobChannel.receive()
                    result = processJob(job)
                    resultChannel.send(result)
                } catch (error) {
                    if (error.message.contains("closed")) {
                        break
                    }
                }
            }
            wg.done()
        }, i)
    }
    
    // Send jobs
    for (job in jobs) {
        jobChannel.send(job)
    }
    jobChannel.close()
    
    // Collect results
    results = []
    for (i = 0; i < len(jobs); i++) {
        result = resultChannel.receive()
        results.push(result)
    }
    
    // Wait for workers to finish
    wg.wait()
    
    return results
}

soul processJob(job) {
    // Simulate work
    Concurrency.sleep(100)
    return "Processed: " + job
}

jobs = ["task1", "task2", "task3", "task4", "task5"]
results = createWorkerPool(3, jobs)
for (result in results) {
    println(result)
}

Producer-Consumer with Multiple Producers

soul multiProducerConsumer() {
    channel = Concurrency.createChannel(10)
    wg = Concurrency.createWaitGroup()
    
    // Multiple producers
    numProducers = 3
    wg.add(numProducers)
    
    for (i = 0; i < numProducers; i++) {
        Concurrency.spawn(soul(producerId) {
            for (j = 0; j < 5; j++) {
                item = "P" + producerId + "-Item" + j
                channel.send(item)
                println("Produced: " + item)
                Concurrency.sleep(100)
            }
            wg.done()
        }, i)
    }
    
    // Consumer
    Concurrency.spawn(soul() {
        itemCount = 0
        while (itemCount < numProducers * 5) {
            item = channel.receive()
            println("Consumed: " + item)
            itemCount++
        }
    })
    
    // Wait for all producers
    wg.wait()
    println("All producers finished")
}

multiProducerConsumer()

Fan-out/Fan-in Pattern

soul fanOutFanIn() {
    input = Concurrency.createChannel()
    output = Concurrency.createChannel()
    
    // Fan-out to multiple workers
    numWorkers = 3
    for (i = 0; i < numWorkers; i++) {
        Concurrency.spawn(soul(workerId) {
            while (true) {
                try {
                    job = input.receive()
                    result = "Worker" + workerId + " processed " + job
                    output.send(result)
                } catch (error) {
                    if (error.message.contains("closed")) {
                        break
                    }
                }
            }
        }, i)
    }
    
    // Send work
    Concurrency.spawn(soul() {
        for (i = 0; i < 10; i++) {
            input.send("job" + i)
        }
        input.close()
    })
    
    // Fan-in results
    for (i = 0; i < 10; i++) {
        result = output.receive()
        println(result)
    }
}

fanOutFanIn()

One-time Execution

createOnce - Ensure function runs once

Ensure a function runs only once across multiple goroutines:
once = Concurrency.createOnce()
initialized = false

soul initialize() {
    println("Initializing...")
    initialized = true
    return "initialized"
}

// Multiple goroutines try to initialize
for (i = 0; i < 5; i++) {
    Concurrency.spawn(soul() {
        result = once.do(initialize)
        println("Goroutine got: " + result)
    })
}

Concurrency.sleep(1000)
println("Initialized: " + initialized)  // Only runs once

Error Handling in Concurrent Code

Promise error handling

promise = Concurrency.spawn(soul() {
    Concurrency.sleep(1000)
    throw "Something went wrong"
})

try {
    result = await promise
    println(result)
} catch (error) {
    println("Caught error: " + error.message)
}

Channel error handling

channel = Concurrency.createChannel()

Concurrency.spawn(soul() {
    channel.send("message")
    channel.close()
})

try {
    message1 = channel.receive()
    println(message1)  // "message"
    
    message2 = channel.receive()  // This will throw
} catch (error) {
    println("Channel error: " + error.message)
}

Complete Example: Concurrent Web Scraper

soul concurrentWebScraper() {
    urls = [
        "https://example.com",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2"
    ]
    
    results = Concurrency.createChannel(len(urls))
    wg = Concurrency.createWaitGroup()
    
    soul scrapeUrl(url) {
        try {
            // Simulate web scraping
            Concurrency.sleep(1000)
            result = "Scraped: " + url
            results.send(result)
        } catch (error) {
            results.send("Error scraping " + url + ": " + error.message)
        } finally {
            wg.done()
        }
    }
    
    // Start scrapers
    for (url in urls) {
        wg.add(1)
        Concurrency.spawn(scrapeUrl, url)
    }
    
    // Collect results
    Concurrency.spawn(soul() {
        wg.wait()
        results.close()
    })
    
    // Process results
    scrapedResults = []
    while (true) {
        try {
            result = results.receive()
            scrapedResults.push(result)
        } catch (error) {
            if (error.message.contains("closed")) {
                break
            }
        }
    }
    
    return scrapedResults
}

results = concurrentWebScraper()
for (result in results) {
    println(result)
}

Best Practices

  1. Resource cleanup: Always clean up resources (close channels, unlock mutexes)
  2. Error handling: Handle errors in concurrent code properly
  3. Avoid race conditions: Use proper synchronization primitives
  4. Timeout operations: Use timeouts for potentially long-running operations
  5. Graceful shutdown: Implement proper shutdown mechanisms
// Good - proper resource cleanup
soul properResourceManagement() {
    mutex = Concurrency.createMutex()
    
    soul safeOperation() {
        mutex.lock()
        try {
            // Do work
            return "success"
        } finally {
            mutex.unlock()
        }
    }
    
    return safeOperation()
}

// Good - timeout for operations
soul timeoutOperation() {
    timeout = Concurrency.createTimeout(5000)
    
    promise = timeout.execute(soul() {
        // Potentially long operation
        Concurrency.sleep(3000)
        return "completed"
    })
    
    try {
        result = await promise
        return result
    } catch (error) {
        println("Operation timed out")
        return null
    }
}

// Good - error handling in concurrent code
soul robustConcurrency() {
    promises = []
    
    for (i = 0; i < 3; i++) {
        promise = Concurrency.spawn(soul(id) {
            if (Math.random() > 0.5) {
                throw "Random error in task " + id
            }
            return "Task " + id + " success"
        }, i)
        promises.push(promise)
    }
    
    // Handle each promise individually
    for (promise in promises) {
        try {
            result = await promise
            println("Success: " + result)
        } catch (error) {
            println("Error: " + error.message)
        }
    }
}
The Concurrency module provides powerful tools for building concurrent and parallel applications in Soul, enabling efficient resource utilization and responsive program behavior.