Chapter 9

Composing the asynchronous

chores()
games()





chores games





9.1 Data processing

0212: Turnbill, Geralyn
0055: Spielvogel, Cierra
0072: Renyer, Connie
0011: Engholm, Ciara
0088: Gitting, Estrella





fs = require 'fs'
raw = fs.readFileSync 'competitors', 'utf-8'
competitors = raw.split /\n/





readFile = (file, callback) ->
  fs.readFile file, 'utf-8', (error, response) ->
    throw error if error
    callback response





readFileAsArray = (file, callback) ->     
  asArray = (data, delimiter) ->        
    callback data.split(delimiter)      
  readFile(file, asArray) 





readFileAsArray 'competitors', (result) ->
  console.log result
# ['0212: Turnbill, Geralyn'
# '0055: Spielvogel, Cierra'
# '0072: Renyer, Connie'
# '0011: Engholm, Ciara'
# '0088: Gitting, Estrella']





[4,3,4,7,6].sort()
# [3,4,4,6,7]
['aardvark', 'zebra', 'porcupine'].sort() # ['aardvark', 'porcupine', 'zebra']





competitors.sort()
# ['0011: Engholm, Ciara',
#  '0055: Spielvogel, Cierra',
#  '0072: Renyer, Connie',
#  '0088: Gitting, Estrella',
#  '0212: Turnbill, Geralyn']





sortedCompetitors = competitors.sort()
sortedCompetitors is competitors
# true





compareOnLastName = (competitors...) ->
  lastName = (s) ->                                    
    s.split(/\s/g)[1]                                  
  if lastName(competitors[0]) > lastName(competitors[1])   
    1
  else
-1
compareOnLastName "0212: Turnbill, Geralyn", "0072: Renyer, Connie" # 1





competitors.sort compareOnLastName
# ["0011: Engholm, Ciara"
#  "0088: Gitting, Estrella"
#  "0072: Renyer, Connie"
#  "0455: Spielvogel, Cierra"
#  "0212: Turnbill, Geralyn"]





Listing 9.1 Displaying names sorted on a field inefficiently

fs = require 'fs'
http = require 'http'
readFile = (file, strategy) -> fs.readFile file, 'utf-8', (error, response) -> throw error if error
strategy response
readFileAsArray = (file, delimiter, callback) -> asArray = (data) -> callback data.split(delimiter).slice(0,-1)
readFile(file, asArray)
compareOnLastName = (a,b) -> lastName = (s) -> s.split(/\s+/g)[1].replace /,/, ',' if !a or !b 1 else if lastName(a) >= lastName(b) 1 else
-1
sortedCompetitorsFromFile = (fileName, callback) -> newline = /\n/gi readFileAsArray fileName, newline, (array) ->
callback array.sort(compareOnLastName)
makeServer = -> responseData = '' server = http.createServer (request, response) -> response.writeHead 200, 'Content-Type': 'text/html' response.end JSON.stringify responseData server.listen 8888, '127.0.0.1' (data) ->
responseData = data
main = (fileName) ->
server = makeServer()
loadData = -> console.log 'Loading data' sortedCompetitorsFromFile fileName, (data) -> console.log 'Data loaded'
server data
loadData()
fs.watchFile fileName, loadData
if process.argv[2] main process.argv[2] console.log "starting server on port 8888" else console.log "usage: coffee 9.1.coffee [file]"





> coffee listing.91.coffee competitors.txt
Loading data
Starting server on port 8888
Data loaded





begin = new Date             
end = new Date               
difference = begin ? end   





loadData = ->
  start = new Date()
  console.log 'Loading and processing data'
  sortedCompetitorsFromFile fileName, (data) ->
    elapsed = new Date() - start
    console.log "Data loaded in #{elapsed/1000} seconds"
    server data





random = (size) -> Math.floor Math.random()*size           
random150000 = (random(150000) for number in [1..150000])
begin = new Date random150000.sort() end = new Date console.log end - begin # 50





compareOnLastName = (a,b) ->
  lastName = (s) ->
    s.split(/\s+/g)[1].replace /,/, ''     
  if !a or !b                           
    1                                   
  else if lastName(a) >= lastName(b)
    1
  else
    -1





Listing 9.2 Decorate-sort-undecorate

decorateSortUndecorate = (array, sortRule) ->
  decorate = (array) ->                                         
{original: item, sortOn: sortRule item} for item in array
undecorate = (array) ->
item.original for item in array
comparator = (left, right) -> if left.sortOn > right.sortOn 1 else
-1
decorated = decorate array sorted = decorated.sort comparator undecorate sorted





lastName = (s) -> s.split(/\s+/g)[1].replace /,/, ''
sortRule = (name) -> lastName name
decorate = (array) ->
{original: item, sortOn: sortRule item} for item in array
decorate ['0011: Engholm, Ciara'] # [{original: '0011: Engholm, Ciara', sortOn: 'Engholm'}]





undecorate = (array) ->
item.original for item in array
undecorate [{original: '0011: Engholm, Ciara', sortOn: 'Engholm'}] # ['0011: Engholm, Ciara']





9.2 Event loops

Loading and processing data
Starting server on port 8888
Data loaded in 5.482 seconds





start = new Date()
setInterval ->
  console.log "Clock tick after #{(new Date()-start)/1000} seconds"
, 1000





Loading and processing data
Starting server on port 8888
Data loaded in 4.636 seconds
Clock tick at 4.643 seconds
Clock tick at 4.643 seconds





setInterval ->
  console.log "Clock tick after #{(new Date()-start)/1000} seconds"
, 1000





fs.readFile 'myFile.txt', (err, data) ->
  console.log 'invoked as the handler when the event occurs'





dataForMyMassiveFile = fs.readFileSync 'myMassiveFile.mpg'





loop 0





9.3 Event emitters

UP = 38       
DOWN = 40
paddle = up: ->
down: ->
document.onkeydown = (event) -> switch event.keyCode when UP then paddle.up() when DOWN then paddle.down()





UP, DOWN, DOWN, DOWN, UP, DOWN, UP, UP, UP





Loading and processing data
Starting server on port 8888
Data loaded in 4.636 seconds
Clock tick at 4.643 seconds
Clock tick at 4.643 seconds





# competitor data
["0011: Engholm, Ciara", "0088: Gitting, Estrella", "0072: Renyer, Connie"]
# keypress data [UP, DOWN, DOWN, DOWN, UP, DOWN, UP, UP, UP]





ONE_SECOND = 1000
start = new Date() competitorEmitter = (callback) -> setInterval -> callback 'A competitor'
, ONE_SECOND
receiver = (data) -> now = new Date() elapsed = now - start
console.log "#{data} received after #{elapsed/ONE_SECOND} seconds"
competitorEmitter receiver
# A competitor received after 0.995 seconds # A competitor received after 1.995 seconds # A competitor received after 2.995 seconds # A competitor received after 3.995 seconds





class DataEmitter                              
  constructor: (interval) ->                   
    @listeners = []                            
    setInterval ->                             
      listener() for listener in listeners     
    , interval                                 
  ondata: (listener) ->                        
listeners.push listener
emitter = new DataEmitter 1.5*ONE_SECOND
emitter.ondata -> console.log "Service responds at #{difference()} seconds past minute"





{EventEmitter} = require 'events'
class CompetitorEmitter extends EventEmitter





fs = require 'fs'
sourceStream = fs.createReadStream 'competitors.txt'





Listing 9.3 Sort competitors from stream

fs = require 'fs'
{EventEmitter} = require 'events'
ONE_SECOND = 1000
lastName = (s) -> try s.split(/\s+/g)[1].replace /,/, ',' catch e
''
undecorate = (array) ->
item.original for item in array
class CompetitorsEmitter extends EventEmitter
validCompetitor = (string) ->
/^[0-9]+:\s[a-zA-Z],\s[a-zA-Z]\n/.test string
lines = (data) -> chunk = data.split /\n/ first = chunk[0] last = chunk[chunk.length-1]
{chunk, first, last}
insertionSort = (array, items) -> insertAt = 0 for item in items toInsert = original: item, sortOn: lastName(item) for existing in array if toInsert.lastName > existing.lastName insertAt++
array.splice insertAt, 0, toInsert
constructor: (source) -> @competitors = [] stream = fs.createReadStream source, {flags: 'r', encoding: 'utf-8'} stream.on 'data', (data) => {chunk, first, last} = lines data if not validCompetitor last @remainder = last chunk.pop() if not validCompetitor first chunk[0] = @remainder + first insertionSort @competitors, chunk
@emit 'data', @competitors
path = require 'path' if !fs.existsSync 'competitors.15000.txt' console.error 'Error: File competitors.15000.txt not found'
process.exit()
competitors = new CompetitorsEmitter 'competitors.15000.txt' competitors.on 'data', (competitors) ->
console.log "There are #{competitors.length} competitors"
start = new Date() setInterval -> now = new Date() console.log "Tick at #{(now - start)/ONE_SECOND}" , ONE_SECOND/10





> coffee 9.3.coffee
There are 1468 competitors
There are 2937 competitors
There are 4406 competitors
Tick at 0.121
...





0212: Turnbill, Geralyn, '12:13'
0055: Spielvogel, Cierra, '11:55'
0072: Renyer, Connie, '11:33'
0011: Engholm, Ciara, '14:10'





for item in chunk
  insertAt = 0
  if scoreBetterThan item, '12:00'
    toInsert = { original: item, sortOn: lastName(item) }
    for competitor in @competitors
      if toInsert.lastName > competitor.lastName
        insertAt++
    @competitors.splice insertAt, 0, toInsert





fasterThan = (n) ->        
  (z) ->                   
z.time < n
lastNameStartsWith = (letter) ->
(s) -> competitor.lastName[0] is letter
result = \ competitors .filter(fasterThan '12:00') .filter(lastNameStartsWith 'a')





9.4 Event composition

['Geralyn Turnbull', 'Connie Renyer']





-> ['Geralyn Turnbull', 'Connie Renyer']





phoneNumbers = [         
  { name: 'hannibal', number: '555-5551', relationship: 'friend' }
  { name: 'darth', number: '555-5552', relationship: 'colleague' }
  { name: 'hal_9000', number: 'disconnected', relationship: 'friend' }
  { name: 'freddy', number: '555-5554', relationship: 'friend' }
  { name: 'T-800', number: '555-5555', relationship: 'colleague' }
]





relationshipIs = (relationship) ->
(item) -> item.relationship is relationship
phoneNumbers .filter(relationshipIs 'friend')





getPhoneNumbers = -> phoneNumbers





withResultOf(getPhoneNumbers)
.filter(relationshipIs 'friend')





withResultOf = (fn) ->
  filter: ->





withResultOf = (fn) ->
  runInOrder = []
  { 
    filter: (filterFn) ->
      runInOrder.push (data) -> item for item in data when filterFn item
      @                                     
    evaluate: ->
      data = fn()
      for processFn in runInOrder
        processFn data
  }





withResultOf(getPhoneNumbers)
.filter(relationshipIs 'friend')
.evaluate()
# [ # { name: 'hannibal', number: '555-5551', relationship: 'friend' }, # { name: 'hal_9000', number: 'disconnected', relationship: 'friend' }, # { name: 'freddy', number: '555-5554', relationship: 'friend' } # ]





withResultOf(getPhoneNumbers)
.filter(relationshipIs 'business')





suppliedContacts.evaluate()





fs.readFile, 'filename', callback





phonebookData = (callback) ->
  callback [         
    { name: 'hannibal', number: '555-5551', relationship: 'friend' }
    { name: 'darth', number: '555-5552', relationship: 'colleague' }
    { name: 'hal_9000', number: 'disconnected', relationship: 'friend' }
    { name: 'freddy', number: '555-5554', relationship: 'friend' }
    { name: 'T-800', number: '555-5555', relationship: 'colleague' }
  ]





phonebook.on 'data', callback





[ UP, DOWN, DOWN, DOWN, UP, DOWN, UP, UP, UP ]





phonebook = \
withEvents(phonebookEmitter)
.filter(relationship 'friend') 





{EventEmitter} = require 'events'
even = (number) -> number%2 is 0
emitter = new EventEmitter
evenNumberEvents = withEvents(emitter, 'number').filter(even)
emitter.emit 'number', 2 emitter.emit 'number', 5





evenNumberEvents.evaluate()
# [2]





emitter.emit 'number', 4
emitter.emit 'number', 3
evenNumberEvents.evaluate() # [2,4]





withEvents(emitter, 'number').filter(even)





hannibal,555-5551,friend
darth,555-5552,colleague
hal_9000,disconnected,friend
freddy,555-5554,friend
T-800,555-5555,colleague
dolly,555-3322,associate





Listing 9.4 Phone book with data loaded from event stream

fs = require 'fs'
{EventEmitter} = require 'events'
withEvents = (emitter, event) -> pipeline = []
data = []
reset = ->
pipeline = []
run = -> result = data for processor in pipeline if processor.filter? result = result.filter processor.filter else if processor.map? result = result.map processor.map
result
emitter.on event, (datum) ->
data.push datum
filter: (filter) -> pipeline.push {filter: filter} @ map: (map) -> pipeline.push {map: map} @ evaluate: -> result = run() reset()
result
class CSVRowEmitter extends EventEmitter
valid = (row) ->
/[^,]+,[^,]+,[^,]+/.test row
constructor: (source) -> @remainder = '' @numbers = [] stream = fs.createReadStream source, {flags: 'r', encoding: 'utf-8'} stream.on 'data', (data) => chunk = data.split /\n/ firstRow = chunk[0] lastRow = chunk[chunk.length-1] if not valid firstRow and @remainder chunk[0] = @remainder + firstRow if not valid lastRow @remainder = lastRow chunk.pop()
else @remainder = ''
@emit('row', row) for row in chunk when valid row
class PhoneBook asObject = (row) -> [name, number, relationship] = row.split ','
{ name, number, relationship }
asString = (data) ->
"#{data.name}: #{data.number} (#{data.relationship})"
print = (s) ->
s.join '\n'
relationshipIs = (relationship) ->
(data) -> data.relationship is relationship
nameIs = (name) ->
(data) -> data.name is name
constructor: (sourceCsv) -> csv = new CSVRowEmitter sourceCsv
@numbers = withEvents(csv, 'row')
list: (relationship) -> evaluated = \ if relationship @numbers .map(asObject) .filter(relationshipIs relationship) .evaluate() else @numbers .map(asObject)
.evaluate()
print(asString data for data in evaluated)
get: (name) -> evaluated = \ @numbers .map(asObject) .filter(nameIs name)
.evaluate()
print(asString data for data in evaluated)
console.log "Phonebook. Commands are get, list and exit."
process.stdin.setEncoding 'utf8'
stdin = process.openStdin()
phonebook = new PhoneBook 'phone_numbers.csv'
stdin.on 'data', (chunk) -> args = chunk.split ' ' command = args[0].trim() name = relationship = args[1].trim() if args[1] console.log switch command when 'get' phonebook.get name when 'list' phonebook.list relationship when 'exit' process.exit 1 else 'Unknown command'





<!DOCTYPE html>
<html dir="ltr" lang="en-US">
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
    <title>Pong</title>
  </head>
  <body>
    <div id="pong"></div>
  </body>
  <script src="9.5.js"></script>
</html>





document.on 'keypress', (event) ->
  console.log 'The keyboard was pressed'





Listing 9.5 Controlling paddles in a web browser

withEvents = (emitter, event) ->
  pipeline = []
data = []
reset = ->
pipeline = []
run = -> result = data for processor in pipeline if processor.filter? result = result.filter processor.filter else if processor.map? result = result.map processor.map
result
emitter.on event, (datum) ->
data.push datum
filter: (filter) -> pipeline.push {filter: filter} @ map: (map) -> pipeline.push {map: map} @ drain: (fn) -> emitter.on event, (datum) -> result = run() data = [] fn result evaluate: -> result = run() reset()
result
UP = 38 DOWN = 40 Q = 81
A = 65
doc = on: (event, fn) -> old = document["on#{event}"] || -> document["on#{event}"] = (e) -> old e
fn e
class Paddle
constructor: (@top=0, @left=0) ->
@render()
move: (displacement) -> @top += displacement*5
@paddle.style.top = @top + 'px'
render: -> @paddle = document.createElement 'div' @paddle.className = 'paddle' @paddle.style.backgroundColor = 'black' @paddle.style.position = 'absolute' @paddle.style.top = "#{@top}px" @paddle.style.left = "#{@left}px" @paddle.style.width = '20px' @paddle.style.height = '100px'
document.querySelector('#pong').appendChild @paddle
displacement = ([up,down]) -> (event) -> switch event.keyCode when up then -1 when down then 1
else 0
move = (paddle) -> (moves) -> for displacement in moves
paddle.move displacement
keys = (expected) -> (pressed) ->
pressed.keyCode in expected
paddle1 = new Paddle 0,0
paddle1.keys = [Q,A]
paddle2 = new Paddle 0,200
paddle2.keys = [UP,DOWN]
withEvents(doc, 'keydown') .filter(keys paddle1.keys) .map(displacement paddle1.keys)
.drain(move paddle1)
withEvents(doc, 'keydown') .filter(keys paddle2.keys) .map(displacement paddle2.keys) .drain(move paddle2)





source1 = [13,14,15]
source2 = [23,24,25]





zip = (a, b) ->
  zipped = []
  while a.length or b.length
    do ->
      fromA = a.pop()
      fromB = b.pop()
      if fromB then zipped.push fromB
      if fromB then zipped.push fromA
zipped.reverse()
zip source1, source2 # [13,23,14,24,15,25]





9.5 Summary