indexing description: "A general purpose thread save job queue." author: "Patrick Ruckstuhl " date: "$Date$" revision: "$Revision$" class JOB_QUEUE create make feature {NONE} -- Initialization make is -- Initialize. do create queue.make create monitor.make create not_empty.make create tag_job_table.make (20) create tag_jobs_completed.make end feature -- Access All_jobs_tag: STRING is "All" actual_jobs: like job_count is -- Number of all current running or waiting jobs do Result := actual_jobs_for_tag (all_jobs_tag) end actual_jobs_for_tag (a_tag: like job_tag): like job_count is -- Number of current running or waiting jobs for a_tag require Tag_valid: a_tag /= Void do monitor.lock if tag_job_table.has (a_tag) then Result := tag_job_table.item (a_tag) end monitor.unlock end feature -- Command execute_all is -- Execute all queued jobs. local cur_procedure: like job_type cur_tag: like job_tag do monitor.lock if queue.is_empty then from until not queue.is_empty loop not_empty.wait (monitor) end end from until queue.is_empty loop cur_procedure ?= queue.item.item (1) cur_tag ?= queue.item.item (2) queue.remove monitor.unlock cur_procedure.call ([]) monitor.lock if cur_tag /= Void then decrement_tag_job (cur_tag) end decrement_tag_job (all_jobs_tag) end monitor.unlock end execute is -- Execute the top job in the queue. local cur_procedure: like job_type cur_tag: like job_tag do monitor.lock if queue.is_empty then from until not queue.is_empty loop not_empty.wait (monitor) end end cur_procedure ?= queue.item.item (1) cur_tag ?= queue.item.item (2) queue.remove monitor.unlock cur_procedure.call ([]) monitor.lock if cur_tag /= Void then decrement_tag_job (cur_tag) end decrement_tag_job (all_jobs_tag) monitor.unlock end join_jobs is -- Wait until all current jobs have finished do join_jobs_for_tag (all_jobs_tag) end join_jobs_for_tag (a_tag: like job_tag) is -- Wait until all current jobs with a_tag have finished require Tag_valid: a_tag /= Void do monitor.lock if tag_job_table.has_key (a_tag) then from until tag_job_table.item (a_tag) = 0 loop tag_jobs_completed.wait (monitor) end end monitor.unlock end feature -- Update force (a_job: like job_type) is -- Add a_job to the queue. require Job_valid: a_job /= Void do monitor.lock queue.force ([a_job, Void]) increment_tag_job (all_jobs_tag) not_empty.signal monitor.unlock end force_with_tag (a_job: like job_type; a_tag: like job_tag) is -- Add a_job to the queue with a_tag. require Job_valid: a_job /= Void Tag_valid: a_tag /= Void and not a_tag.is_equal (all_jobs_tag) do monitor.lock queue.force ([a_job, a_tag]) increment_tag_job (all_jobs_tag) increment_tag_job (a_tag) not_empty.signal monitor.unlock end feature {NONE} -- Implementation queue: LINKED_QUEUE [TUPLE [like job_type, like job_tag]] -- The job queue itself. monitor: MUTEX -- Protection for the queue. not_empty: CONDITION_VARIABLE -- Signaling variable. tag_job_table: HASH_TABLE [like job_count, like job_tag] -- Job counting table. tag_jobs_completed: CONDITION_VARIABLE -- Signaling variable. increment_tag_job (a_tag: like job_tag) is -- Add a job for a_tag (unsynchronized). require Tag_valid: a_tag /= Void do if tag_job_table.has_key (a_tag) then tag_job_table.force (tag_job_table.item (a_tag) + 1, a_tag) else tag_job_table.extend (1, a_tag) end ensure Count_set: tag_job_table.item (a_tag) > 0 end decrement_tag_job (a_tag: like job_tag) is -- Remove a job for a_tag (unsynchronized). require Tag_valid: a_tag /= Void and tag_job_table.has_key (a_tag) Count_valid: tag_job_table.item (a_tag) > 0 do tag_job_table.force (tag_job_table.item (a_tag) - 1, a_tag) if tag_job_table.item (a_tag) = 0 then tag_jobs_completed.signal end ensure Count_set: tag_job_table.item (a_tag) >= 0 end feature {NONE} -- Type anchors job_type: PROCEDURE [ANY, TUPLE []] -- Agent type for the jobs. job_tag: STRING -- Type for tags job_count: NATURAL -- Type for job count end